]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
NVMEDevice: use the only aio thread to process task
authorHaomai Wang <haomai@xsky.com>
Wed, 20 Jan 2016 03:48:39 +0000 (11:48 +0800)
committerHaomai Wang <haomai@xsky.com>
Mon, 1 Feb 2016 14:02:19 +0000 (22:02 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/os/bluestore/NVMEDevice.cc
src/os/bluestore/NVMEDevice.h

index ad96d34c3b979dd2bccd7c632ad90800e58d4b40..9d0eb63a60389528d889bde6900d8ab51acd2935 100644 (file)
@@ -44,7 +44,7 @@
 
 #define dout_subsys ceph_subsys_bdev
 #undef dout_prefix
-#define dout_prefix *_dout << "bdev "
+#define dout_prefix *_dout << "bdev(" << sn << ") "
 
 rte_mempool *request_mempool = nullptr;
 rte_mempool *task_pool = nullptr;
@@ -62,60 +62,7 @@ enum {
   l_bluestore_nvmedevice_last
 };
 
-static void io_complete(void *t, const struct nvme_completion *completion) {
-  Task *task = static_cast<Task*>(t);
-  IOContext *ctx = task->ctx;
-  task->device->inflight_ops.dec();
-  utime_t lat = ceph_clock_now(g_ceph_context);
-  lat -= task->start;
-  if (task->command == IOCommand::WRITE_COMMAND) {
-    task->device->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat);
-    assert(!nvme_completion_is_error(completion));
-    dout(20) << __func__ << " write op successfully, left " << left << dendl;
-    // buffer write won't have ctx, and we will free request later, see `flush`
-    if (ctx) {
-      // check waiting count before doing callback (which may
-      // destroy this ioc).
-      if (!ctx->num_running.dec()) {
-        if (ctx->num_waiting.read()) {
-          Mutex::Locker l(ctx->lock);
-          ctx->cond.Signal();
-        }
-        if (task->device->aio_callback && ctx->priv) {
-          task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
-        }
-      }
-      rte_free(task->buf);
-      rte_mempool_put(task_pool, task);
-    } else {
-      task->device->queue_buffer_task(task);
-    }
-  } else if (task->command == IOCommand::READ_COMMAND) {
-    task->device->logger->tinc(l_bluestore_nvmedevice_read_lat, lat);
-    ctx->num_reading.dec();
-    dout(20) << __func__ << " read op successfully" << dendl;
-    if (nvme_completion_is_error(completion))
-      task->return_code = -1; // FIXME
-    else
-      task->return_code = 0;
-    {
-      Mutex::Locker l(ctx->lock);
-      ctx->cond.Signal();
-    }
-  } else {
-    assert(task->command == IOCommand::FLUSH_COMMAND);
-    task->device->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
-    dout(20) << __func__ << " flush op successfully" << dendl;
-    if (nvme_completion_is_error(completion))
-      task->return_code = -1; // FIXME
-    else
-      task->return_code = 0;
-    {
-      Mutex::Locker l(ctx->lock);
-      ctx->cond.Signal();
-    }
-  }
-}
+static void io_complete(void *t, const struct nvme_completion *completion);
 
 static char *ealargs[] = {
     "ceph-osd",
@@ -123,39 +70,280 @@ static char *ealargs[] = {
     "-n 4",
 };
 
+
 class SharedDriverData {
-  std::map<string, std::pair<nvme_controller*, string> > controllers;
-  bool init = false;
-  Mutex lock;
+  std::string sn;
+  std::string name;
+  nvme_controller *ctrlr;
+  nvme_namespace *ns;
+
+  uint64_t block_size = 0;
+  uint64_t size = 0;
+  std::vector<NVMEDevice*> registered_devices;
+  struct AioCompletionThread : public Thread {
+    SharedDriverData *data;
+    AioCompletionThread(SharedDriverData *d) : data(d) {}
+    void *entry() {
+      data->_aio_thread();
+      return NULL;
+    }
+  } aio_thread;
+  friend class AioCompletionThread;
+
+  bool aio_stop = false;
+  int ref = 1;
+  void _aio_thread();
+  void _aio_start() {
+    aio_thread.create("nvme_aio_thread");
+  }
+  void _aio_stop() {
+    assert(aio_thread.is_started());
+    {
+      Mutex::Locker l(queue_lock);
+      aio_stop = true;
+      queue_cond.Signal();
+    }
+    aio_thread.join();
+    aio_stop = false;
+  }
+  atomic_t queue_empty;
+  Mutex queue_lock;
+  Cond queue_cond;
+  std::queue<Task*> task_queue;
 
-  int _scan_nvme_device(const string &sn_tag, nvme_controller **c, string *name);
+  Mutex flush_lock;
+  Cond flush_cond;
+  atomic_t flush_waiters;
 
  public:
-  SharedDriverData(): lock("NVMEDevice::SharedDriverData::lock") {}
-  int try_get(const string &sn_tag, nvme_controller **c, string *name);
-  void release(nvme_controller *c);
-};
+  atomic_t inflight_ops;
+  PerfCounters *logger = nullptr;
+
+  SharedDriverData(const std::string &sn_tag, const std::string &n, nvme_controller *c, nvme_namespace *ns)
+      : sn(sn_tag),
+        name(n),
+        ctrlr(c),
+        ns(ns),
+        aio_thread(this),
+        queue_empty(1),
+        queue_lock("NVMEDevice::queue_lock"),
+        flush_lock("NVMEDevice::flush_lock"),
+        flush_waiters(0),
+        inflight_ops(0) {
+    block_size = nvme_ns_get_sector_size(ns);
+    size = block_size * nvme_ns_get_num_sectors(ns);
+
+    PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
+                          l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
+    b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
+    b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
+    b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
+    b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
+    b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
+    b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
+    b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
+    b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
+    logger = b.create_perf_counters();
+    g_ceph_context->get_perfcounters_collection()->add(logger);
+    _aio_start();
+  }
+  ~SharedDriverData() {
+    g_ceph_context->get_perfcounters_collection()->remove(logger);
+    delete logger;
+  }
+
+  bool is_equal(const string &tag) const { return sn == tag; }
+  void register_device(NVMEDevice *device) {
+    // in case of registered_devices, we stop thread now.
+    // Because release is really a rare case, we could bear this
+    _aio_stop();
+    registered_devices.push_back(device);
+    _aio_start();
+  }
+  void remove_device(NVMEDevice *device) {
+    _aio_stop();
+    std::vector<NVMEDevice*> new_devices;
+    for (auto &&it : registered_devices) {
+      if (it == device)
+        new_devices.push_back(it);
+    }
+    registered_devices.swap(new_devices);
+    _aio_start();
+  }
 
-static SharedDriverData driver_data;
+  uint64_t get_block_size() {
+    return block_size;
+  }
+  uint64_t get_size() {
+    return size;
+  }
+  void queue_task(Task *t, uint64_t ops = 1) {
+    inflight_ops.add(ops);
+    Mutex::Locker l(queue_lock);
+    task_queue.push(t);
+    if (queue_empty.read()) {
+      queue_empty.dec();
+      queue_cond.Signal();
+    }
+  }
 
-int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **c, string *name)
+  void flush_wait() {
+    if (inflight_ops.read()) {
+      // TODO: this may contains read op
+      dout(1) << __func__ << " existed inflight ops " << inflight_ops.read() << dendl;
+      Mutex::Locker l(flush_lock);
+      flush_waiters.inc();
+      while (inflight_ops.read()) {
+        flush_cond.Wait(flush_lock);
+      }
+      flush_waiters.dec();
+    }
+  }
+};
+
+void SharedDriverData::_aio_thread()
 {
+  dout(1) << __func__ << " start" << dendl;
+  if (nvme_register_io_thread() != 0) {
+    assert(0);
+  }
+
+  Task *t;
   int r = 0;
-  dout(1) << __func__ << " serial number " << sn_tag << dendl;
+  const int max = 4;
+  uint64_t lba_off, lba_count;
+  utime_t lat, start = ceph_clock_now(g_ceph_context);
+  while (true) {
+    dout(40) << __func__ << " polling" << dendl;
+    t = nullptr;
+    if (!queue_empty.read()) {
+      Mutex::Locker l(queue_lock);
+      if (!task_queue.empty()) {
+        t = task_queue.front();
+        task_queue.pop();
+        logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
+      }
+      if (!t)
+        queue_empty.inc();
+    } else if (!inflight_ops.read()) {
+      if (flush_waiters.read()) {
+        Mutex::Locker l(flush_lock);
+        flush_cond.Signal();
+      }
 
-  assert(c);
-  if (sn_tag.empty()) {
-    r = -ENOENT;
-    derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
-    return r;
-  }
+      for (auto &&it : registered_devices)
+        it->reap_ioc();
 
-  auto ctr_it = controllers.find(sn_tag);
-  if (ctr_it != controllers.end()) {
-    *name = ctr_it->second.second;
-    *c = ctr_it->second.first;
-    return 0;
+      Mutex::Locker l(queue_lock);
+      if (queue_empty.read()) {
+        lat = ceph_clock_now(g_ceph_context);
+        lat -= start;
+        logger->tinc(l_bluestore_nvmedevice_polling_lat, lat);
+        if (aio_stop)
+          break;
+        dout(20) << __func__ << " enter sleep" << dendl;
+        queue_cond.Wait(queue_lock);
+        dout(20) << __func__ << " exit sleep" << dendl;
+        start = ceph_clock_now(g_ceph_context);
+      }
+    }
+
+    if (t) {
+      switch (t->command) {
+        case IOCommand::WRITE_COMMAND:
+        {
+          while (t) {
+            lba_off = t->offset / block_size;
+            lba_count = t->len / block_size;
+            dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
+            r = nvme_ns_cmd_write(ns, t->buf, lba_off, lba_count, io_complete, t);
+            if (r < 0) {
+              t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
+              rte_free(t->buf);
+              rte_mempool_put(task_pool, t);
+              derr << __func__ << " failed to do write command" << dendl;
+              assert(0);
+            }
+            lat = ceph_clock_now(g_ceph_context);
+            lat -= t->start;
+            logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat);
+            t = t->next;
+          }
+          break;
+        }
+        case IOCommand::READ_COMMAND:
+        {
+          dout(20) << __func__ << " read command issueed " << lba_off << "~" << lba_count << dendl;
+          lba_off = t->offset / block_size;
+          lba_count = t->len / block_size;
+          r = nvme_ns_cmd_read(ns, t->buf, lba_off, lba_count, io_complete, t);
+          if (r < 0) {
+            derr << __func__ << " failed to read" << dendl;
+            t->ctx->num_reading.dec();
+            t->return_code = r;
+            Mutex::Locker l(t->ctx->lock);
+            t->ctx->cond.Signal();
+          } else {
+            lat = ceph_clock_now(g_ceph_context);
+            lat -= t->start;
+            logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat);
+          }
+          break;
+        }
+        case IOCommand::FLUSH_COMMAND:
+        {
+          dout(20) << __func__ << " flush command issueed " << dendl;
+          r = nvme_ns_cmd_flush(ns, io_complete, t);
+          if (r < 0) {
+            derr << __func__ << " failed to flush" << dendl;
+            t->return_code = r;
+            Mutex::Locker l(t->ctx->lock);
+            t->ctx->cond.Signal();
+          } else {
+            lat = ceph_clock_now(g_ceph_context);
+            lat -= t->start;
+            logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat);
+          }
+          break;
+        }
+      }
+    } else if (inflight_ops.read()) {
+      nvme_ctrlr_process_io_completions(ctrlr, max);
+      dout(30) << __func__ << " idle, have a pause" << dendl;
+#ifdef HAVE_SSE
+      _mm_pause();
+#else
+      usleep(10);
+#endif
+    }
   }
+  nvme_unregister_io_thread();
+  dout(1) << __func__ << " end" << dendl;
+}
+
+class NVMEManager {
+  Mutex lock;
+  bool init = false;
+  std::vector<SharedDriverData*> shared_driver_datas;
+
+  static int _scan_nvme_device(const string &sn_tag, string &name, nvme_controller **c, nvme_namespace **ns);
+
+ public:
+  NVMEManager()
+      : lock("NVMEDevice::NVMEManager::lock") {}
+  int try_get(const string &sn_tag, SharedDriverData **driver);
+};
+
+static NVMEManager manager;
+
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev "
+
+int NVMEManager::_scan_nvme_device(const string &sn_tag, string &name, nvme_controller **c, nvme_namespace **ns)
+{
+  int r = 0;
+  dout(1) << __func__ << " serial number " << sn_tag << dendl;
 
   pci_device *pci_dev;
 
@@ -194,14 +382,14 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **
   }
 
   pci_device_probe(pci_dev);
-  *name = pci_device_get_device_name(pci_dev) ? pci_device_get_device_name(pci_dev) : "Unknown";
+  name = pci_device_get_device_name(pci_dev) ? pci_device_get_device_name(pci_dev) : "Unknown";
   if (pci_device_has_kernel_driver(pci_dev)) {
     if (!pci_device_has_uio_driver(pci_dev)) {
       /*NVMe kernel driver case*/
       if (g_conf->bdev_nvme_unbind_from_kernel) {
         r =  pci_device_switch_to_uio_driver(pci_dev);
         if (r < 0) {
-          derr << __func__ << " device " << *name << " " << pci_dev->bus
+          derr << __func__ << " device " << name << " " << pci_dev->bus
                << ":" << pci_dev->dev << ":" << pci_dev->func
                << " switch to uio driver failed" << dendl;
           return r;
@@ -215,7 +403,7 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **
   } else {
     r =  pci_device_bind_uio_driver(pci_dev, PCI_UIO_DRIVER);
     if (r < 0) {
-      derr << __func__ << " device " << *name << " " << pci_dev->bus
+      derr << __func__ << " device " << name << " " << pci_dev->bus
            << ":" << pci_dev->dev << ":" << pci_dev->func
            << " bind to uio driver failed, may lack of uio_pci_generic kernel module" << dendl;
       return r;
@@ -225,7 +413,7 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **
   /* Claim the device in case conflict with other ids process */
   r =  pci_device_claim(pci_dev);
   if (r < 0) {
-    derr << __func__ << " device " << *name << " " << pci_dev->bus
+    derr << __func__ << " device " << name << " " << pci_dev->bus
          << ":" << pci_dev->dev << ":" << pci_dev->func
          << " claim failed" << dendl;
     return r;
@@ -238,17 +426,25 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **
     return r;
   }
 
-  controllers[sn_tag] = make_pair(*c, *name);
-
   pci_iterator_destroy(iter);
 
-  dout(1) << __func__ << " successfully attach nvme device at" << *name
+  int num_ns = nvme_ctrlr_get_num_ns(*c);
+  assert(num_ns >= 1);
+  if (num_ns > 1) {
+    dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
+  }
+  *ns = nvme_ctrlr_get_ns(*c, 1);
+  if (!*ns) {
+    derr << __func__ << " failed to get namespace at 1" << dendl;
+    return -1;
+  }
+  dout(1) << __func__ << " successfully attach nvme device at" << name
           << " " << pci_dev->bus << ":" << pci_dev->dev << ":" << pci_dev->func << dendl;
 
   return 0;
 }
 
-int SharedDriverData::try_get(const string &sn_tag, nvme_controller **c, string *name)
+int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
 {
   Mutex::Locker l(lock);
   int r = 0;
@@ -284,22 +480,86 @@ int SharedDriverData::try_get(const string &sn_tag, nvme_controller **c, string
 
     init = true;
   }
-  return _scan_nvme_device(sn_tag, c, name);
-}
 
-void SharedDriverData::release(nvme_controller *c)
-{
-  dout(1) << __func__ << " " << c << dendl;
+  if (sn_tag.empty()) {
+    r = -ENOENT;
+    derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
+    return r;
+  }
 
-  Mutex::Locker l(lock);
-  auto it = controllers.begin();
-  for (; it != controllers.end(); ++it) {
-    if (c == it->second.first)
-      break;
+  for (auto &&it : shared_driver_datas) {
+    if (it->is_equal(sn_tag)) {
+      *driver = it;
+      return 0;
+    }
   }
-  if (it == controllers.end()) {
-    derr << __func__ << " not found registered nvme controller " << c << dendl;
-    assert(0);
+
+  nvme_controller *c;
+  nvme_namespace *ns;
+  std::string name;
+  if (_scan_nvme_device(sn_tag, name, &c, &ns) < 0)
+    return -1;
+
+  shared_driver_datas.push_back(new SharedDriverData(sn_tag, name, c, ns));
+  *driver = shared_driver_datas.back();
+
+  return 0;
+}
+
+void io_complete(void *t, const struct nvme_completion *completion)
+{
+  Task *task = static_cast<Task*>(t);
+  IOContext *ctx = task->ctx;
+  SharedDriverData *driver = task->device->get_driver();
+  driver->inflight_ops.dec();
+  utime_t lat = ceph_clock_now(g_ceph_context);
+  lat -= task->start;
+  if (task->command == IOCommand::WRITE_COMMAND) {
+    driver->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat);
+    assert(!nvme_completion_is_error(completion));
+    dout(20) << __func__ << " write op successfully, left " << left << dendl;
+    // buffer write won't have ctx, and we will free request later, see `flush`
+    if (ctx) {
+      // check waiting count before doing callback (which may
+      // destroy this ioc).
+      if (!ctx->num_running.dec()) {
+        if (ctx->num_waiting.read()) {
+          Mutex::Locker l(ctx->lock);
+          ctx->cond.Signal();
+        }
+        if (task->device->aio_callback && ctx->priv) {
+          task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+        }
+      }
+      rte_free(task->buf);
+      rte_mempool_put(task_pool, task);
+    } else {
+      task->device->queue_buffer_task(task);
+    }
+  } else if (task->command == IOCommand::READ_COMMAND) {
+    driver->logger->tinc(l_bluestore_nvmedevice_read_lat, lat);
+    ctx->num_reading.dec();
+    dout(20) << __func__ << " read op successfully" << dendl;
+    if (nvme_completion_is_error(completion))
+      task->return_code = -1; // FIXME
+    else
+      task->return_code = 0;
+    {
+      Mutex::Locker l(ctx->lock);
+      ctx->cond.Signal();
+    }
+  } else {
+    assert(task->command == IOCommand::FLUSH_COMMAND);
+    driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
+    dout(20) << __func__ << " flush op successfully" << dendl;
+    if (nvme_completion_is_error(completion))
+      task->return_code = -1; // FIXME
+    else
+      task->return_code = 0;
+    {
+      Mutex::Locker l(ctx->lock);
+      ctx->cond.Signal();
+    }
   }
 }
 
@@ -308,17 +568,7 @@ void SharedDriverData::release(nvme_controller *c)
 #define dout_prefix *_dout << "bdev(" << name << ") "
 
 NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv)
-    : ctrlr(nullptr),
-      ns(nullptr),
-      aio_stop(false),
-      queue_empty(1),
-      queue_lock("NVMEDevice::queue_lock"),
-      aio_thread(this),
-      flush_lock("NVMEDevice::flush_lock"),
-      flush_waiters(0),
-      buffer_lock("NVMEDevice::buffer_lock"),
-      logger(nullptr),
-      inflight_ops(0),
+    : buffer_lock("NVMEDevice::buffer_lock"),
       aio_callback(cb),
       aio_callback_priv(cbpriv)
 {
@@ -348,44 +598,20 @@ int NVMEDevice::open(string p)
     return r;
   }
   serial_number = string(buf, r);
-  r = driver_data.try_get(serial_number, &ctrlr, &name);
+  r = manager.try_get(serial_number, &driver);
   if (r < 0) {
     derr << __func__ << " failed to get nvme deivce with sn " << serial_number << dendl;
     return r;
   }
 
-  int num_ns = nvme_ctrlr_get_num_ns(ctrlr);
-  assert(num_ns >= 1);
-  if (num_ns > 1) {
-    dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
-  }
-  ns = nvme_ctrlr_get_ns(ctrlr, 1);
-  if (!ns) {
-    derr << __func__ << " failed to get namespace at 1" << dendl;
-    return -1;
-  }
-  block_size = nvme_ns_get_sector_size(ns);
-  size = block_size * nvme_ns_get_num_sectors(ns);
+  driver->register_device(this);
+  block_size = driver->get_block_size();
+  size = driver->get_size();
 
   dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)"
           << " block_size " << block_size << " (" << pretty_si_t(block_size)
           << "B)" << dendl;
 
-  PerfCountersBuilder b(g_ceph_context, string("nvmedevice-") + name + "-" + stringify(this),
-                        l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
-  b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
-  b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
-  b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
-  b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
-  b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
-  b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
-  b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
-  b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
-  logger = b.create_perf_counters();
-  g_ceph_context->get_perfcounters_collection()->add(logger);
-
-  aio_thread.create("nvme_aio_thread");
-
   return 0;
 }
 
@@ -393,156 +619,17 @@ void NVMEDevice::close()
 {
   dout(1) << __func__ << dendl;
 
-  {
-    Mutex::Locker l(queue_lock);
-    aio_stop = true;
-    queue_cond.Signal();
-  }
-  aio_thread.join();
-  aio_stop = false;
-
-  g_ceph_context->get_perfcounters_collection()->remove(logger);
-  delete logger;
-  logger = nullptr;
-
   name.clear();
-  driver_data.release(ctrlr);
-  ctrlr = nullptr;
+  driver->remove_device(this);
 
   dout(1) << __func__ << " end" << dendl;
 }
 
-void NVMEDevice::_aio_thread()
-{
-  dout(10) << __func__ << " start" << dendl;
-  if (nvme_register_io_thread() != 0) {
-    assert(0);
-  }
-
-  Task *t;
-  int r = 0;
-  const int max = 4;
-  uint64_t lba_off, lba_count;
-  utime_t lat, start = ceph_clock_now(g_ceph_context);
-  while (true) {
-    dout(40) << __func__ << " polling" << dendl;
-    t = nullptr;
-    if (!queue_empty.read()) {
-      Mutex::Locker l(queue_lock);
-      if (!task_queue.empty()) {
-        t = task_queue.front();
-        task_queue.pop();
-        logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
-      }
-      if (!t)
-        queue_empty.inc();
-    } else if (!inflight_ops.read()) {
-      if (flush_waiters.read()) {
-        Mutex::Locker l(flush_lock);
-        flush_cond.Signal();
-      }
-      Mutex::Locker l(queue_lock);
-      if (queue_empty.read()) {
-        lat = ceph_clock_now(g_ceph_context);
-        lat -= start;
-        logger->tinc(l_bluestore_nvmedevice_polling_lat, lat);
-        if (aio_stop)
-          break;
-        dout(20) << __func__ << " enter sleep" << dendl;
-        queue_cond.Wait(queue_lock);
-        dout(20) << __func__ << " exit sleep" << dendl;
-        start = ceph_clock_now(g_ceph_context);
-      }
-    }
-
-    if (t) {
-      switch (t->command) {
-        case IOCommand::WRITE_COMMAND:
-        {
-          while (t) {
-            lba_off = t->offset / block_size;
-            lba_count = t->len / block_size;
-            dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
-            r = nvme_ns_cmd_write(ns, t->buf, lba_off, lba_count, io_complete, t);
-            if (r < 0) {
-              t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
-              rte_free(t->buf);
-              rte_mempool_put(task_pool, t);
-              derr << __func__ << " failed to do write command" << dendl;
-              assert(0);
-            }
-            lat = ceph_clock_now(g_ceph_context);
-            lat -= t->start;
-            logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat);
-            t = t->next;
-          }
-          break;
-        }
-        case IOCommand::READ_COMMAND:
-        {
-          dout(20) << __func__ << " read command issueed " << lba_off << "~" << lba_count << dendl;
-          lba_off = t->offset / block_size;
-          lba_count = t->len / block_size;
-          r = nvme_ns_cmd_read(ns, t->buf, lba_off, lba_count, io_complete, t);
-          if (r < 0) {
-            derr << __func__ << " failed to read" << dendl;
-            t->ctx->num_reading.dec();
-            t->return_code = r;
-            Mutex::Locker l(t->ctx->lock);
-            t->ctx->cond.Signal();
-          } else {
-            lat = ceph_clock_now(g_ceph_context);
-            lat -= t->start;
-            logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat);
-          }
-          break;
-        }
-        case IOCommand::FLUSH_COMMAND:
-        {
-          dout(20) << __func__ << " flush command issueed " << dendl;
-          r = nvme_ns_cmd_flush(ns, io_complete, t);
-          if (r < 0) {
-            derr << __func__ << " failed to flush" << dendl;
-            t->return_code = r;
-            Mutex::Locker l(t->ctx->lock);
-            t->ctx->cond.Signal();
-          } else {
-            lat = ceph_clock_now(g_ceph_context);
-            lat -= t->start;
-            logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat);
-          }
-          break;
-        }
-      }
-    } else if (inflight_ops.read()) {
-      nvme_ctrlr_process_io_completions(ctrlr, max);
-      dout(30) << __func__ << " idle, have a pause" << dendl;
-#ifdef HAVE_SSE
-      _mm_pause();
-#else
-      usleep(10);
-#endif
-    }
-    reap_ioc();
-  }
-  nvme_unregister_io_thread();
-  dout(10) << __func__ << " end" << dendl;
-}
-
 int NVMEDevice::flush()
 {
   dout(10) << __func__ << " start" << dendl;
   utime_t start = ceph_clock_now(g_ceph_context);
-  if (inflight_ops.read()) {
-    // TODO: this may contains read op
-    dout(1) << __func__ << " existed inflight ops " << inflight_ops.read() << dendl;
-    Mutex::Locker l(flush_lock);
-    flush_waiters.inc();
-    while (inflight_ops.read()) {
-      flush_cond.Wait(flush_lock);
-    }
-    flush_waiters.dec();
-  }
+  driver->flush_wait();
   Task *t = nullptr;
   {
     Mutex::Locker l(buffer_lock);
@@ -557,7 +644,7 @@ int NVMEDevice::flush()
   }
   utime_t lat = ceph_clock_now(g_ceph_context);
   lat -= start;
-  logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
+  driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
   return 0;
   // nvme device will cause terriable performance degraded
   // while issuing flush command
@@ -579,14 +666,8 @@ int NVMEDevice::flush()
   t->device = this;
   t->return_code = 1;
   t->next = nullptr;
-  {
-    Mutex::Locker l(queue_lock);
-    task_queue.push(t);
-    if (queue_empty.read()) {
-      queue_empty.dec();
-      queue_cond.Signal();
-    }
-  }
+  driver->queue_task(t);
+
   {
     Mutex::Locker l(ioc.lock);
     while (t->return_code > 0)
@@ -610,7 +691,7 @@ void NVMEDevice::aio_submit(IOContext *ioc)
     ioc->num_pending.sub(pending);
     assert(ioc->num_pending.read() == 0);  // we should be only thread doing this
     // Only need to push the first entry
-    queue_task(t, pending);
+    driver->queue_task(t, pending);
     ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
   }
 }
@@ -655,7 +736,7 @@ int NVMEDevice::aio_write(
   if (buffered) {
     t->ctx = nullptr;
     // Only need to push the first entry
-    queue_task(t);
+    driver->queue_task(t);
     Mutex::Locker l(buffer_lock);
     buffered_extents.insert(off, len, (char*)t->buf);
   } else {
@@ -734,7 +815,7 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
   t->return_code = 1;
   t->next = nullptr;
   ioc->num_reading.inc();;
-  queue_task(t);
+  driver->queue_task(t);
 
   {
     Mutex::Locker l(ioc->lock);
@@ -795,7 +876,7 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf)
   t->return_code = 1;
   t->next = nullptr;
   ioc.num_reading.inc();;
-  queue_task(t);
+  driver->queue_task(t);
 
   {
     Mutex::Locker l(ioc.lock);
index 46637c55bcb3ddf82c193936eed5b151153b31a4..cbeb874a386d895a72a8878089ebad0dbe86cece 100644 (file)
@@ -62,6 +62,7 @@ struct Task {
 };
 
 class PerfCounters;
+class SharedDriverData;
 
 class NVMEDevice : public BlockDevice {
   /**
@@ -69,8 +70,7 @@ class NVMEDevice : public BlockDevice {
    * contains 4KB IDENTIFY structure for controller which is
    *  target for CONTROLLER IDENTIFY command during initialization
    */
-  nvme_controller *ctrlr;
-  nvme_namespace       *ns;
+  SharedDriverData *driver;
   string name;
 
   uint64_t size;
@@ -79,36 +79,6 @@ class NVMEDevice : public BlockDevice {
   bool aio_stop;
   bufferptr zeros;
 
-  atomic_t queue_empty;
-  Mutex queue_lock;
-  Cond queue_cond;
-  std::queue<Task*> task_queue;
-
-  void queue_task(Task *t, uint64_t ops = 1) {
-    inflight_ops.add(ops);
-    Mutex::Locker l(queue_lock);
-    task_queue.push(t);
-    if (queue_empty.read()) {
-      queue_empty.dec();
-      queue_cond.Signal();
-    }
-  }
-
-  struct AioCompletionThread : public Thread {
-    NVMEDevice *dev;
-    AioCompletionThread(NVMEDevice *b) : dev(b) {}
-    void *entry() {
-      dev->_aio_thread();
-      return NULL;
-    }
-  } aio_thread;
-
-  void _aio_thread();
-
-  Mutex flush_lock;
-  Cond flush_cond;
-  atomic_t flush_waiters;
-
   struct BufferedExtents {
     struct Extent {
       uint64_t x_len;
@@ -260,9 +230,9 @@ class NVMEDevice : public BlockDevice {
     buffered_task_head = t;
   }
 
+  SharedDriverData *get_driver() { return driver; }
+
  public:
-  PerfCounters *logger;
-  atomic_t inflight_ops;
   aio_callback_t aio_callback;
   void *aio_callback_priv;