]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: watch_check() to check on status of watch
authorSage Weil <sage@redhat.com>
Thu, 13 Nov 2014 23:19:05 +0000 (15:19 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:34:04 +0000 (10:34 -0800)
Return either an accumulated error code, or the time since the watch was
last confirmed.

Signed-off-by: Sage Weil <sage@redhat.com>
12 files changed:
src/include/rados/librados.h
src/include/rados/librados.hpp
src/include/utime.h
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/RadosClient.cc
src/librados/RadosClient.h
src/librados/librados.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/test/librados/watch_notify.cc
src/tracing/librados.tp

index 770c3742470a0a9bf4f5bd933af5b4541eac014b..528adddf91ac36f253bb986e22b5e3cc5e77a4f5 100644 (file)
@@ -1983,6 +1983,22 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handl
                                rados_watcherrcb_t watcherrcb,
                                void *arg);
 
+/**
+ * Check on the status of a watch
+ *
+ * Return the number of milliseconds since the watch was last confirmed.
+ * Or, if there has been an error, return that.
+ *
+ * If there is an error, the watch is no longer valid, and should be
+ * destroyed with rados_unwatch2().  The the user is still interested
+ * in the object, a new watch should be created with rados_watch2().
+ *
+ * @param io the pool the object is in
+ * @param handle the watch handle
+ * @returns ms since last confirmed on success, negative error code on failure
+ */
+int rados_watch_check(rados_ioctx_t io, uint64_t handle);
+
 /**
  * Unregister an interest in an object
  *
index bbd92f37afa902a3cdd3b37cd86af466e0497199..9730ecd95237f8556c4311489900dbab53ff2848 100644 (file)
@@ -956,6 +956,22 @@ namespace librados
                    uint64_t handle,      ///< our watch handle
                    bufferlist& bl);      ///< optional reply payload
 
+    /***
+     * check on watch validity
+     *
+     * Check if a watch is valid.  If so, return the number of
+     * milliseconds since we last confirmed its liveness.  If there is
+     * a known error, return it.
+     *
+     * If there is an error, the watch is no longer valid, and should
+     * be destroyed with unwatch().  The the user is still interested
+     * in the object, a new watch should be created with watch().
+     *
+     * @param handle watch handle
+     * @returns ms since last confirmed valid, or error
+     */
+    int watch_check(uint64_t handle);
+
     // old, deprecated versions
     int watch(const std::string& o, uint64_t ver, uint64_t *handle,
              librados::WatchCtx *ctx) __attribute__ ((deprecated));
index 055b16504d2a4fd0bdc4ea650ece265e6dba1de5..032c7d04ab1909f560582b0a0cb85ead401d57fc 100644 (file)
@@ -82,6 +82,9 @@ public:
   uint64_t to_nsec() const {
     return (uint64_t)tv.tv_nsec + (uint64_t)tv.tv_sec * 1000000000ull;
   }
+  uint64_t to_msec() const {
+    return (uint64_t)tv.tv_nsec / 1000000ull + (uint64_t)tv.tv_sec * 1000ull;
+  }
 
   void copy_to_timeval(struct timeval *v) const {
     v->tv_sec = tv.tv_sec;
index 0d2a89552e2c85dd56a8d9b0ad41a13751667069..d960f8e393e442678134f28d2cb13f0816751fed 100644 (file)
@@ -1094,7 +1094,7 @@ int librados::IoCtxImpl::notify_ack(
 
 int librados::IoCtxImpl::watch_check(uint64_t cookie)
 {
-  Mutex::Locker(*lock);
+  Mutex::Locker l(*lock);
   return client->watch_check(cookie);
 }
 
index e786b4f4b9c22321f9683992ebb01451d8a9cc8f..927a19f826e6fa7755c844cccc34084345e2c21d 100644 (file)
@@ -199,6 +199,7 @@ struct librados::IoCtxImpl {
   void set_sync_op_version(version_t ver);
   int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
            librados::WatchCtx2 *ctx2);
+  int watch_check(uint64_t cookie);
   int unwatch(uint64_t cookie);
   int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms,
             bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len);
@@ -228,6 +229,7 @@ struct WatchNotifyInfo : public RefCountedWaitObject {
   const object_t oid;      // the object
   uint64_t linger_id;      // we use this to unlinger when we are done
   uint64_t cookie;         // callback cookie
+  int err;
 
   // watcher.  only one of these will be defined.
   librados::WatchCtx *watch_ctx;
@@ -254,6 +256,7 @@ struct WatchNotifyInfo : public RefCountedWaitObject {
       oid(_oc),
       linger_id(0),
       cookie(0),
+      err(0),
       watch_ctx(NULL),
       watch_ctx2(NULL),
       notify_lock(NULL),
index 0fcc3a444ac1b5717e3014dc2bc65d9b9f0e4ed0..6512c51f904a8f7fac9d398596871d69a374a670 100644 (file)
@@ -677,6 +677,17 @@ int librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie,
   lock.Lock();
   return 0;
 }
+
+int librados::RadosClient::watch_check(uint64_t cookie)
+{
+  ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
+  map<uint64_t, WatchNotifyInfo *>::iterator iter = watch_notify_info.find(cookie);
+  if (iter == watch_notify_info.end())
+    return -EBADF;
+  WatchNotifyInfo *ctx = iter->second;
+  if (ctx->err)
+    return ctx->err;
+  return objecter->linger_check(ctx->linger_id);
 }
 
 struct C_DoWatchNotify : public Context {
@@ -713,6 +724,7 @@ void librados::RadosClient::do_watch_error(uint64_t cookie, int err)
   if (iter != watch_notify_info.end()) {
     WatchNotifyInfo *wc = iter->second;
     assert(wc);
+    wc->err = err;
     if (wc->watch_ctx2) {
       wc->get();
       ldout(cct,10) << __func__ << " cookie " << cookie
@@ -813,6 +825,7 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m)
     } else if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
       // we failed to ping or reconnect and our watch was canceled.
       ldout(cct,10) << __func__ << " disconnect " << *m << dendl;
+      wc->err = -ENOTCONN;
       if (wc->watch_ctx2) {
        wc->get();
        // trigger the callback
index f9e126f328a6a5373c846dccbfcff5c4c9d1de77..3e57ae4f9fb632283dbe6bc90f3be9e9da3159b2 100644 (file)
@@ -115,6 +115,7 @@ public:
   void register_watch_notify_callback(librados::WatchNotifyInfo *wc,
                                      uint64_t *cookie);
   int unregister_watch_notify_callback(uint64_t cookie, object_t *poid);
+  int watch_check(uint64_t cookie);
   void handle_watch_notify(MWatchNotify *m);
   void do_watch_notify(MWatchNotify *m);
   void do_watch_error(uint64_t cookie, int err);
index b396f708952252505c3d69fd61ea3d68f565f118..8402475e5d70280c8a0e6d10d27663b1af3f217f 100644 (file)
@@ -1684,6 +1684,11 @@ int librados::IoCtx::unwatch(uint64_t handle)
   return io_ctx_impl->unwatch(handle);
 }
 
+int librados::IoCtx::watch_check(uint64_t handle)
+{
+  return io_ctx_impl->watch_check(handle);
+}
+
 int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
 {
   object_t obj(oid);
@@ -3841,6 +3846,16 @@ extern "C" int rados_unwatch2(rados_ioctx_t io, uint64_t handle)
   return retval;
 }
 
+extern "C" int rados_watch_check(rados_ioctx_t io, uint64_t handle)
+{
+  tracepoint(librados, rados_watch_check_enter, io, handle);
+  uint64_t cookie = handle;
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  int retval = ctx->watch_check(cookie);
+  tracepoint(librados, rados_watch_check_exit, retval);
+  return retval;
+}
+
 extern "C" int rados_notify(rados_ioctx_t io, const char *o,
                            uint64_t ver, const char *buf, int buf_len)
 {
index 79d15b41dd35b06dcf82b0cec86f1560270eb262..1f40bc576fd596f27d717edc4a5932debb995771 100644 (file)
@@ -554,6 +554,25 @@ void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent)
   info->watch_lock.Unlock();
 }
 
+int Objecter::linger_check(uint64_t linger_id)
+{
+  RWLock::WLocker wl(rwlock);
+  map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
+  if (iter == linger_ops.end()) {
+    ldout(cct, 10) << __func__ << " " << linger_id << " dne" << dendl;
+    return -EBADF;
+  }
+
+  LingerOp *info = iter->second;
+  utime_t age = ceph_clock_now(NULL) - info->watch_valid_thru;
+  ldout(cct, 10) << __func__ << " " << linger_id
+                << " err " << info->last_error
+                << " age " << age << dendl;
+  if (info->last_error)
+    return info->last_error;
+  return age.to_msec();
+}
+
 void Objecter::unregister_linger(uint64_t linger_id)
 {
   RWLock::WLocker wl(rwlock);
@@ -605,6 +624,7 @@ ceph_tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t&
   info->on_reg_ack = onack;
   info->on_reg_commit = oncommit;
   info->on_error = onerror;
+  info->watch_valid_thru = ceph_clock_now(NULL);
 
   RWLock::WLocker wl(rwlock);
   _linger_submit(info);
index 6d886b91a7fe8fee58e0e8e4f580c75aeca696d3..63f956ee20dab48580dd6b1232377c06a6db3ec0 100644 (file)
@@ -1973,6 +1973,7 @@ public:
                    snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
                    Context *onack,
                    version_t *objver);
+  int linger_check(uint64_t linger_id);
   void unregister_linger(uint64_t linger_id);
   void _unregister_linger(uint64_t linger_id);
 
index eb5ca096b8f94f58e2e5566bda74f89e12622a2c..aade49c5253a6798b56451422b76fa11eae33688 100644 (file)
@@ -235,19 +235,27 @@ TEST_F(LibRadosWatchNotify, Watch2Timeout) {
   memset(buf, 0xcc, sizeof(buf));
   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
   uint64_t handle;
+  time_t start = time(0);
   ASSERT_EQ(0,
            rados_watch2(ioctx, notify_oid, &handle,
                         watch_notify2_test_cb,
                         watch_notify2_test_failcb,
                         watch_notify2_test_errcb, NULL));
+  int age = rados_watch_check(ioctx, handle);
+  time_t age_bound = time(0) + 1 - start;
+  ASSERT_LT(age, age_bound * 1000);
+  ASSERT_GT(age, 0);
   rados_conf_set(cluster, "objecter_inject_no_watch_ping", "true");
   int left = 180;
+  std::cout << "waiting up to " << left << " for osd to time us out ..."
+           << std::endl;
   while (notify_err == 0 && --left) {
     sleep(1);
   }
   ASSERT_TRUE(left > 0);
   rados_conf_set(cluster, "objecter_inject_no_watch_ping", "false");
   ASSERT_EQ(-ENOTCONN, notify_err);
+  ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
 
   // a subsequent notify should not reach us
   char *reply_buf;
@@ -264,6 +272,7 @@ TEST_F(LibRadosWatchNotify, Watch2Timeout) {
     ASSERT_EQ(0u, reply_map.size());
   }
   ASSERT_EQ(0u, notify_cookies.size());
+  ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
 
   // re-watch
   rados_unwatch2(ioctx, handle);
@@ -273,6 +282,8 @@ TEST_F(LibRadosWatchNotify, Watch2Timeout) {
                         watch_notify2_test_cb,
                         watch_notify2_test_failcb,
                         watch_notify2_test_errcb, NULL));
+  ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
+
   // and now a notify will work.
   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
                             "notify", 6, 0,
@@ -289,6 +300,7 @@ TEST_F(LibRadosWatchNotify, Watch2Timeout) {
     ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
   }
   ASSERT_EQ(1u, notify_cookies.size());
+  ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
 
   rados_unwatch2(ioctx, handle);
 }
@@ -308,6 +320,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2) {
                   watch_notify2_test_cb,
                   watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
+  ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
   char *reply_buf;
   size_t reply_buf_len;
   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
@@ -323,6 +336,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2) {
   ASSERT_EQ(1, notify_cookies.count(handle));
   ASSERT_EQ(5, reply_map.begin()->second.length());
   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+  ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
   rados_unwatch2(ioctx, handle);
 }
 
@@ -338,6 +352,7 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
   uint64_t handle;
   WatchNotifyTestCtx2 ctx;
   ASSERT_EQ(0, ioctx.watch(notify_oid, &handle, &ctx));
+  ASSERT_TRUE(ioctx.watch_check(handle) > 0);
   std::list<obj_watch_t> watches;
   ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
   ASSERT_EQ(watches.size(), 1u);
@@ -351,6 +366,7 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
   ASSERT_EQ(1u, reply_map.size());
   ASSERT_EQ(5, reply_map.begin()->second.length());
   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+  ASSERT_TRUE(ioctx.watch_check(handle) > 0);
   ioctx.unwatch(handle);
 }
 
@@ -374,6 +390,8 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
                   watch_notify2_test_cb,
                   watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
+  ASSERT_TRUE(rados_watch_check(ioctx, handle1) > 0);
+  ASSERT_TRUE(rados_watch_check(ioctx, handle2) > 0);
   ASSERT_NE(handle1, handle2);
   char *reply_buf;
   size_t reply_buf_len;
@@ -391,6 +409,8 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
   ASSERT_EQ(1, notify_cookies.count(handle1));
   ASSERT_EQ(1, notify_cookies.count(handle2));
   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+  ASSERT_TRUE(rados_watch_check(ioctx, handle1) > 0);
+  ASSERT_TRUE(rados_watch_check(ioctx, handle2) > 0);
   rados_unwatch2(ioctx, handle1);
   rados_unwatch2(ioctx, handle2);
 }
@@ -412,6 +432,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
                   watch_notify2_test_cb,
                   watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
+  ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
   char *reply_buf;
   size_t reply_buf_len;
   ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,
@@ -431,6 +452,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
                             "notify", 6, 30000, // 30s
                             &reply_buf, &reply_buf_len));
   ASSERT_EQ(1u, notify_cookies.size());
+  ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
 
   rados_unwatch2(ioctx, handle);
 }
@@ -449,6 +471,7 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
   uint64_t handle;
   WatchNotifyTestCtx2 ctx;
   ASSERT_EQ(0, ioctx.watch(notify_oid, &handle, &ctx));
+  ASSERT_TRUE(ioctx.watch_check(handle) > 0);
   std::list<obj_watch_t> watches;
   ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
   ASSERT_EQ(watches.size(), 1u);
@@ -460,6 +483,7 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
   while (!notify_failed && --wait)
     sleep(1);
   ASSERT_TRUE(notify_failed);
+  ASSERT_TRUE(ioctx.watch_check(handle) > 0);
   ioctx.unwatch(handle);
 }
 
index 49a526119a041c8093642f7be04161c729f9d1bc..1aeee2baa63ea59b370a6ca00d933f5330e6a90c 100644 (file)
@@ -2254,6 +2254,24 @@ TRACEPOINT_EVENT(librados, rados_unwatch2_exit,
     )
 )
 
+TRACEPOINT_EVENT(librados, rados_watch_check_enter,
+    TP_ARGS(
+        rados_ioctx_t, ioctx,
+        uint64_t, handle),
+    TP_FIELDS(
+        ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+        ctf_integer(uint64_t, handle, handle)
+    )
+)
+
+TRACEPOINT_EVENT(librados, rados_watch_check_exit,
+    TP_ARGS(
+        int, retval),
+    TP_FIELDS(
+        ctf_integer(int, retval, retval)
+    )
+)
+
 TRACEPOINT_EVENT(librados, rados_notify_enter,
     TP_ARGS(
         rados_ioctx_t, ioctx,