From e3ca1e607e3772cd9883683ffbe32c854deac254 Mon Sep 17 00:00:00 2001 From: Ryne Li Date: Mon, 10 Oct 2016 15:47:32 -0400 Subject: [PATCH] librados: add timeout parameter to rados_watch Signed-off-by: Ryne Li --- src/include/rados/librados.h | 65 +++++++++++++++++++++++---- src/include/rados/librados.hpp | 4 ++ src/librados/IoCtxImpl.cc | 23 +++++++++- src/librados/IoCtxImpl.h | 5 +++ src/librados/librados.cc | 46 ++++++++++++++++--- src/test/librados/watch_notify.cc | 73 +++++++++++++++++++++++++++++++ src/tracing/librados.tp | 12 +++-- 7 files changed, 207 insertions(+), 21 deletions(-) diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index 4c196424b63..bbfb95cb7b3 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -2137,7 +2137,6 @@ typedef void (*rados_watchcb2_t)(void *arg, * after 30 seconds. Watches are automatically reestablished when a new * connection is made, or a placement group switches OSDs. * - * @note BUG: watch timeout should be configurable * @note BUG: librados should provide a way for watchers to notice connection resets * @note BUG: the ver parameter does not work, and -ERANGE will never be returned * (See URL tracker.ceph.com/issues/2592) @@ -2163,13 +2162,12 @@ CEPH_RADOS_API int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, * A watch operation registers the client as being interested in * notifications on an object. OSDs keep track of watches on * persistent storage, so they are preserved across cluster changes by - * the normal recovery process. If the client loses its connection to - * the primary OSD for a watched object, the watch will be removed - * after 30 seconds. Watches are automatically reestablished when a new + * the normal recovery process. If the client loses its connection to the + * primary OSD for a watched object, the watch will be removed after + * a timeout configured with osd_client_watch_timeout. + * Watches are automatically reestablished when a new * connection is made, or a placement group switches OSDs. * - * @note BUG: watch timeout should be configurable - * * @param io the pool the object is in * @param o the object to watch * @param cookie where to store the internal id assigned to this watch @@ -2183,6 +2181,30 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki rados_watcherrcb_t watcherrcb, void *arg); +/** + * Register an interest in an object + * + * A watch operation registers the client as being interested in + * notifications on an object. OSDs keep track of watches on + * persistent storage, so they are preserved across cluster changes by + * the normal recovery process. Watches are automatically reestablished when a new + * connection is made, or a placement group switches OSDs. + * + * @param io the pool the object is in + * @param o the object to watch + * @param cookie where to store the internal id assigned to this watch + * @param watchcb what to do when a notify is received on this object + * @param watcherrcb what to do when the watch session encounters an error + * @param timeout how many seconds the connection will keep after disconnection + * @param arg opaque value to pass to the callback + * @returns 0 on success, negative error code on failure + */ +CEPH_RADOS_API int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, + rados_watchcb2_t watchcb, + rados_watcherrcb_t watcherrcb, + uint32_t timeout, + void *arg); + /** * Asynchronous register an interest in an object * @@ -2194,8 +2216,6 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki * after 30 seconds. Watches are automatically reestablished when a new * connection is made, or a placement group switches OSDs. * - * @note BUG: watch timeout should be configurable - * * @param io the pool the object is in * @param o the object to watch * @param completion what to do when operation has been attempted @@ -2211,6 +2231,35 @@ CEPH_RADOS_API int rados_aio_watch(rados_ioctx_t io, const char *o, rados_watcherrcb_t watcherrcb, void *arg); +/** + * Asynchronous register an interest in an object + * + * A watch operation registers the client as being interested in + * notifications on an object. OSDs keep track of watches on + * persistent storage, so they are preserved across cluster changes by + * the normal recovery process. If the client loses its connection to + * the primary OSD for a watched object, the watch will be removed + * after the number of seconds that configured in timeout parameter. + * Watches are automatically reestablished when a new + * connection is made, or a placement group switches OSDs. + * + * @param io the pool the object is in + * @param o the object to watch + * @param completion what to do when operation has been attempted + * @param handle where to store the internal id assigned to this watch + * @param watchcb what to do when a notify is received on this object + * @param watcherrcb what to do when the watch session encounters an error + * @param timeout how many seconds the connection will keep after disconnection + * @param arg opaque value to pass to the callback + * @returns 0 on success, negative error code on failure + */ +CEPH_RADOS_API int rados_aio_watch2(rados_ioctx_t io, const char *o, + rados_completion_t completion, uint64_t *handle, + rados_watchcb2_t watchcb, + rados_watcherrcb_t watcherrcb, + uint32_t timeout, + void *arg); + /** * Check on the status of a watch * diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 82a2daa54ba..901139aa4df 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -1019,8 +1019,12 @@ namespace librados // watch/notify int watch2(const std::string& o, uint64_t *handle, librados::WatchCtx2 *ctx); + int watch3(const std::string& o, uint64_t *handle, + librados::WatchCtx2 *ctx, uint32_t timeout); int aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx); + int aio_watch2(const std::string& o, AioCompletion *c, uint64_t *handle, + librados::WatchCtx2 *ctx, uint32_t timeout); int unwatch2(uint64_t handle); int aio_unwatch(uint64_t handle, AioCompletion *c); /** diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index 1b29d1eeea8..7ee165229cc 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1419,6 +1419,15 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, bool internal) +{ + return watch(oid, handle, ctx, ctx2, 0, internal); +} + +int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle, + librados::WatchCtx *ctx, + librados::WatchCtx2 *ctx2, + uint32_t timeout, + bool internal) { ::ObjectOperation wr; version_t objver; @@ -1430,7 +1439,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle, oid, ctx, ctx2, internal); prepare_assert_ops(&wr); - wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH); + wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout); bufferlist bl; objecter->linger_watch(linger_op, wr, snapc, ceph::real_clock::now(), bl, @@ -1454,6 +1463,16 @@ int librados::IoCtxImpl::aio_watch(const object_t& oid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, + bool internal) { + return aio_watch(oid, c, handle, ctx, ctx2, 0, internal); +} + +int librados::IoCtxImpl::aio_watch(const object_t& oid, + AioCompletionImpl *c, + uint64_t *handle, + librados::WatchCtx *ctx, + librados::WatchCtx2 *ctx2, + uint32_t timeout, bool internal) { Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); @@ -1465,7 +1484,7 @@ int librados::IoCtxImpl::aio_watch(const object_t& oid, linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal); prepare_assert_ops(&wr); - wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH); + wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout); bufferlist bl; objecter->linger_watch(linger_op, wr, snapc, ceph::real_clock::now(), bl, diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 19fd0b5b6b7..050b52ddd55 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -234,9 +234,14 @@ struct librados::IoCtxImpl { void set_sync_op_version(version_t ver); int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, bool internal = false); + int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx, + librados::WatchCtx2 *ctx2, uint32_t timeout, bool internal = false); int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, bool internal = false); + int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie, + librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, + uint32_t timeout, bool internal = false); int watch_check(uint64_t cookie); int unwatch(uint64_t cookie); int aio_unwatch(uint64_t cookie, AioCompletionImpl *c); diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 944d0b3acd5..3d2c8577d63 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -1878,6 +1878,13 @@ int librados::IoCtx::watch2(const string& oid, uint64_t *cookie, return io_ctx_impl->watch(obj, cookie, NULL, ctx2); } +int librados::IoCtx::watch3(const string& oid, uint64_t *cookie, + librados::WatchCtx2 *ctx2, uint32_t timeout) +{ + object_t obj(oid); + return io_ctx_impl->watch(obj, cookie, NULL, ctx2, timeout); +} + int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c, uint64_t *cookie, librados::WatchCtx2 *ctx2) @@ -1886,6 +1893,14 @@ int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c, return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2); } +int librados::IoCtx::aio_watch2(const string& oid, AioCompletion *c, + uint64_t *cookie, + librados::WatchCtx2 *ctx2, + uint32_t timeout) +{ + object_t obj(oid); + return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2, timeout); +} int librados::IoCtx::unwatch(const string& oid, uint64_t handle) { @@ -4520,9 +4535,17 @@ struct C_WatchCB2 : public librados::WatchCtx2 { extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle, rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, + void *arg) { + return rados_watch3(io, o, handle, watchcb, watcherrcb, 0, arg); +} + +extern "C" int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *handle, + rados_watchcb2_t watchcb, + rados_watcherrcb_t watcherrcb, + uint32_t timeout, void *arg) { - tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg); + tracepoint(librados, rados_watch3_enter, io, o, handle, watchcb, timeout, arg); int ret; if (!watchcb || !o || !handle) { ret = -EINVAL; @@ -4531,9 +4554,9 @@ extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle, librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg); - ret = ctx->watch(oid, cookie, NULL, wc, true); + ret = ctx->watch(oid, cookie, NULL, wc, timeout, true); } - tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0); + tracepoint(librados, rados_watch3_exit, ret, handle ? *handle : 0); return ret; } @@ -4541,9 +4564,18 @@ extern "C" int rados_aio_watch(rados_ioctx_t io, const char *o, rados_completion_t completion, uint64_t *handle, rados_watchcb2_t watchcb, - rados_watcherrcb_t watcherrcb, void *arg) + rados_watcherrcb_t watcherrcb, void *arg) { + return rados_aio_watch2(io, o, completion, handle, watchcb, watcherrcb, 0, arg); +} + +extern "C" int rados_aio_watch2(rados_ioctx_t io, const char *o, + rados_completion_t completion, + uint64_t *handle, + rados_watchcb2_t watchcb, + rados_watcherrcb_t watcherrcb, + uint32_t timeout, void *arg) { - tracepoint(librados, rados_aio_watch_enter, io, o, completion, handle, watchcb, arg); + tracepoint(librados, rados_aio_watch2_enter, io, o, completion, handle, watchcb, timeout, arg); int ret; if (!completion || !watchcb || !o || !handle) { ret = -EINVAL; @@ -4554,9 +4586,9 @@ extern "C" int rados_aio_watch(rados_ioctx_t io, const char *o, librados::AioCompletionImpl *c = reinterpret_cast(completion); C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg); - ret = ctx->aio_watch(oid, c, cookie, NULL, wc, true); + ret = ctx->aio_watch(oid, c, cookie, NULL, wc, timeout, true); } - tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0); + tracepoint(librados, rados_aio_watch2_exit, ret, handle ? *handle : 0); return ret; } diff --git a/src/test/librados/watch_notify.cc b/src/test/librados/watch_notify.cc index dda8fec3785..07370c97410 100644 --- a/src/test/librados/watch_notify.cc +++ b/src/test/librados/watch_notify.cc @@ -770,6 +770,79 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) { comp->release(); } +TEST_P(LibRadosWatchNotifyPP, WatchNotify3) { + notify_oid = "foo"; + notify_ioctx = &ioctx; + notify_cookies.clear(); + uint32_t timeout = 12; // configured timeout + char buf[128]; + memset(buf, 0xcc, sizeof(buf)); + bufferlist bl1; + bl1.append(buf, sizeof(buf)); + ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0)); + uint64_t handle; + WatchNotifyTestCtx2 ctx; + ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout)); + ASSERT_GT(ioctx.watch_check(handle), 0); + std::list watches; + ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches)); + ASSERT_EQ(watches.size(), 1u); + std::cout << "List watches" << std::endl; + for (std::list::iterator it = watches.begin(); + it != watches.end(); ++it) { + ASSERT_EQ(it->timeout_seconds, timeout); + } + bufferlist bl2, bl_reply; + ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply)); + bufferlist::iterator p = bl_reply.begin(); + std::map,bufferlist> reply_map; + std::set > missed_map; + ::decode(reply_map, p); + ::decode(missed_map, p); + ASSERT_EQ(1u, notify_cookies.size()); + ASSERT_EQ(1u, notify_cookies.count(handle)); + ASSERT_EQ(1u, reply_map.size()); + ASSERT_EQ(5u, reply_map.begin()->second.length()); + ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5)); + ASSERT_EQ(0u, missed_map.size()); + ASSERT_GT(ioctx.watch_check(handle), 0); + ioctx.unwatch2(handle); +} + +TEST_F(LibRadosWatchNotify, AioWatchDelete2) { + notify_io = ioctx; + notify_oid = "foo"; + notify_err = 0; + char buf[128]; + uint32_t timeout = 3; + memset(buf, 0xcc, sizeof(buf)); + ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0)); + + + rados_completion_t comp; + uint64_t handle; + ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp)); + rados_aio_watch2(ioctx, notify_oid, comp, &handle, + watch_notify2_test_cb, watch_notify2_test_errcb, timeout, NULL); + ASSERT_EQ(0, rados_aio_wait_for_complete(comp)); + ASSERT_EQ(0, rados_aio_get_return_value(comp)); + rados_aio_release(comp); + ASSERT_EQ(0, rados_remove(ioctx, notify_oid)); + int left = 30; + std::cout << "waiting up to " << left << " for disconnect notification ..." + << std::endl; + while (notify_err == 0 && --left) { + sleep(1); + } + ASSERT_TRUE(left > 0); + ASSERT_EQ(-ENOTCONN, notify_err); + ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle)); + ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp)); + rados_aio_unwatch(ioctx, handle, comp); + ASSERT_EQ(0, rados_aio_wait_for_complete(comp)); + ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp)); + rados_aio_release(comp); +} // -- INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP, diff --git a/src/tracing/librados.tp b/src/tracing/librados.tp index c4aaf4cedc6..dab66020065 100644 --- a/src/tracing/librados.tp +++ b/src/tracing/librados.tp @@ -2301,23 +2301,25 @@ TRACEPOINT_EVENT(librados, rados_watch_exit, ) ) -TRACEPOINT_EVENT(librados, rados_watch2_enter, +TRACEPOINT_EVENT(librados, rados_watch3_enter, TP_ARGS( rados_ioctx_t, ioctx, const char*, oid, uint64_t*, phandle, rados_watchcb2_t, callback, + uint32_t, timeout, void*, arg), TP_FIELDS( ctf_integer_hex(rados_ioctx_t, ioctx, ioctx) ctf_string(oid, oid) ctf_integer_hex(uint64_t, phandle, phandle) ctf_integer_hex(rados_watchcb2_t, callback, callback) + ctf_integer(uint32_t, timeout, timeout) ctf_integer_hex(void*, arg, arg) ) ) -TRACEPOINT_EVENT(librados, rados_watch2_exit, +TRACEPOINT_EVENT(librados, rados_watch3_exit, TP_ARGS( int, retval, uint64_t, handle), @@ -2327,13 +2329,14 @@ TRACEPOINT_EVENT(librados, rados_watch2_exit, ) ) -TRACEPOINT_EVENT(librados, rados_aio_watch_enter, +TRACEPOINT_EVENT(librados, rados_aio_watch2_enter, TP_ARGS( rados_ioctx_t, ioctx, const char*, oid, rados_completion_t, completion, uint64_t*, phandle, rados_watchcb2_t, callback, + uint32_t, timeout, void*, arg), TP_FIELDS( ctf_integer_hex(rados_ioctx_t, ioctx, ioctx) @@ -2341,11 +2344,12 @@ TRACEPOINT_EVENT(librados, rados_aio_watch_enter, ctf_integer_hex(rados_completion_t, completion, completion) ctf_integer_hex(uint64_t, phandle, phandle) ctf_integer_hex(rados_watchcb2_t, callback, callback) + ctf_integer(uint32_t, timeout, timeout) ctf_integer_hex(void*, arg, arg) ) ) -TRACEPOINT_EVENT(librados, rados_aio_watch_exit, +TRACEPOINT_EVENT(librados, rados_aio_watch2_exit, TP_ARGS( int, retval, uint64_t, handle), -- 2.39.5