]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: include missed watchers in notify reply data
authorSage Weil <sage@redhat.com>
Mon, 17 Nov 2014 16:02:55 +0000 (08:02 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:34:05 +0000 (10:34 -0800)
Include a list of who missed their notify in the notify reply.

Signed-off-by: Sage Weil <sage@redhat.com>
src/include/rados/librados.h
src/librados/IoCtxImpl.cc
src/osd/Watch.cc
src/test/librados/watch_notify.cc
src/tools/rados/rados.cc

index 528adddf91ac36f253bb986e22b5e3cc5e77a4f5..0e1fed4c4628bc2e125cf897b4f11137e7705322 100644 (file)
@@ -2064,6 +2064,10 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
  *      le32 buflen  length of reply message buffer
  *      u8 * buflen  payload
  *    } * num_acks
+ *    le32 num_timeouts
+ *    {
+ *      le64 gid     global id for the client
+ *    } * num_timeouts
  *
  * Note: There may be multiple instances of the same gid if there are
  * multiple watchers registered via the same client.
@@ -2071,6 +2075,10 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
  * Note: The buffer must be released with rados_buffer_free() when the
  * user is done with it.
  *
+ * Note: Since the result buffer includes clients that time out, it
+ * will be set even when rados_notify() returns an error code (like
+ * -ETIMEDOUT).
+ *
  * @param io the pool the object is in
  * @param o the name of the object
  * @param buf data to send to watchers
index f821c2115e1e62eef2e95c1fa4b41a93066b4233..957e8a2e2389a4f53cbf511ff0ef37d1e6100560 100644 (file)
@@ -1237,21 +1237,29 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
                           << linger_op << dendl;
     notify_private.wait();
 
-    // pass result back to user
-    if (notify_private.result >= 0) {
-      if (preply_buf) {
-       *preply_buf = (char*)malloc(notify_private.reply_bl.length());
-       memcpy(*preply_buf, notify_private.reply_bl.c_str(),
-              notify_private.reply_bl.length());
-      }
-      if (preply_buf_len)
-       *preply_buf_len = notify_private.reply_bl.length();
-      if (preply_bl)
-       preply_bl->claim(notify_private.reply_bl);
+    ldout(client->cct, 10) << __func__ << " completed notify (linger op "
+                          << linger_op << "), r = " << notify_private.result
+                          << dendl;
+  } else {
+    ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
+                          << r_issue << dendl;
+  }
+
+  // pass result back to user
+  // NOTE: we do this regardless of what error code we return
+  if (preply_buf) {
+    if (notify_private.reply_bl.length()) {
+      *preply_buf = (char*)malloc(notify_private.reply_bl.length());
+      memcpy(*preply_buf, notify_private.reply_bl.c_str(),
+            notify_private.reply_bl.length());
+    } else {
+      *preply_buf = NULL;
     }
   }
-  ldout(client->cct, 10) << __func__ << " completed notify (linger op "
-                        << linger_op << "), unregistering" << dendl;
+  if (preply_buf_len)
+    *preply_buf_len = notify_private.reply_bl.length();
+  if (preply_bl)
+    preply_bl->claim(notify_private.reply_bl);
 
   lock->Lock();
   objecter->linger_cancel(linger_op);
index bccd0f70161ec44d58abaa8fd4c56347f8fc90cd..4c21c83c2124ee3c5c3c180a266d363786b46a08 100644 (file)
@@ -179,8 +179,16 @@ void Notify::maybe_complete_notify()
           << watchers.size()
           << " in progress watchers " << dendl;
   if (watchers.empty() || timed_out) {
+    // prepare reply
     bufferlist bl;
     ::encode(notify_replies, bl);
+    list<uint64_t> missed;
+    for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
+      (*p)->send_failed_notify(this);
+      missed.push_back((*p)->get_entity().num());
+    }
+    ::encode(missed, bl);
+
     bufferlist empty;
     MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
                                         CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
@@ -191,9 +199,6 @@ void Notify::maybe_complete_notify()
     client->send_message(reply);
     unregister_cb();
 
-    for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
-      (*p)->send_failed_notify(this);
-    }
     complete = true;
   }
 }
index aade49c5253a6798b56451422b76fa11eae33688..7dfabe399d36b7308afb8ea02ee11d38eef543f0 100644 (file)
@@ -8,6 +8,7 @@
 #include <semaphore.h>
 #include "gtest/gtest.h"
 #include "include/encoding.h"
+#include <set>
 
 using namespace librados;
 
@@ -258,7 +259,7 @@ TEST_F(LibRadosWatchNotify, Watch2Timeout) {
   ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
 
   // a subsequent notify should not reach us
-  char *reply_buf;
+  char *reply_buf = 0;
   size_t reply_buf_len;
   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
                             "notify", 6, 0,
@@ -267,12 +268,16 @@ TEST_F(LibRadosWatchNotify, Watch2Timeout) {
     bufferlist reply;
     reply.append(reply_buf, reply_buf_len);
     std::multimap<uint64_t, bufferlist> reply_map;
+    std::multiset<uint64_t> missed_map;
     bufferlist::iterator reply_p = reply.begin();
     ::decode(reply_map, reply_p);
+    ::decode(missed_map, reply_p);
     ASSERT_EQ(0u, reply_map.size());
+    ASSERT_EQ(0u, missed_map.size());
   }
   ASSERT_EQ(0u, notify_cookies.size());
   ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
+  rados_buffer_free(reply_buf);
 
   // re-watch
   rados_unwatch2(ioctx, handle);
@@ -292,9 +297,12 @@ TEST_F(LibRadosWatchNotify, Watch2Timeout) {
     bufferlist reply;
     reply.append(reply_buf, reply_buf_len);
     std::multimap<uint64_t, bufferlist> reply_map;
+    std::multiset<uint64_t> missed_map;
     bufferlist::iterator reply_p = reply.begin();
     ::decode(reply_map, reply_p);
+    ::decode(missed_map, reply_p);
     ASSERT_EQ(1u, reply_map.size());
+    ASSERT_EQ(0, missed_map.size());
     ASSERT_EQ(1, notify_cookies.count(handle));
     ASSERT_EQ(5, reply_map.begin()->second.length());
     ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
@@ -321,7 +329,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2) {
                   watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
   ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
-  char *reply_buf;
+  char *reply_buf = 0;
   size_t reply_buf_len;
   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
                             "notify", 6, 0,
@@ -329,14 +337,27 @@ TEST_F(LibRadosWatchNotify, WatchNotify2) {
   bufferlist reply;
   reply.append(reply_buf, reply_buf_len);
   std::multimap<uint64_t, bufferlist> reply_map;
+  std::multiset<uint64_t> missed_map;
   bufferlist::iterator reply_p = reply.begin();
   ::decode(reply_map, reply_p);
+  ::decode(missed_map, reply_p);
   ASSERT_EQ(1u, reply_map.size());
+  ASSERT_EQ(0, missed_map.size());
   ASSERT_EQ(1u, notify_cookies.size());
   ASSERT_EQ(1, notify_cookies.count(handle));
   ASSERT_EQ(5, reply_map.begin()->second.length());
   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
   ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
+  rados_buffer_free(reply_buf);
+
+  // try it on a non-existent object ... our buffer pointers
+  // should get zeroed.
+  ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
+                                  "notify", 6, 0,
+                                  &reply_buf, &reply_buf_len));
+  ASSERT_EQ(NULL, reply_buf);
+  ASSERT_EQ(0, reply_buf_len);
+
   rados_unwatch2(ioctx, handle);
 }
 
@@ -360,12 +381,15 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
   ASSERT_EQ(0, ioctx.notify(notify_oid, bl2, 0, &bl_reply));
   bufferlist::iterator p = bl_reply.begin();
   std::multimap<uint64_t,bufferlist> reply_map;
+  std::multiset<uint64_t> missed_map;
   ::decode(reply_map, p);
+  ::decode(missed_map, p);
   ASSERT_EQ(1u, notify_cookies.size());
   ASSERT_EQ(1, notify_cookies.count(handle));
   ASSERT_EQ(1u, reply_map.size());
   ASSERT_EQ(5, reply_map.begin()->second.length());
   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+  ASSERT_EQ(0, missed_map.size());
   ASSERT_TRUE(ioctx.watch_check(handle) > 0);
   ioctx.unwatch(handle);
 }
@@ -393,7 +417,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
   ASSERT_TRUE(rados_watch_check(ioctx, handle1) > 0);
   ASSERT_TRUE(rados_watch_check(ioctx, handle2) > 0);
   ASSERT_NE(handle1, handle2);
-  char *reply_buf;
+  char *reply_buf = 0;
   size_t reply_buf_len;
   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
                             "notify", 6, 0,
@@ -401,16 +425,20 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
   bufferlist reply;
   reply.append(reply_buf, reply_buf_len);
   std::multimap<uint64_t, bufferlist> reply_map;
+  std::multiset<uint64_t> missed_map;
   bufferlist::iterator reply_p = reply.begin();
   ::decode(reply_map, reply_p);
+  ::decode(missed_map, reply_p);
   ASSERT_EQ(2u, reply_map.size());
   ASSERT_EQ(5, reply_map.begin()->second.length());
+  ASSERT_EQ(0, missed_map.size());
   ASSERT_EQ(2u, notify_cookies.size());
   ASSERT_EQ(1, notify_cookies.count(handle1));
   ASSERT_EQ(1, notify_cookies.count(handle2));
   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
   ASSERT_TRUE(rados_watch_check(ioctx, handle1) > 0);
   ASSERT_TRUE(rados_watch_check(ioctx, handle2) > 0);
+  rados_buffer_free(reply_buf);
   rados_unwatch2(ioctx, handle1);
   rados_unwatch2(ioctx, handle2);
 }
@@ -433,7 +461,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
                   watch_notify2_test_failcb,
                   watch_notify2_test_errcb, NULL));
   ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
-  char *reply_buf;
+  char *reply_buf = 0;
   size_t reply_buf_len;
   ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,
                                      "notify", 6, 1000, // 1s
@@ -443,6 +471,18 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
   while (!notify_failed && --wait)
     sleep(1);
   ASSERT_TRUE(notify_failed);
+  {
+    bufferlist reply;
+    reply.append(reply_buf, reply_buf_len);
+    std::multimap<uint64_t, bufferlist> reply_map;
+    std::multiset<uint64_t> missed_map;
+    bufferlist::iterator reply_p = reply.begin();
+    ::decode(reply_map, reply_p);
+    ::decode(missed_map, reply_p);
+    ASSERT_EQ(0, reply_map.size());
+    ASSERT_EQ(1, missed_map.size());
+  }
+  rados_buffer_free(reply_buf);
 
   // we should get the next notify, though!
   notify_failed = false;
index af29f51c0837346a9a8cc497ea3d937fb13401c4..2138aadb8e62595c80e8f9d28931f497efab255a 100644 (file)
@@ -2389,11 +2389,18 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
       ::decode(rm, p);
       for (multimap<uint64_t,bufferlist>::iterator p = rm.begin(); p != rm.end();
           ++p) {
-       cout << "client." << p->first
+       cout << "reply client." << p->first
             << " : " << p->second.length() << " bytes" << std::endl;
        if (p->second.length())
          p->second.hexdump(cout);
       }
+      if (!p.end()) {
+       list<uint64_t> missed;
+       ::decode(missed, p);
+       for (list<uint64_t>::iterator p = missed.begin(); p != missed.end(); ++p) {
+         cout << "timeout client." << *p << std::endl;
+       }
+      }
     }
   } else if (strcmp(nargs[0], "set-alloc-hint") == 0) {
     if (!pool_name || nargs.size() < 4)