]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librados: add timeout parameter to rados_watch
authorRyne Li <lizhenqiangsnake@gmail.com>
Mon, 10 Oct 2016 19:47:32 +0000 (15:47 -0400)
committerRyne Li <lizhenqiangsnake@gmail.com>
Sun, 13 Nov 2016 12:21:54 +0000 (07:21 -0500)
Signed-off-by: Ryne Li <lizhenqiangsnake@gmail.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/librados.cc
src/test/librados/watch_notify.cc
src/tracing/librados.tp

index 4c196424b63f7badcf8f4317f0d9ad060dbefba1..bbfb95cb7b365b14aafdbe99bb9f50328062a3b4 100644 (file)
@@ -2137,7 +2137,6 @@ typedef void (*rados_watchcb2_t)(void *arg,
  * after 30 seconds. Watches are automatically reestablished when a new
  * connection is made, or a placement group switches OSDs.
  *
- * @note BUG: watch timeout should be configurable
  * @note BUG: librados should provide a way for watchers to notice connection resets
  * @note BUG: the ver parameter does not work, and -ERANGE will never be returned
  *            (See URL tracker.ceph.com/issues/2592)
@@ -2163,13 +2162,12 @@ CEPH_RADOS_API int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver,
  * A watch operation registers the client as being interested in
  * notifications on an object. OSDs keep track of watches on
  * persistent storage, so they are preserved across cluster changes by
- * the normal recovery process. If the client loses its connection to
- * the primary OSD for a watched object, the watch will be removed
- * after 30 seconds. Watches are automatically reestablished when a new
+ * the normal recovery process. If the client loses its connection to the
+ * primary OSD for a watched object, the watch will be removed after
+ * a timeout configured with osd_client_watch_timeout.
+ * Watches are automatically reestablished when a new
  * connection is made, or a placement group switches OSDs.
  *
- * @note BUG: watch timeout should be configurable
- *
  * @param io the pool the object is in
  * @param o the object to watch
  * @param cookie where to store the internal id assigned to this watch
@@ -2183,6 +2181,30 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki
                                rados_watcherrcb_t watcherrcb,
                                void *arg);
 
+/**
+ * Register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param cookie where to store the internal id assigned to this watch
+ * @param watchcb what to do when a notify is received on this object
+ * @param watcherrcb what to do when the watch session encounters an error
+ * @param timeout how many seconds the connection will keep after disconnection
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie,
+        rados_watchcb2_t watchcb,
+        rados_watcherrcb_t watcherrcb,
+        uint32_t timeout,
+        void *arg);
+
 /**
  * Asynchronous register an interest in an object
  *
@@ -2194,8 +2216,6 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki
  * after 30 seconds. Watches are automatically reestablished when a new
  * connection is made, or a placement group switches OSDs.
  *
- * @note BUG: watch timeout should be configurable
- *
  * @param io the pool the object is in
  * @param o the object to watch
  * @param completion what to do when operation has been attempted
@@ -2211,6 +2231,35 @@ CEPH_RADOS_API int rados_aio_watch(rados_ioctx_t io, const char *o,
                                   rados_watcherrcb_t watcherrcb,
                                   void *arg);
 
+/**
+ * Asynchronous register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. If the client loses its connection to
+ * the primary OSD for a watched object, the watch will be removed
+ * after the number of seconds that configured in timeout parameter.
+ * Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param completion what to do when operation has been attempted
+ * @param handle where to store the internal id assigned to this watch
+ * @param watchcb what to do when a notify is received on this object
+ * @param watcherrcb what to do when the watch session encounters an error
+ * @param timeout how many seconds the connection will keep after disconnection
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_aio_watch2(rados_ioctx_t io, const char *o,
+           rados_completion_t completion, uint64_t *handle,
+           rados_watchcb2_t watchcb,
+           rados_watcherrcb_t watcherrcb,
+           uint32_t timeout,
+           void *arg);
+
 /**
  * Check on the status of a watch
  *
index 82a2daa54ba34b2786368983a9d97c6a023e089b..901139aa4df2927d064decc05e6ada9a6b5cce44 100644 (file)
@@ -1019,8 +1019,12 @@ namespace librados
     // watch/notify
     int watch2(const std::string& o, uint64_t *handle,
               librados::WatchCtx2 *ctx);
+    int watch3(const std::string& o, uint64_t *handle,
+              librados::WatchCtx2 *ctx, uint32_t timeout);
     int aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle,
               librados::WatchCtx2 *ctx);
+    int aio_watch2(const std::string& o, AioCompletion *c, uint64_t *handle,
+              librados::WatchCtx2 *ctx, uint32_t timeout);
     int unwatch2(uint64_t handle);
     int aio_unwatch(uint64_t handle, AioCompletion *c);
     /**
index 1b29d1eeea87894960c6ad2f558da702d19a5bbd..7ee165229cc3f5c05e521b490601d17f3d274b5c 100644 (file)
@@ -1419,6 +1419,15 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
                                librados::WatchCtx *ctx,
                                librados::WatchCtx2 *ctx2,
                                bool internal)
+{
+  return watch(oid, handle, ctx, ctx2, 0, internal);
+}
+
+int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
+                               librados::WatchCtx *ctx,
+                               librados::WatchCtx2 *ctx2,
+                               uint32_t timeout,
+                               bool internal)
 {
   ::ObjectOperation wr;
   version_t objver;
@@ -1430,7 +1439,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
                                           oid, ctx, ctx2, internal);
 
   prepare_assert_ops(&wr);
-  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
+  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
   bufferlist bl;
   objecter->linger_watch(linger_op, wr,
                         snapc, ceph::real_clock::now(), bl,
@@ -1454,6 +1463,16 @@ int librados::IoCtxImpl::aio_watch(const object_t& oid,
                                    uint64_t *handle,
                                    librados::WatchCtx *ctx,
                                    librados::WatchCtx2 *ctx2,
+                                   bool internal) {
+  return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
+}
+
+int librados::IoCtxImpl::aio_watch(const object_t& oid,
+                                   AioCompletionImpl *c,
+                                   uint64_t *handle,
+                                   librados::WatchCtx *ctx,
+                                   librados::WatchCtx2 *ctx2,
+                                   uint32_t timeout,
                                    bool internal)
 {
   Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
@@ -1465,7 +1484,7 @@ int librados::IoCtxImpl::aio_watch(const object_t& oid,
   linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);
 
   prepare_assert_ops(&wr);
-  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
+  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
   bufferlist bl;
   objecter->linger_watch(linger_op, wr,
                          snapc, ceph::real_clock::now(), bl,
index 19fd0b5b6b7e7366812acc3ff7ba6e6ce4ece77a..050b52ddd5595d11f6e7693500319bab9374f35b 100644 (file)
@@ -234,9 +234,14 @@ struct librados::IoCtxImpl {
   void set_sync_op_version(version_t ver);
   int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
            librados::WatchCtx2 *ctx2, bool internal = false);
+  int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
+           librados::WatchCtx2 *ctx2, uint32_t timeout, bool internal = false);
   int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
                 librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2,
                 bool internal = false);
+  int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
+                librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2,
+                uint32_t timeout, bool internal = false);
   int watch_check(uint64_t cookie);
   int unwatch(uint64_t cookie);
   int aio_unwatch(uint64_t cookie, AioCompletionImpl *c);
index 944d0b3acd5740fb3bd4f66f99ec7f1336a10270..3d2c8577d63dda640d4ffd4aa25d22eedf356c92 100644 (file)
@@ -1878,6 +1878,13 @@ int librados::IoCtx::watch2(const string& oid, uint64_t *cookie,
   return io_ctx_impl->watch(obj, cookie, NULL, ctx2);
 }
 
+int librados::IoCtx::watch3(const string& oid, uint64_t *cookie,
+          librados::WatchCtx2 *ctx2, uint32_t timeout)
+{
+  object_t obj(oid);
+  return io_ctx_impl->watch(obj, cookie, NULL, ctx2, timeout);
+}
+
 int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c,
                                uint64_t *cookie,
                                librados::WatchCtx2 *ctx2)
@@ -1886,6 +1893,14 @@ int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c,
   return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2);
 }
 
+int librados::IoCtx::aio_watch2(const string& oid, AioCompletion *c,
+                                uint64_t *cookie,
+                                librados::WatchCtx2 *ctx2,
+                                uint32_t timeout)
+{
+  object_t obj(oid);
+  return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2, timeout);
+}
 
 int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
 {
@@ -4520,9 +4535,17 @@ struct C_WatchCB2 : public librados::WatchCtx2 {
 extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
                            rados_watchcb2_t watchcb,
                            rados_watcherrcb_t watcherrcb,
+                           void *arg) {
+  return rados_watch3(io, o, handle, watchcb, watcherrcb, 0, arg);
+}
+
+extern "C" int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *handle,
+                           rados_watchcb2_t watchcb,
+                           rados_watcherrcb_t watcherrcb,
+                           uint32_t timeout,
                            void *arg)
 {
-  tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg);
+  tracepoint(librados, rados_watch3_enter, io, o, handle, watchcb, timeout, arg);
   int ret;
   if (!watchcb || !o || !handle) {
     ret = -EINVAL;
@@ -4531,9 +4554,9 @@ extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
     librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
     object_t oid(o);
     C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
-    ret = ctx->watch(oid, cookie, NULL, wc, true);
+    ret = ctx->watch(oid, cookie, NULL, wc, timeout, true);
   }
-  tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
+  tracepoint(librados, rados_watch3_exit, ret, handle ? *handle : 0);
   return ret;
 }
 
@@ -4541,9 +4564,18 @@ extern "C" int rados_aio_watch(rados_ioctx_t io, const char *o,
                                rados_completion_t completion,
                                uint64_t *handle,
                                rados_watchcb2_t watchcb,
-                               rados_watcherrcb_t watcherrcb, void *arg)
+                               rados_watcherrcb_t watcherrcb, void *arg) {
+  return rados_aio_watch2(io, o, completion, handle, watchcb, watcherrcb, 0, arg);
+}
+
+extern "C" int rados_aio_watch2(rados_ioctx_t io, const char *o,
+                                rados_completion_t completion,
+                                uint64_t *handle,
+                                rados_watchcb2_t watchcb,
+                                rados_watcherrcb_t watcherrcb,
+                                uint32_t timeout, void *arg)
 {
-  tracepoint(librados, rados_aio_watch_enter, io, o, completion, handle, watchcb, arg);
+  tracepoint(librados, rados_aio_watch2_enter, io, o, completion, handle, watchcb, timeout, arg);
   int ret;
   if (!completion || !watchcb || !o || !handle) {
     ret = -EINVAL;
@@ -4554,9 +4586,9 @@ extern "C" int rados_aio_watch(rados_ioctx_t io, const char *o,
     librados::AioCompletionImpl *c =
       reinterpret_cast<librados::AioCompletionImpl*>(completion);
     C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
-    ret = ctx->aio_watch(oid, c, cookie, NULL, wc, true);
+    ret = ctx->aio_watch(oid, c, cookie, NULL, wc, timeout, true);
   }
-  tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
+  tracepoint(librados, rados_aio_watch2_exit, ret, handle ? *handle : 0);
   return ret;
 }
 
index dda8fec3785f6de2c2cb7e8a5c2e581c5b706046..07370c974106d14f18a1fdb4e4454f8bfe8d9ceb 100644 (file)
@@ -770,6 +770,79 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
   comp->release();
 }
 
+TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
+  notify_oid = "foo";
+  notify_ioctx = &ioctx;
+  notify_cookies.clear();
+  uint32_t timeout = 12; // configured timeout
+  char buf[128];
+  memset(buf, 0xcc, sizeof(buf));
+  bufferlist bl1;
+  bl1.append(buf, sizeof(buf));
+  ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
+  uint64_t handle;
+  WatchNotifyTestCtx2 ctx;
+  ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
+  ASSERT_GT(ioctx.watch_check(handle), 0);
+  std::list<obj_watch_t> watches;
+  ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
+  ASSERT_EQ(watches.size(), 1u);
+  std::cout << "List watches" << std::endl;
+  for (std::list<obj_watch_t>::iterator it = watches.begin();
+    it != watches.end(); ++it) {
+    ASSERT_EQ(it->timeout_seconds, timeout);
+  }
+  bufferlist bl2, bl_reply;
+  ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
+  bufferlist::iterator p = bl_reply.begin();
+  std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
+  std::set<std::pair<uint64_t,uint64_t> > missed_map;
+  ::decode(reply_map, p);
+  ::decode(missed_map, p);
+  ASSERT_EQ(1u, notify_cookies.size());
+  ASSERT_EQ(1u, notify_cookies.count(handle));
+  ASSERT_EQ(1u, reply_map.size());
+  ASSERT_EQ(5u, reply_map.begin()->second.length());
+  ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+  ASSERT_EQ(0u, missed_map.size());
+  ASSERT_GT(ioctx.watch_check(handle), 0);
+  ioctx.unwatch2(handle);
+}
+
+TEST_F(LibRadosWatchNotify, AioWatchDelete2) {
+  notify_io = ioctx;
+  notify_oid = "foo";
+  notify_err = 0;
+  char buf[128];
+  uint32_t timeout = 3;
+  memset(buf, 0xcc, sizeof(buf));
+  ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
+
+
+  rados_completion_t comp;
+  uint64_t handle;
+  ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
+  rados_aio_watch2(ioctx, notify_oid, comp, &handle,
+                  watch_notify2_test_cb, watch_notify2_test_errcb, timeout, NULL);
+  ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
+  ASSERT_EQ(0, rados_aio_get_return_value(comp));
+  rados_aio_release(comp);
+  ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
+  int left = 30;
+  std::cout << "waiting up to " << left << " for disconnect notification ..."
+      << std::endl;
+  while (notify_err == 0 && --left) {
+    sleep(1);
+  }
+  ASSERT_TRUE(left > 0);
+  ASSERT_EQ(-ENOTCONN, notify_err);
+  ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
+  ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
+  rados_aio_unwatch(ioctx, handle, comp);
+  ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
+  ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
+  rados_aio_release(comp);
+}
 // --
 
 INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
index c4aaf4cedc65ee99cf7e5911eaae0f05a25a0cf4..dab66020065ee8cd8ce834196509deb592405ea7 100644 (file)
@@ -2301,23 +2301,25 @@ TRACEPOINT_EVENT(librados, rados_watch_exit,
     )
 )
 
-TRACEPOINT_EVENT(librados, rados_watch2_enter,
+TRACEPOINT_EVENT(librados, rados_watch3_enter,
     TP_ARGS(
         rados_ioctx_t, ioctx,
         const char*, oid,
         uint64_t*, phandle,
         rados_watchcb2_t, callback,
+        uint32_t, timeout,
         void*, arg),
     TP_FIELDS(
         ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
         ctf_string(oid, oid)
         ctf_integer_hex(uint64_t, phandle, phandle)
         ctf_integer_hex(rados_watchcb2_t, callback, callback)
+        ctf_integer(uint32_t, timeout, timeout)
         ctf_integer_hex(void*, arg, arg)
     )
 )
 
-TRACEPOINT_EVENT(librados, rados_watch2_exit,
+TRACEPOINT_EVENT(librados, rados_watch3_exit,
     TP_ARGS(
         int, retval,
         uint64_t, handle),
@@ -2327,13 +2329,14 @@ TRACEPOINT_EVENT(librados, rados_watch2_exit,
     )
 )
 
-TRACEPOINT_EVENT(librados, rados_aio_watch_enter,
+TRACEPOINT_EVENT(librados, rados_aio_watch2_enter,
     TP_ARGS(
         rados_ioctx_t, ioctx,
         const char*, oid,
         rados_completion_t, completion,
         uint64_t*, phandle,
         rados_watchcb2_t, callback,
+        uint32_t, timeout,
         void*, arg),
     TP_FIELDS(
         ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
@@ -2341,11 +2344,12 @@ TRACEPOINT_EVENT(librados, rados_aio_watch_enter,
         ctf_integer_hex(rados_completion_t, completion, completion)
         ctf_integer_hex(uint64_t, phandle, phandle)
         ctf_integer_hex(rados_watchcb2_t, callback, callback)
+        ctf_integer(uint32_t, timeout, timeout)
         ctf_integer_hex(void*, arg, arg)
     )
 )
 
-TRACEPOINT_EVENT(librados, rados_aio_watch_exit,
+TRACEPOINT_EVENT(librados, rados_aio_watch2_exit,
     TP_ARGS(
         int, retval,
         uint64_t, handle),