]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
NVMEDevice: make read/write all async
authorHaomai Wang <haomai@xsky.com>
Tue, 5 Jan 2016 15:50:15 +0000 (23:50 +0800)
committerHaomai Wang <haomai@xsky.com>
Mon, 1 Feb 2016 14:00:44 +0000 (22:00 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/os/bluestore/BlockDevice.h
src/os/bluestore/NVMEDevice.cc
src/os/bluestore/NVMEDevice.h

index 03fa9707ab4178b1ad974d389236c52dc9cecfbc..4564de5c08d845517682b2516c7442268f9ab13f 100644 (file)
@@ -24,8 +24,7 @@
 struct IOContext {
   void *priv;
 #ifdef HAVE_SPDK
-  bool done = false;
-  void *backend;
+  void *backend_priv;
 #endif
 
   Mutex lock;
index fff435bf778e7392b73bc2c64e066f92f7eb199c..068c7752dfd33b87da80d321f88ab02379a46dd1 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "bdev "
 
-void IOContext::aio_wait()
-{
-  Mutex::Locker l(lock);
-  // see _aio_thread for waker logic
-  num_waiting.inc();
-  while (num_running.read() > 0 || num_reading.read() > 0) {
-    dout(10) << __func__ << " " << this
-    << " waiting for " << num_running.read() << " aios and/or "
-    << num_reading.read() << " readers to complete" << dendl;
-    cond.Wait(lock);
-  }
-  num_waiting.dec();
-  dout(20) << __func__ << " " << this << " done" << dendl;
-}
-
-static void io_complete(void *ctx, const struct nvme_completion *completion) {
-  if (nvme_completion_is_error(completion)) {
-    assert(0);
-  }
-
-  IOContext *ioc = (IOContext*)ctx;
-  NVMEDevice *device = (NVMEDevice*)ioc->backend;
-  ioc->done = true;
-  if (ioc->priv) {
-    device->aio_callback(device->aio_callback_priv, ioc->priv);
+struct rte_mempool *request_mempool;
+static struct rte_mempool *task_pool;
+
+static void io_complete(void *t, const struct nvme_completion *completion) {
+  Task *task = static_cast<Task*>(t);
+  IOContext *ctx = task->ctx;
+  if (task->command == IOCommand::WRITE_COMMAND) {
+    auto left = ctx->num_running.dec();
+    assert(!nvme_completion_is_error(completion));
+    // check waiting count before doing callback (which may
+    // destroy this ioc).
+    if (!left) {
+      ctx->backend_priv = nullptr;
+      if (ctx->num_waiting.read()) {
+        Mutex::Locker l(ctx->lock);
+        ctx->cond.Signal();
+      }
+      if (ctx->priv) {
+        task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+      }
+    }
+    rte_free(task->buf);
+    rte_mempool_put(task_pool, task);
+  } else {
+    assert(task->command == IOCommand::READ_COMMAND);
+    ctx->num_reading.dec();
+    if (nvme_completion_is_error(completion))
+      task->read_code = -1; // FIXME
+    else
+      task->read_code = 0;
+    Mutex::Locker l(ctx->lock);
+    ctx->cond.Signal();
   }
 }
 
@@ -74,6 +81,9 @@ static void io_complete(void *ctx, const struct nvme_completion *completion) {
 NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv)
     : ctrlr(nullptr),
       ns(nullptr),
+      aio_stop(false),
+      queue_lock("NVMEDevice::queue_lock"),
+      aio_thread(this),
       aio_callback(cb),
       aio_callback_priv(cbpriv)
 {
@@ -82,7 +92,7 @@ NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv)
 }
 
 static char *ealargs[] = {
-    "perf",
+    "ceph-osd",
     "-c 0x1", /* This must be the second parameter. It is overwritten by index in main(). */
     "-n 4",
 };
@@ -97,6 +107,13 @@ void NVMEDevice::init()
                                        nvme_request_size(), 128, 0,
                                        NULL, NULL, NULL, NULL,
                                        SOCKET_ID_ANY, 0);
+  if (request_mempool == NULL)
+    assert(0);
+
+       task_pool = rte_mempool_create(
+      "task_pool", 8192, sizeof(Task),
+      64, 0, NULL, NULL, NULL, NULL,
+      SOCKET_ID_ANY, 0);
   if (request_mempool == NULL)
     assert(0);
 }
@@ -219,6 +236,8 @@ int NVMEDevice::open(string p)
 
   pci_iterator_destroy(iter);
 
+  aio_thread.create();
+
   dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)"
           << " block_size " << block_size << " (" << pretty_si_t(block_size)
           << "B)" << dendl;
@@ -229,10 +248,71 @@ int NVMEDevice::open(string p)
 void NVMEDevice::close()
 {
   dout(1) << __func__ << dendl;
-  nvme_unregister_io_thread();
+
+  aio_stop = true;
+  aio_thread.join();
+  aio_stop = false;
   name.clear();
 }
 
+void NVMEDevice::_aio_thread()
+{
+  dout(10) << __func__ << " start" << dendl;
+  if (nvme_register_io_thread() != 0) {
+    assert(0);
+  }
+
+  Task *t;
+  int r = 0;
+  const int max = 16;
+  while (!aio_stop) {
+    dout(40) << __func__ << " polling" << dendl;
+    {
+      Mutex::Locker l(queue_lock);
+      if (!task_queue.empty()) {
+        t = task_queue.front();
+        task_queue.pop();
+      }
+    }
+
+    if (t) {
+      switch (t->command) {
+        case IOCommand::WRITE_COMMAND:
+        {
+          while (t) {
+            r = nvme_ns_cmd_write(ns, t->buf, t->offset, t->len / block_size, io_complete, t);
+            if (r < 0) {
+              t->ctx->backend_priv = nullptr;
+              rte_free(t->buf);
+              rte_mempool_put(task_pool, t);
+              derr << __func__ << " failed to do write command" << dendl;
+              assert(0);
+            }
+            t = t->next;
+          }
+          break;
+        }
+        case IOCommand::READ_COMMAND:
+        {
+          r = nvme_ns_cmd_read(ns, t->buf, t->offset, t->len / block_size, io_complete, t);
+          if (r < 0) {
+            derr << __func__ << " failed to read" << dendl;
+            t->ctx->num_reading.dec();
+            t->read_code = r;
+            Mutex::Locker l(t->ctx->lock);
+            t->ctx->cond.Signal();
+          }
+          break;
+        }
+      }
+    }
+
+    nvme_ctrlr_process_io_completions(ctrlr, max);
+  }
+  nvme_unregister_io_thread();
+  dout(10) << __func__ << " end" << dendl;
+}
+
 int NVMEDevice::flush()
 {
   dout(10) << __func__ << " start" << dendl;
@@ -241,10 +321,17 @@ int NVMEDevice::flush()
 
 void NVMEDevice::aio_submit(IOContext *ioc)
 {
-  while (!ioc->done) {
-    nvme_ctrlr_process_io_completions(ctrlr, 0);
-    usleep(50);
-  }
+  dout(20) << __func__ << " ioc " << ioc << " pending "
+           << ioc->num_pending.read() << " running "
+           << ioc->num_running.read() << dendl;
+  Task *t = static_cast<Task*>(ioc->backend_priv);
+  int pending = ioc->num_pending.read();
+  ioc->num_running.add(pending);
+  ioc->num_pending.sub(pending);
+  assert(ioc->num_pending.read() == 0);  // we should be only thread doing this
+  Mutex::Locker l(queue_lock);
+  // Only need to push the first entry
+  task_queue.push(t);
 }
 
 int NVMEDevice::aio_write(
@@ -261,18 +348,34 @@ int NVMEDevice::aio_write(
   assert(off < size);
   assert(off + len <= size);
 
-  if (!bl.is_n_page_sized() || !bl.is_page_aligned()) {
-    dout(20) << __func__ << " rebuilding buffer to be page-aligned" << dendl;
-    bl.rebuild();
-  }
-
-  ioc->backend = this;
-  int rc = nvme_ns_cmd_write(ns, bl.c_str(), off,
-                         bl.length()/block_size, io_complete, ioc);
-  if (rc < 0) {
-    derr << __func__ << " failed to do write command" << dendl;
-    return rc;
+  Task *t;
+  int r = rte_mempool_get(task_pool, (void **)&t);
+  if (r < 0) {
+               derr << __func__ << " task_pool rte_mempool_get failed" << dendl;
+    return r;
+       }
+
+  t->buf = rte_malloc(NULL, bl.length(), block_size);
+       if (t->buf == NULL) {
+               derr << __func__ << " task->buf rte_malloc failed" << dendl;
+    rte_mempool_put(task_pool, t);
+    return -ENOMEM;
+       }
+  bl.copy(0, bl.length(), static_cast<char*>(t->buf));
+
+  t->ctx = ioc;
+  t->command = IOCommand::WRITE_COMMAND;
+  t->offset = off;
+  t->len = len;
+  t->device = this;
+  if (ioc->backend_priv) {
+    Task *prev = static_cast<Task*>(ioc->backend_priv);
+    prev->next = t;
+  } else {
+    ioc->backend_priv = t;
   }
+  t->next = nullptr;
+  ioc->num_pending.inc();
 
   dout(5) << __func__ << " " << off << "~" << len << dendl;
 
@@ -315,22 +418,52 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
   assert(off < size);
   assert(off + len <= size);
 
-  bufferptr p = buffer::create_page_aligned(len);
-  ioc->backend = this;
-  int r = nvme_ns_cmd_read(ns, p.c_str(), off, len / block_size, io_complete, ioc);
+  Task *t;
+  int r = rte_mempool_get(task_pool, (void **)&t);
   if (r < 0) {
-    r = -errno;
-    derr << __func__ << " failed to read" << dendl;
+    derr << __func__ << " task_pool rte_mempool_get failed" << dendl;
     return r;
   }
-  while (!ioc->done) {
-    nvme_ctrlr_process_io_completions(ctrlr, 0);
-    usleep(50);
+
+  bufferptr p = buffer::create_page_aligned(len);
+  t->buf = rte_malloc(NULL, len, block_size);
+  if (t->buf == NULL) {
+    derr << __func__ << " task->buf rte_malloc failed" << dendl;
+    r = -ENOMEM;
+    goto out;
+  }
+  t->ctx = ioc;
+  t->command = IOCommand::READ_COMMAND;
+  t->offset = off;
+  t->len = len;
+  t->device = this;
+  t->read_code = 1;
+  assert(!ioc->backend_priv);
+  ioc->num_reading.inc();;
+  {
+    Mutex::Locker l(queue_lock);
+    task_queue.push(t);
+  }
+
+  {
+    Mutex::Locker l(ioc->lock);
+    while (t->read_code > 0)
+      ioc->cond.Wait(ioc->lock);
   }
+  memcpy(p.c_str(), t->buf, len);
   pbl->clear();
   pbl->push_back(p);
-
-  return r < 0 ? r : 0;
+  r = t->read_code;
+  rte_free(t);
+
+ out:
+  rte_mempool_put(task_pool, t);
+  if (ioc->num_waiting.read()) {
+    dout(20) << __func__ << " waking waiter" << dendl;
+    Mutex::Locker l(ioc->lock);
+    ioc->cond.Signal();
+  }
+  return r;
 }
 
 int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
index 2e14fbab2d9df6986bcdc583ec92cf76b5ac70ef..cee0a571a345d6a3b338a3817659449c3a9de30a 100644 (file)
@@ -17,6 +17,7 @@
 #ifndef CEPH_OS_BLUESTORE_NVMEDEVICE
 #define CEPH_OS_BLUESTORE_NVMEDEVICE
 
+#include <queue>
 #include <pciaccess.h>
 
 // since _Static_assert introduced in c11
@@ -33,8 +34,29 @@ extern "C" {
 }
 #endif
 
+#include "include/atomic.h"
+#include "common/Mutex.h"
 #include "BlockDevice.h"
 
+enum class IOCommand {
+  READ_COMMAND,
+  WRITE_COMMAND
+};
+
+class NVMEDevice;
+
+struct Task {
+  NVMEDevice *device;
+  IOContext *ctx;
+  IOCommand command;
+  uint64_t offset, len;
+  void *buf;
+  union {
+    Task *next;
+    int64_t read_code;
+  };
+};
+
 class NVMEDevice : public BlockDevice {
   /**
    * points to pinned, physically contiguous memory region;
@@ -48,8 +70,24 @@ class NVMEDevice : public BlockDevice {
   uint64_t size;
   uint64_t block_size;
 
+  bool aio_stop;
   bufferptr zeros;
 
+  Mutex queue_lock;
+  Cond queue_cond;
+  std::queue<Task*> task_queue;
+
+  struct AioCompletionThread : public Thread {
+    NVMEDevice *dev;
+    AioCompletionThread(NVMEDevice *b) : dev(b) {}
+    void *entry() {
+      dev->_aio_thread();
+      return NULL;
+    }
+  } aio_thread;
+
+  void _aio_thread();
+
   static void init();
 
  public: