From: Sage Weil Date: Thu, 28 Apr 2011 04:28:26 +0000 (-0700) Subject: librados: implement aio_flush X-Git-Tag: v0.30~152 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=07c1989a1838654f59140161e5fc3b59b42c755c;p=ceph.git librados: implement aio_flush Implement a per-ioctx flush that blocks until all previously submitted aio operations on the ioctx are safe. Each aio gets a sequence number and is put on a linked list attached to the ioctx. The flush operation waits for it to drain to the watermark set when flush is first called. Signed-off-by: Sage Weil --- diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index df10a440d7183..b681cfcbe0c16 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -188,6 +188,8 @@ int rados_aio_read(rados_ioctx_t io, const char *oid, rados_completion_t completion, char *buf, size_t len, uint64_t off); +int rados_aio_flush(rados_ioctx_t io); + /* watch/notify */ typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg); int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index e5e2061edf369..8609777a0a3fa 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -214,6 +214,8 @@ namespace librados int aio_append(const std::string& oid, AioCompletion *c, const bufferlist& bl, size_t len); int aio_write_full(const std::string& oid, AioCompletion *c, const bufferlist& bl); + + int aio_flush(); // compound object operations int operate(const std::string& oid, ObjectOperation *op, bufferlist *pbl); diff --git a/src/librados.cc b/src/librados.cc index 9e82908846b92..7b3ff3ca043d1 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -88,8 +88,18 @@ struct librados::IoCtxImpl { uint32_t notify_timeout; object_locator_t oloc; - IoCtxImpl() {} - IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s); + Mutex aio_write_list_lock; + tid_t aio_write_seq; + Cond aio_write_cond; + xlist aio_write_list; + + IoCtxImpl() : + aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock") {} + IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) : + ref_cnt(0), client(c), poolid(pid), + pool_name(pool_name_), snap_seq(s), assert_ver(0), + notify_timeout(g_conf.client_notify_timeout), oloc(pid), + aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0) {} void dup(const IoCtxImpl& rhs) { // Copy everything except the ref count @@ -130,6 +140,10 @@ struct librados::IoCtxImpl { if (ref_cnt.dec() == 0) delete this; } + + void queue_aio_write(struct AioCompletionImpl *c); + void complete_aio_write(struct AioCompletionImpl *c); + void flush_aio_writes(); }; @@ -222,10 +236,15 @@ struct librados::AioCompletionImpl { char *buf; unsigned maxlen; + IoCtxImpl *io; + tid_t aio_write_seq; + xlist::item aio_write_list_item; + AioCompletionImpl() : lock("AioCompletionImpl lock"), - ref(1), rval(0), released(false), ack(false), safe(false), - callback_complete(0), callback_safe(0), callback_arg(0), - pbl(0), buf(0), maxlen(0) { } + ref(1), rval(0), released(false), ack(false), safe(false), + callback_complete(0), callback_safe(0), callback_arg(0), + pbl(0), buf(0), maxlen(0), + io(NULL), aio_write_seq(0), aio_write_list_item(this) { } int set_complete_callback(void *cb_arg, rados_callback_t cb) { lock.Lock(); @@ -305,6 +324,36 @@ struct librados::AioCompletionImpl { } }; +void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c) +{ + aio_write_list_lock.Lock(); + assert(!c->io); + c->io = this; + c->aio_write_seq = ++aio_write_seq; + aio_write_list.push_back(&c->aio_write_list_item); + aio_write_list_lock.Unlock(); +} + +void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) +{ + aio_write_list_lock.Lock(); + assert(c->io == this); + c->io = NULL; + c->aio_write_list_item.remove_myself(); + aio_write_cond.Signal(); + aio_write_list_lock.Unlock(); +} + +void librados::IoCtxImpl::flush_aio_writes() +{ + aio_write_list_lock.Lock(); + tid_t seq = aio_write_seq; + while (!aio_write_list.empty() && + aio_write_list.front()->aio_write_seq <= seq) + aio_write_cond.Wait(aio_write_list_lock); + aio_write_list_lock.Unlock(); +} + struct librados::ObjListCtx { librados::IoCtxImpl *ctx; Objecter::ListContext *lc; @@ -506,6 +555,8 @@ public: c->lock.Lock(); } + c->io->complete_aio_write(c); + c->put_unlock(); } C_aio_Safe(AioCompletionImpl *_c) : c(_c) { @@ -628,13 +679,6 @@ public: } }; -librados::IoCtxImpl:: -IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) - : ref_cnt(0), client(c), poolid(pid), - pool_name(pool_name_), snap_seq(s), assert_ver(0), - notify_timeout(c->conf->client_notify_timeout), oloc(pid) -{} - int librados::RadosClient:: connect() { @@ -1355,6 +1399,7 @@ aio_operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioComplet if (io.snap_seq != CEPH_NOSNAP) return -EINVAL; + io.queue_aio_write(c); objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver); return 0; @@ -1424,6 +1469,8 @@ aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, if (io.snap_seq != CEPH_NOSNAP) return -EROFS; + io.queue_aio_write(c); + Context *onack = new C_aio_Ack(c); Context *onsafe = new C_aio_Safe(c); @@ -1445,6 +1492,8 @@ aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, if (io.snap_seq != CEPH_NOSNAP) return -EROFS; + io.queue_aio_write(c); + Context *onack = new C_aio_Ack(c); Context *onsafe = new C_aio_Safe(c); @@ -1466,6 +1515,8 @@ aio_write_full(IoCtxImpl& io, const object_t &oid, if (io.snap_seq != CEPH_NOSNAP) return -EROFS; + io.queue_aio_write(c); + Context *onack = new C_aio_Ack(c); Context *onsafe = new C_aio_Safe(c); @@ -2563,6 +2614,13 @@ aio_write_full(const std::string& oid, librados::AioCompletion *c, const bufferl return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl); } +int librados::IoCtx:: +aio_flush() +{ + io_ctx_impl->flush_aio_writes(); + return 0; +} + int librados::IoCtx:: watch(const string& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx) { @@ -3506,6 +3564,13 @@ extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o, (librados::AioCompletionImpl*)completion, bl); } +extern "C" int rados_aio_flush(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->flush_aio_writes(); + return 0; +} + struct C_WatchCB : public librados::WatchCtx { rados_watchcb_t wcb; void *arg; diff --git a/src/testrados.c b/src/testrados.c index 27942c5d70408..21be41e0426b2 100644 --- a/src/testrados.c +++ b/src/testrados.c @@ -17,6 +17,7 @@ #include #include #include +#include static void do_rados_setxattr(rados_ioctx_t io_ctx, const char *oid, const char *key, const char *val) @@ -250,6 +251,20 @@ int main(int argc, const char **argv) rados_aio_release(a); rados_aio_release(b); + /* test flush */ + printf("testing aio flush\n"); + rados_completion_t c; + rados_aio_create_completion(0, 0, 0, &c); + rados_aio_write(io_ctx, "c", c, buf, 100, 0); + int safe = rados_aio_is_safe(c); + printf("a should not yet be safe and ... %s\n", safe ? "is":"is not"); + assert(!safe); + rados_aio_flush(io_ctx); + safe = rados_aio_is_safe(c); + printf("a should be safe and ... %s\n", safe ? "is":"is not"); + assert(safe); + rados_aio_release(c); + rados_read(io_ctx, "../b/bb_bb_bb\\foo\\bar", buf2, 128, 0); /* list objects */