]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
os/bluestore/NVMEDevice: Remove using dpdk thread
authorZiye Yang <optimistyzy@gmail.com>
Wed, 6 Sep 2017 05:16:12 +0000 (13:16 +0800)
committerZiye Yang <optimistyzy@gmail.com>
Mon, 18 Sep 2017 16:35:46 +0000 (00:35 +0800)
Do not use DPDK's thread but use the run in complete mode,
thus we do not need to bind two cores for SPDK.

Signed-off-by: Ziye Yang <optimistyzy@gmail.com>
Signed-off-by: Pan Liu <wanjun.lp@alibaba-inc.com>
src/common/options.cc
src/os/bluestore/NVMEDevice.cc
src/os/bluestore/NVMEDevice.h

index b5675608306f7bcd2faf429333ce01dc6a52c469..cc581620d5e8a15b880ebe46affcb36d7878d242 100644 (file)
@@ -3164,7 +3164,7 @@ std::vector<Option> get_global_options() {
     .set_description(""),
 
     Option("bluestore_spdk_coremask", Option::TYPE_STR, Option::LEVEL_DEV)
-    .set_default("0x3")
+    .set_default("0x1")
     .set_description(""),
 
     Option("bluestore_spdk_max_io_completion", Option::TYPE_UINT, Option::LEVEL_DEV)
index ca73d7444282399f83a96c2ca8094c804caef924..ce9c830ddc9f0d00f360c974e61d6b27eae05ab8 100644 (file)
 #include <functional>
 #include <map>
 #include <thread>
-#include <xmmintrin.h>
 
 #include <spdk/nvme.h>
 
-#include <rte_lcore.h>
-
 #include "include/stringify.h"
 #include "include/types.h"
 #include "include/compat.h"
 #undef dout_prefix
 #define dout_prefix *_dout << "bdev(" << sn << ") "
 
-static constexpr uint16_t data_buffer_default_num = 2048;
+thread_local SharedDriverQueueData *queue_t;
+
+static constexpr uint16_t data_buffer_default_num = 1024;
 
 static constexpr uint32_t data_buffer_size = 8192;
 
 static constexpr uint16_t inline_segment_num = 32;
 
-static thread_local int queue_id = -1;
-
 enum {
   l_bluestore_nvmedevice_first = 632430,
   l_bluestore_nvmedevice_write_lat,
@@ -72,12 +69,6 @@ enum {
 
 static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
 
-static int dpdk_thread_adaptor(void *f)
-{
-  (*static_cast<std::function<void ()>*>(f))();
-  return 0;
-}
-
 struct IORequest {
   uint16_t cur_seg_idx = 0;
   uint16_t nseg;
@@ -87,55 +78,83 @@ struct IORequest {
 };
 
 struct Task;
+
+class SharedDriverData {
+  unsigned id;
+  std::string sn;
+  spdk_nvme_ctrlr *ctrlr;
+  spdk_nvme_ns *ns;
+  uint64_t block_size = 0;
+  uint32_t sector_size = 0;
+  uint64_t size = 0;
+
+  public:
+  std::vector<NVMEDevice*> registered_devices;
+  friend class SharedDriverQueueData;
+  SharedDriverData(unsigned _id, const std::string &sn_tag,
+                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
+      : id(_id),
+        sn(sn_tag),
+        ctrlr(c),
+        ns(ns) {
+    sector_size = spdk_nvme_ns_get_sector_size(ns);
+    block_size = std::max(CEPH_PAGE_SIZE, sector_size);
+    size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
+  }
+
+  bool is_equal(const string &tag) const { return sn == tag; }
+  ~SharedDriverData() {
+  }
+
+  void register_device(NVMEDevice *device) {
+    registered_devices.push_back(device);
+  }
+
+  void remove_device(NVMEDevice *device) {
+    std::vector<NVMEDevice*> new_devices;
+    for (auto &&it : registered_devices) {
+      if (it != device)
+        new_devices.push_back(it);
+    }
+    registered_devices.swap(new_devices);
+  }
+
+  uint64_t get_block_size() {
+    return block_size;
+  }
+  uint64_t get_size() {
+    return size;
+  }
+};
+
 class SharedDriverQueueData {
+  NVMEDevice *bdev;
   SharedDriverData *driver;
   spdk_nvme_ctrlr *ctrlr;
   spdk_nvme_ns *ns;
   std::string sn;
   uint64_t block_size;
   uint32_t sector_size;
-  uint32_t core_id;
-  uint32_t queueid;
   uint32_t max_queue_depth;
   struct spdk_nvme_qpair *qpair;
-  std::function<void ()> run_func;
-
-  bool aio_stop = false;
-  void _aio_thread();
+  bool reap_io = false;
   int alloc_buf_from_pool(Task *t, bool write);
 
-  std::atomic_bool queue_empty;
-  Mutex queue_lock;
-  Cond queue_cond;
-  std::queue<Task*> task_queue;
-
-  Mutex flush_lock;
-  Cond flush_cond;
-  std::atomic_int flush_waiters;
-  std::set<uint64_t> flush_waiter_seqs;
-
   public:
     uint32_t current_queue_depth = 0;
     std::atomic_ulong completed_op_seq, queue_op_seq;
     std::vector<void*> data_buf_mempool;
     PerfCounters *logger = nullptr;
+    void _aio_handle(Task *t, IOContext *ioc);
+
+    SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
+      : bdev(bdev),
+        driver(driver) {
+    ctrlr = driver->ctrlr;
+    ns = driver->ns;
+    block_size = driver->block_size;
+    sector_size = driver->sector_size;
 
-    SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
-                          const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
-      : driver(driver),
-        ctrlr(c),
-        ns(ns),
-       sn(sn_tag),
-        block_size(block_size),
-        sector_size(sector_size),
-        core_id(core),
-        queueid(queue_id),
-        run_func([this]() { _aio_thread(); }),
-        queue_empty(false),
-        queue_lock("NVMEDevice::queue_lock"),
-        flush_lock("NVMEDevice::flush_lock"),
-        flush_waiters(0),
-        completed_op_seq(0), queue_op_seq(0) {
     struct spdk_nvme_io_qpair_opts opts = {};
     spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
     opts.qprio = SPDK_NVME_QPRIO_URGENT;
@@ -144,6 +163,16 @@ class SharedDriverQueueData {
     qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
     assert(qpair != NULL);
 
+    // allocate spdk dma memory
+    for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+      void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
+      if (!b) {
+        derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
+        assert(b);
+      }
+      data_buf_mempool.push_back(b);
+    }
+
     PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
                           l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
     b.add_time_avg(l_bluestore_nvmedevice_write_lat, "write_lat", "Average write completing latency");
@@ -157,139 +186,29 @@ class SharedDriverQueueData {
     b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
     logger = b.create_perf_counters();
     g_ceph_context->get_perfcounters_collection()->add(logger);
-   }
-
-   void queue_task(Task *t, uint64_t ops = 1) {
-    queue_op_seq += ops;
-    Mutex::Locker l(queue_lock);
-    task_queue.push(t);
-    if (queue_empty.load()) {
-      queue_empty = false;
-      queue_cond.Signal();
-    }
-  }
-
-  void flush_wait() {
-    uint64_t cur_seq = queue_op_seq.load();
-    uint64_t left = cur_seq - completed_op_seq.load();
-    if (cur_seq > completed_op_seq) {
-      // TODO: this may contains read op
-      dout(10) << __func__ << " existed inflight ops " << left << dendl;
-      Mutex::Locker l(flush_lock);
-      ++flush_waiters;
-      flush_waiter_seqs.insert(cur_seq);
-      while (cur_seq > completed_op_seq.load()) {
-       flush_cond.Wait(flush_lock);
-      }
-      flush_waiter_seqs.erase(cur_seq);
-      --flush_waiters;
-    }
-  }
-
-  void start() {
-    int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
-                                  core_id);
-    assert(r == 0);
-
-  }
-
-  void stop() {
-    {
-      Mutex::Locker l(queue_lock);
-      aio_stop = true;
-      queue_cond.Signal();
-    }
-    int r = rte_eal_wait_lcore(core_id);
-    assert(r == 0);
-    aio_stop = false;
+    bdev->queue_number++;
+    if(bdev->queue_number.load() == 1)
+      reap_io = true;
   }
 
   ~SharedDriverQueueData() {
     g_ceph_context->get_perfcounters_collection()->remove(logger);
     if (!qpair) {
-      spdk_nvme_ctrlr_free_io_qpair(qpair); 
+      spdk_nvme_ctrlr_free_io_qpair(qpair);
+      bdev->queue_number--;
     }
-    delete logger;
-  }
-};
-
-class SharedDriverData {
-  unsigned id;
-  uint32_t core_id;
-
-  std::string sn;
-  spdk_nvme_ctrlr *ctrlr;
-  spdk_nvme_ns *ns;
-  uint64_t block_size = 0;
-  uint32_t sector_size = 0;
-  uint64_t size = 0;
-  uint32_t queue_number;
-  std::vector<SharedDriverQueueData*> queues;
-
-  void _aio_start() {
-     for (auto &&it : queues)
-             it->start();
-  }
-  void _aio_stop() {
-      for (auto &&it : queues)
-             it->stop();
-  }
-
-  public:
-  std::vector<NVMEDevice*> registered_devices;
-  SharedDriverData(unsigned _id, const std::string &sn_tag,
-                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
-      : id(_id),
-        sn(sn_tag),
-        ctrlr(c),
-        ns(ns) {
-    int i;
-    sector_size = spdk_nvme_ns_get_sector_size(ns);
-    block_size = std::max(CEPH_PAGE_SIZE, sector_size);
-    size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
-
-    RTE_LCORE_FOREACH_SLAVE(i) {
-      queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
-   }
-
-   _aio_start();
-  }
 
-  bool is_equal(const string &tag) const { return sn == tag; }
-  ~SharedDriverData() {
-    for (auto p : queues) {
-      delete p;
-   }
-  }
-
-  SharedDriverQueueData *get_queue(uint32_t i) {
-       return queues.at(i%queue_number);
-  }
-
-  void register_device(NVMEDevice *device) {
-    // in case of registered_devices, we stop thread now.
-    // Because release is really a rare case, we could bear this
-    _aio_stop();
-    registered_devices.push_back(device);
-    _aio_start();
-  }
-
-  void remove_device(NVMEDevice *device) {
-    _aio_stop();
-    std::vector<NVMEDevice*> new_devices;
-    for (auto &&it : registered_devices) {
-      if (it != device)
-        new_devices.push_back(it);
+    // free all spdk dma memory;
+    if (!data_buf_mempool.empty()) {
+      for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+        void *b = data_buf_mempool[i];
+        assert(b);
+        spdk_dma_free(b);
+      }
+      data_buf_mempool.clear();
     }
-    registered_devices.swap(new_devices);
-    _aio_start();
-  }
 
-  uint64_t get_block_size() {
-    return block_size;
-  }
-  uint64_t get_size() {
-    return size;
+    delete logger;
   }
 };
 
@@ -342,16 +261,6 @@ struct Task {
       copied += need_copy;
     }
   }
-
-  void io_wait() {
-    std::unique_lock<std::mutex> l(lock);
-    cond.wait(l);
-  }
-
-  void io_wake() {
-    std::lock_guard<std::mutex> l(lock);
-    cond.notify_all();
-  }
 };
 
 static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
@@ -443,32 +352,19 @@ int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
   return 0;
 }
 
-void SharedDriverQueueData::_aio_thread()
+void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
 {
-  dout(1) << __func__ << " start" << dendl;
-
-  if (data_buf_mempool.empty()) {
-    for (uint16_t i = 0; i < data_buffer_default_num; i++) {
-      void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
-      if (!b) {
-        derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
-        assert(b);
-      }
-      data_buf_mempool.push_back(b);
-    }
-  }
+  dout(20) << __func__ << " start" << dendl;
 
-  Task *t = nullptr;
   int r = 0;
   uint64_t lba_off, lba_count;
 
   ceph::coarse_real_clock::time_point cur, start
     = ceph::coarse_real_clock::now();
-  while (true) {
-    bool inflight = queue_op_seq.load() - completed_op_seq.load();
+  while (ioc->num_running) {
  again:
     dout(40) << __func__ << " polling" << dendl;
-    if (inflight) {
+    if (current_queue_depth) {
       spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion);
     }
 
@@ -549,46 +445,15 @@ void SharedDriverQueueData::_aio_thread()
       }
       current_queue_depth++;
     }
-
-    if (!queue_empty.load()) {
-      Mutex::Locker l(queue_lock);
-      if (!task_queue.empty()) {
-        t = task_queue.front();
-        task_queue.pop();
-        logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
-      }
-      if (!t)
-        queue_empty = true;
-    } else {
-      if (flush_waiters.load()) {
-        Mutex::Locker l(flush_lock);
-        if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
-          flush_cond.Signal();
-      }
-
-      if (!inflight) {
-        // be careful, here we need to let each thread reap its own, currently it is done
-        // by only one dedicatd dpdk thread
-        if(!queueid) {
-          for (auto &&it : driver->registered_devices)
-            it->reap_ioc();
-        }
-
-        Mutex::Locker l(queue_lock);
-        if (queue_empty.load()) {
-          cur = ceph::coarse_real_clock::now();
-          auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
-          logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
-          if (aio_stop)
-            break;
-          queue_cond.Wait(queue_lock);
-          start = ceph::coarse_real_clock::now();
-        }
-      }
-    }
+    cur = ceph::coarse_real_clock::now();
+    auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+    logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
+    start = ceph::coarse_real_clock::now();
   }
-  assert(data_buf_mempool.size() == data_buffer_default_num);
-  dout(1) << __func__ << " end" << dendl;
+
+  if(reap_io)
+    bdev->reap_ioc();
+  dout(20) << __func__ << " end" << dendl;
 }
 
 #define dout_subsys ceph_subsys_bdev
@@ -733,10 +598,10 @@ int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
     }
   }
 
-  // at least two cores are needed for using spdk
-  if (core_num < 2) {
+  // at least one core is needed for using spdk
+  if (core_num < 1) {
     r = -ENOENT;
-    derr << __func__ << " invalid spdk coremask, at least two cores are needed: "
+    derr << __func__ << " invalid spdk coremask, at least one core is needed: "
          << cpp_strerror(r) << dendl;
     return r;
   }
@@ -809,7 +674,6 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
 
   assert(queue != NULL);
   assert(ctx != NULL);
-  ++queue->completed_op_seq;
   --queue->current_queue_depth;
   auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
       ceph::coarse_real_clock::now() - task->start);
@@ -868,7 +732,6 @@ NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
 {
 }
 
-
 int NVMEDevice::open(const string& p)
 {
   int r = 0;
@@ -922,6 +785,7 @@ int NVMEDevice::open(const string& p)
           << " block_size " << block_size << " (" << pretty_si_t(block_size)
           << "B)" << dendl;
 
+
   return 0;
 }
 
@@ -950,17 +814,6 @@ int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) c
 
 int NVMEDevice::flush()
 {
-  dout(10) << __func__ << " start" << dendl;
-  auto start = ceph::coarse_real_clock::now();
-
-  if(queue_id == -1)
-    queue_id = ceph_gettid();
-  SharedDriverQueueData *queue = driver->get_queue(queue_id);
-  assert(queue != NULL);
-  queue->flush_wait();
-  auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
-      ceph::coarse_real_clock::now() - start);
-  queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
   return 0;
 }
 
@@ -976,10 +829,10 @@ void NVMEDevice::aio_submit(IOContext *ioc)
     ioc->num_pending -= pending;
     assert(ioc->num_pending.load() == 0);  // we should be only thread doing this
     // Only need to push the first entry
-    if (queue_id == -1)
-      queue_id = ceph_gettid();
     ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
-    driver->get_queue(queue_id)->queue_task(t, pending);
+    if(!queue_t)
+       queue_t = new SharedDriverQueueData(this, driver);
+    queue_t->_aio_handle(t, ioc);
   }
 }
 
index 74e045b0f5b1151164699c1471e8fbddb9dad170..81911512ccfb28b03d9160d783d6cdbc81ab6061 100644 (file)
@@ -38,6 +38,7 @@ enum class IOCommand {
 };
 
 class SharedDriverData;
+class SharedDriverQueueData;
 
 class NVMEDevice : public BlockDevice {
   /**
@@ -49,9 +50,9 @@ class NVMEDevice : public BlockDevice {
   string name;
 
  public:
+  std::atomic_int queue_number = {0};
   SharedDriverData *get_driver() { return driver; }
 
- public:
   NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv);
 
   bool supported_bdev_label() override { return false; }