]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: call notification under different thread context
authorYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 24 Apr 2012 22:51:14 +0000 (15:51 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 25 Apr 2012 21:14:29 +0000 (14:14 -0700)
This fixes #2342. We shouldn't call notify on the dispatcher
context. We should also make sure that we don't hold
the client lock while waiting for the responses.
Also, pushed the client_lock locking into the
ctx->notify().

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/RadosClient.cc
src/msg/Message.h

index 83faef7d322f10be5e099a2cf251a991568b09ba..d90d204adfd5148a3c1ca6aa8506807b6c60b7d7 100644 (file)
@@ -1594,13 +1594,16 @@ librados::WatchContext::~WatchContext()
   io_ctx_impl->put();
 }
 
-void librados::WatchContext::notify(uint8_t opcode,
+void librados::WatchContext::notify(Mutex *client_lock,
+                                    uint8_t opcode,
                                    uint64_t ver,
                                    uint64_t notify_id,
                                    bufferlist& payload)
 {
   ctx->notify(opcode, ver, payload);
   if (opcode != WATCH_NOTIFY_COMPLETE) {
+    client_lock->Lock();
     io_ctx_impl->_notify_ack(oid, notify_id, ver);
+    client_lock->Unlock();
   }
 }
index d0e99d2a71be478b14c9fc1e40d18dad7f91b451..f2372ff745bd4105490d88767bfc0c4e550888e0 100644 (file)
@@ -200,7 +200,7 @@ struct librados::IoCtxImpl {
 };
 
 namespace librados {
-struct WatchContext {
+struct WatchContext : public RefCountedWaitObject {
   IoCtxImpl *io_ctx_impl;
   const object_t oid;
   uint64_t cookie;
@@ -212,7 +212,7 @@ struct WatchContext {
               const object_t& _oc,
               librados::WatchCtx *_ctx);
   ~WatchContext();
-  void notify(uint8_t opcode, uint64_t ver, uint64_t notify_id,
+  void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id,
              bufferlist& payload);
 };
 }
index 755a9b7e8245c0bb11b6d71bb460b5af97bcb08f..8016199dc01c282405da7468c795d39daeff1bb2 100644 (file)
@@ -447,11 +447,36 @@ void librados::RadosClient::unregister_watcher(uint64_t cookie)
     WatchContext *ctx = iter->second;
     if (ctx->linger_id)
       objecter->unregister_linger(ctx->linger_id);
-    delete ctx;
+
     watchers.erase(iter);
+    lock.Unlock();
+    ldout(cct, 10) << "unregister_watcher, dropping reference, waiting ctx=" << (void *)ctx << dendl;
+    ctx->put_wait();
+    ldout(cct, 10) << "unregister_watcher, done ctx=" << (void *)ctx << dendl;
+    lock.Lock();
   }
 }
 
+
+class C_WatchNotify : public Context {
+  librados::WatchContext *ctx;
+  Mutex *client_lock;
+  uint8_t opcode;
+  uint64_t ver;
+  uint64_t notify_id;
+  bufferlist bl;
+
+public:
+  C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock,
+                uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) : 
+                ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {}
+
+  void finish(int r) {
+    ctx->notify(client_lock, opcode, ver, notify_id, bl);
+    ctx->put();
+  }
+};
+
 void librados::RadosClient::watch_notify(MWatchNotify *m)
 {
   assert(lock.is_locked());
@@ -463,7 +488,7 @@ void librados::RadosClient::watch_notify(MWatchNotify *m)
   if (!wc)
     return;
 
-  wc->notify(m->opcode, m->ver, m->notify_id, m->bl);
-
+  wc->get();
+  finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl));
   m->put();
 }
index 476b93f407926cfe0065d11fe1d80dc276ee3649..865ea268b917b9ab14b7521c91871234bff7d242 100644 (file)
@@ -168,6 +168,69 @@ struct RefCountedObject {
   }
 };
 
+struct RefCountedCond : public RefCountedObject {
+  Mutex lock;
+  Cond cond;
+  atomic_t complete;
+
+  RefCountedCond() : lock("RefCountedCond") {}
+
+  void wait() {
+    Mutex::Locker l(lock);
+    while (!complete.read())
+      cond.Wait(lock);
+  }
+
+  void done() {
+    Mutex::Locker l(lock);
+    cond.SignalAll();
+    complete.set(1);
+  }
+};
+
+struct RefCountedWaitObject {
+  atomic_t nref;
+  RefCountedCond *c;
+
+  RefCountedWaitObject() : nref(1) {
+    c = new RefCountedCond;
+  }
+  virtual ~RefCountedWaitObject() {
+    c->put();
+  }
+
+  RefCountedWaitObject *get() {
+    nref.inc();
+    return this;
+  }
+
+  bool put() {
+    bool ret = false;
+    RefCountedCond *cond = c;
+    cond->get();
+    if (nref.dec() == 0) {
+      cond->done();
+      delete this;
+      ret = true;
+    }
+    cond->put();
+    return ret;
+  }
+
+  void put_wait() {
+    RefCountedCond *cond = c;
+
+    cond->get();
+    if (nref.dec() == 0) {
+      cond->done();
+      delete this;
+    } else {
+      cond->wait();
+    }
+    cond->put();
+  }
+};
+
 struct Connection : public RefCountedObject {
   Mutex lock;
   RefCountedObject *priv;