#define dout_subsys ceph_subsys_bdev
#undef dout_prefix
-#define dout_prefix *_dout << "bdev "
+#define dout_prefix *_dout << "bdev(" << sn << ") "
rte_mempool *request_mempool = nullptr;
rte_mempool *task_pool = nullptr;
l_bluestore_nvmedevice_last
};
-static void io_complete(void *t, const struct nvme_completion *completion) {
- Task *task = static_cast<Task*>(t);
- IOContext *ctx = task->ctx;
- task->device->inflight_ops.dec();
- utime_t lat = ceph_clock_now(g_ceph_context);
- lat -= task->start;
- if (task->command == IOCommand::WRITE_COMMAND) {
- task->device->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat);
- assert(!nvme_completion_is_error(completion));
- dout(20) << __func__ << " write op successfully, left " << left << dendl;
- // buffer write won't have ctx, and we will free request later, see `flush`
- if (ctx) {
- // check waiting count before doing callback (which may
- // destroy this ioc).
- if (!ctx->num_running.dec()) {
- if (ctx->num_waiting.read()) {
- Mutex::Locker l(ctx->lock);
- ctx->cond.Signal();
- }
- if (task->device->aio_callback && ctx->priv) {
- task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
- }
- }
- rte_free(task->buf);
- rte_mempool_put(task_pool, task);
- } else {
- task->device->queue_buffer_task(task);
- }
- } else if (task->command == IOCommand::READ_COMMAND) {
- task->device->logger->tinc(l_bluestore_nvmedevice_read_lat, lat);
- ctx->num_reading.dec();
- dout(20) << __func__ << " read op successfully" << dendl;
- if (nvme_completion_is_error(completion))
- task->return_code = -1; // FIXME
- else
- task->return_code = 0;
- {
- Mutex::Locker l(ctx->lock);
- ctx->cond.Signal();
- }
- } else {
- assert(task->command == IOCommand::FLUSH_COMMAND);
- task->device->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
- dout(20) << __func__ << " flush op successfully" << dendl;
- if (nvme_completion_is_error(completion))
- task->return_code = -1; // FIXME
- else
- task->return_code = 0;
- {
- Mutex::Locker l(ctx->lock);
- ctx->cond.Signal();
- }
- }
-}
+static void io_complete(void *t, const struct nvme_completion *completion);
static char *ealargs[] = {
"ceph-osd",
"-n 4",
};
+
class SharedDriverData {
- std::map<string, std::pair<nvme_controller*, string> > controllers;
- bool init = false;
- Mutex lock;
+ std::string sn;
+ std::string name;
+ nvme_controller *ctrlr;
+ nvme_namespace *ns;
+
+ uint64_t block_size = 0;
+ uint64_t size = 0;
+ std::vector<NVMEDevice*> registered_devices;
+ struct AioCompletionThread : public Thread {
+ SharedDriverData *data;
+ AioCompletionThread(SharedDriverData *d) : data(d) {}
+ void *entry() {
+ data->_aio_thread();
+ return NULL;
+ }
+ } aio_thread;
+ friend class AioCompletionThread;
+
+ bool aio_stop = false;
+ int ref = 1;
+ void _aio_thread();
+ void _aio_start() {
+ aio_thread.create("nvme_aio_thread");
+ }
+ void _aio_stop() {
+ assert(aio_thread.is_started());
+ {
+ Mutex::Locker l(queue_lock);
+ aio_stop = true;
+ queue_cond.Signal();
+ }
+ aio_thread.join();
+ aio_stop = false;
+ }
+ atomic_t queue_empty;
+ Mutex queue_lock;
+ Cond queue_cond;
+ std::queue<Task*> task_queue;
- int _scan_nvme_device(const string &sn_tag, nvme_controller **c, string *name);
+ Mutex flush_lock;
+ Cond flush_cond;
+ atomic_t flush_waiters;
public:
- SharedDriverData(): lock("NVMEDevice::SharedDriverData::lock") {}
- int try_get(const string &sn_tag, nvme_controller **c, string *name);
- void release(nvme_controller *c);
-};
+ atomic_t inflight_ops;
+ PerfCounters *logger = nullptr;
+
+ SharedDriverData(const std::string &sn_tag, const std::string &n, nvme_controller *c, nvme_namespace *ns)
+ : sn(sn_tag),
+ name(n),
+ ctrlr(c),
+ ns(ns),
+ aio_thread(this),
+ queue_empty(1),
+ queue_lock("NVMEDevice::queue_lock"),
+ flush_lock("NVMEDevice::flush_lock"),
+ flush_waiters(0),
+ inflight_ops(0) {
+ block_size = nvme_ns_get_sector_size(ns);
+ size = block_size * nvme_ns_get_num_sectors(ns);
+
+ PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
+ l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
+ b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
+ b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
+ b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
+ b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
+ b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
+ b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
+ b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
+ b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
+ logger = b.create_perf_counters();
+ g_ceph_context->get_perfcounters_collection()->add(logger);
+ _aio_start();
+ }
+ ~SharedDriverData() {
+ g_ceph_context->get_perfcounters_collection()->remove(logger);
+ delete logger;
+ }
+
+ bool is_equal(const string &tag) const { return sn == tag; }
+ void register_device(NVMEDevice *device) {
+ // in case of registered_devices, we stop thread now.
+ // Because release is really a rare case, we could bear this
+ _aio_stop();
+ registered_devices.push_back(device);
+ _aio_start();
+ }
+ void remove_device(NVMEDevice *device) {
+ _aio_stop();
+ std::vector<NVMEDevice*> new_devices;
+ for (auto &&it : registered_devices) {
+ if (it == device)
+ new_devices.push_back(it);
+ }
+ registered_devices.swap(new_devices);
+ _aio_start();
+ }
-static SharedDriverData driver_data;
+ uint64_t get_block_size() {
+ return block_size;
+ }
+ uint64_t get_size() {
+ return size;
+ }
+ void queue_task(Task *t, uint64_t ops = 1) {
+ inflight_ops.add(ops);
+ Mutex::Locker l(queue_lock);
+ task_queue.push(t);
+ if (queue_empty.read()) {
+ queue_empty.dec();
+ queue_cond.Signal();
+ }
+ }
-int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **c, string *name)
+ void flush_wait() {
+ if (inflight_ops.read()) {
+ // TODO: this may contains read op
+ dout(1) << __func__ << " existed inflight ops " << inflight_ops.read() << dendl;
+ Mutex::Locker l(flush_lock);
+ flush_waiters.inc();
+ while (inflight_ops.read()) {
+ flush_cond.Wait(flush_lock);
+ }
+ flush_waiters.dec();
+ }
+ }
+};
+
+void SharedDriverData::_aio_thread()
{
+ dout(1) << __func__ << " start" << dendl;
+ if (nvme_register_io_thread() != 0) {
+ assert(0);
+ }
+
+ Task *t;
int r = 0;
- dout(1) << __func__ << " serial number " << sn_tag << dendl;
+ const int max = 4;
+ uint64_t lba_off, lba_count;
+ utime_t lat, start = ceph_clock_now(g_ceph_context);
+ while (true) {
+ dout(40) << __func__ << " polling" << dendl;
+ t = nullptr;
+ if (!queue_empty.read()) {
+ Mutex::Locker l(queue_lock);
+ if (!task_queue.empty()) {
+ t = task_queue.front();
+ task_queue.pop();
+ logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
+ }
+ if (!t)
+ queue_empty.inc();
+ } else if (!inflight_ops.read()) {
+ if (flush_waiters.read()) {
+ Mutex::Locker l(flush_lock);
+ flush_cond.Signal();
+ }
- assert(c);
- if (sn_tag.empty()) {
- r = -ENOENT;
- derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
- return r;
- }
+ for (auto &&it : registered_devices)
+ it->reap_ioc();
- auto ctr_it = controllers.find(sn_tag);
- if (ctr_it != controllers.end()) {
- *name = ctr_it->second.second;
- *c = ctr_it->second.first;
- return 0;
+ Mutex::Locker l(queue_lock);
+ if (queue_empty.read()) {
+ lat = ceph_clock_now(g_ceph_context);
+ lat -= start;
+ logger->tinc(l_bluestore_nvmedevice_polling_lat, lat);
+ if (aio_stop)
+ break;
+ dout(20) << __func__ << " enter sleep" << dendl;
+ queue_cond.Wait(queue_lock);
+ dout(20) << __func__ << " exit sleep" << dendl;
+ start = ceph_clock_now(g_ceph_context);
+ }
+ }
+
+ if (t) {
+ switch (t->command) {
+ case IOCommand::WRITE_COMMAND:
+ {
+ while (t) {
+ lba_off = t->offset / block_size;
+ lba_count = t->len / block_size;
+ dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
+ r = nvme_ns_cmd_write(ns, t->buf, lba_off, lba_count, io_complete, t);
+ if (r < 0) {
+ t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
+ rte_free(t->buf);
+ rte_mempool_put(task_pool, t);
+ derr << __func__ << " failed to do write command" << dendl;
+ assert(0);
+ }
+ lat = ceph_clock_now(g_ceph_context);
+ lat -= t->start;
+ logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat);
+ t = t->next;
+ }
+ break;
+ }
+ case IOCommand::READ_COMMAND:
+ {
+ dout(20) << __func__ << " read command issueed " << lba_off << "~" << lba_count << dendl;
+ lba_off = t->offset / block_size;
+ lba_count = t->len / block_size;
+ r = nvme_ns_cmd_read(ns, t->buf, lba_off, lba_count, io_complete, t);
+ if (r < 0) {
+ derr << __func__ << " failed to read" << dendl;
+ t->ctx->num_reading.dec();
+ t->return_code = r;
+ Mutex::Locker l(t->ctx->lock);
+ t->ctx->cond.Signal();
+ } else {
+ lat = ceph_clock_now(g_ceph_context);
+ lat -= t->start;
+ logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat);
+ }
+ break;
+ }
+ case IOCommand::FLUSH_COMMAND:
+ {
+ dout(20) << __func__ << " flush command issueed " << dendl;
+ r = nvme_ns_cmd_flush(ns, io_complete, t);
+ if (r < 0) {
+ derr << __func__ << " failed to flush" << dendl;
+ t->return_code = r;
+ Mutex::Locker l(t->ctx->lock);
+ t->ctx->cond.Signal();
+ } else {
+ lat = ceph_clock_now(g_ceph_context);
+ lat -= t->start;
+ logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat);
+ }
+ break;
+ }
+ }
+ } else if (inflight_ops.read()) {
+ nvme_ctrlr_process_io_completions(ctrlr, max);
+ dout(30) << __func__ << " idle, have a pause" << dendl;
+#ifdef HAVE_SSE
+ _mm_pause();
+#else
+ usleep(10);
+#endif
+ }
}
+ nvme_unregister_io_thread();
+ dout(1) << __func__ << " end" << dendl;
+}
+
+class NVMEManager {
+ Mutex lock;
+ bool init = false;
+ std::vector<SharedDriverData*> shared_driver_datas;
+
+ static int _scan_nvme_device(const string &sn_tag, string &name, nvme_controller **c, nvme_namespace **ns);
+
+ public:
+ NVMEManager()
+ : lock("NVMEDevice::NVMEManager::lock") {}
+ int try_get(const string &sn_tag, SharedDriverData **driver);
+};
+
+static NVMEManager manager;
+
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev "
+
+int NVMEManager::_scan_nvme_device(const string &sn_tag, string &name, nvme_controller **c, nvme_namespace **ns)
+{
+ int r = 0;
+ dout(1) << __func__ << " serial number " << sn_tag << dendl;
pci_device *pci_dev;
}
pci_device_probe(pci_dev);
- *name = pci_device_get_device_name(pci_dev) ? pci_device_get_device_name(pci_dev) : "Unknown";
+ name = pci_device_get_device_name(pci_dev) ? pci_device_get_device_name(pci_dev) : "Unknown";
if (pci_device_has_kernel_driver(pci_dev)) {
if (!pci_device_has_uio_driver(pci_dev)) {
/*NVMe kernel driver case*/
if (g_conf->bdev_nvme_unbind_from_kernel) {
r = pci_device_switch_to_uio_driver(pci_dev);
if (r < 0) {
- derr << __func__ << " device " << *name << " " << pci_dev->bus
+ derr << __func__ << " device " << name << " " << pci_dev->bus
<< ":" << pci_dev->dev << ":" << pci_dev->func
<< " switch to uio driver failed" << dendl;
return r;
} else {
r = pci_device_bind_uio_driver(pci_dev, PCI_UIO_DRIVER);
if (r < 0) {
- derr << __func__ << " device " << *name << " " << pci_dev->bus
+ derr << __func__ << " device " << name << " " << pci_dev->bus
<< ":" << pci_dev->dev << ":" << pci_dev->func
<< " bind to uio driver failed, may lack of uio_pci_generic kernel module" << dendl;
return r;
/* Claim the device in case conflict with other ids process */
r = pci_device_claim(pci_dev);
if (r < 0) {
- derr << __func__ << " device " << *name << " " << pci_dev->bus
+ derr << __func__ << " device " << name << " " << pci_dev->bus
<< ":" << pci_dev->dev << ":" << pci_dev->func
<< " claim failed" << dendl;
return r;
return r;
}
- controllers[sn_tag] = make_pair(*c, *name);
-
pci_iterator_destroy(iter);
- dout(1) << __func__ << " successfully attach nvme device at" << *name
+ int num_ns = nvme_ctrlr_get_num_ns(*c);
+ assert(num_ns >= 1);
+ if (num_ns > 1) {
+ dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
+ }
+ *ns = nvme_ctrlr_get_ns(*c, 1);
+ if (!*ns) {
+ derr << __func__ << " failed to get namespace at 1" << dendl;
+ return -1;
+ }
+ dout(1) << __func__ << " successfully attach nvme device at" << name
<< " " << pci_dev->bus << ":" << pci_dev->dev << ":" << pci_dev->func << dendl;
return 0;
}
-int SharedDriverData::try_get(const string &sn_tag, nvme_controller **c, string *name)
+int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
{
Mutex::Locker l(lock);
int r = 0;
init = true;
}
- return _scan_nvme_device(sn_tag, c, name);
-}
-void SharedDriverData::release(nvme_controller *c)
-{
- dout(1) << __func__ << " " << c << dendl;
+ if (sn_tag.empty()) {
+ r = -ENOENT;
+ derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
+ return r;
+ }
- Mutex::Locker l(lock);
- auto it = controllers.begin();
- for (; it != controllers.end(); ++it) {
- if (c == it->second.first)
- break;
+ for (auto &&it : shared_driver_datas) {
+ if (it->is_equal(sn_tag)) {
+ *driver = it;
+ return 0;
+ }
}
- if (it == controllers.end()) {
- derr << __func__ << " not found registered nvme controller " << c << dendl;
- assert(0);
+
+ nvme_controller *c;
+ nvme_namespace *ns;
+ std::string name;
+ if (_scan_nvme_device(sn_tag, name, &c, &ns) < 0)
+ return -1;
+
+ shared_driver_datas.push_back(new SharedDriverData(sn_tag, name, c, ns));
+ *driver = shared_driver_datas.back();
+
+ return 0;
+}
+
+void io_complete(void *t, const struct nvme_completion *completion)
+{
+ Task *task = static_cast<Task*>(t);
+ IOContext *ctx = task->ctx;
+ SharedDriverData *driver = task->device->get_driver();
+ driver->inflight_ops.dec();
+ utime_t lat = ceph_clock_now(g_ceph_context);
+ lat -= task->start;
+ if (task->command == IOCommand::WRITE_COMMAND) {
+ driver->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat);
+ assert(!nvme_completion_is_error(completion));
+ dout(20) << __func__ << " write op successfully, left " << left << dendl;
+ // buffer write won't have ctx, and we will free request later, see `flush`
+ if (ctx) {
+ // check waiting count before doing callback (which may
+ // destroy this ioc).
+ if (!ctx->num_running.dec()) {
+ if (ctx->num_waiting.read()) {
+ Mutex::Locker l(ctx->lock);
+ ctx->cond.Signal();
+ }
+ if (task->device->aio_callback && ctx->priv) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ }
+ rte_free(task->buf);
+ rte_mempool_put(task_pool, task);
+ } else {
+ task->device->queue_buffer_task(task);
+ }
+ } else if (task->command == IOCommand::READ_COMMAND) {
+ driver->logger->tinc(l_bluestore_nvmedevice_read_lat, lat);
+ ctx->num_reading.dec();
+ dout(20) << __func__ << " read op successfully" << dendl;
+ if (nvme_completion_is_error(completion))
+ task->return_code = -1; // FIXME
+ else
+ task->return_code = 0;
+ {
+ Mutex::Locker l(ctx->lock);
+ ctx->cond.Signal();
+ }
+ } else {
+ assert(task->command == IOCommand::FLUSH_COMMAND);
+ driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
+ dout(20) << __func__ << " flush op successfully" << dendl;
+ if (nvme_completion_is_error(completion))
+ task->return_code = -1; // FIXME
+ else
+ task->return_code = 0;
+ {
+ Mutex::Locker l(ctx->lock);
+ ctx->cond.Signal();
+ }
}
}
#define dout_prefix *_dout << "bdev(" << name << ") "
NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv)
- : ctrlr(nullptr),
- ns(nullptr),
- aio_stop(false),
- queue_empty(1),
- queue_lock("NVMEDevice::queue_lock"),
- aio_thread(this),
- flush_lock("NVMEDevice::flush_lock"),
- flush_waiters(0),
- buffer_lock("NVMEDevice::buffer_lock"),
- logger(nullptr),
- inflight_ops(0),
+ : buffer_lock("NVMEDevice::buffer_lock"),
aio_callback(cb),
aio_callback_priv(cbpriv)
{
return r;
}
serial_number = string(buf, r);
- r = driver_data.try_get(serial_number, &ctrlr, &name);
+ r = manager.try_get(serial_number, &driver);
if (r < 0) {
derr << __func__ << " failed to get nvme deivce with sn " << serial_number << dendl;
return r;
}
- int num_ns = nvme_ctrlr_get_num_ns(ctrlr);
- assert(num_ns >= 1);
- if (num_ns > 1) {
- dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
- }
- ns = nvme_ctrlr_get_ns(ctrlr, 1);
- if (!ns) {
- derr << __func__ << " failed to get namespace at 1" << dendl;
- return -1;
- }
- block_size = nvme_ns_get_sector_size(ns);
- size = block_size * nvme_ns_get_num_sectors(ns);
+ driver->register_device(this);
+ block_size = driver->get_block_size();
+ size = driver->get_size();
dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)"
<< " block_size " << block_size << " (" << pretty_si_t(block_size)
<< "B)" << dendl;
- PerfCountersBuilder b(g_ceph_context, string("nvmedevice-") + name + "-" + stringify(this),
- l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
- b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
- b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
- b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
- b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
- b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
- b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
- b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
- b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
- logger = b.create_perf_counters();
- g_ceph_context->get_perfcounters_collection()->add(logger);
-
- aio_thread.create("nvme_aio_thread");
-
return 0;
}
{
dout(1) << __func__ << dendl;
- {
- Mutex::Locker l(queue_lock);
- aio_stop = true;
- queue_cond.Signal();
- }
- aio_thread.join();
- aio_stop = false;
-
- g_ceph_context->get_perfcounters_collection()->remove(logger);
- delete logger;
- logger = nullptr;
-
name.clear();
- driver_data.release(ctrlr);
- ctrlr = nullptr;
+ driver->remove_device(this);
dout(1) << __func__ << " end" << dendl;
}
-void NVMEDevice::_aio_thread()
-{
- dout(10) << __func__ << " start" << dendl;
- if (nvme_register_io_thread() != 0) {
- assert(0);
- }
-
- Task *t;
- int r = 0;
- const int max = 4;
- uint64_t lba_off, lba_count;
- utime_t lat, start = ceph_clock_now(g_ceph_context);
- while (true) {
- dout(40) << __func__ << " polling" << dendl;
- t = nullptr;
- if (!queue_empty.read()) {
- Mutex::Locker l(queue_lock);
- if (!task_queue.empty()) {
- t = task_queue.front();
- task_queue.pop();
- logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
- }
- if (!t)
- queue_empty.inc();
- } else if (!inflight_ops.read()) {
- if (flush_waiters.read()) {
- Mutex::Locker l(flush_lock);
- flush_cond.Signal();
- }
- Mutex::Locker l(queue_lock);
- if (queue_empty.read()) {
- lat = ceph_clock_now(g_ceph_context);
- lat -= start;
- logger->tinc(l_bluestore_nvmedevice_polling_lat, lat);
- if (aio_stop)
- break;
- dout(20) << __func__ << " enter sleep" << dendl;
- queue_cond.Wait(queue_lock);
- dout(20) << __func__ << " exit sleep" << dendl;
- start = ceph_clock_now(g_ceph_context);
- }
- }
-
- if (t) {
- switch (t->command) {
- case IOCommand::WRITE_COMMAND:
- {
- while (t) {
- lba_off = t->offset / block_size;
- lba_count = t->len / block_size;
- dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
- r = nvme_ns_cmd_write(ns, t->buf, lba_off, lba_count, io_complete, t);
- if (r < 0) {
- t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
- rte_free(t->buf);
- rte_mempool_put(task_pool, t);
- derr << __func__ << " failed to do write command" << dendl;
- assert(0);
- }
- lat = ceph_clock_now(g_ceph_context);
- lat -= t->start;
- logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat);
- t = t->next;
- }
- break;
- }
- case IOCommand::READ_COMMAND:
- {
- dout(20) << __func__ << " read command issueed " << lba_off << "~" << lba_count << dendl;
- lba_off = t->offset / block_size;
- lba_count = t->len / block_size;
- r = nvme_ns_cmd_read(ns, t->buf, lba_off, lba_count, io_complete, t);
- if (r < 0) {
- derr << __func__ << " failed to read" << dendl;
- t->ctx->num_reading.dec();
- t->return_code = r;
- Mutex::Locker l(t->ctx->lock);
- t->ctx->cond.Signal();
- } else {
- lat = ceph_clock_now(g_ceph_context);
- lat -= t->start;
- logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat);
- }
- break;
- }
- case IOCommand::FLUSH_COMMAND:
- {
- dout(20) << __func__ << " flush command issueed " << dendl;
- r = nvme_ns_cmd_flush(ns, io_complete, t);
- if (r < 0) {
- derr << __func__ << " failed to flush" << dendl;
- t->return_code = r;
- Mutex::Locker l(t->ctx->lock);
- t->ctx->cond.Signal();
- } else {
- lat = ceph_clock_now(g_ceph_context);
- lat -= t->start;
- logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat);
- }
- break;
- }
- }
- } else if (inflight_ops.read()) {
- nvme_ctrlr_process_io_completions(ctrlr, max);
- dout(30) << __func__ << " idle, have a pause" << dendl;
-#ifdef HAVE_SSE
- _mm_pause();
-#else
- usleep(10);
-#endif
- }
- reap_ioc();
- }
- nvme_unregister_io_thread();
- dout(10) << __func__ << " end" << dendl;
-}
-
int NVMEDevice::flush()
{
dout(10) << __func__ << " start" << dendl;
utime_t start = ceph_clock_now(g_ceph_context);
- if (inflight_ops.read()) {
- // TODO: this may contains read op
- dout(1) << __func__ << " existed inflight ops " << inflight_ops.read() << dendl;
- Mutex::Locker l(flush_lock);
- flush_waiters.inc();
- while (inflight_ops.read()) {
- flush_cond.Wait(flush_lock);
- }
- flush_waiters.dec();
- }
+ driver->flush_wait();
Task *t = nullptr;
{
Mutex::Locker l(buffer_lock);
}
utime_t lat = ceph_clock_now(g_ceph_context);
lat -= start;
- logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
+ driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
return 0;
// nvme device will cause terriable performance degraded
// while issuing flush command
t->device = this;
t->return_code = 1;
t->next = nullptr;
- {
- Mutex::Locker l(queue_lock);
- task_queue.push(t);
- if (queue_empty.read()) {
- queue_empty.dec();
- queue_cond.Signal();
- }
- }
+ driver->queue_task(t);
+
{
Mutex::Locker l(ioc.lock);
while (t->return_code > 0)
ioc->num_pending.sub(pending);
assert(ioc->num_pending.read() == 0); // we should be only thread doing this
// Only need to push the first entry
- queue_task(t, pending);
+ driver->queue_task(t, pending);
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
}
}
if (buffered) {
t->ctx = nullptr;
// Only need to push the first entry
- queue_task(t);
+ driver->queue_task(t);
Mutex::Locker l(buffer_lock);
buffered_extents.insert(off, len, (char*)t->buf);
} else {
t->return_code = 1;
t->next = nullptr;
ioc->num_reading.inc();;
- queue_task(t);
+ driver->queue_task(t);
{
Mutex::Locker l(ioc->lock);
t->return_code = 1;
t->next = nullptr;
ioc.num_reading.inc();;
- queue_task(t);
+ driver->queue_task(t);
{
Mutex::Locker l(ioc.lock);