]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: add async flush interface
authorJosh Durgin <josh.durgin@inktank.com>
Wed, 27 Mar 2013 21:48:31 +0000 (14:48 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Thu, 28 Mar 2013 17:46:59 +0000 (10:46 -0700)
Sometimes you don't want flush to block, and can't modify
already scheduled aio_writes. This will be useful for a
librbd async flush interface.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/AioCompletionImpl.h
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/librados.cc
src/test/librados/aio.cc

index fa6ff982bb77993bf2347a24b2c3fb23c15ab16f..f2a60add25cf58e3b548899948579c7cdda467c9 100644 (file)
@@ -24,7 +24,7 @@ extern "C" {
 #endif
 
 #define LIBRADOS_VER_MAJOR 0
-#define LIBRADOS_VER_MINOR 50
+#define LIBRADOS_VER_MINOR 51
 #define LIBRADOS_VER_EXTRA 0
 
 #define LIBRADOS_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
@@ -1446,6 +1446,18 @@ int rados_aio_read(rados_ioctx_t io, const char *oid,
 int rados_aio_flush(rados_ioctx_t io);
 
 
+/**
+ * Schedule a callback for when all currently pending
+ * aio writes are safe. This is a non-blocking version of
+ * rados_aio_flush().
+ *
+ * @param io the context to flush
+ * @param completion what to do when the writes are safe
+ * @returns 0 on success, negative error code on failure
+ */
+int rados_aio_flush_async(rados_ioctx_t io, rados_completion_t completion);
+
+
 /**
  * Asynchronously get object stats (size/mtime)
  *
index 887b6aa3e4446c7d0dc6d9a78e1d0b01ec0ed694..cf3b33d5328aad7c0170c0610821f8cacc96d1c6 100644 (file)
@@ -484,6 +484,16 @@ namespace librados
 
     int aio_flush();
 
+    /**
+     * Schedule a callback for when all currently pending
+     * aio writes are safe. This is a non-blocking version of
+     * aio_flush().
+     *
+     * @param c what to do when the writes are safe
+     * @returns 0 on success, negative error code on failure
+     */
+    int aio_flush_async(AioCompletion *c);
+
     int aio_stat(const std::string& oid, AioCompletion *c, uint64_t *psize, time_t *pmtime);
 
     int aio_exec(const std::string& oid, AioCompletion *c, const char *cls, const char *method,
index 66270c92737b1d357de19303079b7443ece62384..619e7471605d4218fe9800dcfdbf505447a7c663 100644 (file)
@@ -196,6 +196,42 @@ struct C_AioSafe : public Context {
   }
 };
 
+/**
+  * Fills in all completed request data, and calls both
+  * complete and safe callbacks if they exist.
+  *
+  * Not useful for usual I/O, but for special things like
+  * flush where we only want to wait for things to be safe,
+  * but allow users to specify any of the callbacks.
+  */
+struct C_AioCompleteAndSafe : public Context {
+  AioCompletionImpl *c;
+
+  C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) {
+    c->ref++;
+  }
+
+  void finish(int r) {
+    c->rval = r;
+    c->ack = true;
+    c->safe = true;
+    rados_callback_t cb_complete = c->callback_complete;
+    void *cb_arg = c->callback_arg;
+    if (cb_complete)
+      cb_complete(c, cb_arg);
+
+    rados_callback_t cb_safe = c->callback_safe;
+    if (cb_safe)
+      cb_safe(c, cb_arg);
+
+    c->lock.Lock();
+    c->callback_complete = NULL;
+    c->callback_safe = NULL;
+    c->cond.Signal();
+    c->put_unlock();
+  }
+};
+
 }
 
 #endif
index 313e15b23e28e3d416adcab7a9f86d9d3803b002..76daec9c6edfc44fae89bd03a6d2bc3de2dfdf23 100644 (file)
@@ -70,22 +70,58 @@ void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
   aio_write_list_lock.Lock();
   assert(c->io == this);
   c->aio_write_seq = ++aio_write_seq;
+  ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
+                        << " write_seq " << aio_write_seq << dendl;
   aio_write_list.push_back(&c->aio_write_list_item);
   aio_write_list_lock.Unlock();
 }
 
 void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
 {
+  ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
   aio_write_list_lock.Lock();
   assert(c->io == this);
   c->aio_write_list_item.remove_myself();
+  // queue async flush waiters
+  map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters =
+    aio_write_waiters.find(c->aio_write_seq);
+  if (waiters != aio_write_waiters.end()) {
+    ldout(client->cct, 20) << "found " << waiters->second.size()
+                          << " waiters" << dendl;
+    for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
+        it != waiters->second.end(); ++it) {
+      client->finisher.queue(new C_AioCompleteAndSafe(*it));
+      (*it)->put();
+    }
+    aio_write_waiters.erase(waiters);
+  } else {
+    ldout(client->cct, 20) << "found no waiters for tid "
+                          << c->aio_write_seq << dendl;
+  }
   aio_write_cond.Signal();
   aio_write_list_lock.Unlock();
   put();
 }
 
+void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
+{
+  ldout(client->cct, 20) << "flush_aio_writes_async " << this
+                        << " completion " << c << dendl;
+  Mutex::Locker l(aio_write_list_lock);
+  tid_t seq = aio_write_seq;
+  ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid "
+                        << seq << dendl;
+  if (aio_write_list.empty()) {
+    client->finisher.queue(new C_AioCompleteAndSafe(c));
+  } else {
+    c->get();
+    aio_write_waiters[seq].push_back(c);
+  }
+}
+
 void librados::IoCtxImpl::flush_aio_writes()
 {
+  ldout(client->cct, 20) << "flush_aio_writes" << dendl;
   aio_write_list_lock.Lock();
   tid_t seq = aio_write_seq;
   while (!aio_write_list.empty() &&
index c5b14cab8e36994f2ed267a211b90536c3617795..a87a87250254d775486e2ee3116c018df27fb773 100644 (file)
@@ -46,6 +46,7 @@ struct librados::IoCtxImpl {
   tid_t aio_write_seq;
   Cond aio_write_cond;
   xlist<AioCompletionImpl*> aio_write_list;
+  map<tid_t, std::list<AioCompletionImpl*> > aio_write_waiters;
 
   Mutex *lock;
   Objecter *objecter;
@@ -84,6 +85,7 @@ struct librados::IoCtxImpl {
 
   void queue_aio_write(struct AioCompletionImpl *c);
   void complete_aio_write(struct AioCompletionImpl *c);
+  void flush_aio_writes_async(AioCompletionImpl *c);
   void flush_aio_writes();
 
   int64_t get_id() {
index 0a00cfc6df2dc972e4e3879b9d173be6471e4d47..43e377097c2a1deef52660bc32ce4a39ad893b95 100644 (file)
@@ -1000,6 +1000,12 @@ int librados::IoCtx::aio_remove(const std::string& oid, librados::AioCompletion
   return io_ctx_impl->aio_remove(oid, c->pc);
 }
 
+int librados::IoCtx::aio_flush_async(librados::AioCompletion *c)
+{
+  io_ctx_impl->flush_aio_writes_async(c->pc);
+  return 0;
+}
+
 int librados::IoCtx::aio_flush()
 {
   io_ctx_impl->flush_aio_writes();
@@ -2229,6 +2235,14 @@ extern "C" int rados_aio_remove(rados_ioctx_t io, const char *o,
   return ctx->aio_remove(oid, (librados::AioCompletionImpl*)completion);
 }
 
+extern "C" int rados_aio_flush_async(rados_ioctx_t io,
+                                    rados_completion_t completion)
+{
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  ctx->flush_aio_writes_async((librados::AioCompletionImpl*)completion);
+  return 0;
+}
+
 extern "C" int rados_aio_flush(rados_ioctx_t io)
 {
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
index 5fcc3177f201b03489024d1405cb555575ab3f76..835f28763b4f18a314e72c60d0b7aef4fc840f20 100644 (file)
@@ -677,6 +677,86 @@ TEST(LibRadosAio, FlushPP) {
   delete my_completion2;
 }
 
+TEST(LibRadosAio, FlushAsync) {
+  AioTestData test_data;
+  rados_completion_t my_completion;
+  ASSERT_EQ("", test_data.init());
+  ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+             set_completion_complete, set_completion_safe, &my_completion));
+  rados_completion_t flush_completion;
+  ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &flush_completion));
+  char buf[128];
+  memset(buf, 0xee, sizeof(buf));
+  ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
+                              my_completion, buf, sizeof(buf), 0));
+  ASSERT_EQ(0, rados_aio_flush_async(test_data.m_ioctx, flush_completion));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, rados_aio_wait_for_complete(flush_completion));
+    ASSERT_EQ(0, rados_aio_wait_for_safe(flush_completion));
+  }
+  ASSERT_EQ(1, rados_aio_is_complete(my_completion));
+  ASSERT_EQ(1, rados_aio_is_safe(my_completion));
+  ASSERT_EQ(1, rados_aio_is_complete(flush_completion));
+  ASSERT_EQ(1, rados_aio_is_safe(flush_completion));
+  char buf2[128];
+  memset(buf2, 0, sizeof(buf2));
+  rados_completion_t my_completion2;
+  ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+             set_completion_complete, set_completion_safe, &my_completion2));
+  ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "foo",
+                             my_completion2, buf2, sizeof(buf2), 0));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
+  }
+  ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf)));
+  rados_aio_release(my_completion);
+  rados_aio_release(my_completion2);
+  rados_aio_release(flush_completion);
+}
+
+TEST(LibRadosAio, FlushAsyncPP) {
+  AioTestDataPP test_data;
+  ASSERT_EQ("", test_data.init());
+  AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
+         (void*)&test_data, set_completion_complete, set_completion_safe);
+  AioCompletion *flush_completion =
+      test_data.m_cluster.aio_create_completion(NULL, NULL, NULL);
+  AioCompletion *my_completion_null = NULL;
+  ASSERT_NE(my_completion, my_completion_null);
+  char buf[128];
+  memset(buf, 0xee, sizeof(buf));
+  bufferlist bl1;
+  bl1.append(buf, sizeof(buf));
+  ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
+                                          bl1, sizeof(buf), 0));
+  ASSERT_EQ(0, test_data.m_ioctx.aio_flush_async(flush_completion));
+  {
+      TestAlarm alarm;
+      ASSERT_EQ(0, flush_completion->wait_for_complete());
+      ASSERT_EQ(0, flush_completion->wait_for_safe());
+  }
+  ASSERT_EQ(1, my_completion->is_complete());
+  ASSERT_EQ(1, my_completion->is_safe());
+  ASSERT_EQ(1, flush_completion->is_complete());
+  ASSERT_EQ(1, flush_completion->is_safe());
+  bufferlist bl2;
+  AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
+         (void*)&test_data, set_completion_complete, set_completion_safe);
+  ASSERT_NE(my_completion2, my_completion_null);
+  ASSERT_EQ(0, test_data.m_ioctx.aio_read("foo", my_completion2,
+                                         &bl2, sizeof(buf), 0));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, my_completion2->wait_for_complete());
+  }
+  ASSERT_EQ(0, memcmp(buf, bl2.c_str(), sizeof(buf)));
+  delete my_completion;
+  delete my_completion2;
+  delete flush_completion;
+}
+
 TEST(LibRadosAio, RoundTripWriteFull) {
   AioTestData test_data;
   rados_completion_t my_completion, my_completion2, my_completion3;