]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: add async flush interface
authorJosh Durgin <josh.durgin@inktank.com>
Wed, 27 Mar 2013 21:48:31 +0000 (14:48 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Tue, 23 Apr 2013 18:33:18 +0000 (11:33 -0700)
Sometimes you don't want flush to block, and can't modify
already scheduled aio_writes. This will be useful for a
librbd async flush interface.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
(cherry picked from commit 7cc0940f89070dadab5b9102b1e78362f762f402)

Conflicts:

src/include/rados/librados.h
src/include/rados/librados.hpp

src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/AioCompletionImpl.h
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/librados.cc
src/test/librados/aio.cc

index 44d6f712a73072b5b2e9cd529b25c163281ce2a2..fe578d262126ca9bcbee321cedc0caba1918d766 100644 (file)
@@ -1444,6 +1444,17 @@ int rados_aio_read(rados_ioctx_t io, const char *oid,
  */
 int rados_aio_flush(rados_ioctx_t io);
 
+/**
+ * Schedule a callback for when all currently pending
+ * aio writes are safe. This is a non-blocking version of
+ * rados_aio_flush().
+ *
+ * @param io the context to flush
+ * @param completion what to do when the writes are safe
+ * @returns 0 on success, negative error code on failure
+ */
+int rados_aio_flush_async(rados_ioctx_t io, rados_completion_t completion);
+
 /** @} Asynchronous I/O */
 
 /**
index f8d69aa8af4a82c1de6ce456f928b612877606c1..37e0eb75f5941b089fd83c51180a134da3348cb6 100644 (file)
@@ -481,6 +481,16 @@ namespace librados
     
     int aio_flush();
 
+    /**
+     * Schedule a callback for when all currently pending
+     * aio writes are safe. This is a non-blocking version of
+     * aio_flush().
+     *
+     * @param c what to do when the writes are safe
+     * @returns 0 on success, negative error code on failure
+     */
+    int aio_flush_async(AioCompletion *c);
+
     int aio_exec(const std::string& oid, AioCompletion *c, const char *cls, const char *method,
                 bufferlist& inbl, bufferlist *outbl);
 
index 66270c92737b1d357de19303079b7443ece62384..619e7471605d4218fe9800dcfdbf505447a7c663 100644 (file)
@@ -196,6 +196,42 @@ struct C_AioSafe : public Context {
   }
 };
 
+/**
+  * Fills in all completed request data, and calls both
+  * complete and safe callbacks if they exist.
+  *
+  * Not useful for usual I/O, but for special things like
+  * flush where we only want to wait for things to be safe,
+  * but allow users to specify any of the callbacks.
+  */
+struct C_AioCompleteAndSafe : public Context {
+  AioCompletionImpl *c;
+
+  C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) {
+    c->ref++;
+  }
+
+  void finish(int r) {
+    c->rval = r;
+    c->ack = true;
+    c->safe = true;
+    rados_callback_t cb_complete = c->callback_complete;
+    void *cb_arg = c->callback_arg;
+    if (cb_complete)
+      cb_complete(c, cb_arg);
+
+    rados_callback_t cb_safe = c->callback_safe;
+    if (cb_safe)
+      cb_safe(c, cb_arg);
+
+    c->lock.Lock();
+    c->callback_complete = NULL;
+    c->callback_safe = NULL;
+    c->cond.Signal();
+    c->put_unlock();
+  }
+};
+
 }
 
 #endif
index 6cd102be1e4253a98c9ca7781c98b534ceb97d31..0c339caedc501575d0343c2acb86ac0ac77314a8 100644 (file)
@@ -70,22 +70,58 @@ void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
   aio_write_list_lock.Lock();
   assert(c->io == this);
   c->aio_write_seq = ++aio_write_seq;
+  ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
+                        << " write_seq " << aio_write_seq << dendl;
   aio_write_list.push_back(&c->aio_write_list_item);
   aio_write_list_lock.Unlock();
 }
 
 void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
 {
+  ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
   aio_write_list_lock.Lock();
   assert(c->io == this);
   c->aio_write_list_item.remove_myself();
+  // queue async flush waiters
+  map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters =
+    aio_write_waiters.find(c->aio_write_seq);
+  if (waiters != aio_write_waiters.end()) {
+    ldout(client->cct, 20) << "found " << waiters->second.size()
+                          << " waiters" << dendl;
+    for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
+        it != waiters->second.end(); ++it) {
+      client->finisher.queue(new C_AioCompleteAndSafe(*it));
+      (*it)->put();
+    }
+    aio_write_waiters.erase(waiters);
+  } else {
+    ldout(client->cct, 20) << "found no waiters for tid "
+                          << c->aio_write_seq << dendl;
+  }
   aio_write_cond.Signal();
   aio_write_list_lock.Unlock();
   put();
 }
 
+void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
+{
+  ldout(client->cct, 20) << "flush_aio_writes_async " << this
+                        << " completion " << c << dendl;
+  Mutex::Locker l(aio_write_list_lock);
+  tid_t seq = aio_write_seq;
+  ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid "
+                        << seq << dendl;
+  if (aio_write_list.empty()) {
+    client->finisher.queue(new C_AioCompleteAndSafe(c));
+  } else {
+    c->get();
+    aio_write_waiters[seq].push_back(c);
+  }
+}
+
 void librados::IoCtxImpl::flush_aio_writes()
 {
+  ldout(client->cct, 20) << "flush_aio_writes" << dendl;
   aio_write_list_lock.Lock();
   tid_t seq = aio_write_seq;
   while (!aio_write_list.empty() &&
index feea0e8b1960c45b303d6537bf3815731b37cf07..b7c385d3cbd680580dd5a00c5d1348d25b980261 100644 (file)
@@ -46,6 +46,7 @@ struct librados::IoCtxImpl {
   tid_t aio_write_seq;
   Cond aio_write_cond;
   xlist<AioCompletionImpl*> aio_write_list;
+  map<tid_t, std::list<AioCompletionImpl*> > aio_write_waiters;
 
   Mutex *lock;
   Objecter *objecter;
@@ -84,6 +85,7 @@ struct librados::IoCtxImpl {
 
   void queue_aio_write(struct AioCompletionImpl *c);
   void complete_aio_write(struct AioCompletionImpl *c);
+  void flush_aio_writes_async(AioCompletionImpl *c);
   void flush_aio_writes();
 
   int64_t get_id() {
index c31b82ae34b6d4f3cb6ae96d5ccf8cc474ab8aef..fa8d8e2b2be160e494e8bd628ceec45eaa07b1f9 100644 (file)
@@ -972,6 +972,12 @@ int librados::IoCtx::aio_remove(const std::string& oid, librados::AioCompletion
   return io_ctx_impl->aio_remove(oid, c->pc);
 }
 
+int librados::IoCtx::aio_flush_async(librados::AioCompletion *c)
+{
+  io_ctx_impl->flush_aio_writes_async(c->pc);
+  return 0;
+}
+
 int librados::IoCtx::aio_flush()
 {
   io_ctx_impl->flush_aio_writes();
@@ -2166,6 +2172,14 @@ extern "C" int rados_aio_remove(rados_ioctx_t io, const char *o,
   return ctx->aio_remove(oid, (librados::AioCompletionImpl*)completion);
 }
 
+extern "C" int rados_aio_flush_async(rados_ioctx_t io,
+                                    rados_completion_t completion)
+{
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  ctx->flush_aio_writes_async((librados::AioCompletionImpl*)completion);
+  return 0;
+}
+
 extern "C" int rados_aio_flush(rados_ioctx_t io)
 {
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
index 4983fee7bc79459439db493b33382d3f55dd697c..dde97ed816e0a9a73ed4c88287166164f4814f53 100644 (file)
@@ -677,6 +677,86 @@ TEST(LibRadosAio, FlushPP) {
   delete my_completion2;
 }
 
+TEST(LibRadosAio, FlushAsync) {
+  AioTestData test_data;
+  rados_completion_t my_completion;
+  ASSERT_EQ("", test_data.init());
+  ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+             set_completion_complete, set_completion_safe, &my_completion));
+  rados_completion_t flush_completion;
+  ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &flush_completion));
+  char buf[128];
+  memset(buf, 0xee, sizeof(buf));
+  ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
+                              my_completion, buf, sizeof(buf), 0));
+  ASSERT_EQ(0, rados_aio_flush_async(test_data.m_ioctx, flush_completion));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, rados_aio_wait_for_complete(flush_completion));
+    ASSERT_EQ(0, rados_aio_wait_for_safe(flush_completion));
+  }
+  ASSERT_EQ(1, rados_aio_is_complete(my_completion));
+  ASSERT_EQ(1, rados_aio_is_safe(my_completion));
+  ASSERT_EQ(1, rados_aio_is_complete(flush_completion));
+  ASSERT_EQ(1, rados_aio_is_safe(flush_completion));
+  char buf2[128];
+  memset(buf2, 0, sizeof(buf2));
+  rados_completion_t my_completion2;
+  ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+             set_completion_complete, set_completion_safe, &my_completion2));
+  ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "foo",
+                             my_completion2, buf2, sizeof(buf2), 0));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
+  }
+  ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf)));
+  rados_aio_release(my_completion);
+  rados_aio_release(my_completion2);
+  rados_aio_release(flush_completion);
+}
+
+TEST(LibRadosAio, FlushAsyncPP) {
+  AioTestDataPP test_data;
+  ASSERT_EQ("", test_data.init());
+  AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
+         (void*)&test_data, set_completion_complete, set_completion_safe);
+  AioCompletion *flush_completion =
+      test_data.m_cluster.aio_create_completion(NULL, NULL, NULL);
+  AioCompletion *my_completion_null = NULL;
+  ASSERT_NE(my_completion, my_completion_null);
+  char buf[128];
+  memset(buf, 0xee, sizeof(buf));
+  bufferlist bl1;
+  bl1.append(buf, sizeof(buf));
+  ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
+                                          bl1, sizeof(buf), 0));
+  ASSERT_EQ(0, test_data.m_ioctx.aio_flush_async(flush_completion));
+  {
+      TestAlarm alarm;
+      ASSERT_EQ(0, flush_completion->wait_for_complete());
+      ASSERT_EQ(0, flush_completion->wait_for_safe());
+  }
+  ASSERT_EQ(1, my_completion->is_complete());
+  ASSERT_EQ(1, my_completion->is_safe());
+  ASSERT_EQ(1, flush_completion->is_complete());
+  ASSERT_EQ(1, flush_completion->is_safe());
+  bufferlist bl2;
+  AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
+         (void*)&test_data, set_completion_complete, set_completion_safe);
+  ASSERT_NE(my_completion2, my_completion_null);
+  ASSERT_EQ(0, test_data.m_ioctx.aio_read("foo", my_completion2,
+                                         &bl2, sizeof(buf), 0));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, my_completion2->wait_for_complete());
+  }
+  ASSERT_EQ(0, memcmp(buf, bl2.c_str(), sizeof(buf)));
+  delete my_completion;
+  delete my_completion2;
+  delete flush_completion;
+}
+
 TEST(LibRadosAio, RoundTripWriteFull) {
   AioTestData test_data;
   rados_completion_t my_completion, my_completion2, my_completion3;