]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: do aio callbacks in async thread
authorSage Weil <sage.weil@dreamhost.com>
Thu, 5 Apr 2012 22:55:50 +0000 (15:55 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Sat, 14 Apr 2012 03:46:34 +0000 (20:46 -0700)
Call user completions in an async thread.  This allows callers to call back
into librados from the callback, and allows them to take locks in their
callbacks that they hold when queuing requests (making their life much
easier).

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/librados/AioCompletionImpl.h
src/librados/IoCtxImpl.cc
src/librados/RadosClient.cc
src/librados/RadosClient.h

index 02be2f3984d98674800e1c14f0f9c6a4f70d0c69..a6fda7d08c4ad8cd2995e27a63a582ba4ae149d0 100644 (file)
@@ -130,4 +130,37 @@ struct librados::AioCompletionImpl {
   }
 };
 
+namespace librados {
+struct C_AioComplete : public Context {
+  AioCompletionImpl *c;
+
+  C_AioComplete(AioCompletionImpl *cc) : c(cc) {
+    c->ref++;
+  }
+
+  void finish(int r) {
+    rados_callback_t cb = c->callback_complete;
+    void *cb_arg = c->callback_arg;
+    cb(c, cb_arg);
+    c->put();
+  }
+};
+
+struct C_AioSafe : public Context {
+  AioCompletionImpl *c;
+
+  C_AioSafe(AioCompletionImpl *cc) : c(cc) {
+    c->ref++;
+  }
+
+  void finish(int r) {
+    rados_callback_t cb = c->callback_safe;
+    void *cb_arg = c->callback_arg;
+    cb(c, cb_arg);
+    c->put();
+  }
+};
+
+}
+
 #endif
index 66ae5644f61547d45de6b0c3d8748762c8ec4d83..f5dbd2b08fa2bdba888d7a4e40258543557be376 100644 (file)
@@ -1500,11 +1500,7 @@ void librados::IoCtxImpl::C_aio_Ack::finish(int r)
   }
 
   if (c->callback_complete) {
-    rados_callback_t cb = c->callback_complete;
-    void *cb_arg = c->callback_arg;
-    c->lock.Unlock();
-    cb(c, cb_arg);
-    c->lock.Lock();
+    c->io->client->finisher.queue(new C_AioComplete(c));
   }
 
   c->put_unlock();
@@ -1532,11 +1528,7 @@ void librados::IoCtxImpl::C_aio_sparse_read_Ack::finish(int r)
   }
 
   if (c->callback_complete) {
-    rados_callback_t cb = c->callback_complete;
-    void *cb_arg = c->callback_arg;
-    c->lock.Unlock();
-    cb(c, cb_arg);
-    c->lock.Lock();
+    c->io->client->finisher.queue(new C_AioComplete(c));
   }
 
   c->put_unlock();
@@ -1560,11 +1552,7 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r)
   c->cond.Signal();
 
   if (c->callback_safe) {
-    rados_callback_t cb = c->callback_safe;
-    void *cb_arg = c->callback_arg;
-    c->lock.Unlock();
-    cb(c, cb_arg);
-    c->lock.Lock();
+    c->io->client->finisher.queue(new C_AioSafe(c));
   }
 
   c->io->complete_aio_write(c);
index 67d882339f235e6eb20c87a57c32e241e70e7c47..cc29c60f5c84d2a3d731ab177e60572001e57ca9 100644 (file)
@@ -53,16 +53,18 @@ bool librados::RadosClient::ms_get_authorizer(int dest_type,
   return *authorizer != NULL;
 }
 
-librados::RadosClient::RadosClient(CephContext *cct_) : Dispatcher(cct_),
-                                                       cct(cct_),
-                                                       conf(cct_->_conf),
-                                                       state(DISCONNECTED),
-                                                       monclient(cct_),
-                                                       messenger(NULL),
-                                                       objecter(NULL),
-                                                       lock("radosclient"),
-                                                       timer(cct, lock),
-                                                       max_watch_cookie(0)
+librados::RadosClient::RadosClient(CephContext *cct_)
+  : Dispatcher(cct_),
+    cct(cct_),
+    conf(cct_->_conf),
+    state(DISCONNECTED),
+    monclient(cct_),
+    messenger(NULL),
+    objecter(NULL),
+    lock("radosclient"),
+    timer(cct, lock),
+    finisher(cct),
+    max_watch_cookie(0)
 {
 }
 
@@ -164,7 +166,11 @@ int librados::RadosClient::connect()
     ldout(cct, 1) << "waiting for osdmap" << dendl;
     cond.Wait(lock);
   }
+
+  finisher.start();
+
   state = CONNECTED;
+
   lock.Unlock();
 
   ldout(cct, 1) << "init done" << dendl;
@@ -183,6 +189,9 @@ void librados::RadosClient::shutdown()
     lock.Unlock();
     return;
   }
+  if (state == CONNECTED) {
+    finisher.stop();
+  }
   monclient.shutdown();
   if (objecter && state == CONNECTED)
     objecter->shutdown();
index 9f92458da121556b827badc57aeb9a1e4cb357e7..0e5d7c455bc199b679de947c491744e811216e87 100644 (file)
@@ -64,6 +64,7 @@ private:
   SafeTimer timer;
 
 public:
+  Finisher finisher;
 
   RadosClient(CephContext *cct_);
   ~RadosClient();