From: Haomai Wang Date: Tue, 5 Jan 2016 15:50:15 +0000 (+0800) Subject: NVMEDevice: make read/write all async X-Git-Tag: v10.0.4~81^2~34 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a5748cb2da3f7c3e4c6180133dfdee90d17f1957;p=ceph.git NVMEDevice: make read/write all async Signed-off-by: Haomai Wang --- diff --git a/src/os/bluestore/BlockDevice.h b/src/os/bluestore/BlockDevice.h index 03fa9707ab41..4564de5c08d8 100644 --- a/src/os/bluestore/BlockDevice.h +++ b/src/os/bluestore/BlockDevice.h @@ -24,8 +24,7 @@ struct IOContext { void *priv; #ifdef HAVE_SPDK - bool done = false; - void *backend; + void *backend_priv; #endif Mutex lock; diff --git a/src/os/bluestore/NVMEDevice.cc b/src/os/bluestore/NVMEDevice.cc index fff435bf778e..068c7752dfd3 100644 --- a/src/os/bluestore/NVMEDevice.cc +++ b/src/os/bluestore/NVMEDevice.cc @@ -39,31 +39,38 @@ #undef dout_prefix #define dout_prefix *_dout << "bdev " -void IOContext::aio_wait() -{ - Mutex::Locker l(lock); - // see _aio_thread for waker logic - num_waiting.inc(); - while (num_running.read() > 0 || num_reading.read() > 0) { - dout(10) << __func__ << " " << this - << " waiting for " << num_running.read() << " aios and/or " - << num_reading.read() << " readers to complete" << dendl; - cond.Wait(lock); - } - num_waiting.dec(); - dout(20) << __func__ << " " << this << " done" << dendl; -} - -static void io_complete(void *ctx, const struct nvme_completion *completion) { - if (nvme_completion_is_error(completion)) { - assert(0); - } - - IOContext *ioc = (IOContext*)ctx; - NVMEDevice *device = (NVMEDevice*)ioc->backend; - ioc->done = true; - if (ioc->priv) { - device->aio_callback(device->aio_callback_priv, ioc->priv); +struct rte_mempool *request_mempool; +static struct rte_mempool *task_pool; + +static void io_complete(void *t, const struct nvme_completion *completion) { + Task *task = static_cast(t); + IOContext *ctx = task->ctx; + if (task->command == IOCommand::WRITE_COMMAND) { + auto left = ctx->num_running.dec(); + assert(!nvme_completion_is_error(completion)); + // check waiting count before doing callback (which may + // destroy this ioc). + if (!left) { + ctx->backend_priv = nullptr; + if (ctx->num_waiting.read()) { + Mutex::Locker l(ctx->lock); + ctx->cond.Signal(); + } + if (ctx->priv) { + task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + } + } + rte_free(task->buf); + rte_mempool_put(task_pool, task); + } else { + assert(task->command == IOCommand::READ_COMMAND); + ctx->num_reading.dec(); + if (nvme_completion_is_error(completion)) + task->read_code = -1; // FIXME + else + task->read_code = 0; + Mutex::Locker l(ctx->lock); + ctx->cond.Signal(); } } @@ -74,6 +81,9 @@ static void io_complete(void *ctx, const struct nvme_completion *completion) { NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv) : ctrlr(nullptr), ns(nullptr), + aio_stop(false), + queue_lock("NVMEDevice::queue_lock"), + aio_thread(this), aio_callback(cb), aio_callback_priv(cbpriv) { @@ -82,7 +92,7 @@ NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv) } static char *ealargs[] = { - "perf", + "ceph-osd", "-c 0x1", /* This must be the second parameter. It is overwritten by index in main(). */ "-n 4", }; @@ -97,6 +107,13 @@ void NVMEDevice::init() nvme_request_size(), 128, 0, NULL, NULL, NULL, NULL, SOCKET_ID_ANY, 0); + if (request_mempool == NULL) + assert(0); + + task_pool = rte_mempool_create( + "task_pool", 8192, sizeof(Task), + 64, 0, NULL, NULL, NULL, NULL, + SOCKET_ID_ANY, 0); if (request_mempool == NULL) assert(0); } @@ -219,6 +236,8 @@ int NVMEDevice::open(string p) pci_iterator_destroy(iter); + aio_thread.create(); + dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)" << " block_size " << block_size << " (" << pretty_si_t(block_size) << "B)" << dendl; @@ -229,10 +248,71 @@ int NVMEDevice::open(string p) void NVMEDevice::close() { dout(1) << __func__ << dendl; - nvme_unregister_io_thread(); + + aio_stop = true; + aio_thread.join(); + aio_stop = false; name.clear(); } +void NVMEDevice::_aio_thread() +{ + dout(10) << __func__ << " start" << dendl; + if (nvme_register_io_thread() != 0) { + assert(0); + } + + Task *t; + int r = 0; + const int max = 16; + while (!aio_stop) { + dout(40) << __func__ << " polling" << dendl; + { + Mutex::Locker l(queue_lock); + if (!task_queue.empty()) { + t = task_queue.front(); + task_queue.pop(); + } + } + + if (t) { + switch (t->command) { + case IOCommand::WRITE_COMMAND: + { + while (t) { + r = nvme_ns_cmd_write(ns, t->buf, t->offset, t->len / block_size, io_complete, t); + if (r < 0) { + t->ctx->backend_priv = nullptr; + rte_free(t->buf); + rte_mempool_put(task_pool, t); + derr << __func__ << " failed to do write command" << dendl; + assert(0); + } + t = t->next; + } + break; + } + case IOCommand::READ_COMMAND: + { + r = nvme_ns_cmd_read(ns, t->buf, t->offset, t->len / block_size, io_complete, t); + if (r < 0) { + derr << __func__ << " failed to read" << dendl; + t->ctx->num_reading.dec(); + t->read_code = r; + Mutex::Locker l(t->ctx->lock); + t->ctx->cond.Signal(); + } + break; + } + } + } + + nvme_ctrlr_process_io_completions(ctrlr, max); + } + nvme_unregister_io_thread(); + dout(10) << __func__ << " end" << dendl; +} + int NVMEDevice::flush() { dout(10) << __func__ << " start" << dendl; @@ -241,10 +321,17 @@ int NVMEDevice::flush() void NVMEDevice::aio_submit(IOContext *ioc) { - while (!ioc->done) { - nvme_ctrlr_process_io_completions(ctrlr, 0); - usleep(50); - } + dout(20) << __func__ << " ioc " << ioc << " pending " + << ioc->num_pending.read() << " running " + << ioc->num_running.read() << dendl; + Task *t = static_cast(ioc->backend_priv); + int pending = ioc->num_pending.read(); + ioc->num_running.add(pending); + ioc->num_pending.sub(pending); + assert(ioc->num_pending.read() == 0); // we should be only thread doing this + Mutex::Locker l(queue_lock); + // Only need to push the first entry + task_queue.push(t); } int NVMEDevice::aio_write( @@ -261,18 +348,34 @@ int NVMEDevice::aio_write( assert(off < size); assert(off + len <= size); - if (!bl.is_n_page_sized() || !bl.is_page_aligned()) { - dout(20) << __func__ << " rebuilding buffer to be page-aligned" << dendl; - bl.rebuild(); - } - - ioc->backend = this; - int rc = nvme_ns_cmd_write(ns, bl.c_str(), off, - bl.length()/block_size, io_complete, ioc); - if (rc < 0) { - derr << __func__ << " failed to do write command" << dendl; - return rc; + Task *t; + int r = rte_mempool_get(task_pool, (void **)&t); + if (r < 0) { + derr << __func__ << " task_pool rte_mempool_get failed" << dendl; + return r; + } + + t->buf = rte_malloc(NULL, bl.length(), block_size); + if (t->buf == NULL) { + derr << __func__ << " task->buf rte_malloc failed" << dendl; + rte_mempool_put(task_pool, t); + return -ENOMEM; + } + bl.copy(0, bl.length(), static_cast(t->buf)); + + t->ctx = ioc; + t->command = IOCommand::WRITE_COMMAND; + t->offset = off; + t->len = len; + t->device = this; + if (ioc->backend_priv) { + Task *prev = static_cast(ioc->backend_priv); + prev->next = t; + } else { + ioc->backend_priv = t; } + t->next = nullptr; + ioc->num_pending.inc(); dout(5) << __func__ << " " << off << "~" << len << dendl; @@ -315,22 +418,52 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, assert(off < size); assert(off + len <= size); - bufferptr p = buffer::create_page_aligned(len); - ioc->backend = this; - int r = nvme_ns_cmd_read(ns, p.c_str(), off, len / block_size, io_complete, ioc); + Task *t; + int r = rte_mempool_get(task_pool, (void **)&t); if (r < 0) { - r = -errno; - derr << __func__ << " failed to read" << dendl; + derr << __func__ << " task_pool rte_mempool_get failed" << dendl; return r; } - while (!ioc->done) { - nvme_ctrlr_process_io_completions(ctrlr, 0); - usleep(50); + + bufferptr p = buffer::create_page_aligned(len); + t->buf = rte_malloc(NULL, len, block_size); + if (t->buf == NULL) { + derr << __func__ << " task->buf rte_malloc failed" << dendl; + r = -ENOMEM; + goto out; + } + t->ctx = ioc; + t->command = IOCommand::READ_COMMAND; + t->offset = off; + t->len = len; + t->device = this; + t->read_code = 1; + assert(!ioc->backend_priv); + ioc->num_reading.inc();; + { + Mutex::Locker l(queue_lock); + task_queue.push(t); + } + + { + Mutex::Locker l(ioc->lock); + while (t->read_code > 0) + ioc->cond.Wait(ioc->lock); } + memcpy(p.c_str(), t->buf, len); pbl->clear(); pbl->push_back(p); - - return r < 0 ? r : 0; + r = t->read_code; + rte_free(t); + + out: + rte_mempool_put(task_pool, t); + if (ioc->num_waiting.read()) { + dout(20) << __func__ << " waking waiter" << dendl; + Mutex::Locker l(ioc->lock); + ioc->cond.Signal(); + } + return r; } int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len) diff --git a/src/os/bluestore/NVMEDevice.h b/src/os/bluestore/NVMEDevice.h index 2e14fbab2d9d..cee0a571a345 100644 --- a/src/os/bluestore/NVMEDevice.h +++ b/src/os/bluestore/NVMEDevice.h @@ -17,6 +17,7 @@ #ifndef CEPH_OS_BLUESTORE_NVMEDEVICE #define CEPH_OS_BLUESTORE_NVMEDEVICE +#include #include // since _Static_assert introduced in c11 @@ -33,8 +34,29 @@ extern "C" { } #endif +#include "include/atomic.h" +#include "common/Mutex.h" #include "BlockDevice.h" +enum class IOCommand { + READ_COMMAND, + WRITE_COMMAND +}; + +class NVMEDevice; + +struct Task { + NVMEDevice *device; + IOContext *ctx; + IOCommand command; + uint64_t offset, len; + void *buf; + union { + Task *next; + int64_t read_code; + }; +}; + class NVMEDevice : public BlockDevice { /** * points to pinned, physically contiguous memory region; @@ -48,8 +70,24 @@ class NVMEDevice : public BlockDevice { uint64_t size; uint64_t block_size; + bool aio_stop; bufferptr zeros; + Mutex queue_lock; + Cond queue_cond; + std::queue task_queue; + + struct AioCompletionThread : public Thread { + NVMEDevice *dev; + AioCompletionThread(NVMEDevice *b) : dev(b) {} + void *entry() { + dev->_aio_thread(); + return NULL; + } + } aio_thread; + + void _aio_thread(); + static void init(); public: