From: Haomai Wang Date: Mon, 4 Jan 2016 09:21:40 +0000 (+0800) Subject: bluestore: rename BlockDevice to KernelDevice matching NVMEDevice X-Git-Tag: v10.0.4~81^2~53 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=35576154aece55bb28320bfe4ea4a1cac6cc8aec;p=ceph.git bluestore: rename BlockDevice to KernelDevice matching NVMEDevice Signed-off-by: Haomai Wang --- diff --git a/src/os/bluestore/BlockDevice.cc b/src/os/bluestore/BlockDevice.cc deleted file mode 100644 index 230ea455f4d9..000000000000 --- a/src/os/bluestore/BlockDevice.cc +++ /dev/null @@ -1,550 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include -#include -#include -#include -#include -#include - -#include "BlockDevice.h" -#include "include/types.h" -#include "include/compat.h" -#include "common/errno.h" -#include "common/debug.h" -#include "common/blkdev.h" - -#define dout_subsys ceph_subsys_bdev -#undef dout_prefix -#define dout_prefix *_dout << "bdev " - - -void IOContext::aio_wait() -{ - Mutex::Locker l(lock); - // see _aio_thread for waker logic - num_waiting.inc(); - while (num_running.read() > 0 || num_reading.read() > 0) { - dout(10) << __func__ << " " << this - << " waiting for " << num_running.read() << " aios and/or " - << num_reading.read() << " readers to complete" << dendl; - cond.Wait(lock); - } - num_waiting.dec(); - dout(20) << __func__ << " " << this << " done" << dendl; -} - -// ---------------- -#undef dout_prefix -#define dout_prefix *_dout << "bdev(" << path << ") " - -BlockDevice::BlockDevice(aio_callback_t cb, void *cbpriv) - : fd_direct(-1), - fd_buffered(-1), - size(0), block_size(0), - fs(NULL), aio(false), dio(false), - debug_lock("BlockDevice::debug_lock"), - ioc_reap_lock("BlockDevice::ioc_reap_lock"), - flush_lock("BlockDevice::flush_lock"), - aio_queue(g_conf->bdev_aio_max_queue_depth), - aio_callback(cb), - aio_callback_priv(cbpriv), - aio_stop(false), - aio_thread(this) -{ - zeros = buffer::create_page_aligned(1048576); - zeros.zero(); -} - -int BlockDevice::_lock() -{ - struct flock l; - memset(&l, 0, sizeof(l)); - l.l_type = F_WRLCK; - l.l_whence = SEEK_SET; - l.l_start = 0; - l.l_len = 0; - int r = ::fcntl(fd_direct, F_SETLK, &l); - if (r < 0) - return -errno; - return 0; -} - -int BlockDevice::open(string p) -{ - path = p; - int r = 0; - dout(1) << __func__ << " path " << path << dendl; - - fd_direct = ::open(path.c_str(), O_RDWR | O_DIRECT); - if (fd_direct < 0) { - int r = -errno; - derr << __func__ << " open got: " << cpp_strerror(r) << dendl; - return r; - } - fd_buffered = ::open(path.c_str(), O_RDWR); - if (fd_buffered < 0) { - r = -errno; - derr << __func__ << " open got: " << cpp_strerror(r) << dendl; - goto out_direct; - } - dio = true; - aio = g_conf->bdev_aio; - if (!aio) { - assert(0 == "non-aio not supported"); - } - - // disable readahead as it will wreak havoc on our mix of - // directio/aio and buffered io. - r = posix_fadvise(fd_buffered, 0, 0, POSIX_FADV_RANDOM); - if (r < 0) { - r = -errno; - derr << __func__ << " open got: " << cpp_strerror(r) << dendl; - goto out_fail; - } - - r = _lock(); - if (r < 0) { - derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r) - << dendl; - goto out_fail; - } - - struct stat st; - r = ::fstat(fd_direct, &st); - if (r < 0) { - r = -errno; - derr << __func__ << " fstat got " << cpp_strerror(r) << dendl; - goto out_fail; - } - if (S_ISBLK(st.st_mode)) { - int64_t s; - r = get_block_device_size(fd_direct, &s); - if (r < 0) { - goto out_fail; - } - size = s; - } else { - size = st.st_size; - } - block_size = st.st_blksize; - - fs = FS::create_by_fd(fd_direct); - assert(fs); - - r = _aio_start(); - assert(r == 0); - - dout(1) << __func__ - << " size " << size - << " (" << pretty_si_t(size) << "B)" - << " block_size " << block_size - << " (" << pretty_si_t(block_size) << "B)" - << dendl; - return 0; - - out_fail: - VOID_TEMP_FAILURE_RETRY(::close(fd_buffered)); - fd_buffered = -1; - out_direct: - VOID_TEMP_FAILURE_RETRY(::close(fd_direct)); - fd_direct = -1; - return r; -} - -void BlockDevice::close() -{ - dout(1) << __func__ << dendl; - _aio_stop(); - - assert(fs); - delete fs; - fs = NULL; - - assert(fd_direct >= 0); - VOID_TEMP_FAILURE_RETRY(::close(fd_direct)); - fd_direct = -1; - - assert(fd_buffered >= 0); - VOID_TEMP_FAILURE_RETRY(::close(fd_buffered)); - fd_buffered = -1; - - path.clear(); -} - -int BlockDevice::flush() -{ - // serialize flushers, so that we can avoid weird io_since_flush - // races (w/ multipler flushers). - Mutex::Locker l(flush_lock); - if (io_since_flush.read() == 0) { - dout(10) << __func__ << " no-op (no ios since last flush)" << dendl; - return 0; - } - dout(10) << __func__ << " start" << dendl; - io_since_flush.set(0); - if (g_conf->bdev_inject_crash) { - // sleep for a moment to give other threads a chance to submit or - // wait on io that races with a flush. - derr << __func__ << " injecting crash. first we sleep..." << dendl; - sleep(3); - derr << __func__ << " and now we die" << dendl; - assert(0 == "bdev_inject_crash"); - } - utime_t start = ceph_clock_now(NULL); - int r = ::fdatasync(fd_direct); - utime_t end = ceph_clock_now(NULL); - utime_t dur = end - start; - if (r < 0) { - r = -errno; - derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl; - } - dout(5) << __func__ << " in " << dur << dendl;; - return r; -} - -int BlockDevice::_aio_start() -{ - if (g_conf->bdev_aio) { - dout(10) << __func__ << dendl; - int r = aio_queue.init(); - if (r < 0) { - derr << __func__ << " failed: " << cpp_strerror(r) << dendl; - return r; - } - aio_thread.create("bstore_aio"); - } - return 0; -} - -void BlockDevice::_aio_stop() -{ - if (g_conf->bdev_aio) { - dout(10) << __func__ << dendl; - aio_stop = true; - aio_thread.join(); - aio_stop = false; - aio_queue.shutdown(); - } -} - -void BlockDevice::_aio_thread() -{ - dout(10) << __func__ << " start" << dendl; - while (!aio_stop) { - dout(40) << __func__ << " polling" << dendl; - int max = 16; - FS::aio_t *aio[max]; - int r = aio_queue.get_next_completed(g_conf->bdev_aio_poll_ms, - aio, max); - if (r < 0) { - derr << __func__ << " got " << cpp_strerror(r) << dendl; - } - if (r > 0) { - dout(30) << __func__ << " got " << r << " completed aios" << dendl; - for (int i = 0; i < r; ++i) { - IOContext *ioc = static_cast(aio[i]->priv); - _aio_log_finish(ioc, aio[i]->offset, aio[i]->length); - int left = ioc->num_running.dec(); - int r = aio[i]->get_return_value(); - dout(10) << __func__ << " finished aio " << aio[i] << " r " << r - << " ioc " << ioc - << " with " << left << " aios left" << dendl; - assert(r >= 0); - if (left == 0) { - // check waiting count before doing callback (which may - // destroy this ioc). - if (ioc->num_waiting.read()) { - dout(20) << __func__ << " waking waiter" << dendl; - Mutex::Locker l(ioc->lock); - ioc->cond.Signal(); - } - if (ioc->priv) { - aio_callback(aio_callback_priv, ioc->priv); - } - } - } - } - if (ioc_reap_count.read()) { - Mutex::Locker l(ioc_reap_lock); - for (auto p : ioc_reap_queue) { - dout(20) << __func__ << " reap ioc " << p << dendl; - delete p; - } - ioc_reap_queue.clear(); - ioc_reap_count.dec(); - } - } - dout(10) << __func__ << " end" << dendl; -} - -void BlockDevice::_aio_log_start( - IOContext *ioc, - uint64_t offset, - uint64_t length) -{ - dout(20) << __func__ << " " << offset << "~" << length << dendl; - if (g_conf->bdev_debug_inflight_ios) { - Mutex::Locker l(debug_lock); - if (debug_inflight.intersects(offset, length)) { - derr << __func__ << " inflight overlap of " - << offset << "~" << length - << " with " << debug_inflight << dendl; - assert(0); - } - debug_inflight.insert(offset, length); - } -} - -void BlockDevice::_aio_log_finish( - IOContext *ioc, - uint64_t offset, - uint64_t length) -{ - dout(20) << __func__ << " " << aio << " " << offset << "~" << length << dendl; - if (g_conf->bdev_debug_inflight_ios) { - Mutex::Locker l(debug_lock); - debug_inflight.erase(offset, length); - } -} - -void BlockDevice::aio_submit(IOContext *ioc) -{ - dout(20) << __func__ << " ioc " << ioc - << " pending " << ioc->num_pending.read() - << " running " << ioc->num_running.read() - << dendl; - // move these aside, and get our end iterator position now, as the - // aios might complete as soon as they are submitted and queue more - // wal aio's. - list::iterator e = ioc->running_aios.begin(); - ioc->running_aios.splice(e, ioc->pending_aios); - list::iterator p = ioc->running_aios.begin(); - - int pending = ioc->num_pending.read(); - ioc->num_running.add(pending); - ioc->num_pending.sub(pending); - assert(ioc->num_pending.read() == 0); // we should be only thread doing this - - bool done = false; - while (!done) { - FS::aio_t& aio = *p; - aio.priv = static_cast(ioc); - dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd - << " " << aio.offset << "~" << aio.length << dendl; - for (vector::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q) - dout(30) << __func__ << " iov " << (void*)q->iov_base - << " len " << q->iov_len << dendl; - - // be careful: as soon as we submit aio we race with completion. - // since we are holding a ref take care not to dereference txc at - // all after that point. - list::iterator cur = p; - ++p; - done = (p == e); - - // do not dereference txc (or it's contents) after we submit (if - // done == true and we don't loop) - int retries = 0; - int r = aio_queue.submit(*cur, &retries); - if (retries) - derr << __func__ << " retries " << retries << dendl; - if (r) { - derr << " aio submit got " << cpp_strerror(r) << dendl; - assert(r == 0); - } - } -} - -int BlockDevice::aio_write( - uint64_t off, - bufferlist &bl, - IOContext *ioc, - bool buffered) -{ - uint64_t len = bl.length(); - dout(20) << __func__ << " " << off << "~" << len << dendl; - assert(off % block_size == 0); - assert(len % block_size == 0); - assert(len > 0); - assert(off < size); - assert(off + len <= size); - - if (!bl.is_n_page_sized() || !bl.is_page_aligned()) { - dout(20) << __func__ << " rebuilding buffer to be page-aligned" << dendl; - bl.rebuild(); - } - - dout(40) << "data: "; - bl.hexdump(*_dout); - *_dout << dendl; - - _aio_log_start(ioc, off, bl.length()); - -#ifdef HAVE_LIBAIO - if (aio && dio && !buffered) { - ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct)); - ioc->num_pending.inc(); - FS::aio_t& aio = ioc->pending_aios.back(); - if (g_conf->bdev_inject_crash && - rand() % g_conf->bdev_inject_crash == 0) { - derr << __func__ << " bdev_inject_crash: dropping io " << off << "~" << len - << dendl; - // generate a real io so that aio_wait behaves properly, but make it - // a read instead of write, and toss the result. - aio.pread(off, len); - } else { - bl.prepare_iov(&aio.iov); - for (unsigned i=0; ibdev_inject_crash && - rand() % g_conf->bdev_inject_crash == 0) { - derr << __func__ << " bdev_inject_crash: dropping io " << off << "~" << len - << dendl; - return 0; - } - vector iov; - bl.prepare_iov(&iov); - int r = ::pwritev(buffered ? fd_buffered : fd_direct, - &iov[0], iov.size(), off); - if (r < 0) { - derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl; - return r; - } - if (buffered) { - // initiate IO (but do not wait) - ::sync_file_range(fd_buffered, off, len, SYNC_FILE_RANGE_WRITE); - } - } - - io_since_flush.set(1); - return 0; -} - -int BlockDevice::aio_zero( - uint64_t off, - uint64_t len, - IOContext *ioc) -{ - dout(5) << __func__ << " " << off << "~" << len << dendl; - assert(off % block_size == 0); - assert(len % block_size == 0); - assert(len > 0); - assert(off < size); - assert(off + len <= size); - - bufferlist bl; - while (len > 0) { - bufferlist t; - t.append(zeros, 0, MIN(zeros.length(), len)); - len -= t.length(); - bl.claim_append(t); - } - bufferlist foo; - // note: this works with aio only becaues the actual buffer is - // this->zeros, which is page-aligned and never freed. - return aio_write(off, bl, ioc, false); -} - -int BlockDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, - IOContext *ioc, - bool buffered) -{ - dout(5) << __func__ << " " << off << "~" << len << dendl; - assert(off % block_size == 0); - assert(len % block_size == 0); - assert(len > 0); - assert(off < size); - assert(off + len <= size); - - _aio_log_start(ioc, off, len); - ioc->num_reading.inc();; - - bufferptr p = buffer::create_page_aligned(len); - int r = ::pread(buffered ? fd_buffered : fd_direct, - p.c_str(), len, off); - if (r < 0) { - r = -errno; - goto out; - } - pbl->clear(); - pbl->push_back(p); - - dout(40) << "data: "; - pbl->hexdump(*_dout); - *_dout << dendl; - - out: - _aio_log_finish(ioc, off, len); - ioc->num_reading.dec(); - if (ioc->num_waiting.read()) { - dout(20) << __func__ << " waking waiter" << dendl; - Mutex::Locker l(ioc->lock); - ioc->cond.Signal(); - } - return r < 0 ? r : 0; -} - -int BlockDevice::read_buffered(uint64_t off, uint64_t len, char *buf) -{ - dout(5) << __func__ << " " << off << "~" << len << dendl; - assert(len > 0); - assert(off < size); - assert(off + len <= size); - - int r = 0; - char *t = buf; - uint64_t left = len; - while (left > 0) { - r = ::pread(fd_buffered, t, left, off); - if (r < 0) { - r = -errno; - goto out; - } - off += r; - t += r; - left -= r; - } - - dout(40) << __func__ << " data: "; - bufferlist bl; - bl.append(buf, len); - bl.hexdump(*_dout); - *_dout << dendl; - - out: - return r < 0 ? r : 0; -} - -int BlockDevice::invalidate_cache(uint64_t off, uint64_t len) -{ - dout(5) << __func__ << " " << off << "~" << len << dendl; - assert(off % block_size == 0); - assert(len % block_size == 0); - int r = posix_fadvise(fd_buffered, off, len, POSIX_FADV_DONTNEED); - if (r < 0) { - r = -errno; - derr << __func__ << " " << off << "~" << len << " error: " - << cpp_strerror(r) << dendl; - } - return r; -} - -void BlockDevice::queue_reap_ioc(IOContext *ioc) -{ - Mutex::Locker l(ioc_reap_lock); - if (ioc_reap_count.read() == 0) - ioc_reap_count.inc(); - ioc_reap_queue.push_back(ioc); -} diff --git a/src/os/bluestore/BlockDevice.h b/src/os/bluestore/BlockDevice.h deleted file mode 100644 index 77bb8eb49b1a..000000000000 --- a/src/os/bluestore/BlockDevice.h +++ /dev/null @@ -1,120 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_OS_BLUESTORE_BLOCKDEVICE -#define CEPH_OS_BLUESTORE_BLOCKDEVICE - -#include "os/fs/FS.h" -#include "include/interval_set.h" - -/// track in-flight io -struct IOContext { - void *priv; - - Mutex lock; - Cond cond; - //interval_set blocks; ///< blocks with aio in flight - - list pending_aios; ///< not yet submitted - list running_aios; ///< submitting or submitted - atomic_t num_pending; - atomic_t num_running; - atomic_t num_reading; - atomic_t num_waiting; - - IOContext(void *p) - : priv(p), - lock("IOContext::lock") - {} - - // no copying - IOContext(const IOContext& other); - IOContext &operator=(const IOContext& other); - - bool has_aios() { - Mutex::Locker l(lock); - return num_pending.read() + num_running.read(); - } - - void aio_wait(); -}; - -class BlockDevice { -public: - typedef void (*aio_callback_t)(void *handle, void *aio); - -private: - int fd_direct, fd_buffered; - uint64_t size; - uint64_t block_size; - string path; - FS *fs; - bool aio, dio; - bufferptr zeros; - - Mutex debug_lock; - interval_set debug_inflight; - - Mutex ioc_reap_lock; - vector ioc_reap_queue; - atomic_t ioc_reap_count; - - Mutex flush_lock; - atomic_t io_since_flush; - - FS::aio_queue_t aio_queue; - aio_callback_t aio_callback; - void *aio_callback_priv; - bool aio_stop; - - struct AioCompletionThread : public Thread { - BlockDevice *bdev; - AioCompletionThread(BlockDevice *b) : bdev(b) {} - void *entry() { - bdev->_aio_thread(); - return NULL; - } - } aio_thread; - - void _aio_thread(); - int _aio_start(); - void _aio_stop(); - - void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length); - void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length); - - int _lock(); - -public: - BlockDevice(aio_callback_t cb, void *cbpriv); - - void aio_submit(IOContext *ioc); - - uint64_t get_size() const { - return size; - } - uint64_t get_block_size() const { - return block_size; - } - - int read(uint64_t off, uint64_t len, bufferlist *pbl, - IOContext *ioc, - bool buffered); - int read_buffered(uint64_t off, uint64_t len, char *buf); - - int aio_write(uint64_t off, bufferlist& bl, - IOContext *ioc, - bool buffered); - int aio_zero(uint64_t off, uint64_t len, - IOContext *ioc); - int flush(); - - void queue_reap_ioc(IOContext *ioc); - - // for managing buffered readers/writers - int invalidate_cache(uint64_t off, uint64_t len); - int open(string path); - void close(); -}; - -#endif diff --git a/src/os/bluestore/KernelDevice.cc b/src/os/bluestore/KernelDevice.cc new file mode 100644 index 000000000000..230ea455f4d9 --- /dev/null +++ b/src/os/bluestore/KernelDevice.cc @@ -0,0 +1,550 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include +#include +#include +#include + +#include "BlockDevice.h" +#include "include/types.h" +#include "include/compat.h" +#include "common/errno.h" +#include "common/debug.h" +#include "common/blkdev.h" + +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev " + + +void IOContext::aio_wait() +{ + Mutex::Locker l(lock); + // see _aio_thread for waker logic + num_waiting.inc(); + while (num_running.read() > 0 || num_reading.read() > 0) { + dout(10) << __func__ << " " << this + << " waiting for " << num_running.read() << " aios and/or " + << num_reading.read() << " readers to complete" << dendl; + cond.Wait(lock); + } + num_waiting.dec(); + dout(20) << __func__ << " " << this << " done" << dendl; +} + +// ---------------- +#undef dout_prefix +#define dout_prefix *_dout << "bdev(" << path << ") " + +BlockDevice::BlockDevice(aio_callback_t cb, void *cbpriv) + : fd_direct(-1), + fd_buffered(-1), + size(0), block_size(0), + fs(NULL), aio(false), dio(false), + debug_lock("BlockDevice::debug_lock"), + ioc_reap_lock("BlockDevice::ioc_reap_lock"), + flush_lock("BlockDevice::flush_lock"), + aio_queue(g_conf->bdev_aio_max_queue_depth), + aio_callback(cb), + aio_callback_priv(cbpriv), + aio_stop(false), + aio_thread(this) +{ + zeros = buffer::create_page_aligned(1048576); + zeros.zero(); +} + +int BlockDevice::_lock() +{ + struct flock l; + memset(&l, 0, sizeof(l)); + l.l_type = F_WRLCK; + l.l_whence = SEEK_SET; + l.l_start = 0; + l.l_len = 0; + int r = ::fcntl(fd_direct, F_SETLK, &l); + if (r < 0) + return -errno; + return 0; +} + +int BlockDevice::open(string p) +{ + path = p; + int r = 0; + dout(1) << __func__ << " path " << path << dendl; + + fd_direct = ::open(path.c_str(), O_RDWR | O_DIRECT); + if (fd_direct < 0) { + int r = -errno; + derr << __func__ << " open got: " << cpp_strerror(r) << dendl; + return r; + } + fd_buffered = ::open(path.c_str(), O_RDWR); + if (fd_buffered < 0) { + r = -errno; + derr << __func__ << " open got: " << cpp_strerror(r) << dendl; + goto out_direct; + } + dio = true; + aio = g_conf->bdev_aio; + if (!aio) { + assert(0 == "non-aio not supported"); + } + + // disable readahead as it will wreak havoc on our mix of + // directio/aio and buffered io. + r = posix_fadvise(fd_buffered, 0, 0, POSIX_FADV_RANDOM); + if (r < 0) { + r = -errno; + derr << __func__ << " open got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + + r = _lock(); + if (r < 0) { + derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r) + << dendl; + goto out_fail; + } + + struct stat st; + r = ::fstat(fd_direct, &st); + if (r < 0) { + r = -errno; + derr << __func__ << " fstat got " << cpp_strerror(r) << dendl; + goto out_fail; + } + if (S_ISBLK(st.st_mode)) { + int64_t s; + r = get_block_device_size(fd_direct, &s); + if (r < 0) { + goto out_fail; + } + size = s; + } else { + size = st.st_size; + } + block_size = st.st_blksize; + + fs = FS::create_by_fd(fd_direct); + assert(fs); + + r = _aio_start(); + assert(r == 0); + + dout(1) << __func__ + << " size " << size + << " (" << pretty_si_t(size) << "B)" + << " block_size " << block_size + << " (" << pretty_si_t(block_size) << "B)" + << dendl; + return 0; + + out_fail: + VOID_TEMP_FAILURE_RETRY(::close(fd_buffered)); + fd_buffered = -1; + out_direct: + VOID_TEMP_FAILURE_RETRY(::close(fd_direct)); + fd_direct = -1; + return r; +} + +void BlockDevice::close() +{ + dout(1) << __func__ << dendl; + _aio_stop(); + + assert(fs); + delete fs; + fs = NULL; + + assert(fd_direct >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_direct)); + fd_direct = -1; + + assert(fd_buffered >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_buffered)); + fd_buffered = -1; + + path.clear(); +} + +int BlockDevice::flush() +{ + // serialize flushers, so that we can avoid weird io_since_flush + // races (w/ multipler flushers). + Mutex::Locker l(flush_lock); + if (io_since_flush.read() == 0) { + dout(10) << __func__ << " no-op (no ios since last flush)" << dendl; + return 0; + } + dout(10) << __func__ << " start" << dendl; + io_since_flush.set(0); + if (g_conf->bdev_inject_crash) { + // sleep for a moment to give other threads a chance to submit or + // wait on io that races with a flush. + derr << __func__ << " injecting crash. first we sleep..." << dendl; + sleep(3); + derr << __func__ << " and now we die" << dendl; + assert(0 == "bdev_inject_crash"); + } + utime_t start = ceph_clock_now(NULL); + int r = ::fdatasync(fd_direct); + utime_t end = ceph_clock_now(NULL); + utime_t dur = end - start; + if (r < 0) { + r = -errno; + derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl; + } + dout(5) << __func__ << " in " << dur << dendl;; + return r; +} + +int BlockDevice::_aio_start() +{ + if (g_conf->bdev_aio) { + dout(10) << __func__ << dendl; + int r = aio_queue.init(); + if (r < 0) { + derr << __func__ << " failed: " << cpp_strerror(r) << dendl; + return r; + } + aio_thread.create("bstore_aio"); + } + return 0; +} + +void BlockDevice::_aio_stop() +{ + if (g_conf->bdev_aio) { + dout(10) << __func__ << dendl; + aio_stop = true; + aio_thread.join(); + aio_stop = false; + aio_queue.shutdown(); + } +} + +void BlockDevice::_aio_thread() +{ + dout(10) << __func__ << " start" << dendl; + while (!aio_stop) { + dout(40) << __func__ << " polling" << dendl; + int max = 16; + FS::aio_t *aio[max]; + int r = aio_queue.get_next_completed(g_conf->bdev_aio_poll_ms, + aio, max); + if (r < 0) { + derr << __func__ << " got " << cpp_strerror(r) << dendl; + } + if (r > 0) { + dout(30) << __func__ << " got " << r << " completed aios" << dendl; + for (int i = 0; i < r; ++i) { + IOContext *ioc = static_cast(aio[i]->priv); + _aio_log_finish(ioc, aio[i]->offset, aio[i]->length); + int left = ioc->num_running.dec(); + int r = aio[i]->get_return_value(); + dout(10) << __func__ << " finished aio " << aio[i] << " r " << r + << " ioc " << ioc + << " with " << left << " aios left" << dendl; + assert(r >= 0); + if (left == 0) { + // check waiting count before doing callback (which may + // destroy this ioc). + if (ioc->num_waiting.read()) { + dout(20) << __func__ << " waking waiter" << dendl; + Mutex::Locker l(ioc->lock); + ioc->cond.Signal(); + } + if (ioc->priv) { + aio_callback(aio_callback_priv, ioc->priv); + } + } + } + } + if (ioc_reap_count.read()) { + Mutex::Locker l(ioc_reap_lock); + for (auto p : ioc_reap_queue) { + dout(20) << __func__ << " reap ioc " << p << dendl; + delete p; + } + ioc_reap_queue.clear(); + ioc_reap_count.dec(); + } + } + dout(10) << __func__ << " end" << dendl; +} + +void BlockDevice::_aio_log_start( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " " << offset << "~" << length << dendl; + if (g_conf->bdev_debug_inflight_ios) { + Mutex::Locker l(debug_lock); + if (debug_inflight.intersects(offset, length)) { + derr << __func__ << " inflight overlap of " + << offset << "~" << length + << " with " << debug_inflight << dendl; + assert(0); + } + debug_inflight.insert(offset, length); + } +} + +void BlockDevice::_aio_log_finish( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " " << aio << " " << offset << "~" << length << dendl; + if (g_conf->bdev_debug_inflight_ios) { + Mutex::Locker l(debug_lock); + debug_inflight.erase(offset, length); + } +} + +void BlockDevice::aio_submit(IOContext *ioc) +{ + dout(20) << __func__ << " ioc " << ioc + << " pending " << ioc->num_pending.read() + << " running " << ioc->num_running.read() + << dendl; + // move these aside, and get our end iterator position now, as the + // aios might complete as soon as they are submitted and queue more + // wal aio's. + list::iterator e = ioc->running_aios.begin(); + ioc->running_aios.splice(e, ioc->pending_aios); + list::iterator p = ioc->running_aios.begin(); + + int pending = ioc->num_pending.read(); + ioc->num_running.add(pending); + ioc->num_pending.sub(pending); + assert(ioc->num_pending.read() == 0); // we should be only thread doing this + + bool done = false; + while (!done) { + FS::aio_t& aio = *p; + aio.priv = static_cast(ioc); + dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd + << " " << aio.offset << "~" << aio.length << dendl; + for (vector::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q) + dout(30) << __func__ << " iov " << (void*)q->iov_base + << " len " << q->iov_len << dendl; + + // be careful: as soon as we submit aio we race with completion. + // since we are holding a ref take care not to dereference txc at + // all after that point. + list::iterator cur = p; + ++p; + done = (p == e); + + // do not dereference txc (or it's contents) after we submit (if + // done == true and we don't loop) + int retries = 0; + int r = aio_queue.submit(*cur, &retries); + if (retries) + derr << __func__ << " retries " << retries << dendl; + if (r) { + derr << " aio submit got " << cpp_strerror(r) << dendl; + assert(r == 0); + } + } +} + +int BlockDevice::aio_write( + uint64_t off, + bufferlist &bl, + IOContext *ioc, + bool buffered) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " " << off << "~" << len << dendl; + assert(off % block_size == 0); + assert(len % block_size == 0); + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + if (!bl.is_n_page_sized() || !bl.is_page_aligned()) { + dout(20) << __func__ << " rebuilding buffer to be page-aligned" << dendl; + bl.rebuild(); + } + + dout(40) << "data: "; + bl.hexdump(*_dout); + *_dout << dendl; + + _aio_log_start(ioc, off, bl.length()); + +#ifdef HAVE_LIBAIO + if (aio && dio && !buffered) { + ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct)); + ioc->num_pending.inc(); + FS::aio_t& aio = ioc->pending_aios.back(); + if (g_conf->bdev_inject_crash && + rand() % g_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io " << off << "~" << len + << dendl; + // generate a real io so that aio_wait behaves properly, but make it + // a read instead of write, and toss the result. + aio.pread(off, len); + } else { + bl.prepare_iov(&aio.iov); + for (unsigned i=0; ibdev_inject_crash && + rand() % g_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io " << off << "~" << len + << dendl; + return 0; + } + vector iov; + bl.prepare_iov(&iov); + int r = ::pwritev(buffered ? fd_buffered : fd_direct, + &iov[0], iov.size(), off); + if (r < 0) { + derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl; + return r; + } + if (buffered) { + // initiate IO (but do not wait) + ::sync_file_range(fd_buffered, off, len, SYNC_FILE_RANGE_WRITE); + } + } + + io_since_flush.set(1); + return 0; +} + +int BlockDevice::aio_zero( + uint64_t off, + uint64_t len, + IOContext *ioc) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + assert(off % block_size == 0); + assert(len % block_size == 0); + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + bufferlist bl; + while (len > 0) { + bufferlist t; + t.append(zeros, 0, MIN(zeros.length(), len)); + len -= t.length(); + bl.claim_append(t); + } + bufferlist foo; + // note: this works with aio only becaues the actual buffer is + // this->zeros, which is page-aligned and never freed. + return aio_write(off, bl, ioc, false); +} + +int BlockDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + assert(off % block_size == 0); + assert(len % block_size == 0); + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + _aio_log_start(ioc, off, len); + ioc->num_reading.inc();; + + bufferptr p = buffer::create_page_aligned(len); + int r = ::pread(buffered ? fd_buffered : fd_direct, + p.c_str(), len, off); + if (r < 0) { + r = -errno; + goto out; + } + pbl->clear(); + pbl->push_back(p); + + dout(40) << "data: "; + pbl->hexdump(*_dout); + *_dout << dendl; + + out: + _aio_log_finish(ioc, off, len); + ioc->num_reading.dec(); + if (ioc->num_waiting.read()) { + dout(20) << __func__ << " waking waiter" << dendl; + Mutex::Locker l(ioc->lock); + ioc->cond.Signal(); + } + return r < 0 ? r : 0; +} + +int BlockDevice::read_buffered(uint64_t off, uint64_t len, char *buf) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + int r = 0; + char *t = buf; + uint64_t left = len; + while (left > 0) { + r = ::pread(fd_buffered, t, left, off); + if (r < 0) { + r = -errno; + goto out; + } + off += r; + t += r; + left -= r; + } + + dout(40) << __func__ << " data: "; + bufferlist bl; + bl.append(buf, len); + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r < 0 ? r : 0; +} + +int BlockDevice::invalidate_cache(uint64_t off, uint64_t len) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + assert(off % block_size == 0); + assert(len % block_size == 0); + int r = posix_fadvise(fd_buffered, off, len, POSIX_FADV_DONTNEED); + if (r < 0) { + r = -errno; + derr << __func__ << " " << off << "~" << len << " error: " + << cpp_strerror(r) << dendl; + } + return r; +} + +void BlockDevice::queue_reap_ioc(IOContext *ioc) +{ + Mutex::Locker l(ioc_reap_lock); + if (ioc_reap_count.read() == 0) + ioc_reap_count.inc(); + ioc_reap_queue.push_back(ioc); +} diff --git a/src/os/bluestore/KernelDevice.h b/src/os/bluestore/KernelDevice.h new file mode 100644 index 000000000000..77bb8eb49b1a --- /dev/null +++ b/src/os/bluestore/KernelDevice.h @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_OS_BLUESTORE_BLOCKDEVICE +#define CEPH_OS_BLUESTORE_BLOCKDEVICE + +#include "os/fs/FS.h" +#include "include/interval_set.h" + +/// track in-flight io +struct IOContext { + void *priv; + + Mutex lock; + Cond cond; + //interval_set blocks; ///< blocks with aio in flight + + list pending_aios; ///< not yet submitted + list running_aios; ///< submitting or submitted + atomic_t num_pending; + atomic_t num_running; + atomic_t num_reading; + atomic_t num_waiting; + + IOContext(void *p) + : priv(p), + lock("IOContext::lock") + {} + + // no copying + IOContext(const IOContext& other); + IOContext &operator=(const IOContext& other); + + bool has_aios() { + Mutex::Locker l(lock); + return num_pending.read() + num_running.read(); + } + + void aio_wait(); +}; + +class BlockDevice { +public: + typedef void (*aio_callback_t)(void *handle, void *aio); + +private: + int fd_direct, fd_buffered; + uint64_t size; + uint64_t block_size; + string path; + FS *fs; + bool aio, dio; + bufferptr zeros; + + Mutex debug_lock; + interval_set debug_inflight; + + Mutex ioc_reap_lock; + vector ioc_reap_queue; + atomic_t ioc_reap_count; + + Mutex flush_lock; + atomic_t io_since_flush; + + FS::aio_queue_t aio_queue; + aio_callback_t aio_callback; + void *aio_callback_priv; + bool aio_stop; + + struct AioCompletionThread : public Thread { + BlockDevice *bdev; + AioCompletionThread(BlockDevice *b) : bdev(b) {} + void *entry() { + bdev->_aio_thread(); + return NULL; + } + } aio_thread; + + void _aio_thread(); + int _aio_start(); + void _aio_stop(); + + void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length); + void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length); + + int _lock(); + +public: + BlockDevice(aio_callback_t cb, void *cbpriv); + + void aio_submit(IOContext *ioc); + + uint64_t get_size() const { + return size; + } + uint64_t get_block_size() const { + return block_size; + } + + int read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered); + int read_buffered(uint64_t off, uint64_t len, char *buf); + + int aio_write(uint64_t off, bufferlist& bl, + IOContext *ioc, + bool buffered); + int aio_zero(uint64_t off, uint64_t len, + IOContext *ioc); + int flush(); + + void queue_reap_ioc(IOContext *ioc); + + // for managing buffered readers/writers + int invalidate_cache(uint64_t off, uint64_t len); + int open(string path); + void close(); +}; + +#endif