]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: tell watcher if we cause a notify timeout
authorSage Weil <sage@redhat.com>
Sat, 8 Nov 2014 01:12:54 +0000 (17:12 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:32:38 +0000 (10:32 -0800)
If we are a watcher and we fail to notify in a timely manner, or
circumstances otherwise conspire to prevent out ack from arriving in time,
initiate a callback.

Signed-off-by: Sage Weil <sage@redhat.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/RadosClient.cc
src/librados/librados.cc
src/test/librados/watch_notify.cc
src/tools/rados/rados.cc

index 4347d0ae1b0efd61e302a46b991a6d160bb860ce..f2b5771599f5e3215f003cd3e73f75307fe0cc2c 100644 (file)
@@ -1893,6 +1893,22 @@ typedef void (*rados_watchcb2_t)(void *arg,
                                 void *data,
                                 size_t data_len);
 
+/**
+ * @typedef rados_watchfailcb_t
+ *
+ * Callback activated when a notify is not acked in a timely manner,
+ * resulting in a timeout for the notifier.
+ *
+ * @param arg opaque user-defined value provided to rados_watch2()
+ * @param notify_id an id for this notify event
+ * @param handle the watcher handle we are notifying
+ * @param notifier_id the unique client id for the notifier
+ */
+typedef void (*rados_watchfailcb_t)(void *arg,
+                                   uint64_t notify_id,
+                                   uint64_t handle,
+                                   uint64_t notifier_id);
+
 /**
  * @typedef rados_watcherrcb_t
  *
@@ -1954,12 +1970,15 @@ CEPH_RADOS_API int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver,
  * @param o the object to watch
  * @param handle where to store the internal id assigned to this watch
  * @param watchcb2 what to do when a notify is received on this object
+ * @param watchfailcb what to do when a notify is not acked in time
  * @param watcherrcb what to do when the watch session encounters an error
  * @param arg opaque value to pass to the callback
  * @returns 0 on success, negative error code on failure
  */
 int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
-                rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb,
+                rados_watchcb2_t watchcb,
+                rados_watchfailcb_t watchfailcb,
+                rados_watcherrcb_t watcherrcb,
                 void *arg);
 
 /**
index 92780b2c5a63542ec9b4cd5f06f090e6d5fbd0e1..f90632519c223ec34d0e8b0e6fb98095d097b73b 100644 (file)
@@ -167,6 +167,22 @@ namespace librados
                               uint64_t cookie,
                               uint64_t notifier_id,
                               bufferlist& bl) = 0;
+
+    /**
+     * Callback activated when we are too slow to ack a notify
+     *
+     * If we fail to ack a notify or our ack doesn't arrive in time
+     * and a notify timeout is triggered for another client, this
+     * callback will be triggered to let us know about it.
+     *
+     * @param notify_id unique id for this notify event
+     * @param cookie the watcher we are notifying
+     * @param notifier_id the unique client id of the notifier
+     */
+    virtual void handle_failed_notify(uint64_t notify_id,
+                                     uint64_t cookie,
+                                     uint64_t notifier_id) = 0;
+
     /**
      * Callback activated when we encounter an error with the watch.
      *
index f5a4e161d9350332e8838c1fc3ad570017b3602b..69a006b9ae4bbd94a97adb5c07f80c4c8c0bb29f 100644 (file)
@@ -749,6 +749,9 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m)
     assert(wc);
     if (wc->notify_lock) {
       // we sent a notify and it completed (or failed)
+      // NOTE: opcode may be either NOTIFY (older OSDs) or NOTIFY_COMPLETE
+      // (newer OSDs).  In practice it doesn't matter because completion is the
+      // only kind of event we get on notify cookies.
       ldout(cct,10) << __func__ << " completed notify " << *m << dendl;
       wc->notify_lock->Lock();
       *wc->notify_done = true;
@@ -766,7 +769,7 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m)
       }
       wc->notify_cond->Signal();
       wc->notify_lock->Unlock();
-    } else {
+    } else if (m->opcode == CEPH_WATCH_EVENT_NOTIFY) {
       // we are watcher and got a notify
       ldout(cct,10) << __func__ << " got notify " << *m << dendl;
       wc->get();
@@ -787,6 +790,24 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m)
       lock.Lock();
       ldout(cct,10) << __func__ << " notify done" << dendl;
       wc->put();
+    } else if (m->opcode == CEPH_WATCH_EVENT_FAILED_NOTIFY) {
+      // we are watcher and failed to ack a notify in time, causing it to time
+      // out.
+      ldout(cct,10) << __func__ << " failed notify " << *m << dendl;
+      wc->get();
+      // trigger the callback
+      assert(!!wc->watch_ctx ^ !!wc->watch_ctx2);  // only one is defined
+      lock.Unlock();
+      if (wc->watch_ctx2) {
+       wc->watch_ctx2->handle_failed_notify(m->notify_id, m->cookie,
+                                            m->notifier_gid);
+      }
+      lock.Lock();
+      ldout(cct,10) << __func__ << " failed notify done" << dendl;
+      wc->put();
+    } else {
+      lderr(cct) << __func__ << " got unknown event " << m->opcode
+                << " " << ceph_watch_event_name(m->opcode) << dendl;
     }
   } else {
     ldout(cct, 4) << __func__ << " unknown cookie " << m->cookie << dendl;
index 7f7fd10ae5315f2fd3959141ec84b5da428447a3..3da21be13fe1ce7b52ce6a1d639a125f125c67a3 100644 (file)
@@ -3771,17 +3771,25 @@ extern "C" int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver,
 
 struct C_WatchCB2 : public librados::WatchCtx2 {
   rados_watchcb2_t wcb;
+  rados_watchfailcb_t failcb;
   rados_watcherrcb_t errcb;
   void *arg;
   C_WatchCB2(rados_watchcb2_t _wcb,
+            rados_watchfailcb_t _failcb,
             rados_watcherrcb_t _errcb,
-            void *_arg) : wcb(_wcb), errcb(_errcb), arg(_arg) {}
+            void *_arg) : wcb(_wcb), failcb(_failcb), errcb(_errcb), arg(_arg) {}
   void handle_notify(uint64_t notify_id,
                     uint64_t cookie,
                     uint64_t notifier_gid,
                     bufferlist& bl) {
     wcb(arg, notify_id, cookie, notifier_gid, bl.c_str(), bl.length());
   }
+  void handle_failed_notify(uint64_t notify_id,
+                           uint64_t cookie,
+                           uint64_t notifier_gid) {
+    if (failcb)
+      failcb(arg, notify_id, cookie, notifier_gid);
+  }
   void handle_error(uint64_t cookie, int err) {
     if (errcb)
       errcb(arg, cookie, err);
@@ -3790,6 +3798,7 @@ struct C_WatchCB2 : public librados::WatchCtx2 {
 
 extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
                            rados_watchcb2_t watchcb,
+                           rados_watchfailcb_t watchfailcb,
                            rados_watcherrcb_t watcherrcb,
                            void *arg)
 {
@@ -3801,7 +3810,7 @@ extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
     uint64_t *cookie = handle;
     librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
     object_t oid(o);
-    C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
+    C_WatchCB2 *wc = new C_WatchCB2(watchcb, watchfailcb, watcherrcb, arg);
     ret = ctx->watch(oid, cookie, NULL, wc);
   }
   tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
index 245664f243435803e3ba18d93b8a096de261abf4..3b7152de75dc7d08f1844a2f4d8a82116c539bfd 100644 (file)
@@ -43,6 +43,7 @@ std::set<uint64_t> notify_cookies;
 rados_ioctx_t notify_io;
 const char *notify_oid = 0;
 int notify_err = 0;
+bool notify_failed = false;
 
 static void watch_notify2_test_cb(void *arg,
                                  uint64_t notify_id,
@@ -62,6 +63,16 @@ static void watch_notify2_test_cb(void *arg,
   rados_notify_ack(notify_io, notify_oid, notify_id, cookie, "reply", 5);
 }
 
+static void watch_notify2_test_failcb(void *arg,
+                                     uint64_t notify_id,
+                                     uint64_t cookie,
+                                     uint64_t notifier_gid)
+{
+  std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
+           << " cookie " << cookie << std::endl;
+  notify_failed = true;
+}
+
 static void watch_notify2_test_errcb(void *arg, uint64_t cookie, int err)
 {
   std::cout << __func__ << " cookie " << cookie << std::endl;
@@ -73,8 +84,7 @@ class WatchNotifyTestCtx2 : public WatchCtx2
 {
 public:
   void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
-                    bufferlist& bl)
-  {
+                    bufferlist& bl) {
     std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
              << " notifier_gid " << notifier_gid << std::endl;
     notify_bl = bl;
@@ -86,6 +96,13 @@ public:
     notify_ioctx->notify_ack(notify_oid, notify_id, cookie, reply);
   }
 
+  void handle_failed_notify(uint64_t notify_id, uint64_t cookie,
+                           uint64_t notifier_gid) {
+    std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
+             << " notifier_gid " << notifier_gid << std::endl;
+    notify_failed = true;
+  }
+
   void handle_error(uint64_t cookie, int err) {
     std::cout << __func__ << " cookie " << cookie << std::endl;
     notify_err = err;
@@ -116,7 +133,9 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Test) {
   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
   uint64_t handle;
   ASSERT_EQ(0,
-      rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb,
+      rados_watch2(ioctx, notify_oid, &handle,
+                  watch_notify2_test_cb,
+                  watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
   char *reply_buf;
   size_t reply_buf_len;
@@ -145,10 +164,14 @@ TEST_F(LibRadosWatchNotify, WatchNotify2MultiTest) {
   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
   uint64_t handle1, handle2;
   ASSERT_EQ(0,
-      rados_watch2(ioctx, notify_oid, &handle1, watch_notify2_test_cb,
+      rados_watch2(ioctx, notify_oid, &handle1,
+                  watch_notify2_test_cb,
+                  watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
   ASSERT_EQ(0,
-      rados_watch2(ioctx, notify_oid, &handle2, watch_notify2_test_cb,
+      rados_watch2(ioctx, notify_oid, &handle2,
+                  watch_notify2_test_cb,
+                  watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
   ASSERT_NE(handle1, handle2);
   char *reply_buf;
@@ -176,12 +199,15 @@ TEST_F(LibRadosWatchNotify, WatchNotify2TimeoutTest) {
   notify_oid = "foo";
   notify_sleep = 3; // 3s
   notify_cookies.clear();
+  notify_failed = false;
   char buf[128];
   memset(buf, 0xcc, sizeof(buf));
   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
   uint64_t handle;
   ASSERT_EQ(0,
-      rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb,
+      rados_watch2(ioctx, notify_oid, &handle,
+                  watch_notify2_test_cb,
+                  watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
   char *reply_buf;
   size_t reply_buf_len;
@@ -189,6 +215,10 @@ TEST_F(LibRadosWatchNotify, WatchNotify2TimeoutTest) {
                                      "notify", 6, 1000, // 1s
                                      &reply_buf, &reply_buf_len));
   ASSERT_EQ(1, notify_cookies.size());
+  int wait = 10;
+  while (!notify_failed && --wait)
+    sleep(1);
+    ASSERT_TRUE(notify_failed);
   rados_unwatch(ioctx, notify_oid, handle);
 }
 
@@ -246,6 +276,7 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2TimeoutTestPP) {
   notify_ioctx = &ioctx;
   notify_sleep = 3;  // 3s
   notify_cookies.clear();
+  notify_failed = true;
   char buf[128];
   memset(buf, 0xcc, sizeof(buf));
   bufferlist bl1;
@@ -260,6 +291,10 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2TimeoutTestPP) {
   ASSERT_EQ(0, notify_cookies.size());
   bufferlist bl2, bl_reply;
   ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */, &bl_reply));
+  int wait = 10;
+  while (!notify_failed && --wait)
+    sleep(1);
+  ASSERT_TRUE(notify_failed);
   ioctx.unwatch(notify_oid, handle);
 }
 
index 449df682b3affda0461d82708c438ce92603c2bd..381d59068828fd8cf33cfebedb8e3ef9ea732af0 100644 (file)
@@ -505,6 +505,15 @@ public:
     bufferlist empty;
     ioctx.notify_ack(name, notify_id, cookie, empty);
   }
+  void handle_failed_notify(uint64_t notify_id,
+                           uint64_t cookie,
+                           uint64_t notifier_id) {
+    cout << "FAILED_NOTIFY"
+        << " cookie " << cookie
+        << " notify_id " << notify_id
+        << " from " << notifier_id
+        << std::endl;
+  }
   void handle_error(uint64_t cookie, int err) {
     cout << "ERROR"
         << " cookie " << cookie