]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
NVMEDevice: add buffer write support
authorHaomai Wang <haomai@xsky.com>
Tue, 19 Jan 2016 10:26:58 +0000 (18:26 +0800)
committerHaomai Wang <haomai@xsky.com>
Mon, 1 Feb 2016 14:02:18 +0000 (22:02 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/os/bluestore/NVMEDevice.cc
src/os/bluestore/NVMEDevice.h

index 3de8f3b595cf2b008cc875d3668517deb2835120..ad96d34c3b979dd2bccd7c632ad90800e58d4b40 100644 (file)
@@ -70,22 +70,26 @@ static void io_complete(void *t, const struct nvme_completion *completion) {
   lat -= task->start;
   if (task->command == IOCommand::WRITE_COMMAND) {
     task->device->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, lat);
-    auto left = ctx->num_running.dec();
     assert(!nvme_completion_is_error(completion));
-    // check waiting count before doing callback (which may
-    // destroy this ioc).
     dout(20) << __func__ << " write op successfully, left " << left << dendl;
-    if (!left) {
-      if (ctx->num_waiting.read()) {
-        Mutex::Locker l(ctx->lock);
-        ctx->cond.Signal();
-      }
-      if (task->device->aio_callback && ctx->priv) {
-        task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+    // buffer write won't have ctx, and we will free request later, see `flush`
+    if (ctx) {
+      // check waiting count before doing callback (which may
+      // destroy this ioc).
+      if (!ctx->num_running.dec()) {
+        if (ctx->num_waiting.read()) {
+          Mutex::Locker l(ctx->lock);
+          ctx->cond.Signal();
+        }
+        if (task->device->aio_callback && ctx->priv) {
+          task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+        }
       }
+      rte_free(task->buf);
+      rte_mempool_put(task_pool, task);
+    } else {
+      task->device->queue_buffer_task(task);
     }
-    rte_free(task->buf);
-    rte_mempool_put(task_pool, task);
   } else if (task->command == IOCommand::READ_COMMAND) {
     task->device->logger->tinc(l_bluestore_nvmedevice_read_lat, lat);
     ctx->num_reading.dec();
@@ -312,6 +316,7 @@ NVMEDevice::NVMEDevice(aio_callback_t cb, void *cbpriv)
       aio_thread(this),
       flush_lock("NVMEDevice::flush_lock"),
       flush_waiters(0),
+      buffer_lock("NVMEDevice::buffer_lock"),
       logger(nullptr),
       inflight_ops(0),
       aio_callback(cb),
@@ -416,7 +421,7 @@ void NVMEDevice::_aio_thread()
 
   Task *t;
   int r = 0;
-  const int max = 16;
+  const int max = 4;
   uint64_t lba_off, lba_count;
   utime_t lat, start = ceph_clock_now(g_ceph_context);
   while (true) {
@@ -443,7 +448,9 @@ void NVMEDevice::_aio_thread()
         logger->tinc(l_bluestore_nvmedevice_polling_lat, lat);
         if (aio_stop)
           break;
+        dout(20) << __func__ << " enter sleep" << dendl;
         queue_cond.Wait(queue_lock);
+        dout(20) << __func__ << " exit sleep" << dendl;
         start = ceph_clock_now(g_ceph_context);
       }
     }
@@ -467,7 +474,6 @@ void NVMEDevice::_aio_thread()
             lat = ceph_clock_now(g_ceph_context);
             lat -= t->start;
             logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, lat);
-            inflight_ops.inc();
             t = t->next;
           }
           break;
@@ -485,7 +491,6 @@ void NVMEDevice::_aio_thread()
             Mutex::Locker l(t->ctx->lock);
             t->ctx->cond.Signal();
           } else {
-            inflight_ops.inc();
             lat = ceph_clock_now(g_ceph_context);
             lat -= t->start;
             logger->tinc(l_bluestore_nvmedevice_read_queue_lat, lat);
@@ -502,7 +507,6 @@ void NVMEDevice::_aio_thread()
             Mutex::Locker l(t->ctx->lock);
             t->ctx->cond.Signal();
           } else {
-            inflight_ops.inc();
             lat = ceph_clock_now(g_ceph_context);
             lat -= t->start;
             logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, lat);
@@ -511,16 +515,14 @@ void NVMEDevice::_aio_thread()
         }
       }
     } else if (inflight_ops.read()) {
-      dout(20) << __func__ << " idle, have a pause" << dendl;
-
+      nvme_ctrlr_process_io_completions(ctrlr, max);
+      dout(30) << __func__ << " idle, have a pause" << dendl;
 #ifdef HAVE_SSE
       _mm_pause();
 #else
       usleep(10);
 #endif
     }
-
-    nvme_ctrlr_process_io_completions(ctrlr, max);
     reap_ioc();
   }
   nvme_unregister_io_thread();
@@ -541,6 +543,18 @@ int NVMEDevice::flush()
     }
     flush_waiters.dec();
   }
+  Task *t = nullptr;
+  {
+    Mutex::Locker l(buffer_lock);
+    buffered_extents.clear();
+    t = buffered_task_head;
+    buffered_task_head = nullptr;
+  }
+  while (t) {
+    rte_free(t->buf);
+    rte_mempool_put(task_pool, t);
+    t = t->next;
+  }
   utime_t lat = ceph_clock_now(g_ceph_context);
   lat -= start;
   logger->tinc(l_bluestore_nvmedevice_flush_lat, lat);
@@ -595,13 +609,8 @@ void NVMEDevice::aio_submit(IOContext *ioc)
     ioc->num_running.add(pending);
     ioc->num_pending.sub(pending);
     assert(ioc->num_pending.read() == 0);  // we should be only thread doing this
-    Mutex::Locker l(queue_lock);
     // Only need to push the first entry
-    task_queue.push(t);
-    if (queue_empty.read()) {
-      queue_empty.dec();
-      queue_cond.Signal();
-    }
+    queue_task(t, pending);
     ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
   }
 }
@@ -613,7 +622,8 @@ int NVMEDevice::aio_write(
     bool buffered)
 {
   uint64_t len = bl.length();
-  dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+  dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
+           << " buffered " << buffered << dendl;
   assert(off % block_size == 0);
   assert(len % block_size == 0);
   assert(len > 0);
@@ -623,7 +633,6 @@ int NVMEDevice::aio_write(
   Task *t;
   int r = rte_mempool_get(task_pool, (void **)&t);
   if (r < 0) {
-    derr << __func__ << " task_pool rte_mempool_get failed" << dendl;
     return r;
   }
   t->start = ceph_clock_now(g_ceph_context);
@@ -636,24 +645,30 @@ int NVMEDevice::aio_write(
   }
   bl.copy(0, len, static_cast<char*>(t->buf));
 
-  t->ctx = ioc;
   t->command = IOCommand::WRITE_COMMAND;
   t->offset = off;
   t->len = len;
   t->device = this;
   t->return_code = 0;
+  t->next = nullptr;
 
   if (buffered) {
+    t->ctx = nullptr;
+    // Only need to push the first entry
+    queue_task(t);
+    Mutex::Locker l(buffer_lock);
+    buffered_extents.insert(off, len, (char*)t->buf);
+  } else {
+    t->ctx = ioc;
+    Task *first = static_cast<Task*>(ioc->nvme_task_first);
+    Task *last = static_cast<Task*>(ioc->nvme_task_last);
+    if (last)
+      last->next = t;
+    if (!first)
+      ioc->nvme_task_first = t;
+    ioc->nvme_task_last = t;
+    ioc->num_pending.inc();
   }
-  Task *first = static_cast<Task*>(ioc->nvme_task_first);
-  Task *last = static_cast<Task*>(ioc->nvme_task_last);
-  if (last)
-    last->next = t;
-  t->next = nullptr;
-  if (!first)
-    ioc->nvme_task_first = t;
-  ioc->nvme_task_last = t;
-  ioc->num_pending.inc();
 
   dout(5) << __func__ << " " << off << "~" << len << dendl;
 
@@ -719,14 +734,7 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
   t->return_code = 1;
   t->next = nullptr;
   ioc->num_reading.inc();;
-  {
-    Mutex::Locker l(queue_lock);
-    task_queue.push(t);
-    if (queue_empty.read()) {
-      queue_empty.dec();
-      queue_cond.Signal();
-    }
-  }
+  queue_task(t);
 
   {
     Mutex::Locker l(ioc->lock);
@@ -734,6 +742,11 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
       ioc->cond.Wait(ioc->lock);
   }
   memcpy(p.c_str(), t->buf, len);
+  {
+    Mutex::Locker l(buffer_lock);
+    uint64_t copied = buffered_extents.read_overlap(off, len, (char*)t->buf);
+    dout(10) << __func__ << " read from buffer " << copied << dendl;
+  }
   pbl->clear();
   pbl->push_back(p);
   r = t->return_code;
@@ -782,14 +795,7 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf)
   t->return_code = 1;
   t->next = nullptr;
   ioc.num_reading.inc();;
-  {
-    Mutex::Locker l(queue_lock);
-    task_queue.push(t);
-    if (queue_empty.read()) {
-      queue_empty.dec();
-      queue_cond.Signal();
-    }
-  }
+  queue_task(t);
 
   {
     Mutex::Locker l(ioc.lock);
@@ -797,6 +803,11 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf)
       ioc.cond.Wait(ioc.lock);
   }
   memcpy(buf, (char*)t->buf+off-aligned_off, len);
+  {
+    Mutex::Locker l(buffer_lock);
+    uint64_t copied = buffered_extents.read_overlap(off, len, buf);
+    dout(10) << __func__ << " read from buffer " << copied << dendl;
+  }
   r = t->return_code;
   rte_free(t->buf);
   rte_mempool_put(task_pool, t);
index 9ef705347069176c78b9b19d9b8f59753afa4228..46637c55bcb3ddf82c193936eed5b151153b31a4 100644 (file)
@@ -18,7 +18,9 @@
 #define CEPH_OS_BLUESTORE_NVMEDEVICE
 
 #include <queue>
+#include <map>
 #include <pciaccess.h>
+#include <limits>
 
 // since _Static_assert introduced in c11
 #define _Static_assert static_assert
@@ -35,6 +37,7 @@ extern "C" {
 #endif
 
 #include "include/atomic.h"
+#include "include/interval_set.h"
 #include "include/utime.h"
 #include "common/Mutex.h"
 #include "BlockDevice.h"
@@ -81,6 +84,16 @@ class NVMEDevice : public BlockDevice {
   Cond queue_cond;
   std::queue<Task*> task_queue;
 
+  void queue_task(Task *t, uint64_t ops = 1) {
+    inflight_ops.add(ops);
+    Mutex::Locker l(queue_lock);
+    task_queue.push(t);
+    if (queue_empty.read()) {
+      queue_empty.dec();
+      queue_cond.Signal();
+    }
+  }
+
   struct AioCompletionThread : public Thread {
     NVMEDevice *dev;
     AioCompletionThread(NVMEDevice *b) : dev(b) {}
@@ -96,7 +109,156 @@ class NVMEDevice : public BlockDevice {
   Cond flush_cond;
   atomic_t flush_waiters;
 
+  struct BufferedExtents {
+    struct Extent {
+      uint64_t x_len;
+      uint64_t x_off;
+      const char *data;
+      uint64_t data_len;
+    };
+    using Offset = uint64_t;
+    map<Offset, Extent> buffered_extents;
+    uint64_t left_edge = std::numeric_limits<uint64_t>::max();
+    uint64_t right_edge = 0;
+
+    void verify() {
+      interval_set<uint64_t> m;
+      for (auto && it : buffered_extents) {
+        assert(!m.intersects(it.first, it.second.x_len));
+        m.insert(it.first, it.second.x_len);
+      }
+    }
+
+    void insert(uint64_t off, uint64_t len, const char *data) {
+      auto it = buffered_extents.lower_bound(off);
+      if (it != buffered_extents.begin()) {
+        --it;
+        if (it->first + it->second.x_len <= off)
+          ++it;
+      }
+      uint64_t end = off + len;
+      if (off < left_edge)
+        left_edge = off;
+      if (end > right_edge)
+        right_edge = end;
+      while (it != buffered_extents.end()) {
+        if (it->first >= end)
+          break;
+        uint64_t extent_it_end = it->first + it->second.x_len;
+        assert(extent_it_end >= off);
+        if (it->first <= off) {
+          if (extent_it_end > end) {
+            //         <-     data    ->
+            // <-            it           ->
+            it->second.x_len -= (extent_it_end - off);
+            buffered_extents[end] = Extent{
+                extent_it_end - end, it->second.x_off + it->second.x_len + len, it->second.data, it->second.data_len};
+          } else {
+            //         <-     data    ->
+            // <-     it    ->
+            assert(extent_it_end <= end);
+            it->second.x_len -= (extent_it_end - off);
+          }
+          ++it;
+        } else {
+          assert(it->first > off) ;
+          if (extent_it_end > end) {
+            //  <-     data    ->
+            //      <-           it          ->
+            uint64_t overlap = end - it->first;
+            buffered_extents[end] = Extent{
+                it->second.x_len - overlap, it->second.x_off + overlap, it->second.data, it->second.data_len};
+          } else {
+            //  <-     data    ->
+            //      <- it ->
+          }
+          buffered_extents.erase(it++);
+        }
+      }
+      buffered_extents[off] = Extent{
+          len, 0, data, len};
+
+      if (0)
+        verify();
+    }
+
+    void memcpy_check(char *dst, uint64_t dst_raw_len, uint64_t dst_off,
+                      map<Offset, Extent>::iterator &it, uint64_t src_off, uint64_t copylen) {
+      if (0) {
+        assert(dst_off + copylen <= dst_raw_len);
+        assert(it->second.x_off + src_off + copylen <= it->second.data_len);
+      }
+      memcpy(dst + dst_off, it->second.data + it->second.x_off + src_off, copylen);
+    }
+
+    uint64_t read_overlap(uint64_t off, uint64_t len, char *buf) {
+      uint64_t end = off + len;
+      if (end <= left_edge || off >= right_edge)
+        return 0;
+
+      uint64_t copied = 0;
+      auto it = buffered_extents.lower_bound(off);
+      if (it != buffered_extents.begin()) {
+        --it;
+        if (it->first + it->second.x_len <= off)
+          ++it;
+      }
+      uint64_t copy_len;
+      while (it != buffered_extents.end()) {
+        if (it->first >= end)
+          break;
+        uint64_t extent_it_end = it->first + it->second.x_len;
+        assert(extent_it_end >= off);
+        if (it->first >= off) {
+          if (extent_it_end > end) {
+            //  <-     data    ->
+            //      <-           it          ->
+            copy_len = len - (it->first - off);
+            memcpy_check(buf, len, it->first - off, it, 0, copy_len);
+          } else {
+            //  <-     data    ->
+            //      <- it ->
+            copy_len = it->second.x_len;
+            memcpy_check(buf, len, it->first - off, it, 0, copy_len);
+          }
+        } else {
+          if (extent_it_end > end) {
+            //         <-     data    ->
+            // <-           it          ->
+            copy_len = len;
+            memcpy_check(buf, len, 0, it, off - it->first, copy_len);
+          } else {
+            //         <-     data    ->
+            // <-     it    ->
+            assert(extent_it_end <= end);
+            copy_len = it->first + it->second.x_len - off;
+            memcpy_check(buf, len, 0, it, off - it->first, copy_len);
+          }
+        }
+        copied += copy_len;
+        ++it;
+      }
+      return copied;
+    }
+
+    void clear() {
+      buffered_extents.clear();
+      left_edge = std::numeric_limits<uint64_t>::max();
+      right_edge = 0;
+    }
+  };
+  Mutex buffer_lock;
+  BufferedExtents buffered_extents;
+  Task *buffered_task_head = nullptr;
+
   static void init();
+ public:
+  void queue_buffer_task(Task *t) {
+    Mutex::Locker l(buffer_lock);
+    assert(t->next == nullptr);
+    t->next = buffered_task_head;
+    buffered_task_head = t;
+  }
 
  public:
   PerfCounters *logger;