]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: move watch/notify into the Objecter
authorSage Weil <sage@redhat.com>
Sun, 16 Nov 2014 21:44:08 +0000 (13:44 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:34:05 +0000 (10:34 -0800)
Several things here:
 - we move all of the junk from librados' RadosClient into Objecter
 - we use fast-dispatch to schedule the watch-notify events (this will
   keep them ordered wrt ping)
 - we use the LingerOp * as the librados-exposed handle so that we can
   avoid any lookups for watch-check.

Signed-off-by: Sage Weil <sage@redhat.com>
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/RadosClient.cc
src/librados/RadosClient.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 08d71d049cd7c0b04e80425e39bc322193b7256e..9ebd545c6532539b6a96ca005e5842f3faf708c0 100644 (file)
@@ -1032,46 +1032,89 @@ void librados::IoCtxImpl::set_sync_op_version(version_t ver)
   last_objver = ver;
 }
 
+struct WatchInfo : public Objecter::WatchContext {
+  librados::IoCtxImpl *ioctx;
+  uint64_t user_cookie;
+  object_t oid;
+  librados::WatchCtx *ctx;
+  librados::WatchCtx2 *ctx2;
+
+  WatchInfo(librados::IoCtxImpl *io, uint64_t uc, object_t o,
+           librados::WatchCtx *c, librados::WatchCtx2 *c2)
+    : ioctx(io), user_cookie(uc), oid(o), ctx(c), ctx2(c2) {}
+
+  void handle_notify(uint64_t notify_id,
+                    uint64_t cookie,
+                    uint64_t notifier_id,
+                    bufferlist& bl) {
+    ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
+                                 << " cookie " << user_cookie
+                                 << " notifier_id " << notifier_id
+                                 << " len " << bl.length()
+                                 << dendl;
+
+    if (ctx2)
+      ctx2->handle_notify(notify_id, user_cookie, notifier_id, bl);
+    if (ctx) {
+      ctx->notify(0, 0, bl);
+
+      // send ACK back to OSD if using legacy protocol
+      bufferlist empty;
+      ioctx->notify_ack(oid, notify_id, user_cookie, empty);
+    }
+  }
+  void handle_failed_notify(uint64_t notify_id,
+                           uint64_t cookie,
+                           uint64_t notifier_id) {
+    ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
+                                 << " cookie " << user_cookie
+                                 << " notifier_id " << notifier_id
+                                 << dendl;
+    if (ctx2)
+      ctx2->handle_failed_notify(notify_id, user_cookie, notifier_id);
+  }
+  void handle_error(uint64_t cookie, int err) {
+    ldout(ioctx->client->cct, 10) << __func__ << " cookie " << user_cookie
+                                 << " err " << err
+                                 << dendl;
+    if (ctx2)
+      ctx2->handle_error(user_cookie, err);
+  }
+};
+
 int librados::IoCtxImpl::watch(const object_t& oid,
-                              uint64_t *cookie,
+                              uint64_t *handle,
                               librados::WatchCtx *ctx,
                               librados::WatchCtx2 *ctx2)
 {
   ::ObjectOperation wr;
-  Mutex mylock("IoCtxImpl::watch::mylock");
-  Cond cond;
-  bool done;
-  int r;
-  Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
   version_t objver;
+  C_SaferCond onfinish;
 
   lock->Lock();
 
-  WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
-  wc->watch_ctx = ctx;
-  wc->watch_ctx2 = ctx2;
-  wc->linger_op = objecter->linger_register(oid, oloc, 0, cookie);
-  client->register_watch_notify_callback(wc, *cookie);
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+  *handle = reinterpret_cast<uint64_t>(linger_op);
+  linger_op->watch_context = new WatchInfo(this,
+                                          reinterpret_cast<uint64_t>(linger_op),
+                                          oid, ctx, ctx2);
+
   prepare_assert_ops(&wr);
-  wr.watch(*cookie, CEPH_OSD_WATCH_OP_WATCH);
+  wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_WATCH);
   bufferlist bl;
-  objecter->linger_watch(wc->linger_op, wr,
+  objecter->linger_watch(linger_op, wr,
                         snapc, ceph_clock_now(NULL), bl,
-                        *cookie,
-                        NULL, onfinish, &wc->on_error,
+                        NULL, &onfinish,
                         &objver);
   lock->Unlock();
 
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
+  int r = onfinish.wait();
 
   set_sync_op_version(objver);
 
   if (r < 0) {
     lock->Lock();
-    client->unregister_watch_notify_callback(*cookie, NULL); // destroys wc
+    objecter->linger_cancel(linger_op);
     lock->Unlock();
   }
 
@@ -1087,8 +1130,9 @@ int librados::IoCtxImpl::notify_ack(
 {
   Mutex::Locker l(*lock);
   ::ObjectOperation rd;
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
   prepare_assert_ops(&rd);
-  rd.notify_ack(notify_id, cookie, bl);
+  rd.notify_ack(notify_id, linger_op->linger_id, bl);
   objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
   return 0;
 }
@@ -1096,41 +1140,28 @@ int librados::IoCtxImpl::notify_ack(
 int librados::IoCtxImpl::watch_check(uint64_t cookie)
 {
   Mutex::Locker l(*lock);
-  return client->watch_check(cookie);
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+  return objecter->linger_check(linger_op);
 }
 
 int librados::IoCtxImpl::unwatch(uint64_t cookie)
 {
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
   bufferlist inbl, outbl;
+  C_SaferCond onfinish;
+  version_t ver = 0;
 
-  Mutex mylock("IoCtxImpl::unwatch::mylock");
-  Cond cond;
-  bool done;
-  int r;
-  Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
-  version_t ver;
   lock->Lock();
-
-  object_t oid;
-  r = client->unregister_watch_notify_callback(cookie, &oid);
-  if (r < 0) {
-    lock->Unlock();
-    return r;
-  }
-
   ::ObjectOperation wr;
   prepare_assert_ops(&wr);
-  wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
-  objecter->mutate(oid, oloc, wr, snapc, ceph_clock_now(client->cct), 0, NULL, oncommit, &ver);
+  wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_UNWATCH);
+  objecter->mutate(linger_op->target.base_oid, oloc, wr,
+                  snapc, ceph_clock_now(client->cct), 0, NULL, &onfinish, &ver);
+  objecter->linger_cancel(linger_op);
   lock->Unlock();
 
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-
+  int r = onfinish.wait();
   set_sync_op_version(ver);
-
   return r;
 }
 
@@ -1139,28 +1170,42 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
                                bufferlist *preply_bl,
                                char **preply_buf, size_t *preply_buf_len)
 {
-  bufferlist inbl, outbl;
-
-  // Construct WatchNotifyInfo
-  Cond cond_all;
-  Mutex mylock_all("IoCtxImpl::notify::mylock_all");
-  bool done_all = false;
-  int r_notify = 0;
-  WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
-  wc->notify_done = &done_all;
-  wc->notify_lock = &mylock_all;
-  wc->notify_cond = &cond_all;
-  wc->notify_rval = &r_notify;
-  wc->notify_reply_bl = preply_bl;
-  wc->notify_reply_buf = preply_buf;
-  wc->notify_reply_buf_len = preply_buf_len;
+  bufferlist inbl;
+
+  struct C_NotifyFinish : public Context {
+    Cond cond;
+    Mutex lock;
+    bool done;
+    int result;
+    bufferlist reply_bl;
+
+    C_NotifyFinish()
+      : lock("IoCtxImpl::notify::C_NotifyFinish::lock"),
+       done(false),
+       result(0) { }
+
+    void finish(int r) {}
+    void complete(int r) {
+      lock.Lock();
+      done = true;
+      result = r;
+      cond.Signal();
+      lock.Unlock();
+    }
+    void wait() {
+      lock.Lock();
+      while (!done)
+       cond.Wait(lock);
+      lock.Unlock();
+    }
+  } notify_private;
 
   lock->Lock();
 
-  // Acquire cookie
-  uint64_t cookie;
-  wc->linger_op = objecter->linger_register(oid, oloc, 0, &cookie);
-  client->register_watch_notify_callback(wc, cookie);
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+  linger_op->on_notify_finish = &notify_private;
+  linger_op->notify_result_bl = &notify_private.reply_bl;
+
   uint32_t prot_ver = 1;
   uint32_t timeout = notify_timeout;
   if (timeout_ms)
@@ -1172,36 +1217,49 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
   // Construct RADOS op
   ::ObjectOperation rd;
   prepare_assert_ops(&rd);
-  rd.notify(cookie, inbl);
+  rd.notify(linger_op->linger_id, inbl);
 
   // Issue RADOS op
   C_SaferCond onack;
   version_t objver;
-  objecter->linger_notify(wc->linger_op,
+  objecter->linger_notify(linger_op,
                          rd, snap_seq, inbl, NULL,
                          &onack, &objver);
   lock->Unlock();
 
-  ldout(client->cct, 10) << __func__ << " issued linger op " << wc->linger_op << dendl;
+  ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
   int r_issue = onack.wait();
-  ldout(client->cct, 10) << __func__ << " linger op " << wc->linger_op << " acked (" << r_issue << ")" << dendl;
+  ldout(client->cct, 10) << __func__ << " linger op " << linger_op
+                        << " acked (" << r_issue << ")" << dendl;
 
   if (r_issue == 0) {
-  ldout(client->cct, 10) << __func__ << "waiting for watch_notify message for linger op " << wc->linger_op << dendl;
-    mylock_all.Lock();
-    while (!done_all)
-      cond_all.Wait(mylock_all);
-    mylock_all.Unlock();
+    ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
+                          << linger_op << dendl;
+    notify_private.wait();
+
+    // pass result back to user
+    if (notify_private.result >= 0) {
+      if (preply_buf) {
+       *preply_buf = (char*)malloc(notify_private.reply_bl.length());
+       memcpy(*preply_buf, notify_private.reply_bl.c_str(),
+              notify_private.reply_bl.length());
+      }
+      if (preply_buf_len)
+       *preply_buf_len = notify_private.reply_bl.length();
+      if (preply_bl)
+       preply_bl->claim(notify_private.reply_bl);
+    }
   }
-  ldout(client->cct, 10) << __func__ << " completed notify (linger op " << wc->linger_op << "), unregistering" << dendl;
+  ldout(client->cct, 10) << __func__ << " completed notify (linger op "
+                        << linger_op << "), unregistering" << dendl;
 
   lock->Lock();
-  client->unregister_watch_notify_callback(cookie, NULL);   // destroys wc
+  objecter->linger_cancel(linger_op);
   lock->Unlock();
 
   set_sync_op_version(objver);
 
-  return r_issue == 0 ? r_notify : r_issue;
+  return r_issue ? r_issue : notify_private.result;
 }
 
 int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
index 3ac48d217e259005ec0518778f9c000bad1436ed..a5c0874e2af48f12e8827fd47e707e384652c423 100644 (file)
@@ -217,62 +217,4 @@ struct librados::IoCtxImpl {
 
 };
 
-namespace librados {
-
-/**
- * watch/notify info
- *
- * Capture state about a watch or an in-progress notify
- */
-struct WatchNotifyInfo : public RefCountedWaitObject {
-  IoCtxImpl *io_ctx_impl;  // parent
-  const object_t oid;      // the object
-  class Objecter::LingerOp *linger_op;     // we use this to unlinger when we are done
-  uint64_t cookie;         // callback cookie
-  int err;
-
-  // watcher.  only one of these will be defined.
-  librados::WatchCtx *watch_ctx;
-  librados::WatchCtx2 *watch_ctx2;
-
-  // notify that we initiated
-  Mutex *notify_lock;
-  Cond *notify_cond;
-  bool *notify_done;
-  bufferlist *notify_reply_bl;
-  char **notify_reply_buf;
-  size_t *notify_reply_buf_len;
-  int *notify_rval;
-
-  struct OnError : public Context {
-    WatchNotifyInfo *info;
-    void finish(int r) { assert(0); }
-    void complete(int r);
-  } on_error;
-
-  WatchNotifyInfo(IoCtxImpl *io_ctx_impl_,
-                 const object_t& _oc)
-    : io_ctx_impl(io_ctx_impl_),
-      oid(_oc),
-      linger_op(NULL),
-      cookie(0),
-      err(0),
-      watch_ctx(NULL),
-      watch_ctx2(NULL),
-      notify_lock(NULL),
-      notify_cond(NULL),
-      notify_done(NULL),
-      notify_reply_bl(NULL),
-      notify_reply_buf(NULL),
-      notify_reply_buf_len(NULL),
-      notify_rval(NULL) {
-    io_ctx_impl->get();
-    on_error.info = this;
-  }
-
-  ~WatchNotifyInfo() {
-    io_ctx_impl->put();
-  }
-};
-}
 #endif
index a6b292e516aed73c03b8a21c3fe8eb73690475b4..ae06368b1fad5bb13084ac9cc1f2f7a00a88fb81 100644 (file)
@@ -28,7 +28,6 @@
 #include "include/buffer.h"
 #include "include/stringify.h"
 
-#include "messages/MWatchNotify.h"
 #include "messages/MLog.h"
 #include "msg/Messenger.h"
 
@@ -386,10 +385,6 @@ bool librados::RadosClient::_dispatch(Message *m)
   case CEPH_MSG_MDS_MAP:
     break;
 
-  case CEPH_MSG_WATCH_NOTIFY:
-    handle_watch_notify(static_cast<MWatchNotify *>(m));
-    break;
-
   case MSG_LOG:
     handle_log(static_cast<MLog *>(m));
     break;
@@ -639,219 +634,6 @@ void librados::RadosClient::blacklist_self(bool set) {
   objecter->blacklist_self(set);
 }
 
-
-// -----------
-// watch/notify
-
-void librados::RadosClient::register_watch_notify_callback(
-  WatchNotifyInfo *wc,
-  uint64_t cookie)
-{
-  assert(lock.is_locked_by_me());
-  wc->cookie = cookie;
-  ldout(cct,10) << __func__ << " cookie " << wc->cookie << dendl;
-  watch_notify_info[wc->cookie] = wc;
-}
-
-int librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie,
-                                                           object_t *poid)
-{
-  ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
-  assert(lock.is_locked_by_me());
-  map<uint64_t, WatchNotifyInfo *>::iterator iter = watch_notify_info.find(cookie);
-  if (iter == watch_notify_info.end())
-    return -EBADF;
-
-  WatchNotifyInfo *ctx = iter->second;
-  if (poid)
-    *poid = ctx->oid;
-  if (ctx->linger_op) {
-    objecter->linger_cancel(ctx->linger_op);
-    ctx->linger_op = NULL;
-  }
-
-  watch_notify_info.erase(iter);
-  lock.Unlock();
-  ldout(cct, 10) << __func__ << " dropping reference, waiting ctx="
-                << (void *)ctx << dendl;
-  ctx->put_wait();
-  ldout(cct, 10) << __func__ << " done ctx=" << (void *)ctx << dendl;
-  lock.Lock();
-  return 0;
-}
-
-int librados::RadosClient::watch_check(uint64_t cookie)
-{
-  ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
-  map<uint64_t, WatchNotifyInfo *>::iterator iter = watch_notify_info.find(cookie);
-  if (iter == watch_notify_info.end())
-    return -EBADF;
-  WatchNotifyInfo *ctx = iter->second;
-  if (ctx->err)
-    return ctx->err;
-  return objecter->linger_check(ctx->linger_op);
-}
-
-struct C_DoWatchNotify : public Context {
-  librados::RadosClient *rados;
-  MWatchNotify *m;
-  C_DoWatchNotify(librados::RadosClient *r, MWatchNotify *m) : rados(r), m(m) {}
-  void finish(int r) {
-    rados->do_watch_notify(m);
-  }
-};
-
-struct C_DoWatchError : public Context {
-  librados::RadosClient *rados;
-  uint64_t cookie;
-  int err;
-  C_DoWatchError(librados::RadosClient *r, uint64_t cookie, int err)
-    : rados(r), cookie(cookie), err(err) {}
-  void finish(int r) {
-    rados->do_watch_error(cookie, err);
-  }
-};
-
-void librados::WatchNotifyInfo::OnError::complete(int r)
-{
-  RadosClient *client = info->io_ctx_impl->client;
-  client->finisher.queue(new C_DoWatchError(client, info->cookie, r));
-}
-
-void librados::RadosClient::do_watch_error(uint64_t cookie, int err)
-{
-  Mutex::Locker l(lock);
-  map<uint64_t, WatchNotifyInfo *>::iterator iter =
-    watch_notify_info.find(cookie);
-  if (iter != watch_notify_info.end()) {
-    WatchNotifyInfo *wc = iter->second;
-    assert(wc);
-    wc->err = err;
-    if (wc->watch_ctx2) {
-      wc->get();
-      ldout(cct,10) << __func__ << " cookie " << cookie
-                   << " handle_error " << err << dendl;
-      lock.Unlock();
-      wc->watch_ctx2->handle_error(cookie, err);
-      lock.Lock();
-      ldout(cct,10) << __func__ << " cookie " << cookie
-                   << " handle_error " << err << " done" << dendl;
-      wc->put();
-    }
-  } else {
-    ldout(cct,10) << __func__ << " cookie " << cookie << " not found" << dendl;
-  }
-}
-
-void librados::RadosClient::handle_watch_notify(MWatchNotify *m)
-{
-  Mutex::Locker l(lock);
-
-  if (watch_notify_info.count(m->cookie)) {
-    ldout(cct,10) << __func__ << " queueing async " << *m << dendl;
-    // deliver this async via a finisher thread
-    finisher.queue(new C_DoWatchNotify(this, m));
-  } else {
-    // drop it on the floor
-    ldout(cct,10) << __func__ << " cookie " << m->cookie << " unknown" << dendl;
-    m->put();
-  }
-}
-
-void librados::RadosClient::do_watch_notify(MWatchNotify *m)
-{
-  Mutex::Locker l(lock);
-  map<uint64_t, WatchNotifyInfo *>::iterator iter =
-    watch_notify_info.find(m->cookie);
-  if (iter != watch_notify_info.end()) {
-    WatchNotifyInfo *wc = iter->second;
-    assert(wc);
-    if (wc->notify_lock) {
-      // we sent a notify and it completed (or failed)
-      // NOTE: opcode may be either NOTIFY (older OSDs) or NOTIFY_COMPLETE
-      // (newer OSDs).  In practice it doesn't matter because completion is the
-      // only kind of event we get on notify cookies.
-      ldout(cct,10) << __func__ << " completed notify " << *m << dendl;
-      wc->notify_lock->Lock();
-      *wc->notify_done = true;
-      *wc->notify_rval = m->return_code;
-      if (wc->notify_reply_bl) {
-       wc->notify_reply_bl->claim(m->get_data());
-      }
-      if (wc->notify_reply_buf) {
-       *wc->notify_reply_buf = (char*)malloc(m->get_data().length());
-       memcpy(*wc->notify_reply_buf, m->get_data().c_str(),
-              m->get_data().length());
-      }
-      if (wc->notify_reply_buf_len) {
-       *wc->notify_reply_buf_len = m->get_data().length();
-      }
-      wc->notify_cond->Signal();
-      wc->notify_lock->Unlock();
-    } else if (m->opcode == CEPH_WATCH_EVENT_NOTIFY) {
-      // we are watcher and got a notify
-      ldout(cct,10) << __func__ << " got notify " << *m << dendl;
-      wc->get();
-
-      // trigger the callback
-      assert(!!wc->watch_ctx ^ !!wc->watch_ctx2);  // only one is defined
-      lock.Unlock();
-      if (wc->watch_ctx) {
-       wc->watch_ctx->notify(CEPH_WATCH_EVENT_NOTIFY, m->ver, m->bl);
-       // send ACK back to the OSD
-       bufferlist empty;
-       wc->io_ctx_impl->notify_ack(wc->oid, m->notify_id, m->cookie, empty);
-      } else if (wc->watch_ctx2) {
-       wc->watch_ctx2->handle_notify(m->notify_id, m->cookie,
-                                     m->notifier_gid, m->bl);
-       // user needs to explicitly ack (and may have already!)
-      }
-      lock.Lock();
-      ldout(cct,10) << __func__ << " notify done" << dendl;
-      wc->put();
-    } else if (m->opcode == CEPH_WATCH_EVENT_FAILED_NOTIFY) {
-      // we are watcher and failed to ack a notify in time, causing it to time
-      // out.
-      ldout(cct,10) << __func__ << " failed notify " << *m << dendl;
-      wc->get();
-      // trigger the callback
-      assert(!!wc->watch_ctx ^ !!wc->watch_ctx2);  // only one is defined
-      lock.Unlock();
-      if (wc->watch_ctx2) {
-       wc->watch_ctx2->handle_failed_notify(m->notify_id, m->cookie,
-                                            m->notifier_gid);
-      }
-      lock.Lock();
-      ldout(cct,10) << __func__ << " failed notify done" << dendl;
-      wc->put();
-    } else if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
-      // we failed to ping or reconnect and our watch was canceled.
-      ldout(cct,10) << __func__ << " disconnect " << *m << dendl;
-      wc->err = -ENOTCONN;
-      if (wc->watch_ctx2) {
-       wc->get();
-       // trigger the callback
-       lock.Unlock();
-       wc->watch_ctx2->handle_error(m->cookie, -ENOTCONN);
-       lock.Lock();
-       ldout(cct,10) << __func__ << " disconnect done" << dendl;
-       wc->put();
-      } else {
-       lderr(cct) << __func__ << " watch disconnect on "
-                  << wc->oid << " on old API user, silently ignoring"
-                  << dendl;
-      }
-    } else {
-      lderr(cct) << __func__ << " got unknown event " << m->opcode
-                << " " << ceph_watch_event_name(m->opcode) << dendl;
-    }
-  } else {
-    ldout(cct, 4) << __func__ << " unknown cookie " << m->cookie << dendl;
-  }
-  m->put();
-}
-
-
 int librados::RadosClient::mon_command(const vector<string>& cmd,
                                       const bufferlist &inbl,
                                       bufferlist *outbl, string *outs)
index 299ffa288ed80b0503724a7c5a9bfb8723bba58a..b3aa1e168d8061511a4a717953eda4f0e2dc2bc1 100644 (file)
@@ -30,10 +30,8 @@ class CephContext;
 struct Connection;
 struct md_config_t;
 class Message;
-class MWatchNotify;
 class MLog;
 class Messenger;
-struct WatchNotifyInfo;
 
 class librados::RadosClient : public Dispatcher
 {
@@ -108,17 +106,6 @@ public:
 
   int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);
 
-  // watch/notify
-  map<uint64_t, librados::WatchNotifyInfo *> watch_notify_info;
-
-  void register_watch_notify_callback(librados::WatchNotifyInfo *wc,
-                                     uint64_t cookie);
-  int unregister_watch_notify_callback(uint64_t cookie, object_t *poid);
-  int watch_check(uint64_t cookie);
-  void handle_watch_notify(MWatchNotify *m);
-  void do_watch_notify(MWatchNotify *m);
-  void do_watch_error(uint64_t cookie, int err);
-
   int mon_command(const vector<string>& cmd, const bufferlist &inbl,
                  bufferlist *outbl, string *outs);
   int mon_command(int rank,
index 6833dc51c4ab96aff170b5b0e0ad914989d7c97c..a27147f4b3d29b0c8ddc1a81e227dc3b3c03acef 100644 (file)
@@ -40,6 +40,8 @@
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
 
+#include "messages/MWatchNotify.h"
+
 #include <errno.h>
 
 #include "common/config.h"
@@ -421,7 +423,7 @@ void Objecter::_send_linger(LingerOp *info)
     onack = new C_Linger_Reconnect(this, info);
     opv.push_back(OSDOp());
     opv.back().op.op = CEPH_OSD_OP_WATCH;
-    opv.back().op.watch.cookie = info->cookie;
+    opv.back().op.watch.cookie = info->linger_id;
     opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
   } else {
     ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl;
@@ -484,6 +486,18 @@ void Objecter::_linger_commit(LingerOp *info, int r)
   info->pobjver = NULL;
 }
 
+struct C_DoWatchError : public Context {
+  Objecter::LingerOp *info;
+  int err;
+  C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) {
+    info->get();
+  }
+  void finish(int r) {
+    info->watch_context->handle_error(info->linger_id, err);
+    info->put();
+  }
+};
+
 void Objecter::_linger_reconnect(LingerOp *info, int r)
 {
   ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
@@ -492,8 +506,8 @@ void Objecter::_linger_reconnect(LingerOp *info, int r)
     info->watch_lock.Lock();
     info->last_error = r;
     info->watch_cond.Signal();
-    if (info->on_error)
-      info->on_error->complete(r);
+    if (info->watch_context)
+      finisher->queue(new C_DoWatchError(info, r));
     info->watch_lock.Unlock();
   }
 }
@@ -519,7 +533,7 @@ void Objecter::_send_linger_ping(LingerOp *info)
 
   vector<OSDOp> opv(1);
   opv[0].op.op = CEPH_OSD_OP_WATCH;
-  opv[0].op.watch.cookie = info->cookie;
+  opv[0].op.watch.cookie = info->linger_id;
   opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
   C_Linger_Ping *onack = new C_Linger_Ping(this, info);
   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
@@ -548,8 +562,8 @@ void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent)
     info->watch_valid_thru = sent;
   } else if (r < 0) {
     info->last_error = r;
-    if (info->on_error)
-      info->on_error->complete(r);
+    if (info->watch_context)
+      finisher->queue(new C_DoWatchError(info, r));
   }
   info->watch_cond.SignalAll();
   info->watch_lock.Unlock();
@@ -596,8 +610,7 @@ void Objecter::_linger_cancel(LingerOp *info)
 
 Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
                                              const object_locator_t& oloc,
-                                             int flags,
-                                             uint64_t *cookie)
+                                             int flags)
 {
   LingerOp *info = new LingerOp;
   info->target.base_oid = oid;
@@ -611,7 +624,6 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
 
   // Acquire linger ID
   info->linger_id = ++max_linger_id;
-  *cookie = info->linger_id;
   ldout(cct, 10) << __func__ << " info " << info
                 << " linger_id " << info->linger_id << dendl;
   linger_ops[info->linger_id] = info;
@@ -623,22 +635,20 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
 ceph_tid_t Objecter::linger_watch(LingerOp *info,
                                  ObjectOperation& op,
                                  const SnapContext& snapc, utime_t mtime,
-                                 bufferlist& inbl, uint64_t cookie,
+                                 bufferlist& inbl,
                                  Context *onack, Context *oncommit,
-                                 Context *onerror,
                                  version_t *objver)
 {
+  info->is_watch = true;
   info->snapc = snapc;
   info->mtime = mtime;
   info->target.flags |= CEPH_OSD_FLAG_WRITE;
   info->ops = op.ops;
-  info->cookie = cookie;
   info->inbl = inbl;
   info->poutbl = NULL;
   info->pobjver = objver;
   info->on_reg_ack = onack;
   info->on_reg_commit = oncommit;
-  info->on_error = onerror;
 
   RWLock::WLocker wl(rwlock);
   _linger_submit(info);
@@ -691,6 +701,93 @@ void Objecter::_linger_submit(LingerOp *info)
   _send_linger(info);
 }
 
+struct C_DoWatchNotify : public Context {
+  Objecter *objecter;
+  Objecter::LingerOp *info;
+  MWatchNotify *msg;
+  C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
+    : objecter(o), info(i), msg(m) {
+    info->get();
+    msg->get();
+  }
+  void finish(int r) {
+    objecter->_do_watch_notify(info, msg);
+  }
+};
+
+void Objecter::handle_watch_notify(MWatchNotify *m)
+{
+  RWLock::RLocker l(rwlock);
+  if (!initialized.read()) {
+    return;
+  }
+
+  map<uint64_t,LingerOp*>::iterator p = linger_ops.find(m->cookie);
+  if (p == linger_ops.end()) {
+    ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
+    return;
+  }
+
+  LingerOp *info = p->second;
+  finisher->queue(new C_DoWatchNotify(this, info, m));
+}
+
+void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
+{
+  ldout(cct, 10) << __func__ << " " << *m << dendl;
+
+  rwlock.get_read();
+  if (!initialized.read()) {
+    rwlock.put_read();
+    goto out;
+  }
+
+  if (info->canceled) {
+    rwlock.put_read();
+    goto out;
+  }
+
+  // notify completion?
+  if (!info->is_watch) {
+    assert(info->on_notify_finish);
+    info->notify_result_bl->claim(m->get_data());
+    rwlock.put_read();
+    info->on_notify_finish->complete(m->return_code);
+    goto out;
+  }
+
+  assert(info->is_watch);
+  assert(info->watch_context);
+
+  switch (m->opcode) {
+  case CEPH_WATCH_EVENT_NOTIFY:
+    rwlock.put_read();
+    info->watch_context->handle_notify(m->notify_id, m->cookie,
+                                      m->notifier_gid, m->bl);
+    break;
+
+  case CEPH_WATCH_EVENT_FAILED_NOTIFY:
+    rwlock.put_read();
+    info->watch_context->handle_failed_notify(m->notify_id, m->cookie,
+                                             m->notifier_gid);
+    break;
+
+  case CEPH_WATCH_EVENT_DISCONNECT:
+    info->last_error = -ENOTCONN;
+    rwlock.put_read();
+    info->watch_context->handle_error(m->cookie, -ENOTCONN);
+    break;
+
+  default:
+    rwlock.put_read();
+    break;
+  }
+
+ out:
+  info->put();
+  m->put();
+}
+
 bool Objecter::ms_dispatch(Message *m)
 {
   ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
@@ -703,6 +800,11 @@ bool Objecter::ms_dispatch(Message *m)
     handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
     return true;
 
+  case CEPH_MSG_WATCH_NOTIFY:
+    handle_watch_notify(static_cast<MWatchNotify*>(m));
+    m->put();
+    return true;
+
   case MSG_COMMAND_REPLY:
     if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
       handle_command_reply(static_cast<MCommandReply*>(m));
@@ -1715,7 +1817,7 @@ void Objecter::tick()
         assert(op->session);
         ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
         toping.insert(op->session);
-       if (op->cookie && !op->last_error)
+       if (op->is_watch && !op->last_error)
          _send_linger_ping(op);
       }
       for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
@@ -2651,7 +2753,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     m->put();
     return;
   }
-
   RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
 
   map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
index 8e207ab8e3e72fab74113c5c9cd9e3504e195e11..d07103b60454733b9bd2d084b11bf991bb963361 100644 (file)
@@ -44,6 +44,7 @@ class MPoolOpReply;
 class MGetPoolStatsReply;
 class MStatfsReply;
 class MCommandReply;
+class MWatchNotify;
 
 class PerfCounters;
 
@@ -1446,6 +1447,19 @@ public:
 
   // -- lingering ops --
 
+  struct WatchContext {
+    // this simply mirrors librados WatchCtx2
+    virtual void handle_notify(uint64_t notify_id,
+                              uint64_t cookie,
+                              uint64_t notifier_id,
+                              bufferlist& bl) = 0;
+    virtual void handle_failed_notify(uint64_t notify_id,
+                                     uint64_t cookie,
+                                     uint64_t notifier_id) = 0;
+    virtual void handle_error(uint64_t cookie, int err) = 0;
+    virtual ~WatchContext() {}
+  };
+
   struct LingerOp : public RefCountedObject {
     uint64_t linger_id;
 
@@ -1460,7 +1474,7 @@ public:
     bufferlist *poutbl;
     version_t *pobjver;
 
-    uint64_t cookie;   ///< non-zero if this is a watch
+    bool is_watch;
     utime_t watch_valid_thru; ///< send time for last acked ping
     int last_error;  ///< error from last failed ping|reconnect, if any
     Mutex watch_lock;
@@ -1468,7 +1482,13 @@ public:
 
     bool registered;
     bool canceled;
-    Context *on_reg_ack, *on_reg_commit, *on_error;
+    Context *on_reg_ack, *on_reg_commit;
+
+    // we trigger these from an async finisher
+    Context *on_notify_finish;
+    bufferlist *notify_result_bl;
+
+    WatchContext *watch_context;
 
     OSDSession *session;
 
@@ -1480,13 +1500,15 @@ public:
                 target(object_t(), object_locator_t(), 0),
                 snap(CEPH_NOSNAP),
                 poutbl(NULL), pobjver(NULL),
-                cookie(0),
+                is_watch(false),
                 last_error(0),
                 watch_lock("Objecter::LingerOp::watch_lock"),
                 registered(false),
                 canceled(false),
                 on_reg_ack(NULL), on_reg_commit(NULL),
-                on_error(NULL),
+                on_notify_finish(NULL),
+                notify_result_bl(NULL),
+                watch_context(NULL),
                 session(NULL),
                 register_tid(0),
                 ping_tid(0),
@@ -1803,6 +1825,7 @@ public:
   bool ms_can_fast_dispatch(Message *m) const {
     switch (m->get_type()) {
     case CEPH_MSG_OSD_OPREPLY:
+    case CEPH_MSG_WATCH_NOTIFY:
       return true;
     default:
       return false;
@@ -1813,6 +1836,7 @@ public:
   }
 
   void handle_osd_op_reply(class MOSDOpReply *m);
+  void handle_watch_notify(class MWatchNotify *m);
   void handle_osd_map(class MOSDMap *m);
   void wait_for_osd_map();
 
@@ -1968,12 +1992,12 @@ public:
 
   // caller owns a ref
   LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
-                           int flags, uint64_t *pcookie);
+                           int flags);
   ceph_tid_t linger_watch(LingerOp *info,
                          ObjectOperation& op,
                          const SnapContext& snapc, utime_t mtime,
-                         bufferlist& inbl, uint64_t cookie,
-                         Context *onack, Context *onfinish, Context *onerror,
+                         bufferlist& inbl,
+                         Context *onack, Context *onfinish,
                          version_t *objver);
   ceph_tid_t linger_notify(LingerOp *info,
                           ObjectOperation& op,
@@ -1985,6 +2009,8 @@ public:
   void linger_cancel(LingerOp *info);  // releases a reference
   void _linger_cancel(LingerOp *info);
 
+  void _do_watch_notify(LingerOp *info, MWatchNotify *m);
+
   /**
    * set up initial ops in the op vector, and allocate a final op slot.
    *