]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: add an async flush
authorJosh Durgin <josh.durgin@inktank.com>
Thu, 21 Mar 2013 23:04:10 +0000 (16:04 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Thu, 28 Mar 2013 17:46:59 +0000 (10:46 -0700)
At this point it's a simple wrapper around the ObjectCacher or
librados.

This is needed for QEMU so that its main thread can continue while a
flush is occurring. Since this will be backported, don't update the
librbd version yet, just add a #define that QEMU and others can use to
detect the presence of aio_flush().

Refs: #3737
Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/include/rbd/librbd.h
src/include/rbd/librbd.hpp
src/librbd/AioCompletion.h
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/internal.cc
src/librbd/internal.h
src/librbd/librbd.cc
src/test/librbd/test_librbd.cc

index f41641bec7ef4f746a9fd7374b4bddd750ce619f..dcd6d227347794e7b0aeb702cdd30b90f68c87bd 100644 (file)
@@ -38,6 +38,7 @@ extern "C" {
 #define LIBRBD_VERSION_CODE LIBRBD_VERSION(LIBRBD_VER_MAJOR, LIBRBD_VER_MINOR, LIBRBD_VER_EXTRA)
 
 #define LIBRBD_SUPPORTS_WATCH 0
+#define LIBRBD_SUPPORTS_AIO_FLUSH 1
 
 typedef void *rbd_snap_t;
 typedef void *rbd_image_t;
@@ -323,6 +324,15 @@ int rbd_aio_wait_for_complete(rbd_completion_t c);
 ssize_t rbd_aio_get_return_value(rbd_completion_t c);
 void rbd_aio_release(rbd_completion_t c);
 int rbd_flush(rbd_image_t image);
+/**
+ * Start a flush if caching is enabled. Get a callback when
+ * the currently pending writes are on disk.
+ *
+ * @param image the image to flush writes to
+ * @param c what to call when flushing is complete
+ * @returns 0 on success, negative error code on failure
+ */
+int rbd_aio_flush(rbd_image_t image, rbd_completion_t c);
 
 #ifdef __cplusplus
 }
index c256df267f9a9c24035889d7384cbefb4f935ecc..a7bfcf43233c83b7daf08f5217fb8d634f8e348e 100644 (file)
@@ -181,6 +181,15 @@ public:
   int aio_discard(uint64_t off, uint64_t len, RBD::AioCompletion *c);
 
   int flush();
+  /**
+   * Start a flush if caching is enabled. Get a callback when
+   * the currently pending writes are on disk.
+   *
+   * @param image the image to flush writes to
+   * @param c what to call when flushing is complete
+   * @returns 0 on success, negative error code on failure
+   */
+  int aio_flush(RBD::AioCompletion *c);
 
 private:
   friend class RBD;
index 8420d37c8bd6f4cfcd5147126110e3d0d4ffc737..899586d51a7a985203f45b549ded6cf25cef1664 100644 (file)
@@ -24,6 +24,7 @@ namespace librbd {
     AIO_TYPE_READ = 0,
     AIO_TYPE_WRITE,
     AIO_TYPE_DISCARD,
+    AIO_TYPE_FLUSH,
     AIO_TYPE_NONE,
   } aio_type_t;
 
@@ -104,12 +105,14 @@ namespace librbd {
        complete_cb(rbd_comp, complete_arg);
       }
       switch (aio_type) {
-      case AIO_TYPE_READ: 
+      case AIO_TYPE_READ:
        ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
       case AIO_TYPE_WRITE:
        ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
       case AIO_TYPE_DISCARD:
        ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
+      case AIO_TYPE_FLUSH:
+       ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
       default:
        lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl;
        break;
index cddd0d44ac67cc1325a7cfff4c2bec1f492f0bf9..08c72ef4a60ba67a433cd066e70a2495deb28efa 100644 (file)
@@ -220,6 +220,8 @@ namespace librbd {
     plb.add_u64_counter(l_librbd_aio_discard, "aio_discard");
     plb.add_u64_counter(l_librbd_aio_discard_bytes, "aio_discard_bytes");
     plb.add_time_avg(l_librbd_aio_discard_latency, "aio_discard_latency");
+    plb.add_u64_counter(l_librbd_aio_flush, "aio_flush");
+    plb.add_time_avg(l_librbd_aio_flush_latency, "aio_flush_latency");
     plb.add_u64_counter(l_librbd_snap_create, "snap_create");
     plb.add_u64_counter(l_librbd_snap_remove, "snap_remove");
     plb.add_u64_counter(l_librbd_snap_rollback, "snap_rollback");
@@ -522,24 +524,26 @@ namespace librbd {
     }
   }
 
+  void ImageCtx::flush_cache_aio(Context *onfinish) {
+    cache_lock.Lock();
+    object_cacher->flush_set(object_set, onfinish);
+    cache_lock.Unlock();
+  }
+
   int ImageCtx::flush_cache() {
     int r = 0;
     Mutex mylock("librbd::ImageCtx::flush_cache");
     Cond cond;
     bool done;
     Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
-    cache_lock.Lock();
-    bool already_flushed = object_cacher->flush_set(object_set, onfinish);
-    cache_lock.Unlock();
-    if (!already_flushed) {
-      mylock.Lock();
-      while (!done) {
-       ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
-       cond.Wait(mylock);
-      }
-      mylock.Unlock();
-      ldout(cct, 20) << "finished flushing cache" << dendl;
+    flush_cache_aio(onfinish);
+    mylock.Lock();
+    while (!done) {
+      ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+      cond.Wait(mylock);
     }
+    mylock.Unlock();
+    ldout(cct, 20) << "finished flushing cache" << dendl;
     return r;
   }
 
index 550d747524be1343c0cea9a55307a52c30eef54e..fde24fb3e247543d19462234c1f5fefc4b9e8719 100644 (file)
@@ -132,6 +132,7 @@ namespace librbd {
                        Context *onfinish);
     int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off);
     void user_flushed();
+    void flush_cache_aio(Context *onfinish);
     int flush_cache();
     void shutdown_cache();
     void invalidate_cache();
index fcefdc43161e77d5ffc88757465257b1eac7c6fa..09348171eb685e12b702f01213c9aa3ee8d01791 100644 (file)
@@ -2435,6 +2435,12 @@ reprotect_and_return_err:
     req->complete(rados_aio_get_return_value(c));
   }
 
+  void rados_ctx_cb(rados_completion_t c, void *arg)
+  {
+    Context *comp = reinterpret_cast<Context *>(arg);
+    comp->complete(rados_aio_get_return_value(c));
+  }
+
   // validate extent against image size; clip to image size if necessary
   int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len)
   {
@@ -2463,6 +2469,36 @@ reprotect_and_return_err:
     return 0;
   }
 
+  int aio_flush(ImageCtx *ictx, AioCompletion *c)
+  {
+    CephContext *cct = ictx->cct;
+    ldout(cct, 20) << "aio_flush " << ictx << " completion " << c <<  dendl;
+
+    int r = ictx_check(ictx);
+    if (r < 0)
+      return r;
+
+    ictx->user_flushed();
+
+    c->get();
+    c->add_request();
+    c->init_time(ictx, AIO_TYPE_FLUSH);
+    C_AioWrite *req_comp = new C_AioWrite(cct, c);
+    if (ictx->object_cacher) {
+      ictx->flush_cache_aio(req_comp);
+    } else {
+      librados::AioCompletion *rados_completion =
+       librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
+      ictx->data_ctx.aio_flush_async(rados_completion);
+      rados_completion->release();
+    }
+    c->finish_adding_requests(cct);
+    c->put();
+    ictx->perfcounter->inc(l_librbd_aio_flush);
+
+    return 0;
+  }
+
   int flush(ImageCtx *ictx)
   {
     CephContext *cct = ictx->cct;
index 7c06bf9e7682fe61575224fcc3b7049d5b74e8d9..f1392f690a257e2873a7ab232e5311986a3b034c 100644 (file)
@@ -37,6 +37,8 @@ enum {
   l_librbd_aio_discard,
   l_librbd_aio_discard_bytes,
   l_librbd_aio_discard_latency,
+  l_librbd_aio_flush,
+  l_librbd_aio_flush_latency,
 
   l_librbd_snap_create,
   l_librbd_snap_remove,
@@ -177,6 +179,7 @@ namespace librbd {
               char *buf, bufferlist *pbl, AioCompletion *c);
   int aio_read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
               char *buf, bufferlist *pbl, AioCompletion *c);
+  int aio_flush(ImageCtx *ictx, AioCompletion *c);
   int flush(ImageCtx *ictx);
   int _flush(ImageCtx *ictx);
 
index a155abb03764da779f241b7f4a3a28b99094cd2c..02d8fbf3211672a0cbe196ab58aacd73f3159e16 100644 (file)
@@ -481,6 +481,12 @@ namespace librbd {
     return librbd::flush(ictx);
   }
 
+  int Image::aio_flush(RBD::AioCompletion *c)
+  {
+    ImageCtx *ictx = (ImageCtx *)ctx;
+    return librbd::aio_flush(ictx, (librbd::AioCompletion *)c->pc);
+  }
+
 } // namespace librbd
 
 extern "C" void rbd_version(int *major, int *minor, int *extra)
@@ -1066,6 +1072,13 @@ extern "C" int rbd_flush(rbd_image_t image)
   return librbd::flush(ictx);
 }
 
+extern "C" int rbd_aio_flush(rbd_image_t image, rbd_completion_t c)
+{
+  librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
+  librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
+  return librbd::aio_flush(ictx, (librbd::AioCompletion *)comp->pc);
+}
+
 extern "C" int rbd_aio_is_complete(rbd_completion_t c)
 {
   librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
index 9cde09dcdbe79b1de810c277266b0aa10256cbf1..e38d317485fc2e571fec0c08c32f1a882fa708a9 100644 (file)
@@ -1391,3 +1391,106 @@ TEST(LibRBD, LockingPP)
   ioctx.close();
   ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
 }
+
+TEST(LibRBD, FlushAio)
+{
+  rados_t cluster;
+  rados_ioctx_t ioctx;
+  string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool(pool_name, &cluster));
+  rados_ioctx_create(cluster, pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  const char *name = "testimg";
+  uint64_t size = 2 << 20;
+  size_t num_aios = 256;
+
+  ASSERT_EQ(0, create_image(ioctx, name, size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
+
+  char test_data[TEST_IO_SIZE + 1];
+  size_t i;
+  for (i = 0; i < TEST_IO_SIZE; ++i) {
+    test_data[i] = (char) (rand() % (126 - 33) + 33);
+  }
+
+  rbd_completion_t write_comps[num_aios];
+  for (i = 0; i < num_aios; ++i) {
+    ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &write_comps[i]));
+    uint64_t offset = rand() % (size - TEST_IO_SIZE);
+    ASSERT_EQ(0, rbd_aio_write(image, offset, TEST_IO_SIZE, test_data,
+                              write_comps[i]));
+  }
+
+  rbd_completion_t flush_comp;
+  ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &flush_comp));
+  ASSERT_EQ(0, rbd_aio_flush(image, flush_comp));
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(flush_comp));
+  ASSERT_EQ(1, rbd_aio_is_complete(flush_comp));
+  rbd_aio_release(flush_comp);
+
+  for (i = 0; i < num_aios; ++i) {
+    ASSERT_EQ(1, rbd_aio_is_complete(write_comps[i]));
+    rbd_aio_release(write_comps[i]);
+  }
+
+  ASSERT_EQ(0, rbd_close(image));
+  ASSERT_EQ(0, rbd_remove(ioctx, name));
+  rados_ioctx_destroy(ioctx);
+  ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster));
+}
+
+TEST(LibRBD, FlushAioPP)
+{
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  string pool_name = get_temp_pool_name();
+
+  ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+  ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    const char *name = "testimg";
+    uint64_t size = 2 << 20;
+    size_t num_aios = 256;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
+
+    char test_data[TEST_IO_SIZE + 1];
+    size_t i;
+    for (i = 0; i < TEST_IO_SIZE; ++i) {
+      test_data[i] = (char) (rand() % (126 - 33) + 33);
+    }
+
+    librbd::RBD::AioCompletion *write_comps[num_aios];
+    for (i = 0; i < num_aios; ++i) {
+      ceph::bufferlist bl;
+      bl.append(test_data, strlen(test_data));
+      write_comps[i] = new librbd::RBD::AioCompletion(NULL, NULL);
+      uint64_t offset = rand() % (size - TEST_IO_SIZE);
+      ASSERT_EQ(0, image.aio_write(offset, TEST_IO_SIZE, bl,
+                                  write_comps[i]));
+    }
+
+    librbd::RBD::AioCompletion *flush_comp =
+      new librbd::RBD::AioCompletion(NULL, NULL);
+    ASSERT_EQ(0, image.aio_flush(flush_comp));
+    ASSERT_EQ(0, flush_comp->wait_for_complete());
+    ASSERT_EQ(1, flush_comp->is_complete());
+    delete flush_comp;
+
+    for (i = 0; i < num_aios; ++i) {
+      librbd::RBD::AioCompletion *comp = write_comps[i];
+      ASSERT_EQ(1, comp->is_complete());
+      delete comp;
+    }
+  }
+
+  ioctx.close();
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}