]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: allow multiple watchers/notify replies per client
authorSage Weil <sage@redhat.com>
Wed, 8 Oct 2014 15:25:45 +0000 (08:25 -0700)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:32:37 +0000 (10:32 -0800)
It is perfectly valid to register multiple watchers via the same
librados client.  Make the notify_replies a multimap to capture all
of the acks.

Signed-off-by: Sage Weil <sage@redhat.com>
src/include/rados/librados.h
src/osd/Watch.cc
src/osd/Watch.h
src/test/librados/watch_notify.cc

index 0e8047827dad080e6dd22298f6325e1fe9be2555..9cf888c25f570aec5936accc829e46d3aa9f53e3 100644 (file)
@@ -1996,8 +1996,11 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
  *      u8 * buflen  payload
  *    } * num_acks
  *
- * Note that this buffer must be released with rados_buffer_free()
- * when the user is done with it.
+ * Note: There may be multiple instances of the same gid if there are
+ * multiple watchers registered via the same client.
+ *
+ * Note: The buffer must be released with rados_buffer_free() when the
+ * user is done with it.
  *
  * @param io the pool the object is in
  * @param o the name of the object
index 125a93b20fb2d8cfd86735a4c4ed12ac591b2be3..4e07d6e66a9cf6a656fa4d3153f7c471a30c76d8 100644 (file)
@@ -163,7 +163,7 @@ void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
   assert(in_progress_watchers > 0);
   watchers.erase(watch);
   --in_progress_watchers;
-  notify_replies[watch->get_watcher_gid()].claim(reply_bl);
+  notify_replies.insert(make_pair(watch->get_watcher_gid(), reply_bl));
   maybe_complete_notify();
 }
 
index b6142a59d628b8f4e3a9ef74284ce4984da17a5b..dee3e1e2f59c6bcb70e174bb79bffa3a967c0496 100644 (file)
@@ -73,7 +73,7 @@ class Notify {
   Mutex lock;
 
   /// gid -> reply_bl for everyone who acked the notify
-  map<uint64_t,bufferlist> notify_replies;
+  multimap<uint64_t,bufferlist> notify_replies;
 
   /// true if this notify is being discarded
   bool is_discarded() {
index 3ed93676739f4f0ec138ab0646f34f1dd0c3ef88..d41552492fcaf943d3ab4497bc1eba7fde203278 100644 (file)
@@ -107,7 +107,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Test) {
                             &reply_buf, &reply_buf_len));
   bufferlist reply;
   reply.append(reply_buf, reply_buf_len);
-  std::map<uint64_t, bufferlist> reply_map;
+  std::multimap<uint64_t, bufferlist> reply_map;
   bufferlist::iterator reply_p = reply.begin();
   ::decode(reply_map, reply_p);
   ASSERT_EQ(1, reply_map.size());
@@ -116,6 +116,35 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Test) {
   rados_unwatch(ioctx, notify_oid, handle);
 }
 
+TEST_F(LibRadosWatchNotify, WatchNotify2MultiTest) {
+  notify_io = ioctx;
+  notify_oid = "foo";
+  char buf[128];
+  memset(buf, 0xcc, sizeof(buf));
+  ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
+  uint64_t handle1, handle2;
+  ASSERT_EQ(0,
+      rados_watch2(ioctx, notify_oid, &handle1, watch_notify2_test_cb, NULL));
+  ASSERT_EQ(0,
+      rados_watch2(ioctx, notify_oid, &handle2, watch_notify2_test_cb, NULL));
+  ASSERT_NE(handle1, handle2);
+  char *reply_buf;
+  size_t reply_buf_len;
+  ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
+                            "notify", 6, 0,
+                            &reply_buf, &reply_buf_len));
+  bufferlist reply;
+  reply.append(reply_buf, reply_buf_len);
+  std::multimap<uint64_t, bufferlist> reply_map;
+  bufferlist::iterator reply_p = reply.begin();
+  ::decode(reply_map, reply_p);
+  ASSERT_EQ(2, reply_map.size());
+  ASSERT_EQ(5, reply_map.begin()->second.length());
+  ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+  rados_unwatch(ioctx, notify_oid, handle1);
+  rados_unwatch(ioctx, notify_oid, handle2);
+}
+
 TEST_F(LibRadosWatchNotify, WatchNotify2TimeoutTest) {
   notify_io = ioctx;
   notify_oid = "foo";
@@ -172,7 +201,7 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2TestPP) {
   bufferlist bl2, bl_reply;
   ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 0, &bl_reply));
   bufferlist::iterator p = bl_reply.begin();
-  std::map<uint64_t,bufferlist> reply_map;
+  std::multimap<uint64_t,bufferlist> reply_map;
   ::decode(reply_map, p);
   ASSERT_EQ(1u, reply_map.size());
   ASSERT_EQ(5, reply_map.begin()->second.length());