]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados: first pass at aio interface
authorSage Weil <sage@newdream.net>
Mon, 1 Jun 2009 21:36:14 +0000 (14:36 -0700)
committerSage Weil <sage@newdream.net>
Mon, 1 Jun 2009 21:36:44 +0000 (14:36 -0700)
src/include/librados.h
src/librados.cc

index 8f4f4d4dfc69c4b142cd039ca8f6f3e7d7cc7bed..76d5727635ba9c107b5d636336d4fb173a185bc6 100644 (file)
@@ -33,10 +33,14 @@ int rados_exec(rados_pool_t pool, struct ceph_object *o, const char *cls, const
               const char *in_buf, size_t in_len, char *buf, size_t out_len);
 
 /* async io */
-typedef struct {
-  int done;
-  int rval;
-} rados_completion_t;
+typedef void *rados_completion_t;
+
+int rados_aio_wait_for_complete(rados_completion_t c);
+int rados_aio_wait_for_safe(rados_completion_t c);
+int rados_aio_is_complete(rados_completion_t c);
+int rados_aio_is_safe(rados_completion_t c);
+int rados_aio_get_return_value(rados_completion_t c);
+void rados_aio_free(rados_completion_t c);
 
 int rados_aio_write(rados_pool_t pool, struct ceph_object *oid, off_t off, const char *buf, size_t len, rados_completion_t *completion);
 int rados_aio_read(rados_pool_t pool, struct ceph_object *oid, off_t off, char *buf, size_t len, rados_completion_t *completion);
index 585bb117a61f6b68759baa25b3825383f19c184e..642534b617dca214fcd916cd44c8257051acbb22 100644 (file)
@@ -59,6 +59,7 @@ class RadosClient : public Dispatcher
   Mutex lock;
   Cond cond;
 
 public:
   RadosClient() : messenger(NULL), mc(NULL), lock("radosclient") {}
   ~RadosClient();
@@ -73,6 +74,115 @@ public:
   int remove(int pool, object_t& oid);
 
   int exec(int pool, object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
+
+
+  // --- aio ---
+  struct AioCompletion {
+    Mutex lock;
+    Cond cond;
+    int ref, rval;
+    bool ack, safe;
+
+    // for read
+    bufferlist bl, *pbl;
+    char *buf;
+    unsigned maxlen;
+
+    AioCompletion() : lock("RadosClient::AioCompletion"),
+                     ref(1), rval(0), ack(false), safe(false), pbl(0), buf(0), maxlen(0) {}
+
+    int wait_for_complete() {
+      lock.Lock();
+      while (!ack)
+       cond.Wait(lock);
+      lock.Unlock();
+      return 0;
+    }
+    int wait_for_safe() {
+      lock.Lock();
+      while (!safe)
+       cond.Wait(lock);
+      lock.Unlock();
+      return 0;
+    }
+    int is_complete() {
+      lock.Lock();
+      int r = ack;
+      lock.Unlock();
+      return r;
+    }
+    int is_safe() {
+      lock.Lock();
+      int r = safe;
+      lock.Unlock();
+      return r;
+    }
+    int get_return_value() {
+      lock.Lock();
+      int r = rval;
+      lock.Unlock();
+      return r;
+    }
+    void put() {
+      lock.Lock();
+      int n = --ref;
+      lock.Unlock();
+      if (!n)
+       delete this;
+    }
+  };
+
+  struct C_aio_Ack : public Context {
+    AioCompletion *c;
+    void finish(int r) {
+      c->lock.Lock();
+      c->rval = r;
+      c->ack = true;
+      c->cond.Signal();
+
+      if (c->buf && c->bl.length() > 0) {
+       unsigned l = MIN(c->bl.length(), c->maxlen);
+       c->bl.copy(0, l, c->buf);
+       c->rval = c->bl.length();
+      }
+      if (c->pbl) {
+       *c->pbl = c->bl;
+      }
+
+      int n = --c->ref;
+      c->lock.Unlock();
+      if (!n)
+       delete c;
+    }
+    C_aio_Ack(AioCompletion *_c) : c(_c) {}
+  };
+  
+  struct C_aio_Safe : public Context {
+    AioCompletion *c;
+    void finish(int r) {
+      c->lock.Lock();
+      if (!c->ack) {
+       c->rval = r;
+       c->ack = true;
+      }
+      c->safe = true;
+      c->cond.Signal();
+      int n = --c->ref;
+      c->lock.Unlock();
+      if (!n)
+       delete c;
+    }
+    C_aio_Safe(AioCompletion *_c) : c(_c) {}
+  };
+
+  int aio_read(int pool, object_t oid, off_t off, bufferlist *pbl, size_t len,
+              AioCompletion **pc);
+  int aio_read(int pool, object_t oid, off_t off, char *buf, size_t len,
+              AioCompletion **pc);
+
+  int aio_write(int pool, object_t oid, off_t off, bufferlist& bl, size_t len,
+               AioCompletion **pc);
+
 };
 
 bool RadosClient::init()
@@ -210,6 +320,59 @@ int RadosClient::write(int pool, object_t& oid, off_t off, bufferlist& bl, size_
   return len;
 }
 
+int RadosClient::aio_read(int pool, object_t oid, off_t off, bufferlist *pbl, size_t len,
+                         AioCompletion **pc)
+{
+  AioCompletion *c = new AioCompletion;
+  Context *onack = new C_aio_Ack(c);
+
+  c->pbl = pbl;
+
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  objecter->read(oid, layout,
+                off, len, CEPH_NOSNAP, &c->bl, 0,
+                 onack);
+
+  *pc = c;
+  return 0;
+}
+int RadosClient::aio_read(int pool, object_t oid, off_t off, char *buf, size_t len,
+                         AioCompletion **pc)
+{
+  AioCompletion *c = new AioCompletion;
+  Context *onack = new C_aio_Ack(c);
+
+  c->buf = buf;
+  c->maxlen = len;
+
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  objecter->read(oid, layout,
+                off, len, CEPH_NOSNAP, &c->bl, 0,
+                 onack);
+
+  *pc = c;
+  return 0;
+}
+
+int RadosClient::aio_write(int pool, object_t oid, off_t off, bufferlist& bl, size_t len,
+                          AioCompletion **pc)
+{
+  SnapContext snapc;
+  utime_t ut = g_clock.now();
+
+  AioCompletion *c = new AioCompletion;
+  Context *onack = new C_aio_Ack(c);
+  Context *onsafe = new C_aio_Safe(c);
+
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  objecter->write(oid, layout,
+                 off, len, snapc, bl, ut, 0,
+                 onack, onsafe);
+
+  *pc = c;
+  return 0;
+}
+
 int RadosClient::remove(int pool, object_t& oid)
 {
   SnapContext snapc;
@@ -490,3 +653,55 @@ extern "C" int rados_exec(rados_pool_t pool, ceph_object *o, const char *cls, co
   return ret;
 }
 
+
+
+// -------------------------
+// aio
+
+extern "C" int rados_aio_wait_for_complete(rados_completion_t c)
+{
+  return ((RadosClient::AioCompletion *)c)->wait_for_complete();
+}
+
+extern "C" int rados_aio_wait_for_safe(rados_completion_t c)
+{
+  return ((RadosClient::AioCompletion *)c)->wait_for_safe();
+}
+
+extern "C" int rados_aio_is_complete(rados_completion_t c)
+{
+  return ((RadosClient::AioCompletion *)c)->is_complete();
+}
+
+extern "C" int rados_aio_is_safe(rados_completion_t c)
+{
+  return ((RadosClient::AioCompletion *)c)->is_safe();
+}
+
+extern "C" int rados_aio_get_return_value(rados_completion_t c)
+{
+  return ((RadosClient::AioCompletion *)c)->get_return_value();
+}
+
+extern "C" void rados_aio_free(rados_completion_t c)
+{
+  ((RadosClient::AioCompletion *)c)->put();
+}
+
+extern "C" int rados_aio_read(rados_pool_t pool, struct ceph_object *o,
+                              off_t off, char *buf, size_t len,
+                              rados_completion_t *completion)
+{
+  object_t oid(*o);
+  return radosp->aio_read(pool, oid, off, buf, len, (RadosClient::AioCompletion**)completion);
+}
+
+extern "C" int rados_aio_write(rados_pool_t pool, struct ceph_object *o,
+                              off_t off, const char *buf, size_t len,
+                              rados_completion_t *completion)
+{
+  object_t oid(*o);
+  bufferlist bl;
+  bl.append(buf, len);
+  return radosp->aio_write(pool, oid, off, bl, len, (RadosClient::AioCompletion**)completion);
+}