]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
bluestore, NVMEDevice: Add multiple thread support for SPDK I/O thread 14420/head
authorZiye Yang <ziye.yang@intel.com>
Wed, 29 Mar 2017 06:04:21 +0000 (14:04 +0800)
committeroptimistyzy <optimistyzy@gmail.com>
Wed, 12 Apr 2017 13:04:33 +0000 (21:04 +0800)
Previously, we only have one thread to do SPDK I/O, and this patch
adds the multiple thread support.

In this first version, we use the tid of the thread to map the I/Os
of this thread to the corresponding SPDK I/O thread.

Signed-off-by: Ziye Yang <optimistyzy@gmail.com>
src/os/bluestore/NVMEDevice.cc

index ef27b8d2ab4f9b76436390c1d4da4ad69f8c1d02..6b08bf707f2322f642147cbf188e410fe330512e 100644 (file)
@@ -43,6 +43,7 @@
 #include "common/errno.h"
 #include "common/debug.h"
 #include "common/perf_counters.h"
+#include "common/io_priority.h"
 
 #include "NVMEDevice.h"
 
 #undef dout_prefix
 #define dout_prefix *_dout << "bdev(" << sn << ") "
 
-std::vector<void*> data_buf_mempool;
-
 static constexpr uint16_t data_buffer_default_num = 2048;
 
 static constexpr uint32_t data_buffer_size = 8192;
 
 static constexpr uint16_t inline_segment_num = 32;
 
+static thread_local int queue_id = -1;
+
 enum {
   l_bluestore_nvmedevice_first = 632430,
   l_bluestore_nvmedevice_aio_write_lat,
@@ -94,97 +95,23 @@ struct IORequest {
   void **extra_segs = nullptr;
 };
 
-struct Task {
-  NVMEDevice *device;
-  IOContext *ctx = nullptr;
-  IOCommand command;
-  uint64_t offset;
-  uint64_t len;
-  bufferlist write_bl;
-  std::function<void()> fill_cb;
-  Task *next = nullptr;
-  int64_t return_code;
-  ceph::coarse_real_clock::time_point start;
-  IORequest io_request;
-  std::mutex lock;
-  std::condition_variable cond;
-  Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
-    : device(dev), command(c), offset(off), len(l),
-      return_code(rc),
-      start(ceph::coarse_real_clock::now()) {}
-  ~Task() {
-    assert(!io_request.nseg);
-  }
-  void release_segs() {
-    if (io_request.extra_segs) {
-      for (uint16_t i = 0; i < io_request.nseg; i++)
-        data_buf_mempool.push_back(io_request.extra_segs[i]);
-      delete io_request.extra_segs;
-    } else if (io_request.nseg) {
-      for (uint16_t i = 0; i < io_request.nseg; i++)
-        data_buf_mempool.push_back(io_request.inline_segs[i]);
-    }
-    io_request.nseg = 0;
-  }
-
-  void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
-    uint64_t copied = 0;
-    uint64_t left = len;
-    void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
-    uint16_t i = 0;
-    while (left > 0) {
-      char *src = static_cast<char*>(segs[i++]);
-      uint64_t need_copy = std::min(left, data_buffer_size-off);
-      memcpy(buf+copied, src+off, need_copy);
-      off = 0;
-      left -= need_copy;
-      copied += need_copy;
-    }
-  }
-
-  void io_wait() {
-    std::unique_lock<std::mutex> l(lock);
-    cond.wait(l);
-  }
-
-  void io_wake() {
-    std::lock_guard<std::mutex> l(lock);
-    cond.notify_all();
-  }
-};
-
-class SharedDriverData {
-  unsigned id;
-  uint32_t core_id;
-  std::string sn;
+class SharedDriverQueueData {
+  SharedDriverData *driver;
   spdk_nvme_ctrlr *ctrlr;
   spdk_nvme_ns *ns;
+  std::string sn;
+  uint64_t block_size;
+  uint32_t sector_size;
+  uint32_t core_id;
+  uint32_t queueid;
   struct spdk_nvme_qpair *qpair;
   std::function<void ()> run_func;
-
-  uint64_t block_size = 0;
-  uint32_t sector_size = 0;
-  uint64_t size = 0;
-  std::vector<NVMEDevice*> registered_devices;
   friend class AioCompletionThread;
 
   bool aio_stop = false;
   void _aio_thread();
-  void _aio_start() {
-    int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
-                                  core_id);
-    assert(r == 0);
-  }
-  void _aio_stop() {
-    {
-      Mutex::Locker l(queue_lock);
-      aio_stop = true;
-      queue_cond.Signal();
-    }
-    int r = rte_eal_wait_lcore(core_id);
-    assert(r == 0);
-    aio_stop = false;
-  }
+  int alloc_buf_from_pool(Task *t, bool write);
+
   std::atomic_bool queue_empty;
   Mutex queue_lock;
   Cond queue_cond;
@@ -195,17 +122,21 @@ class SharedDriverData {
   std::atomic_int flush_waiters;
   std::set<uint64_t> flush_waiter_seqs;
 
- public:
-  std::atomic_ulong completed_op_seq, queue_op_seq;
-  PerfCounters *logger = nullptr;
+  public:
+    std::atomic_ulong completed_op_seq, queue_op_seq;
+    std::vector<void*> data_buf_mempool;
+    PerfCounters *logger = nullptr;
 
-  SharedDriverData(unsigned i, uint32_t core, const std::string &sn_tag,
-                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
-      : id(i),
-        core_id(core),
-        sn(sn_tag),
+    SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
+                          const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
+      : driver(driver),
         ctrlr(c),
         ns(ns),
+       sn(sn_tag),
+        block_size(block_size),
+        sector_size(sector_size),
+        core_id(core),
+        queueid(queue_id),
         run_func([this]() { _aio_thread(); }),
         queue_empty(false),
         queue_lock("NVMEDevice::queue_lock"),
@@ -213,11 +144,7 @@ class SharedDriverData {
         flush_waiters(0),
         completed_op_seq(0), queue_op_seq(0) {
 
-    sector_size = spdk_nvme_ns_get_sector_size(ns);
-    block_size = std::max(CEPH_PAGE_SIZE, sector_size);
-    size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
-    qpair = spdk_nvme_ctrlr_alloc_io_qpair(c, SPDK_NVME_QPRIO_URGENT);
-
+    qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
     PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
                           l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
     b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
@@ -231,17 +158,115 @@ class SharedDriverData {
     b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
     logger = b.create_perf_counters();
     g_ceph_context->get_perfcounters_collection()->add(logger);
-    _aio_start();
+   }
+
+   void queue_task(Task *t, uint64_t ops = 1) {
+    queue_op_seq += ops;
+    Mutex::Locker l(queue_lock);
+    task_queue.push(t);
+    if (queue_empty.load()) {
+      queue_empty = false;
+      queue_cond.Signal();
+    }
   }
-  ~SharedDriverData() {
+
+  void flush_wait() {
+    uint64_t cur_seq = queue_op_seq.load();
+    uint64_t left = cur_seq - completed_op_seq.load();
+    if (cur_seq > completed_op_seq) {
+      // TODO: this may contains read op
+      dout(10) << __func__ << " existed inflight ops " << left << dendl;
+      Mutex::Locker l(flush_lock);
+      ++flush_waiters;
+      flush_waiter_seqs.insert(cur_seq);
+      while (cur_seq > completed_op_seq.load()) {
+       flush_cond.Wait(flush_lock);
+      }
+      flush_waiter_seqs.erase(cur_seq);
+      --flush_waiters;
+    }
+  }
+
+  void start() {
+    int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
+                                  core_id);
+    assert(r == 0);
+
+  }
+
+  void stop() {
+    {
+      Mutex::Locker l(queue_lock);
+      aio_stop = true;
+      queue_cond.Signal();
+    }
+    int r = rte_eal_wait_lcore(core_id);
+    assert(r == 0);
+    aio_stop = false;
+  }
+
+  ~SharedDriverQueueData() {
     g_ceph_context->get_perfcounters_collection()->remove(logger);
     if (!qpair) {
       spdk_nvme_ctrlr_free_io_qpair(qpair); 
     }
     delete logger;
   }
+};
+
+class SharedDriverData {
+  unsigned id;
+  uint32_t core_id;
+
+  std::string sn;
+  spdk_nvme_ctrlr *ctrlr;
+  spdk_nvme_ns *ns;
+  uint64_t block_size = 0;
+  uint32_t sector_size = 0;
+  uint64_t size = 0;
+  uint32_t queue_number;
+  std::vector<SharedDriverQueueData*> queues;
+
+  void _aio_start() {
+     for (auto &&it : queues)
+             it->start();
+  }
+  void _aio_stop() {
+      for (auto &&it : queues)
+             it->stop();
+  }
+
+  public:
+  std::vector<NVMEDevice*> registered_devices;
+  SharedDriverData(unsigned _id, const std::string &sn_tag,
+                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
+      : id(_id),
+        sn(sn_tag),
+        ctrlr(c),
+        ns(ns) {
+    int i;
+    sector_size = spdk_nvme_ns_get_sector_size(ns);
+    block_size = std::max(CEPH_PAGE_SIZE, sector_size);
+    size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
+
+    RTE_LCORE_FOREACH_SLAVE(i) {
+      queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
+   }
+
+   _aio_start();
+  }
 
   bool is_equal(const string &tag) const { return sn == tag; }
+  ~SharedDriverData() {
+    for (auto p : queues) {
+      delete p;
+   }
+  }
+
+  SharedDriverQueueData *get_queue(uint32_t i) {
+       return queues.at(i%queue_number);
+  }
+
   void register_device(NVMEDevice *device) {
     // in case of registered_devices, we stop thread now.
     // Because release is really a rare case, we could bear this
@@ -249,6 +274,7 @@ class SharedDriverData {
     registered_devices.push_back(device);
     _aio_start();
   }
+
   void remove_device(NVMEDevice *device) {
     _aio_stop();
     std::vector<NVMEDevice*> new_devices;
@@ -266,32 +292,66 @@ class SharedDriverData {
   uint64_t get_size() {
     return size;
   }
-  void queue_task(Task *t, uint64_t ops = 1) {
-    queue_op_seq += ops;
-    Mutex::Locker l(queue_lock);
-    task_queue.push(t);
-    if (queue_empty.load()) {
-      queue_empty = false;
-      queue_cond.Signal();
+};
+
+struct Task {
+  NVMEDevice *device;
+  IOContext *ctx = nullptr;
+  IOCommand command;
+  uint64_t offset;
+  uint64_t len;
+  bufferlist write_bl;
+  std::function<void()> fill_cb;
+  Task *next = nullptr;
+  int64_t return_code;
+  ceph::coarse_real_clock::time_point start;
+  IORequest io_request;
+  std::mutex lock;
+  std::condition_variable cond;
+  SharedDriverQueueData *queue;
+  Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
+    : device(dev), command(c), offset(off), len(l),
+      return_code(rc),
+      start(ceph::coarse_real_clock::now()) {}
+  ~Task() {
+    assert(!io_request.nseg);
+  }
+  void release_segs(SharedDriverQueueData *queue_data) {
+    if (io_request.extra_segs) {
+      for (uint16_t i = 0; i < io_request.nseg; i++)
+        queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]);
+      delete io_request.extra_segs;
+    } else if (io_request.nseg) {
+      for (uint16_t i = 0; i < io_request.nseg; i++)
+        queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
     }
+    io_request.nseg = 0;
   }
 
-  void flush_wait() {
-    uint64_t cur_seq = queue_op_seq.load();
-    uint64_t left = cur_seq - completed_op_seq.load();
-    if (cur_seq > completed_op_seq) {
-      // TODO: this may contains read op
-      dout(10) << __func__ << " existed inflight ops " << left << dendl;
-      Mutex::Locker l(flush_lock);
-      ++flush_waiters;
-      flush_waiter_seqs.insert(cur_seq);
-      while (cur_seq > completed_op_seq.load()) {
-        flush_cond.Wait(flush_lock);
-      }
-      flush_waiter_seqs.erase(cur_seq);
-      --flush_waiters;
+  void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
+    uint64_t copied = 0;
+    uint64_t left = len;
+    void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
+    uint16_t i = 0;
+    while (left > 0) {
+      char *src = static_cast<char*>(segs[i++]);
+      uint64_t need_copy = std::min(left, data_buffer_size-off);
+      memcpy(buf+copied, src+off, need_copy);
+      off = 0;
+      left -= need_copy;
+      copied += need_copy;
     }
   }
+
+  void io_wait() {
+    std::unique_lock<std::mutex> l(lock);
+    cond.wait(l);
+  }
+
+  void io_wake() {
+    std::lock_guard<std::mutex> l(lock);
+    cond.notify_all();
+  }
 };
 
 static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
@@ -349,7 +409,7 @@ static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
   return 0;
 }
 
-static int alloc_buf_from_pool(Task *t, bool write)
+int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
 {
   uint64_t count = t->len / data_buffer_size;
   if (t->len % data_buffer_size)
@@ -382,7 +442,7 @@ static int alloc_buf_from_pool(Task *t, bool write)
   return 0;
 }
 
-void SharedDriverData::_aio_thread()
+void SharedDriverQueueData::_aio_thread()
 {
   dout(1) << __func__ << " start" << dendl;
 
@@ -400,6 +460,7 @@ void SharedDriverData::_aio_thread()
   Task *t = nullptr;
   int r = 0;
   uint64_t lba_off, lba_count;
+
   ceph::coarse_real_clock::time_point cur, start
     = ceph::coarse_real_clock::now();
   while (true) {
@@ -414,6 +475,7 @@ void SharedDriverData::_aio_thread()
     }
 
     for (; t; t = t->next) {
+      t->queue = this;
       lba_off = t->offset / sector_size;
       lba_count = t->len / sector_size;
       switch (t->command) {
@@ -432,7 +494,7 @@ void SharedDriverData::_aio_thread()
           if (r < 0) {
             derr << __func__ << " failed to do write command" << dendl;
             t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
-            t->release_segs();
+            t->release_segs(this);
             delete t;
             ceph_abort();
           }
@@ -455,7 +517,7 @@ void SharedDriverData::_aio_thread()
               data_buf_reset_sgl, data_buf_next_sge);
           if (r < 0) {
             derr << __func__ << " failed to read" << dendl;
-            t->release_segs();
+            t->release_segs(this);
             delete t;
             ceph_abort();
           } else {
@@ -471,7 +533,7 @@ void SharedDriverData::_aio_thread()
           r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
           if (r < 0) {
             derr << __func__ << " failed to flush" << dendl;
-            t->release_segs();
+            t->release_segs(this);
             delete t;
             ceph_abort();
           } else {
@@ -500,10 +562,13 @@ void SharedDriverData::_aio_thread()
           flush_cond.Signal();
       }
 
-
       if (!inflight) {
-        for (auto &&it : registered_devices)
-          it->reap_ioc();
+        // be careful, here we need to let each thread reap its own, currently it is done
+        // by only one dedicatd dpdk thread
+        if(!queueid) {
+          for (auto &&it : driver->registered_devices)
+            it->reap_ioc();
+        }
 
         Mutex::Locker l(queue_lock);
         if (queue_empty.load()) {
@@ -568,7 +633,7 @@ class NVMEManager {
     // only support one device per osd now!
     assert(shared_driver_datas.empty());
     // index 0 is occured by master thread
-    shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, rte_get_next_lcore(-1, 1, 0), sn_tag, c, ns));
+    shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
     *driver = shared_driver_datas.back();
   }
 };
@@ -755,17 +820,18 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
 {
   Task *task = static_cast<Task*>(t);
   IOContext *ctx = task->ctx;
-  SharedDriverData *driver = task->device->get_driver();
+  SharedDriverQueueData *queue = task->queue;
 
+  assert(queue != NULL);
   assert(ctx != NULL);
-  ++driver->completed_op_seq;
+  ++queue->completed_op_seq;
   auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
       ceph::coarse_real_clock::now() - task->start);
   if (task->command == IOCommand::WRITE_COMMAND) {
-    driver->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
+    queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
     assert(!spdk_nvme_cpl_is_error(completion));
     dout(20) << __func__ << " write/zero op successfully, left "
-             << driver->queue_op_seq - driver->completed_op_seq << dendl;
+             << queue->queue_op_seq - queue->completed_op_seq << dendl;
     // check waiting count before doing callback (which may
     // destroy this ioc).
     if (!--ctx->num_running) {
@@ -774,14 +840,14 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
         task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
       }
     }
-    task->release_segs();
+    task->release_segs(queue);
     delete task;
   } else if (task->command == IOCommand::READ_COMMAND) {
-    driver->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
+    queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
     assert(!spdk_nvme_cpl_is_error(completion));
     dout(20) << __func__ << " read op successfully" << dendl;
     task->fill_cb();
-    task->release_segs();
+    task->release_segs(queue);
     // read submitted by AIO
     if(!task->return_code) {
       if (!--ctx->num_running) {
@@ -800,7 +866,7 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
   } else {
     assert(task->command == IOCommand::FLUSH_COMMAND);
     assert(!spdk_nvme_cpl_is_error(completion));
-    driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
+    queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
     dout(20) << __func__ << " flush op successfully" << dendl;
     task->return_code = 0;
     ctx->aio_wake();
@@ -907,10 +973,15 @@ int NVMEDevice::flush()
 {
   dout(10) << __func__ << " start" << dendl;
   auto start = ceph::coarse_real_clock::now();
-  driver->flush_wait();
+
+  if(queue_id == -1)
+    queue_id = ceph_gettid();
+  SharedDriverQueueData *queue = driver->get_queue(queue_id);
+  assert(queue != NULL);
+  queue->flush_wait();
   auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
       ceph::coarse_real_clock::now() - start);
-  driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
+  queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
   return 0;
 }
 
@@ -926,7 +997,9 @@ void NVMEDevice::aio_submit(IOContext *ioc)
     ioc->num_pending -= pending;
     assert(ioc->num_pending.load() == 0);  // we should be only thread doing this
     // Only need to push the first entry
-    driver->queue_task(t, pending);
+  if(queue_id == -1)
+    queue_id = ceph_gettid();
+    driver->get_queue(queue_id)->queue_task(t, pending);
     ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
   }
 }
@@ -954,7 +1027,9 @@ int NVMEDevice::aio_write(
 
   if (buffered) {
     // Only need to push the first entry
-    driver->queue_task(t);
+    if(queue_id == -1)
+      queue_id = ceph_gettid();
+    driver->get_queue(queue_id)->queue_task(t);
   } else {
     t->ctx = ioc;
     Task *first = static_cast<Task*>(ioc->nvme_task_first);
@@ -992,7 +1067,9 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
     t->copy_to_buf(buf, 0, t->len);
   };
   ++ioc->num_reading;
-  driver->queue_task(t);
+  if(queue_id == -1)
+    queue_id = ceph_gettid();
+  driver->get_queue(queue_id)->queue_task(t);
 
   while(t->return_code > 0) {
     t->io_wait();
@@ -1038,8 +1115,6 @@ int NVMEDevice::aio_read(
   return 0;
 }
 
-
-
 int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
 {
   assert(len > 0);
@@ -1058,7 +1133,9 @@ int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered
     t->copy_to_buf(buf, off-t->offset, len);
   };
   ++ioc.num_reading;
-  driver->queue_task(t);
+  if(queue_id == -1)
+    queue_id = ceph_gettid();
+  driver->get_queue(queue_id)->queue_task(t);
 
   while(t->return_code > 0) {
     t->io_wait();