]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: implement aio_flush
authorSage Weil <sage.weil@dreamhost.com>
Thu, 28 Apr 2011 04:28:26 +0000 (21:28 -0700)
committerSage Weil <sage@newdream.net>
Tue, 31 May 2011 20:58:19 +0000 (13:58 -0700)
Implement a per-ioctx flush that blocks until all previously submitted
aio operations on the ioctx are safe.  Each aio gets a sequence number and
is put on a linked list attached to the ioctx.  The flush operation waits
for it to drain to the watermark set when flush is first called.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados.cc
src/testrados.c

index df10a440d7183f1f133c3814ac49760d16d5a132..b681cfcbe0c16e2bf7e7f83b699d154b2cab05b7 100644 (file)
@@ -188,6 +188,8 @@ int rados_aio_read(rados_ioctx_t io, const char *oid,
                   rados_completion_t completion,
                   char *buf, size_t len, uint64_t off);
 
+int rados_aio_flush(rados_ioctx_t io);
+
 /* watch/notify */
 typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg);
 int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle,
index e5e2061edf3698f9ec5aecfdcddc348f96a57083..8609777a0a3fadf2e6ba0d2dae6c7d228359b97e 100644 (file)
@@ -214,6 +214,8 @@ namespace librados
     int aio_append(const std::string& oid, AioCompletion *c, const bufferlist& bl,
                  size_t len);
     int aio_write_full(const std::string& oid, AioCompletion *c, const bufferlist& bl);
+    
+    int aio_flush();
 
     // compound object operations
     int operate(const std::string& oid, ObjectOperation *op, bufferlist *pbl);
index 9e82908846b92178392a64764e7521ba4a4b0075..7b3ff3ca043d1d1a6b9466a995df10243f004eb6 100644 (file)
@@ -88,8 +88,18 @@ struct librados::IoCtxImpl {
   uint32_t notify_timeout;
   object_locator_t oloc;
 
-  IoCtxImpl() {}
-  IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s);
+  Mutex aio_write_list_lock;
+  tid_t aio_write_seq;
+  Cond aio_write_cond;
+  xlist<AioCompletionImpl*> aio_write_list;
+
+  IoCtxImpl() :
+    aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock") {}
+  IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) :
+    ref_cnt(0), client(c), poolid(pid),
+    pool_name(pool_name_), snap_seq(s), assert_ver(0),
+    notify_timeout(g_conf.client_notify_timeout), oloc(pid),
+    aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0) {}
 
   void dup(const IoCtxImpl& rhs) {
     // Copy everything except the ref count
@@ -130,6 +140,10 @@ struct librados::IoCtxImpl {
     if (ref_cnt.dec() == 0)
       delete this;
   }
+
+  void queue_aio_write(struct AioCompletionImpl *c);
+  void complete_aio_write(struct AioCompletionImpl *c);
+  void flush_aio_writes();
 };
 
 
@@ -222,10 +236,15 @@ struct librados::AioCompletionImpl {
   char *buf;
   unsigned maxlen;
 
+  IoCtxImpl *io;
+  tid_t aio_write_seq;
+  xlist<AioCompletionImpl*>::item aio_write_list_item;
+
   AioCompletionImpl() : lock("AioCompletionImpl lock"),
-                   ref(1), rval(0), released(false), ack(false), safe(false),
-                   callback_complete(0), callback_safe(0), callback_arg(0),
-                   pbl(0), buf(0), maxlen(0) { }
+                       ref(1), rval(0), released(false), ack(false), safe(false),
+                       callback_complete(0), callback_safe(0), callback_arg(0),
+                       pbl(0), buf(0), maxlen(0),
+                       io(NULL), aio_write_seq(0), aio_write_list_item(this) { }
 
   int set_complete_callback(void *cb_arg, rados_callback_t cb) {
     lock.Lock();
@@ -305,6 +324,36 @@ struct librados::AioCompletionImpl {
   }
 };
 
+void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
+{
+  aio_write_list_lock.Lock();
+  assert(!c->io);
+  c->io = this;
+  c->aio_write_seq = ++aio_write_seq;
+  aio_write_list.push_back(&c->aio_write_list_item);
+  aio_write_list_lock.Unlock();
+}
+
+void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
+{
+  aio_write_list_lock.Lock();
+  assert(c->io == this);
+  c->io = NULL;
+  c->aio_write_list_item.remove_myself();
+  aio_write_cond.Signal();
+  aio_write_list_lock.Unlock();
+}
+
+void librados::IoCtxImpl::flush_aio_writes()
+{
+  aio_write_list_lock.Lock();
+  tid_t seq = aio_write_seq;
+  while (!aio_write_list.empty() &&
+        aio_write_list.front()->aio_write_seq <= seq)
+    aio_write_cond.Wait(aio_write_list_lock);
+  aio_write_list_lock.Unlock();
+}
+
 struct librados::ObjListCtx {
   librados::IoCtxImpl *ctx;
   Objecter::ListContext *lc;
@@ -506,6 +555,8 @@ public:
        c->lock.Lock();
       }
 
+      c->io->complete_aio_write(c);
+
       c->put_unlock();
     }
     C_aio_Safe(AioCompletionImpl *_c) : c(_c) {
@@ -628,13 +679,6 @@ public:
   }
 };
 
-librados::IoCtxImpl::
-IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s)
-  : ref_cnt(0), client(c), poolid(pid),
-    pool_name(pool_name_), snap_seq(s), assert_ver(0),
-    notify_timeout(c->conf->client_notify_timeout), oloc(pid)
-{}
-
 int librados::RadosClient::
 connect()
 {
@@ -1355,6 +1399,7 @@ aio_operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioComplet
   if (io.snap_seq != CEPH_NOSNAP)
     return -EINVAL;
 
+  io.queue_aio_write(c);
   objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver);
 
   return 0;
@@ -1424,6 +1469,8 @@ aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
   if (io.snap_seq != CEPH_NOSNAP)
     return -EROFS;
 
+  io.queue_aio_write(c);
+
   Context *onack = new C_aio_Ack(c);
   Context *onsafe = new C_aio_Safe(c);
 
@@ -1445,6 +1492,8 @@ aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
   if (io.snap_seq != CEPH_NOSNAP)
     return -EROFS;
 
+  io.queue_aio_write(c);
+
   Context *onack = new C_aio_Ack(c);
   Context *onsafe = new C_aio_Safe(c);
 
@@ -1466,6 +1515,8 @@ aio_write_full(IoCtxImpl& io, const object_t &oid,
   if (io.snap_seq != CEPH_NOSNAP)
     return -EROFS;
 
+  io.queue_aio_write(c);
+
   Context *onack = new C_aio_Ack(c);
   Context *onsafe = new C_aio_Safe(c);
 
@@ -2563,6 +2614,13 @@ aio_write_full(const std::string& oid, librados::AioCompletion *c, const bufferl
   return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl);
 }
 
+int librados::IoCtx::
+aio_flush()
+{
+  io_ctx_impl->flush_aio_writes();
+  return 0;
+}
+
 int librados::IoCtx::
 watch(const string& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx)
 {
@@ -3506,6 +3564,13 @@ extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o,
              (librados::AioCompletionImpl*)completion, bl);
 }
 
+extern "C" int rados_aio_flush(rados_ioctx_t io)
+{
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  ctx->flush_aio_writes();
+  return 0;
+}
+
 struct C_WatchCB : public librados::WatchCtx {
   rados_watchcb_t wcb;
   void *arg;
index 27942c5d7040860f888ab21e6cd891d6f0906b59..21be41e0426b2c5ef1d304b1a9dc1a287be6e151 100644 (file)
@@ -17,6 +17,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <time.h>
+#include <assert.h>
 
 static void do_rados_setxattr(rados_ioctx_t io_ctx, const char *oid,
                        const char *key, const char *val)
@@ -250,6 +251,20 @@ int main(int argc, const char **argv)
        rados_aio_release(a);
        rados_aio_release(b);
 
+       /* test flush */
+       printf("testing aio flush\n");
+       rados_completion_t c;
+       rados_aio_create_completion(0, 0, 0, &c);
+       rados_aio_write(io_ctx, "c", c, buf, 100, 0);
+       int safe = rados_aio_is_safe(c);
+       printf("a should not yet be safe and ... %s\n", safe ? "is":"is not");
+       assert(!safe);
+       rados_aio_flush(io_ctx);
+       safe = rados_aio_is_safe(c);
+       printf("a should be safe and ... %s\n", safe ? "is":"is not");
+       assert(safe);
+       rados_aio_release(c);
+       
        rados_read(io_ctx, "../b/bb_bb_bb\\foo\\bar", buf2, 128, 0);
 
        /* list objects */