From eaa9a057acaaa14d7bfa380f9a6add1326a8d60b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 1 Jun 2009 14:36:14 -0700 Subject: [PATCH] rados: first pass at aio interface --- src/include/librados.h | 12 ++- src/librados.cc | 215 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/src/include/librados.h b/src/include/librados.h index 8f4f4d4dfc69c..76d5727635ba9 100644 --- a/src/include/librados.h +++ b/src/include/librados.h @@ -33,10 +33,14 @@ int rados_exec(rados_pool_t pool, struct ceph_object *o, const char *cls, const const char *in_buf, size_t in_len, char *buf, size_t out_len); /* async io */ -typedef struct { - int done; - int rval; -} rados_completion_t; +typedef void *rados_completion_t; + +int rados_aio_wait_for_complete(rados_completion_t c); +int rados_aio_wait_for_safe(rados_completion_t c); +int rados_aio_is_complete(rados_completion_t c); +int rados_aio_is_safe(rados_completion_t c); +int rados_aio_get_return_value(rados_completion_t c); +void rados_aio_free(rados_completion_t c); int rados_aio_write(rados_pool_t pool, struct ceph_object *oid, off_t off, const char *buf, size_t len, rados_completion_t *completion); int rados_aio_read(rados_pool_t pool, struct ceph_object *oid, off_t off, char *buf, size_t len, rados_completion_t *completion); diff --git a/src/librados.cc b/src/librados.cc index 585bb117a61f6..642534b617dca 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -59,6 +59,7 @@ class RadosClient : public Dispatcher Mutex lock; Cond cond; + public: RadosClient() : messenger(NULL), mc(NULL), lock("radosclient") {} ~RadosClient(); @@ -73,6 +74,115 @@ public: int remove(int pool, object_t& oid); int exec(int pool, object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl); + + + // --- aio --- + struct AioCompletion { + Mutex lock; + Cond cond; + int ref, rval; + bool ack, safe; + + // for read + bufferlist bl, *pbl; + char *buf; + unsigned maxlen; + + AioCompletion() : lock("RadosClient::AioCompletion"), + ref(1), rval(0), ack(false), safe(false), pbl(0), buf(0), maxlen(0) {} + + int wait_for_complete() { + lock.Lock(); + while (!ack) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int wait_for_safe() { + lock.Lock(); + while (!safe) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int is_complete() { + lock.Lock(); + int r = ack; + lock.Unlock(); + return r; + } + int is_safe() { + lock.Lock(); + int r = safe; + lock.Unlock(); + return r; + } + int get_return_value() { + lock.Lock(); + int r = rval; + lock.Unlock(); + return r; + } + void put() { + lock.Lock(); + int n = --ref; + lock.Unlock(); + if (!n) + delete this; + } + }; + + struct C_aio_Ack : public Context { + AioCompletion *c; + void finish(int r) { + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->cond.Signal(); + + if (c->buf && c->bl.length() > 0) { + unsigned l = MIN(c->bl.length(), c->maxlen); + c->bl.copy(0, l, c->buf); + c->rval = c->bl.length(); + } + if (c->pbl) { + *c->pbl = c->bl; + } + + int n = --c->ref; + c->lock.Unlock(); + if (!n) + delete c; + } + C_aio_Ack(AioCompletion *_c) : c(_c) {} + }; + + struct C_aio_Safe : public Context { + AioCompletion *c; + void finish(int r) { + c->lock.Lock(); + if (!c->ack) { + c->rval = r; + c->ack = true; + } + c->safe = true; + c->cond.Signal(); + int n = --c->ref; + c->lock.Unlock(); + if (!n) + delete c; + } + C_aio_Safe(AioCompletion *_c) : c(_c) {} + }; + + int aio_read(int pool, object_t oid, off_t off, bufferlist *pbl, size_t len, + AioCompletion **pc); + int aio_read(int pool, object_t oid, off_t off, char *buf, size_t len, + AioCompletion **pc); + + int aio_write(int pool, object_t oid, off_t off, bufferlist& bl, size_t len, + AioCompletion **pc); + }; bool RadosClient::init() @@ -210,6 +320,59 @@ int RadosClient::write(int pool, object_t& oid, off_t off, bufferlist& bl, size_ return len; } +int RadosClient::aio_read(int pool, object_t oid, off_t off, bufferlist *pbl, size_t len, + AioCompletion **pc) +{ + AioCompletion *c = new AioCompletion; + Context *onack = new C_aio_Ack(c); + + c->pbl = pbl; + + ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool); + objecter->read(oid, layout, + off, len, CEPH_NOSNAP, &c->bl, 0, + onack); + + *pc = c; + return 0; +} +int RadosClient::aio_read(int pool, object_t oid, off_t off, char *buf, size_t len, + AioCompletion **pc) +{ + AioCompletion *c = new AioCompletion; + Context *onack = new C_aio_Ack(c); + + c->buf = buf; + c->maxlen = len; + + ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool); + objecter->read(oid, layout, + off, len, CEPH_NOSNAP, &c->bl, 0, + onack); + + *pc = c; + return 0; +} + +int RadosClient::aio_write(int pool, object_t oid, off_t off, bufferlist& bl, size_t len, + AioCompletion **pc) +{ + SnapContext snapc; + utime_t ut = g_clock.now(); + + AioCompletion *c = new AioCompletion; + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool); + objecter->write(oid, layout, + off, len, snapc, bl, ut, 0, + onack, onsafe); + + *pc = c; + return 0; +} + int RadosClient::remove(int pool, object_t& oid) { SnapContext snapc; @@ -490,3 +653,55 @@ extern "C" int rados_exec(rados_pool_t pool, ceph_object *o, const char *cls, co return ret; } + + +// ------------------------- +// aio + +extern "C" int rados_aio_wait_for_complete(rados_completion_t c) +{ + return ((RadosClient::AioCompletion *)c)->wait_for_complete(); +} + +extern "C" int rados_aio_wait_for_safe(rados_completion_t c) +{ + return ((RadosClient::AioCompletion *)c)->wait_for_safe(); +} + +extern "C" int rados_aio_is_complete(rados_completion_t c) +{ + return ((RadosClient::AioCompletion *)c)->is_complete(); +} + +extern "C" int rados_aio_is_safe(rados_completion_t c) +{ + return ((RadosClient::AioCompletion *)c)->is_safe(); +} + +extern "C" int rados_aio_get_return_value(rados_completion_t c) +{ + return ((RadosClient::AioCompletion *)c)->get_return_value(); +} + +extern "C" void rados_aio_free(rados_completion_t c) +{ + ((RadosClient::AioCompletion *)c)->put(); +} + +extern "C" int rados_aio_read(rados_pool_t pool, struct ceph_object *o, + off_t off, char *buf, size_t len, + rados_completion_t *completion) +{ + object_t oid(*o); + return radosp->aio_read(pool, oid, off, buf, len, (RadosClient::AioCompletion**)completion); +} + +extern "C" int rados_aio_write(rados_pool_t pool, struct ceph_object *o, + off_t off, const char *buf, size_t len, + rados_completion_t *completion) +{ + object_t oid(*o); + bufferlist bl; + bl.append(buf, len); + return radosp->aio_write(pool, oid, off, bl, len, (RadosClient::AioCompletion**)completion); +} -- 2.39.5