From 86cd2f8bdf711d33b506d580af731aa720b30207 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 20 Jan 2016 11:48:39 +0800 Subject: [PATCH] NVMEDevice: use the only aio thread to process task Signed-off-by: Haomai Wang --- src/os/bluestore/NVMEDevice.cc | 663 ++++++++++++++++++--------------- src/os/bluestore/NVMEDevice.h | 38 +- 2 files changed, 376 insertions(+), 325 deletions(-) diff --git a/src/os/bluestore/NVMEDevice.cc b/src/os/bluestore/NVMEDevice.cc index ad96d34c3b979..9d0eb63a60389 100644 --- a/src/os/bluestore/NVMEDevice.cc +++ b/src/os/bluestore/NVMEDevice.cc @@ -44,7 +44,7 @@ #define dout_subsys ceph_subsys_bdev #undef dout_prefix -#define dout_prefix *_dout << "bdev " +#define dout_prefix *_dout << "bdev(" << sn << ") " rte_mempool *request_mempool = nullptr; rte_mempool *task_pool = nullptr; @@ -62,60 +62,7 @@ enum { l_bluestore_nvmedevice_last }; -static void io_complete(void *t, const struct nvme_completion *completion) { - Task *task = static_cast(t); - IOContext *ctx = task->ctx; - task->device->inflight_ops.dec(); - utime_t lat = ceph_clock_now(g_ceph_context); - lat -= task->start; - if (task->command == IOCommand::WRITE_COMMAND) { - task->device->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat); - assert(!nvme_completion_is_error(completion)); - dout(20) << __func__ << " write op successfully, left " << left << dendl; - // buffer write won't have ctx, and we will free request later, see `flush` - if (ctx) { - // check waiting count before doing callback (which may - // destroy this ioc). - if (!ctx->num_running.dec()) { - if (ctx->num_waiting.read()) { - Mutex::Locker l(ctx->lock); - ctx->cond.Signal(); - } - if (task->device->aio_callback && ctx->priv) { - task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); - } - } - rte_free(task->buf); - rte_mempool_put(task_pool, task); - } else { - task->device->queue_buffer_task(task); - } - } else if (task->command == IOCommand::READ_COMMAND) { - task->device->logger->tinc(l_bluestore_nvmedevice_read_lat, lat); - ctx->num_reading.dec(); - dout(20) << __func__ << " read op successfully" << dendl; - if (nvme_completion_is_error(completion)) - task->return_code = -1; // FIXME - else - task->return_code = 0; - { - Mutex::Locker l(ctx->lock); - ctx->cond.Signal(); - } - } else { - assert(task->command == IOCommand::FLUSH_COMMAND); - task->device->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat); - dout(20) << __func__ << " flush op successfully" << dendl; - if (nvme_completion_is_error(completion)) - task->return_code = -1; // FIXME - else - task->return_code = 0; - { - Mutex::Locker l(ctx->lock); - ctx->cond.Signal(); - } - } -} +static void io_complete(void *t, const struct nvme_completion *completion); static char *ealargs[] = { "ceph-osd", @@ -123,39 +70,280 @@ static char *ealargs[] = { "-n 4", }; + class SharedDriverData { - std::map > controllers; - bool init = false; - Mutex lock; + std::string sn; + std::string name; + nvme_controller *ctrlr; + nvme_namespace *ns; + + uint64_t block_size = 0; + uint64_t size = 0; + std::vector registered_devices; + struct AioCompletionThread : public Thread { + SharedDriverData *data; + AioCompletionThread(SharedDriverData *d) : data(d) {} + void *entry() { + data->_aio_thread(); + return NULL; + } + } aio_thread; + friend class AioCompletionThread; + + bool aio_stop = false; + int ref = 1; + void _aio_thread(); + void _aio_start() { + aio_thread.create("nvme_aio_thread"); + } + void _aio_stop() { + assert(aio_thread.is_started()); + { + Mutex::Locker l(queue_lock); + aio_stop = true; + queue_cond.Signal(); + } + aio_thread.join(); + aio_stop = false; + } + atomic_t queue_empty; + Mutex queue_lock; + Cond queue_cond; + std::queue task_queue; - int _scan_nvme_device(const string &sn_tag, nvme_controller **c, string *name); + Mutex flush_lock; + Cond flush_cond; + atomic_t flush_waiters; public: - SharedDriverData(): lock("NVMEDevice::SharedDriverData::lock") {} - int try_get(const string &sn_tag, nvme_controller **c, string *name); - void release(nvme_controller *c); -}; + atomic_t inflight_ops; + PerfCounters *logger = nullptr; + + SharedDriverData(const std::string &sn_tag, const std::string &n, nvme_controller *c, nvme_namespace *ns) + : sn(sn_tag), + name(n), + ctrlr(c), + ns(ns), + aio_thread(this), + queue_empty(1), + queue_lock("NVMEDevice::queue_lock"), + flush_lock("NVMEDevice::flush_lock"), + flush_waiters(0), + inflight_ops(0) { + block_size = nvme_ns_get_sector_size(ns); + size = block_size * nvme_ns_get_num_sectors(ns); + + PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)), + l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last); + b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency"); + b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency"); + b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency"); + b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue"); + b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency"); + b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency"); + b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency"); + b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency"); + logger = b.create_perf_counters(); + g_ceph_context->get_perfcounters_collection()->add(logger); + _aio_start(); + } + ~SharedDriverData() { + g_ceph_context->get_perfcounters_collection()->remove(logger); + delete logger; + } + + bool is_equal(const string &tag) const { return sn == tag; } + void register_device(NVMEDevice *device) { + // in case of registered_devices, we stop thread now. + // Because release is really a rare case, we could bear this + _aio_stop(); + registered_devices.push_back(device); + _aio_start(); + } + void remove_device(NVMEDevice *device) { + _aio_stop(); + std::vector new_devices; + for (auto &&it : registered_devices) { + if (it == device) + new_devices.push_back(it); + } + registered_devices.swap(new_devices); + _aio_start(); + } -static SharedDriverData driver_data; + uint64_t get_block_size() { + return block_size; + } + uint64_t get_size() { + return size; + } + void queue_task(Task *t, uint64_t ops = 1) { + inflight_ops.add(ops); + Mutex::Locker l(queue_lock); + task_queue.push(t); + if (queue_empty.read()) { + queue_empty.dec(); + queue_cond.Signal(); + } + } -int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **c, string *name) + void flush_wait() { + if (inflight_ops.read()) { + // TODO: this may contains read op + dout(1) << __func__ << " existed inflight ops " << inflight_ops.read() << dendl; + Mutex::Locker l(flush_lock); + flush_waiters.inc(); + while (inflight_ops.read()) { + flush_cond.Wait(flush_lock); + } + flush_waiters.dec(); + } + } +}; + +void SharedDriverData::_aio_thread() { + dout(1) << __func__ << " start" << dendl; + if (nvme_register_io_thread() != 0) { + assert(0); + } + + Task *t; int r = 0; - dout(1) << __func__ << " serial number " << sn_tag << dendl; + const int max = 4; + uint64_t lba_off, lba_count; + utime_t lat, start = ceph_clock_now(g_ceph_context); + while (true) { + dout(40) << __func__ << " polling" << dendl; + t = nullptr; + if (!queue_empty.read()) { + Mutex::Locker l(queue_lock); + if (!task_queue.empty()) { + t = task_queue.front(); + task_queue.pop(); + logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size()); + } + if (!t) + queue_empty.inc(); + } else if (!inflight_ops.read()) { + if (flush_waiters.read()) { + Mutex::Locker l(flush_lock); + flush_cond.Signal(); + } - assert(c); - if (sn_tag.empty()) { - r = -ENOENT; - derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl; - return r; - } + for (auto &&it : registered_devices) + it->reap_ioc(); - auto ctr_it = controllers.find(sn_tag); - if (ctr_it != controllers.end()) { - *name = ctr_it->second.second; - *c = ctr_it->second.first; - return 0; + Mutex::Locker l(queue_lock); + if (queue_empty.read()) { + lat = ceph_clock_now(g_ceph_context); + lat -= start; + logger->tinc(l_bluestore_nvmedevice_polling_lat, lat); + if (aio_stop) + break; + dout(20) << __func__ << " enter sleep" << dendl; + queue_cond.Wait(queue_lock); + dout(20) << __func__ << " exit sleep" << dendl; + start = ceph_clock_now(g_ceph_context); + } + } + + if (t) { + switch (t->command) { + case IOCommand::WRITE_COMMAND: + { + while (t) { + lba_off = t->offset / block_size; + lba_count = t->len / block_size; + dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl; + r = nvme_ns_cmd_write(ns, t->buf, lba_off, lba_count, io_complete, t); + if (r < 0) { + t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr; + rte_free(t->buf); + rte_mempool_put(task_pool, t); + derr << __func__ << " failed to do write command" << dendl; + assert(0); + } + lat = ceph_clock_now(g_ceph_context); + lat -= t->start; + logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat); + t = t->next; + } + break; + } + case IOCommand::READ_COMMAND: + { + dout(20) << __func__ << " read command issueed " << lba_off << "~" << lba_count << dendl; + lba_off = t->offset / block_size; + lba_count = t->len / block_size; + r = nvme_ns_cmd_read(ns, t->buf, lba_off, lba_count, io_complete, t); + if (r < 0) { + derr << __func__ << " failed to read" << dendl; + t->ctx->num_reading.dec(); + t->return_code = r; + Mutex::Locker l(t->ctx->lock); + t->ctx->cond.Signal(); + } else { + lat = ceph_clock_now(g_ceph_context); + lat -= t->start; + logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat); + } + break; + } + case IOCommand::FLUSH_COMMAND: + { + dout(20) << __func__ << " flush command issueed " << dendl; + r = nvme_ns_cmd_flush(ns, io_complete, t); + if (r < 0) { + derr << __func__ << " failed to flush" << dendl; + t->return_code = r; + Mutex::Locker l(t->ctx->lock); + t->ctx->cond.Signal(); + } else { + lat = ceph_clock_now(g_ceph_context); + lat -= t->start; + logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat); + } + break; + } + } + } else if (inflight_ops.read()) { + nvme_ctrlr_process_io_completions(ctrlr, max); + dout(30) << __func__ << " idle, have a pause" << dendl; +#ifdef HAVE_SSE + _mm_pause(); +#else + usleep(10); +#endif + } } + nvme_unregister_io_thread(); + dout(1) << __func__ << " end" << dendl; +} + +class NVMEManager { + Mutex lock; + bool init = false; + std::vector shared_driver_datas; + + static int _scan_nvme_device(const string &sn_tag, string &name, nvme_controller **c, nvme_namespace **ns); + + public: + NVMEManager() + : lock("NVMEDevice::NVMEManager::lock") {} + int try_get(const string &sn_tag, SharedDriverData **driver); +}; + +static NVMEManager manager; + +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev " + +int NVMEManager::_scan_nvme_device(const string &sn_tag, string &name, nvme_controller **c, nvme_namespace **ns) +{ + int r = 0; + dout(1) << __func__ << " serial number " << sn_tag << dendl; pci_device *pci_dev; @@ -194,14 +382,14 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller ** } pci_device_probe(pci_dev); - *name = pci_device_get_device_name(pci_dev) ? pci_device_get_device_name(pci_dev) : "Unknown"; + name = pci_device_get_device_name(pci_dev) ? pci_device_get_device_name(pci_dev) : "Unknown"; if (pci_device_has_kernel_driver(pci_dev)) { if (!pci_device_has_uio_driver(pci_dev)) { /*NVMe kernel driver case*/ if (g_conf->bdev_nvme_unbind_from_kernel) { r = pci_device_switch_to_uio_driver(pci_dev); if (r < 0) { - derr << __func__ << " device " << *name << " " << pci_dev->bus + derr << __func__ << " device " << name << " " << pci_dev->bus << ":" << pci_dev->dev << ":" << pci_dev->func << " switch to uio driver failed" << dendl; return r; @@ -215,7 +403,7 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller ** } else { r = pci_device_bind_uio_driver(pci_dev, PCI_UIO_DRIVER); if (r < 0) { - derr << __func__ << " device " << *name << " " << pci_dev->bus + derr << __func__ << " device " << name << " " << pci_dev->bus << ":" << pci_dev->dev << ":" << pci_dev->func << " bind to uio driver failed, may lack of uio_pci_generic kernel module" << dendl; return r; @@ -225,7 +413,7 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller ** /* Claim the device in case conflict with other ids process */ r = pci_device_claim(pci_dev); if (r < 0) { - derr << __func__ << " device " << *name << " " << pci_dev->bus + derr << __func__ << " device " << name << " " << pci_dev->bus << ":" << pci_dev->dev << ":" << pci_dev->func << " claim failed" << dendl; return r; @@ -238,17 +426,25 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller ** return r; } - controllers[sn_tag] = make_pair(*c, *name); - pci_iterator_destroy(iter); - dout(1) << __func__ << " successfully attach nvme device at" << *name + int num_ns = nvme_ctrlr_get_num_ns(*c); + assert(num_ns >= 1); + if (num_ns > 1) { + dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl; + } + *ns = nvme_ctrlr_get_ns(*c, 1); + if (!*ns) { + derr << __func__ << " failed to get namespace at 1" << dendl; + return -1; + } + dout(1) << __func__ << " successfully attach nvme device at" << name << " " << pci_dev->bus << ":" << pci_dev->dev << ":" << pci_dev->func << dendl; return 0; } -int SharedDriverData::try_get(const string &sn_tag, nvme_controller **c, string *name) +int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver) { Mutex::Locker l(lock); int r = 0; @@ -284,22 +480,86 @@ int SharedDriverData::try_get(const string &sn_tag, nvme_controller **c, string init = true; } - return _scan_nvme_device(sn_tag, c, name); -} -void SharedDriverData::release(nvme_controller *c) -{ - dout(1) << __func__ << " " << c << dendl; + if (sn_tag.empty()) { + r = -ENOENT; + derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl; + return r; + } - Mutex::Locker l(lock); - auto it = controllers.begin(); - for (; it != controllers.end(); ++it) { - if (c == it->second.first) - break; + for (auto &&it : shared_driver_datas) { + if (it->is_equal(sn_tag)) { + *driver = it; + return 0; + } } - if (it == controllers.end()) { - derr << __func__ << " not found registered nvme controller " << c << dendl; - assert(0); + + nvme_controller *c; + nvme_namespace *ns; + std::string name; + if (_scan_nvme_device(sn_tag, name, &c, &ns) < 0) + return -1; + + shared_driver_datas.push_back(new SharedDriverData(sn_tag, name, c, ns)); + *driver = shared_driver_datas.back(); + + return 0; +} + +void io_complete(void *t, const struct nvme_completion *completion) +{ + Task *task = static_cast(t); + IOContext *ctx = task->ctx; + SharedDriverData *driver = task->device->get_driver(); + driver->inflight_ops.dec(); + utime_t lat = ceph_clock_now(g_ceph_context); + lat -= task->start; + if (task->command == IOCommand::WRITE_COMMAND) { + driver->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat); + assert(!nvme_completion_is_error(completion)); + dout(20) << __func__ << " write op successfully, left " << left << dendl; + // buffer write won't have ctx, and we will free request later, see `flush` + if (ctx) { + // check waiting count before doing callback (which may + // destroy this ioc). + if (!ctx->num_running.dec()) { + if (ctx->num_waiting.read()) { + Mutex::Locker l(ctx->lock); + ctx->cond.Signal(); + } + if (task->device->aio_callback && ctx->priv) { + task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + } + } + rte_free(task->buf); + rte_mempool_put(task_pool, task); + } else { + task->device->queue_buffer_task(task); + } + } else if (task->command == IOCommand::READ_COMMAND) { + driver->logger->tinc(l_bluestore_nvmedevice_read_lat, lat); + ctx->num_reading.dec(); + dout(20) << __func__ << " read op successfully" << dendl; + if (nvme_completion_is_error(completion)) + task->return_code = -1; // FIXME + else + task->return_code = 0; + { + Mutex::Locker l(ctx->lock); + ctx->cond.Signal(); + } + } else { + assert(task->command == IOCommand::FLUSH_COMMAND); + driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat); + dout(20) << __func__ << " flush op successfully" << dendl; + if (nvme_completion_is_error(completion)) + task->return_code = -1; // FIXME + else + task->return_code = 0; + { + Mutex::Locker l(ctx->lock); + ctx->cond.Signal(); + } } } @@ -308,17 +568,7 @@ void SharedDriverData::release(nvme_controller *c) #define dout_prefix *_dout << "bdev(" << name << ") " NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv) - : ctrlr(nullptr), - ns(nullptr), - aio_stop(false), - queue_empty(1), - queue_lock("NVMEDevice::queue_lock"), - aio_thread(this), - flush_lock("NVMEDevice::flush_lock"), - flush_waiters(0), - buffer_lock("NVMEDevice::buffer_lock"), - logger(nullptr), - inflight_ops(0), + : buffer_lock("NVMEDevice::buffer_lock"), aio_callback(cb), aio_callback_priv(cbpriv) { @@ -348,44 +598,20 @@ int NVMEDevice::open(string p) return r; } serial_number = string(buf, r); - r = driver_data.try_get(serial_number, &ctrlr, &name); + r = manager.try_get(serial_number, &driver); if (r < 0) { derr << __func__ << " failed to get nvme deivce with sn " << serial_number << dendl; return r; } - int num_ns = nvme_ctrlr_get_num_ns(ctrlr); - assert(num_ns >= 1); - if (num_ns > 1) { - dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl; - } - ns = nvme_ctrlr_get_ns(ctrlr, 1); - if (!ns) { - derr << __func__ << " failed to get namespace at 1" << dendl; - return -1; - } - block_size = nvme_ns_get_sector_size(ns); - size = block_size * nvme_ns_get_num_sectors(ns); + driver->register_device(this); + block_size = driver->get_block_size(); + size = driver->get_size(); dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)" << " block_size " << block_size << " (" << pretty_si_t(block_size) << "B)" << dendl; - PerfCountersBuilder b(g_ceph_context, string("nvmedevice-") + name + "-" + stringify(this), - l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last); - b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency"); - b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency"); - b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency"); - b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue"); - b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency"); - b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency"); - b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency"); - b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency"); - logger = b.create_perf_counters(); - g_ceph_context->get_perfcounters_collection()->add(logger); - - aio_thread.create("nvme_aio_thread"); - return 0; } @@ -393,156 +619,17 @@ void NVMEDevice::close() { dout(1) << __func__ << dendl; - { - Mutex::Locker l(queue_lock); - aio_stop = true; - queue_cond.Signal(); - } - aio_thread.join(); - aio_stop = false; - - g_ceph_context->get_perfcounters_collection()->remove(logger); - delete logger; - logger = nullptr; - name.clear(); - driver_data.release(ctrlr); - ctrlr = nullptr; + driver->remove_device(this); dout(1) << __func__ << " end" << dendl; } -void NVMEDevice::_aio_thread() -{ - dout(10) << __func__ << " start" << dendl; - if (nvme_register_io_thread() != 0) { - assert(0); - } - - Task *t; - int r = 0; - const int max = 4; - uint64_t lba_off, lba_count; - utime_t lat, start = ceph_clock_now(g_ceph_context); - while (true) { - dout(40) << __func__ << " polling" << dendl; - t = nullptr; - if (!queue_empty.read()) { - Mutex::Locker l(queue_lock); - if (!task_queue.empty()) { - t = task_queue.front(); - task_queue.pop(); - logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size()); - } - if (!t) - queue_empty.inc(); - } else if (!inflight_ops.read()) { - if (flush_waiters.read()) { - Mutex::Locker l(flush_lock); - flush_cond.Signal(); - } - Mutex::Locker l(queue_lock); - if (queue_empty.read()) { - lat = ceph_clock_now(g_ceph_context); - lat -= start; - logger->tinc(l_bluestore_nvmedevice_polling_lat, lat); - if (aio_stop) - break; - dout(20) << __func__ << " enter sleep" << dendl; - queue_cond.Wait(queue_lock); - dout(20) << __func__ << " exit sleep" << dendl; - start = ceph_clock_now(g_ceph_context); - } - } - - if (t) { - switch (t->command) { - case IOCommand::WRITE_COMMAND: - { - while (t) { - lba_off = t->offset / block_size; - lba_count = t->len / block_size; - dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl; - r = nvme_ns_cmd_write(ns, t->buf, lba_off, lba_count, io_complete, t); - if (r < 0) { - t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr; - rte_free(t->buf); - rte_mempool_put(task_pool, t); - derr << __func__ << " failed to do write command" << dendl; - assert(0); - } - lat = ceph_clock_now(g_ceph_context); - lat -= t->start; - logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat); - t = t->next; - } - break; - } - case IOCommand::READ_COMMAND: - { - dout(20) << __func__ << " read command issueed " << lba_off << "~" << lba_count << dendl; - lba_off = t->offset / block_size; - lba_count = t->len / block_size; - r = nvme_ns_cmd_read(ns, t->buf, lba_off, lba_count, io_complete, t); - if (r < 0) { - derr << __func__ << " failed to read" << dendl; - t->ctx->num_reading.dec(); - t->return_code = r; - Mutex::Locker l(t->ctx->lock); - t->ctx->cond.Signal(); - } else { - lat = ceph_clock_now(g_ceph_context); - lat -= t->start; - logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat); - } - break; - } - case IOCommand::FLUSH_COMMAND: - { - dout(20) << __func__ << " flush command issueed " << dendl; - r = nvme_ns_cmd_flush(ns, io_complete, t); - if (r < 0) { - derr << __func__ << " failed to flush" << dendl; - t->return_code = r; - Mutex::Locker l(t->ctx->lock); - t->ctx->cond.Signal(); - } else { - lat = ceph_clock_now(g_ceph_context); - lat -= t->start; - logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat); - } - break; - } - } - } else if (inflight_ops.read()) { - nvme_ctrlr_process_io_completions(ctrlr, max); - dout(30) << __func__ << " idle, have a pause" << dendl; -#ifdef HAVE_SSE - _mm_pause(); -#else - usleep(10); -#endif - } - reap_ioc(); - } - nvme_unregister_io_thread(); - dout(10) << __func__ << " end" << dendl; -} - int NVMEDevice::flush() { dout(10) << __func__ << " start" << dendl; utime_t start = ceph_clock_now(g_ceph_context); - if (inflight_ops.read()) { - // TODO: this may contains read op - dout(1) << __func__ << " existed inflight ops " << inflight_ops.read() << dendl; - Mutex::Locker l(flush_lock); - flush_waiters.inc(); - while (inflight_ops.read()) { - flush_cond.Wait(flush_lock); - } - flush_waiters.dec(); - } + driver->flush_wait(); Task *t = nullptr; { Mutex::Locker l(buffer_lock); @@ -557,7 +644,7 @@ int NVMEDevice::flush() } utime_t lat = ceph_clock_now(g_ceph_context); lat -= start; - logger->tinc(l_bluestore_nvmedevice_flush_lat, lat); + driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat); return 0; // nvme device will cause terriable performance degraded // while issuing flush command @@ -579,14 +666,8 @@ int NVMEDevice::flush() t->device = this; t->return_code = 1; t->next = nullptr; - { - Mutex::Locker l(queue_lock); - task_queue.push(t); - if (queue_empty.read()) { - queue_empty.dec(); - queue_cond.Signal(); - } - } + driver->queue_task(t); + { Mutex::Locker l(ioc.lock); while (t->return_code > 0) @@ -610,7 +691,7 @@ void NVMEDevice::aio_submit(IOContext *ioc) ioc->num_pending.sub(pending); assert(ioc->num_pending.read() == 0); // we should be only thread doing this // Only need to push the first entry - queue_task(t, pending); + driver->queue_task(t, pending); ioc->nvme_task_first = ioc->nvme_task_last = nullptr; } } @@ -655,7 +736,7 @@ int NVMEDevice::aio_write( if (buffered) { t->ctx = nullptr; // Only need to push the first entry - queue_task(t); + driver->queue_task(t); Mutex::Locker l(buffer_lock); buffered_extents.insert(off, len, (char*)t->buf); } else { @@ -734,7 +815,7 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, t->return_code = 1; t->next = nullptr; ioc->num_reading.inc();; - queue_task(t); + driver->queue_task(t); { Mutex::Locker l(ioc->lock); @@ -795,7 +876,7 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf) t->return_code = 1; t->next = nullptr; ioc.num_reading.inc();; - queue_task(t); + driver->queue_task(t); { Mutex::Locker l(ioc.lock); diff --git a/src/os/bluestore/NVMEDevice.h b/src/os/bluestore/NVMEDevice.h index 46637c55bcb3d..cbeb874a386d8 100644 --- a/src/os/bluestore/NVMEDevice.h +++ b/src/os/bluestore/NVMEDevice.h @@ -62,6 +62,7 @@ struct Task { }; class PerfCounters; +class SharedDriverData; class NVMEDevice : public BlockDevice { /** @@ -69,8 +70,7 @@ class NVMEDevice : public BlockDevice { * contains 4KB IDENTIFY structure for controller which is * target for CONTROLLER IDENTIFY command during initialization */ - nvme_controller *ctrlr; - nvme_namespace *ns; + SharedDriverData *driver; string name; uint64_t size; @@ -79,36 +79,6 @@ class NVMEDevice : public BlockDevice { bool aio_stop; bufferptr zeros; - atomic_t queue_empty; - Mutex queue_lock; - Cond queue_cond; - std::queue task_queue; - - void queue_task(Task *t, uint64_t ops = 1) { - inflight_ops.add(ops); - Mutex::Locker l(queue_lock); - task_queue.push(t); - if (queue_empty.read()) { - queue_empty.dec(); - queue_cond.Signal(); - } - } - - struct AioCompletionThread : public Thread { - NVMEDevice *dev; - AioCompletionThread(NVMEDevice *b) : dev(b) {} - void *entry() { - dev->_aio_thread(); - return NULL; - } - } aio_thread; - - void _aio_thread(); - - Mutex flush_lock; - Cond flush_cond; - atomic_t flush_waiters; - struct BufferedExtents { struct Extent { uint64_t x_len; @@ -260,9 +230,9 @@ class NVMEDevice : public BlockDevice { buffered_task_head = t; } + SharedDriverData *get_driver() { return driver; } + public: - PerfCounters *logger; - atomic_t inflight_ops; aio_callback_t aio_callback; void *aio_callback_priv; -- 2.39.5