]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: refactor watch/notify; return notify error code
authorSage Weil <sage@redhat.com>
Wed, 13 Aug 2014 00:40:36 +0000 (17:40 -0700)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:34:19 +0000 (01:34 +0100)
Get rid of a level of intermediate classes with confusing names and put
the notify and notify finish logic in a single place so that it is easier
to follow and understand.

Pass the return value from the notify completion message to the caller.

Signed-off-by: Sage Weil <sage@redhat.com>
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/RadosClient.cc
src/librados/RadosClient.h
src/test/librados/watch_notify.cc

index d766c40dac2fc1940580723d6c9991bdac4581bb..d02a7de7c24e4bdbfc3c0d4f36569d85a555abff 100644 (file)
@@ -1013,7 +1013,8 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
 
   lock->Lock();
 
-  WatchContext *wc = new WatchContext(this, oid, ctx);
+  WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
+  wc->watch_ctx = ctx;
   client->register_watch_notify_callback(wc, cookie);
   prepare_assert_ops(&wr);
   wr.watch(*cookie, ver, 1);
@@ -1033,7 +1034,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
 
   if (r < 0) {
     lock->Lock();
-    client->unregister_watch_notify_callback(*cookie);
+    client->unregister_watch_notify_callback(*cookie); // destroys wc
     lock->Unlock();
   }
 
@@ -1051,7 +1052,6 @@ int librados::IoCtxImpl::_notify_ack(
   prepare_assert_ops(&rd);
   rd.notify_ack(notify_id, ver, cookie);
   objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
-
   return 0;
 }
 
@@ -1097,13 +1097,16 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
   Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
   version_t objver;
   uint64_t cookie;
-  C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
 
   ::ObjectOperation rd;
   prepare_assert_ops(&rd);
 
   lock->Lock();
-  WatchContext *wc = new WatchContext(this, oid, ctx);
+  WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
+  wc->notify_done = &done_all;
+  wc->notify_lock = &mylock_all;
+  wc->notify_cond = &cond_all;
+  wc->notify_rval = &r;
   client->register_watch_notify_callback(wc, &cookie);
   uint32_t prot_ver = 1;
   uint32_t timeout = notify_timeout;
@@ -1120,19 +1123,18 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
     cond.Wait(mylock);
   mylock.Unlock();
 
-  mylock_all.Lock();
   if (r == 0) {
+    mylock_all.Lock();
     while (!done_all)
       cond_all.Wait(mylock_all);
+    mylock_all.Unlock();
   }
-  mylock_all.Unlock();
 
   lock->Lock();
-  client->unregister_watch_notify_callback(cookie);
+  client->unregister_watch_notify_callback(cookie);   // destroys wc
   lock->Unlock();
 
   set_sync_op_version(objver);
-  delete ctx;
 
   return r;
 }
@@ -1250,51 +1252,3 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r)
   c->put_unlock();
 }
 
-///////////////////////// C_NotifyComplete /////////////////////////////
-
-librados::IoCtxImpl::C_NotifyComplete::C_NotifyComplete(Mutex *_l,
-                                                       Cond *_c,
-                                                       bool *_d)
-  : lock(_l), cond(_c), done(_d)
-{
-  *done = false;
-}
-
-void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode,
-                                                  uint64_t ver,
-                                                  bufferlist& bl)
-{
-  lock->Lock();
-  *done = true;
-  cond->Signal();
-  lock->Unlock();
-}
-
-/////////////////////////// WatchContext ///////////////////////////////
-
-librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_,
-                                    const object_t& _oc,
-                                    librados::WatchCtx *_ctx)
-  : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0), cookie(0)
-{
-  io_ctx_impl->get();
-}
-
-librados::WatchContext::~WatchContext()
-{
-  io_ctx_impl->put();
-}
-
-void librados::WatchContext::notify(Mutex *client_lock,
-                                    uint8_t opcode,
-                                   uint64_t ver,
-                                   uint64_t notify_id,
-                                   bufferlist& payload)
-{
-  ctx->notify(opcode, ver, payload);
-  if (opcode != WATCH_NOTIFY_COMPLETE) {
-    client_lock->Lock();
-    io_ctx_impl->_notify_ack(oid, notify_id, ver, cookie);
-    client_lock->Unlock();
-  }
-}
index 21e64441a4dcc22db1e6a894bc57320b5db2fb45..4d79e7b16f9f57d20b65f949aaa79357bec6b1a9 100644 (file)
@@ -212,30 +212,51 @@ struct librados::IoCtxImpl {
   void set_assert_src_version(const object_t& oid, uint64_t ver);
   void set_notify_timeout(uint32_t timeout);
 
-  struct C_NotifyComplete : public librados::WatchCtx {
-    Mutex *lock;
-    Cond *cond;
-    bool *done;
-
-    C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d);
-    void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
-  };
 };
 
 namespace librados {
-struct WatchContext : public RefCountedWaitObject {
-  IoCtxImpl *io_ctx_impl;
-  const object_t oid;
-  librados::WatchCtx *ctx;
-  uint64_t linger_id;
-  uint64_t cookie;
-
-  WatchContext(IoCtxImpl *io_ctx_impl_,
-              const object_t& _oc,
-              librados::WatchCtx *_ctx);
-  ~WatchContext();
+
+  /**
+   * watch/notify info
+   *
+   * Capture state about a watch or an in-progress notify
+   */
+struct WatchNotifyInfo : public RefCountedWaitObject {
+  IoCtxImpl *io_ctx_impl;  // parent
+  const object_t oid;      // the object
+  uint64_t linger_id;      // we use this to unlinger when we are done
+  uint64_t cookie;         // callback cookie
+
+  // watcher
+  librados::WatchCtx *watch_ctx;
+
+  // notify that we initiated
+  Mutex *notify_lock;
+  Cond *notify_cond;
+  bool *notify_done;
+  int *notify_rval;
+
+  WatchNotifyInfo(IoCtxImpl *io_ctx_impl_,
+                 const object_t& _oc)
+    : io_ctx_impl(io_ctx_impl_),
+      oid(_oc),
+      linger_id(0),
+      cookie(0),
+      watch_ctx(NULL),
+      notify_lock(NULL),
+      notify_cond(NULL),
+      notify_done(NULL),
+      notify_rval(NULL) {
+    io_ctx_impl->get();
+  }
+
+  ~WatchNotifyInfo() {
+    io_ctx_impl->put();
+  }
+
   void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id,
-             bufferlist& payload);
+             bufferlist& payload,
+             int return_code);
 };
 }
 #endif
index 956cc2c1428c544cf66c9af56a3ea1f4ed203118..faf8ea7a9c4c45957bfb0ac24eb4b3b0ba1198bc 100644 (file)
@@ -79,7 +79,7 @@ librados::RadosClient::RadosClient(CephContext *cct_)
     refcnt(1),
     log_last_version(0), log_cb(NULL), log_cb_arg(NULL),
     finisher(cct),
-    max_watch_cookie(0)
+    max_watch_notify_cookie(0)
 {
 }
 
@@ -607,70 +607,107 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti
   return r;
 }
 
+void librados::RadosClient::blacklist_self(bool set) {
+  Mutex::Locker l(lock);
+  objecter->blacklist_self(set);
+}
+
+
+// -----------
+// watch/notify
+
 void librados::RadosClient::register_watch_notify_callback(
-  WatchContext *wc,
+  WatchNotifyInfo *wc,
   uint64_t *cookie)
 {
   assert(lock.is_locked_by_me());
-  wc->cookie = *cookie = ++max_watch_cookie;
-  watchers[wc->cookie] = wc;
+  wc->cookie = *cookie = ++max_watch_notify_cookie;
+  ldout(cct,10) << __func__ << " cookie " << wc->cookie << dendl;
+  watch_notify_info[wc->cookie] = wc;
 }
 
 void librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie)
 {
+  ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
   assert(lock.is_locked_by_me());
-  map<uint64_t, WatchContext *>::iterator iter = watchers.find(cookie);
-  if (iter != watchers.end()) {
-    WatchContext *ctx = iter->second;
+  map<uint64_t, WatchNotifyInfo *>::iterator iter =
+    watch_notify_info.find(cookie);
+  if (iter != watch_notify_info.end()) {
+    WatchNotifyInfo *ctx = iter->second;
     if (ctx->linger_id)
       objecter->unregister_linger(ctx->linger_id);
 
-    watchers.erase(iter);
+    watch_notify_info.erase(iter);
     lock.Unlock();
-    ldout(cct, 10) << __func__ << " dropping reference, waiting ctx=" << (void *)ctx << dendl;
+    ldout(cct, 10) << __func__ << " dropping reference, waiting ctx="
+                  << (void *)ctx << dendl;
     ctx->put_wait();
     ldout(cct, 10) << __func__ << " done ctx=" << (void *)ctx << dendl;
     lock.Lock();
   }
 }
 
-void librados::RadosClient::blacklist_self(bool set) {
-  Mutex::Locker l(lock);
-  objecter->blacklist_self(set);
-}
-
-class C_WatchNotify : public Context {
-  librados::WatchContext *ctx;
-  Mutex *client_lock;
-  uint8_t opcode;
-  uint64_t ver;
-  uint64_t notify_id;
-  bufferlist bl;
-
-public:
-  C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock,
-                uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) : 
-                ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {}
-
+struct C_DoWatchNotify : public Context {
+  librados::RadosClient *rados;
+  MWatchNotify *m;
+  C_DoWatchNotify(librados::RadosClient *r, MWatchNotify *m) : rados(r), m(m) {}
   void finish(int r) {
-    ctx->notify(client_lock, opcode, ver, notify_id, bl);
-    ctx->put();
+    rados->do_watch_notify(m);
   }
 };
 
 void librados::RadosClient::handle_watch_notify(MWatchNotify *m)
 {
   Mutex::Locker l(lock);
-  map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie);
-  if (iter != watchers.end()) {
-    WatchContext *wc = iter->second;
+
+  if (watch_notify_info.count(m->cookie)) {
+    ldout(cct,10) << __func__ << " queueing async " << *m << dendl;
+    // deliver this async via a finisher thread
+    finisher.queue(new C_DoWatchNotify(this, m));
+  } else {
+    // drop it on the floor
+    ldout(cct,10) << __func__ << " cookie " << m->cookie << " unknown" << dendl;
+    m->put();
+  }
+}
+
+void librados::RadosClient::do_watch_notify(MWatchNotify *m)
+{
+  Mutex::Locker l(lock);
+  map<uint64_t, WatchNotifyInfo *>::iterator iter =
+    watch_notify_info.find(m->cookie);
+  if (iter != watch_notify_info.end()) {
+    WatchNotifyInfo *wc = iter->second;
     assert(wc);
-    wc->get();
-    finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl));
+    if (wc->notify_lock) {
+      // we sent a notify and it completed (or failed)
+      ldout(cct,10) << __func__ << " completed notify " << *m << dendl;
+      wc->notify_lock->Lock();
+      *wc->notify_done = true;
+      *wc->notify_rval = m->return_code;
+      wc->notify_cond->Signal();
+      wc->notify_lock->Unlock();
+    } else {
+      // we are watcher and got a notify
+      ldout(cct,10) << __func__ << " got notify " << *m << dendl;
+      wc->get();
+
+      // trigger the callback
+      lock.Unlock();
+      wc->watch_ctx->notify(m->opcode, m->ver, m->bl);
+      lock.Lock();
+
+      // send ACK back to the OSD
+      wc->io_ctx_impl->_notify_ack(wc->oid, m->notify_id, m->ver, m->cookie);
+
+      ldout(cct,10) << __func__ << " notify done" << dendl;
+      wc->put();
+    }
   }
   m->put();
 }
 
+
 int librados::RadosClient::mon_command(const vector<string>& cmd,
                                       const bufferlist &inbl,
                                       bufferlist *outbl, string *outs)
index b2e44cadaa11cd67e6e39e0008b65ea037c0699f..9a394b3d5b70313bbe16f13e0077a742238056c1 100755 (executable)
@@ -107,13 +107,15 @@ public:
   int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);
 
   // watch/notify
-  uint64_t max_watch_cookie;
-  map<uint64_t, librados::WatchContext *> watchers;
+  uint64_t max_watch_notify_cookie;
+  map<uint64_t, librados::WatchNotifyInfo *> watch_notify_info;
 
-  void register_watch_notify_callback(librados::WatchContext *wc,
+  void register_watch_notify_callback(librados::WatchNotifyInfo *wc,
                                      uint64_t *cookie);
   void unregister_watch_notify_callback(uint64_t cookie);
   void handle_watch_notify(MWatchNotify *m);
+  void do_watch_notify(MWatchNotify *m);
+
   int mon_command(const vector<string>& cmd, const bufferlist &inbl,
                  bufferlist *outbl, string *outs);
   int mon_command(int rank,
index d92c1b8ebe6e31463ec18bfe4f379dbcd4cfe946..41ca711e6d2c8766f159e7d88d2f40033fe243c3 100644 (file)
@@ -19,6 +19,7 @@ static sem_t sem;
 
 static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg)
 {
+  std::cout << __func__ << std::endl;
   sem_post(&sem);
 }
 
@@ -27,6 +28,7 @@ class WatchNotifyTestCtx : public WatchCtx
 public:
     void notify(uint8_t opcode, uint64_t ver, bufferlist& bl)
     {
+      std::cout << __func__ << std::endl;
       sem_post(&sem);
     }
 };