From d70216b9fa7bcd92317ef997af91ba2dca4c7f8a Mon Sep 17 00:00:00 2001 From: Ziye Yang Date: Wed, 29 Mar 2017 14:04:21 +0800 Subject: [PATCH] bluestore, NVMEDevice: Add multiple thread support for SPDK I/O thread Previously, we only have one thread to do SPDK I/O, and this patch adds the multiple thread support. In this first version, we use the tid of the thread to map the I/Os of this thread to the corresponding SPDK I/O thread. Signed-off-by: Ziye Yang --- src/os/bluestore/NVMEDevice.cc | 369 ++++++++++++++++++++------------- 1 file changed, 223 insertions(+), 146 deletions(-) diff --git a/src/os/bluestore/NVMEDevice.cc b/src/os/bluestore/NVMEDevice.cc index ef27b8d2ab4f9..6b08bf707f232 100644 --- a/src/os/bluestore/NVMEDevice.cc +++ b/src/os/bluestore/NVMEDevice.cc @@ -43,6 +43,7 @@ #include "common/errno.h" #include "common/debug.h" #include "common/perf_counters.h" +#include "common/io_priority.h" #include "NVMEDevice.h" @@ -51,14 +52,14 @@ #undef dout_prefix #define dout_prefix *_dout << "bdev(" << sn << ") " -std::vector data_buf_mempool; - static constexpr uint16_t data_buffer_default_num = 2048; static constexpr uint32_t data_buffer_size = 8192; static constexpr uint16_t inline_segment_num = 32; +static thread_local int queue_id = -1; + enum { l_bluestore_nvmedevice_first = 632430, l_bluestore_nvmedevice_aio_write_lat, @@ -94,97 +95,23 @@ struct IORequest { void **extra_segs = nullptr; }; -struct Task { - NVMEDevice *device; - IOContext *ctx = nullptr; - IOCommand command; - uint64_t offset; - uint64_t len; - bufferlist write_bl; - std::function fill_cb; - Task *next = nullptr; - int64_t return_code; - ceph::coarse_real_clock::time_point start; - IORequest io_request; - std::mutex lock; - std::condition_variable cond; - Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0) - : device(dev), command(c), offset(off), len(l), - return_code(rc), - start(ceph::coarse_real_clock::now()) {} - ~Task() { - assert(!io_request.nseg); - } - void release_segs() { - if (io_request.extra_segs) { - for (uint16_t i = 0; i < io_request.nseg; i++) - data_buf_mempool.push_back(io_request.extra_segs[i]); - delete io_request.extra_segs; - } else if (io_request.nseg) { - for (uint16_t i = 0; i < io_request.nseg; i++) - data_buf_mempool.push_back(io_request.inline_segs[i]); - } - io_request.nseg = 0; - } - - void copy_to_buf(char *buf, uint64_t off, uint64_t len) { - uint64_t copied = 0; - uint64_t left = len; - void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs; - uint16_t i = 0; - while (left > 0) { - char *src = static_cast(segs[i++]); - uint64_t need_copy = std::min(left, data_buffer_size-off); - memcpy(buf+copied, src+off, need_copy); - off = 0; - left -= need_copy; - copied += need_copy; - } - } - - void io_wait() { - std::unique_lock l(lock); - cond.wait(l); - } - - void io_wake() { - std::lock_guard l(lock); - cond.notify_all(); - } -}; - -class SharedDriverData { - unsigned id; - uint32_t core_id; - std::string sn; +class SharedDriverQueueData { + SharedDriverData *driver; spdk_nvme_ctrlr *ctrlr; spdk_nvme_ns *ns; + std::string sn; + uint64_t block_size; + uint32_t sector_size; + uint32_t core_id; + uint32_t queueid; struct spdk_nvme_qpair *qpair; std::function run_func; - - uint64_t block_size = 0; - uint32_t sector_size = 0; - uint64_t size = 0; - std::vector registered_devices; friend class AioCompletionThread; bool aio_stop = false; void _aio_thread(); - void _aio_start() { - int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast(&run_func), - core_id); - assert(r == 0); - } - void _aio_stop() { - { - Mutex::Locker l(queue_lock); - aio_stop = true; - queue_cond.Signal(); - } - int r = rte_eal_wait_lcore(core_id); - assert(r == 0); - aio_stop = false; - } + int alloc_buf_from_pool(Task *t, bool write); + std::atomic_bool queue_empty; Mutex queue_lock; Cond queue_cond; @@ -195,17 +122,21 @@ class SharedDriverData { std::atomic_int flush_waiters; std::set flush_waiter_seqs; - public: - std::atomic_ulong completed_op_seq, queue_op_seq; - PerfCounters *logger = nullptr; + public: + std::atomic_ulong completed_op_seq, queue_op_seq; + std::vector data_buf_mempool; + PerfCounters *logger = nullptr; - SharedDriverData(unsigned i, uint32_t core, const std::string &sn_tag, - spdk_nvme_ctrlr *c, spdk_nvme_ns *ns) - : id(i), - core_id(core), - sn(sn_tag), + SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size, + const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id) + : driver(driver), ctrlr(c), ns(ns), + sn(sn_tag), + block_size(block_size), + sector_size(sector_size), + core_id(core), + queueid(queue_id), run_func([this]() { _aio_thread(); }), queue_empty(false), queue_lock("NVMEDevice::queue_lock"), @@ -213,11 +144,7 @@ class SharedDriverData { flush_waiters(0), completed_op_seq(0), queue_op_seq(0) { - sector_size = spdk_nvme_ns_get_sector_size(ns); - block_size = std::max(CEPH_PAGE_SIZE, sector_size); - size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns); - qpair = spdk_nvme_ctrlr_alloc_io_qpair(c, SPDK_NVME_QPRIO_URGENT); - + qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT); PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)), l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last); b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency"); @@ -231,17 +158,115 @@ class SharedDriverData { b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count"); logger = b.create_perf_counters(); g_ceph_context->get_perfcounters_collection()->add(logger); - _aio_start(); + } + + void queue_task(Task *t, uint64_t ops = 1) { + queue_op_seq += ops; + Mutex::Locker l(queue_lock); + task_queue.push(t); + if (queue_empty.load()) { + queue_empty = false; + queue_cond.Signal(); + } } - ~SharedDriverData() { + + void flush_wait() { + uint64_t cur_seq = queue_op_seq.load(); + uint64_t left = cur_seq - completed_op_seq.load(); + if (cur_seq > completed_op_seq) { + // TODO: this may contains read op + dout(10) << __func__ << " existed inflight ops " << left << dendl; + Mutex::Locker l(flush_lock); + ++flush_waiters; + flush_waiter_seqs.insert(cur_seq); + while (cur_seq > completed_op_seq.load()) { + flush_cond.Wait(flush_lock); + } + flush_waiter_seqs.erase(cur_seq); + --flush_waiters; + } + } + + void start() { + int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast(&run_func), + core_id); + assert(r == 0); + + } + + void stop() { + { + Mutex::Locker l(queue_lock); + aio_stop = true; + queue_cond.Signal(); + } + int r = rte_eal_wait_lcore(core_id); + assert(r == 0); + aio_stop = false; + } + + ~SharedDriverQueueData() { g_ceph_context->get_perfcounters_collection()->remove(logger); if (!qpair) { spdk_nvme_ctrlr_free_io_qpair(qpair); } delete logger; } +}; + +class SharedDriverData { + unsigned id; + uint32_t core_id; + + std::string sn; + spdk_nvme_ctrlr *ctrlr; + spdk_nvme_ns *ns; + uint64_t block_size = 0; + uint32_t sector_size = 0; + uint64_t size = 0; + uint32_t queue_number; + std::vector queues; + + void _aio_start() { + for (auto &&it : queues) + it->start(); + } + void _aio_stop() { + for (auto &&it : queues) + it->stop(); + } + + public: + std::vector registered_devices; + SharedDriverData(unsigned _id, const std::string &sn_tag, + spdk_nvme_ctrlr *c, spdk_nvme_ns *ns) + : id(_id), + sn(sn_tag), + ctrlr(c), + ns(ns) { + int i; + sector_size = spdk_nvme_ns_get_sector_size(ns); + block_size = std::max(CEPH_PAGE_SIZE, sector_size); + size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns); + + RTE_LCORE_FOREACH_SLAVE(i) { + queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++)); + } + + _aio_start(); + } bool is_equal(const string &tag) const { return sn == tag; } + ~SharedDriverData() { + for (auto p : queues) { + delete p; + } + } + + SharedDriverQueueData *get_queue(uint32_t i) { + return queues.at(i%queue_number); + } + void register_device(NVMEDevice *device) { // in case of registered_devices, we stop thread now. // Because release is really a rare case, we could bear this @@ -249,6 +274,7 @@ class SharedDriverData { registered_devices.push_back(device); _aio_start(); } + void remove_device(NVMEDevice *device) { _aio_stop(); std::vector new_devices; @@ -266,32 +292,66 @@ class SharedDriverData { uint64_t get_size() { return size; } - void queue_task(Task *t, uint64_t ops = 1) { - queue_op_seq += ops; - Mutex::Locker l(queue_lock); - task_queue.push(t); - if (queue_empty.load()) { - queue_empty = false; - queue_cond.Signal(); +}; + +struct Task { + NVMEDevice *device; + IOContext *ctx = nullptr; + IOCommand command; + uint64_t offset; + uint64_t len; + bufferlist write_bl; + std::function fill_cb; + Task *next = nullptr; + int64_t return_code; + ceph::coarse_real_clock::time_point start; + IORequest io_request; + std::mutex lock; + std::condition_variable cond; + SharedDriverQueueData *queue; + Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0) + : device(dev), command(c), offset(off), len(l), + return_code(rc), + start(ceph::coarse_real_clock::now()) {} + ~Task() { + assert(!io_request.nseg); + } + void release_segs(SharedDriverQueueData *queue_data) { + if (io_request.extra_segs) { + for (uint16_t i = 0; i < io_request.nseg; i++) + queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]); + delete io_request.extra_segs; + } else if (io_request.nseg) { + for (uint16_t i = 0; i < io_request.nseg; i++) + queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]); } + io_request.nseg = 0; } - void flush_wait() { - uint64_t cur_seq = queue_op_seq.load(); - uint64_t left = cur_seq - completed_op_seq.load(); - if (cur_seq > completed_op_seq) { - // TODO: this may contains read op - dout(10) << __func__ << " existed inflight ops " << left << dendl; - Mutex::Locker l(flush_lock); - ++flush_waiters; - flush_waiter_seqs.insert(cur_seq); - while (cur_seq > completed_op_seq.load()) { - flush_cond.Wait(flush_lock); - } - flush_waiter_seqs.erase(cur_seq); - --flush_waiters; + void copy_to_buf(char *buf, uint64_t off, uint64_t len) { + uint64_t copied = 0; + uint64_t left = len; + void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs; + uint16_t i = 0; + while (left > 0) { + char *src = static_cast(segs[i++]); + uint64_t need_copy = std::min(left, data_buffer_size-off); + memcpy(buf+copied, src+off, need_copy); + off = 0; + left -= need_copy; + copied += need_copy; } } + + void io_wait() { + std::unique_lock l(lock); + cond.wait(l); + } + + void io_wake() { + std::lock_guard l(lock); + cond.notify_all(); + } }; static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset) @@ -349,7 +409,7 @@ static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length) return 0; } -static int alloc_buf_from_pool(Task *t, bool write) +int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write) { uint64_t count = t->len / data_buffer_size; if (t->len % data_buffer_size) @@ -382,7 +442,7 @@ static int alloc_buf_from_pool(Task *t, bool write) return 0; } -void SharedDriverData::_aio_thread() +void SharedDriverQueueData::_aio_thread() { dout(1) << __func__ << " start" << dendl; @@ -400,6 +460,7 @@ void SharedDriverData::_aio_thread() Task *t = nullptr; int r = 0; uint64_t lba_off, lba_count; + ceph::coarse_real_clock::time_point cur, start = ceph::coarse_real_clock::now(); while (true) { @@ -414,6 +475,7 @@ void SharedDriverData::_aio_thread() } for (; t; t = t->next) { + t->queue = this; lba_off = t->offset / sector_size; lba_count = t->len / sector_size; switch (t->command) { @@ -432,7 +494,7 @@ void SharedDriverData::_aio_thread() if (r < 0) { derr << __func__ << " failed to do write command" << dendl; t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr; - t->release_segs(); + t->release_segs(this); delete t; ceph_abort(); } @@ -455,7 +517,7 @@ void SharedDriverData::_aio_thread() data_buf_reset_sgl, data_buf_next_sge); if (r < 0) { derr << __func__ << " failed to read" << dendl; - t->release_segs(); + t->release_segs(this); delete t; ceph_abort(); } else { @@ -471,7 +533,7 @@ void SharedDriverData::_aio_thread() r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t); if (r < 0) { derr << __func__ << " failed to flush" << dendl; - t->release_segs(); + t->release_segs(this); delete t; ceph_abort(); } else { @@ -500,10 +562,13 @@ void SharedDriverData::_aio_thread() flush_cond.Signal(); } - if (!inflight) { - for (auto &&it : registered_devices) - it->reap_ioc(); + // be careful, here we need to let each thread reap its own, currently it is done + // by only one dedicatd dpdk thread + if(!queueid) { + for (auto &&it : driver->registered_devices) + it->reap_ioc(); + } Mutex::Locker l(queue_lock); if (queue_empty.load()) { @@ -568,7 +633,7 @@ class NVMEManager { // only support one device per osd now! assert(shared_driver_datas.empty()); // index 0 is occured by master thread - shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, rte_get_next_lcore(-1, 1, 0), sn_tag, c, ns)); + shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns)); *driver = shared_driver_datas.back(); } }; @@ -755,17 +820,18 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion) { Task *task = static_cast(t); IOContext *ctx = task->ctx; - SharedDriverData *driver = task->device->get_driver(); + SharedDriverQueueData *queue = task->queue; + assert(queue != NULL); assert(ctx != NULL); - ++driver->completed_op_seq; + ++queue->completed_op_seq; auto dur = std::chrono::duration_cast( ceph::coarse_real_clock::now() - task->start); if (task->command == IOCommand::WRITE_COMMAND) { - driver->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur); + queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur); assert(!spdk_nvme_cpl_is_error(completion)); dout(20) << __func__ << " write/zero op successfully, left " - << driver->queue_op_seq - driver->completed_op_seq << dendl; + << queue->queue_op_seq - queue->completed_op_seq << dendl; // check waiting count before doing callback (which may // destroy this ioc). if (!--ctx->num_running) { @@ -774,14 +840,14 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion) task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); } } - task->release_segs(); + task->release_segs(queue); delete task; } else if (task->command == IOCommand::READ_COMMAND) { - driver->logger->tinc(l_bluestore_nvmedevice_read_lat, dur); + queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur); assert(!spdk_nvme_cpl_is_error(completion)); dout(20) << __func__ << " read op successfully" << dendl; task->fill_cb(); - task->release_segs(); + task->release_segs(queue); // read submitted by AIO if(!task->return_code) { if (!--ctx->num_running) { @@ -800,7 +866,7 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion) } else { assert(task->command == IOCommand::FLUSH_COMMAND); assert(!spdk_nvme_cpl_is_error(completion)); - driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur); + queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur); dout(20) << __func__ << " flush op successfully" << dendl; task->return_code = 0; ctx->aio_wake(); @@ -907,10 +973,15 @@ int NVMEDevice::flush() { dout(10) << __func__ << " start" << dendl; auto start = ceph::coarse_real_clock::now(); - driver->flush_wait(); + + if(queue_id == -1) + queue_id = ceph_gettid(); + SharedDriverQueueData *queue = driver->get_queue(queue_id); + assert(queue != NULL); + queue->flush_wait(); auto dur = std::chrono::duration_cast( ceph::coarse_real_clock::now() - start); - driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur); + queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur); return 0; } @@ -926,7 +997,9 @@ void NVMEDevice::aio_submit(IOContext *ioc) ioc->num_pending -= pending; assert(ioc->num_pending.load() == 0); // we should be only thread doing this // Only need to push the first entry - driver->queue_task(t, pending); + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t, pending); ioc->nvme_task_first = ioc->nvme_task_last = nullptr; } } @@ -954,7 +1027,9 @@ int NVMEDevice::aio_write( if (buffered) { // Only need to push the first entry - driver->queue_task(t); + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t); } else { t->ctx = ioc; Task *first = static_cast(ioc->nvme_task_first); @@ -992,7 +1067,9 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, t->copy_to_buf(buf, 0, t->len); }; ++ioc->num_reading; - driver->queue_task(t); + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t); while(t->return_code > 0) { t->io_wait(); @@ -1038,8 +1115,6 @@ int NVMEDevice::aio_read( return 0; } - - int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered) { assert(len > 0); @@ -1058,7 +1133,9 @@ int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered t->copy_to_buf(buf, off-t->offset, len); }; ++ioc.num_reading; - driver->queue_task(t); + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t); while(t->return_code > 0) { t->io_wait(); -- 2.39.5