]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
bluestore/NVMEDevice: make IO thread using dpdk launch 8160/head
authorHaomai Wang <haomai@xsky.com>
Wed, 16 Mar 2016 11:13:39 +0000 (19:13 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 16 Mar 2016 14:36:04 +0000 (22:36 +0800)
Previously we will call rte_eal_init in caller thread as well as dpdk
resources will init in that thread. It will cause all threads spawned
later will bind to the specified core.

And io thread will be spawned via dpdk api, otherwise, dpdk memory
has huge performance degraded problem.

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/os/bluestore/NVMEDevice.cc

index ef68bdd560d784df553eb9cef34a02950eeab65f..d89a7052fb95cbafcbe09a0aaf3c43e4f7b5b849 100644 (file)
@@ -21,6 +21,7 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <map>
+#include <thread>
 #ifdef HAVE_SSE
 #include <xmmintrin.h>
 #endif
@@ -77,45 +78,39 @@ enum {
 
 static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
 
-static const char *ealargs[] = {
-    "ceph-osd",
-    "-c 0x1", /* This must be the second parameter. It is overwritten by index in main(). */
-    "-n 4",
-};
-
+int dpdk_thread_adaptor(void *f)
+{
+  (*static_cast<std::function<void ()>*>(f))();
+  return 0;
+}
 
 class SharedDriverData {
+  unsigned id;
   std::string sn;
   std::string name;
   spdk_nvme_ctrlr *ctrlr;
   spdk_nvme_ns *ns;
+  std::function<void ()> run_func;
 
   uint64_t block_size = 0;
   uint64_t size = 0;
   std::vector<NVMEDevice*> registered_devices;
-  struct AioCompletionThread : public Thread {
-    SharedDriverData *data;
-    AioCompletionThread(SharedDriverData *d) : data(d) {}
-    void *entry() {
-      data->_aio_thread();
-      return NULL;
-    }
-  } aio_thread;
   friend class AioCompletionThread;
 
   bool aio_stop = false;
   void _aio_thread();
   void _aio_start() {
-    aio_thread.create("nvme_aio_thread");
+    int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func), id);
+    assert(r == 0);
   }
   void _aio_stop() {
-    assert(aio_thread.is_started());
     {
       Mutex::Locker l(queue_lock);
       aio_stop = true;
       queue_cond.Signal();
     }
-    aio_thread.join();
+    int r = rte_eal_wait_lcore(id);
+    assert(r == 0);
     aio_stop = false;
   }
   std::atomic_bool queue_empty;
@@ -132,12 +127,14 @@ class SharedDriverData {
   std::atomic_int inflight_ops;
   PerfCounters *logger = nullptr;
 
-  SharedDriverData(const std::string &sn_tag, const std::string &n, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
-      : sn(sn_tag),
+  SharedDriverData(unsigned i, const std::string &sn_tag, const std::string &n,
+                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
+      : id(i),
+        sn(sn_tag),
         name(n),
         ctrlr(c),
         ns(ns),
-        aio_thread(this),
+        run_func([this]() { _aio_thread(); }),
         queue_empty(false),
         queue_lock("NVMEDevice::queue_lock"),
         flush_lock("NVMEDevice::flush_lock"),
@@ -354,17 +351,24 @@ void SharedDriverData::_aio_thread()
 #define dout_prefix *_dout << "bdev "
 
 class NVMEManager {
-  Mutex lock;
-  bool init = false;
-  std::vector<SharedDriverData*> shared_driver_datas;
-
  public:
   struct ProbeContext {
     string sn_tag;
     NVMEManager *manager;
     SharedDriverData *driver;
+    bool done;
   };
 
+ private:
+  Mutex lock;
+  bool init = false;
+  std::vector<SharedDriverData*> shared_driver_datas;
+  std::thread dpdk_thread;
+  std::mutex probe_queue_lock;
+  std::condition_variable probe_queue_cond;
+  std::list<ProbeContext*> probe_queue;
+
+ public:
   NVMEManager()
       : lock("NVMEDevice::NVMEManager::lock") {}
   int try_get(const string &sn_tag, SharedDriverData **driver);
@@ -386,7 +390,10 @@ class NVMEManager {
     dout(1) << __func__ << " successfully attach nvme device at" << name
             << " " << spdk_pci_device_get_bus(pci_dev) << ":" << spdk_pci_device_get_dev(pci_dev) << ":" << spdk_pci_device_get_func(pci_dev) << dendl;
 
-    shared_driver_datas.push_back(new SharedDriverData(sn_tag, name, c, ns));
+    // only support one device per osd now!
+    assert(shared_driver_datas.empty());
+    // index 0 is occured by master thread
+    shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, name, c, ns));
     *driver = shared_driver_datas.back();
   }
 };
@@ -454,39 +461,6 @@ int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
 {
   Mutex::Locker l(lock);
   int r = 0;
-  if (!init) {
-    r = rte_eal_init(sizeof(ealargs) / sizeof(ealargs[0]), (char **)(void *)(uintptr_t)ealargs);
-    if (r < 0) {
-      derr << __func__ << " failed to do rte_eal_init" << dendl;
-      return r;
-    }
-
-    request_mempool = rte_mempool_create("nvme_request", 512,
-                                         spdk_nvme_request_size(), 128, 0,
-                                         NULL, NULL, NULL, NULL,
-                                         SOCKET_ID_ANY, 0);
-    if (request_mempool == NULL) {
-      derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
-      return -ENOMEM;
-    }
-
-    task_pool = rte_mempool_create(
-        "task_pool", 512, sizeof(Task),
-        64, 0, NULL, NULL, NULL, NULL,
-        SOCKET_ID_ANY, 0);
-    if (task_pool == NULL) {
-      derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
-      return -ENOMEM;
-    }
-
-    pci_system_init();
-    spdk_nvme_retry_count = g_conf->bdev_nvme_retry_count;
-    if (spdk_nvme_retry_count < 0)
-      spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
-
-    init = true;
-  }
-
   if (sn_tag.empty()) {
     r = -ENOENT;
     derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
@@ -500,14 +474,75 @@ int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
     }
   }
 
-  ProbeContext ctx = {sn_tag, this, nullptr};
-  r = spdk_nvme_probe(&ctx, probe_cb, attach_cb);
-  if (r < 0) {
-    assert(!ctx.driver);
-    derr << __func__ << " device probe nvme failed" << dendl;
-    return r;
+  if (!init) {
+    init = true;
+    dpdk_thread = std::thread(
+      [this]() {
+        static const char *ealargs[] = {
+            "ceph-osd",
+            "-c 0x3", /* This must be the second parameter. It is overwritten by index in main(). */
+            "-n 4",
+        };
+
+        int r = rte_eal_init(sizeof(ealargs) / sizeof(ealargs[0]), (char **)(void *)(uintptr_t)ealargs);
+        if (r < 0) {
+          derr << __func__ << " failed to do rte_eal_init" << dendl;
+          assert(0);
+        }
+
+        request_mempool = rte_mempool_create("nvme_request", 512,
+                                             spdk_nvme_request_size(), 128, 0,
+                                             NULL, NULL, NULL, NULL,
+                                             SOCKET_ID_ANY, 0);
+        if (request_mempool == NULL) {
+          derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
+          assert(0);
+        }
+
+        task_pool = rte_mempool_create(
+            "task_pool", 512, sizeof(Task),
+            64, 0, NULL, NULL, NULL, NULL,
+            SOCKET_ID_ANY, 0);
+        if (task_pool == NULL) {
+          derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
+          assert(0);
+        }
+
+        pci_system_init();
+        spdk_nvme_retry_count = g_conf->bdev_nvme_retry_count;
+        if (spdk_nvme_retry_count < 0)
+          spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
+
+        std::unique_lock<std::mutex> l(probe_queue_lock);
+        while (true) {
+          if (!probe_queue.empty()) {
+            ProbeContext* ctxt = probe_queue.front();
+            probe_queue.pop_front();
+            r = spdk_nvme_probe(ctxt, probe_cb, attach_cb);
+            if (r < 0) {
+              assert(!ctxt->driver);
+              derr << __func__ << " device probe nvme failed" << dendl;
+            }
+            ctxt->done = true;
+            probe_queue_cond.notify_all();
+          } else {
+            probe_queue_cond.wait(l);
+          }
+        }
+      }
+    );
+    dpdk_thread.detach();
   }
 
+  ProbeContext ctx = {sn_tag, this, nullptr, false};
+  {
+    std::unique_lock<std::mutex> l(probe_queue_lock);
+    probe_queue.push_back(&ctx);
+    while (!ctx.done)
+      probe_queue_cond.wait(l);
+  }
+  if (!ctx.driver)
+    return -1;
   *driver = ctx.driver;
 
   return 0;