* after 30 seconds. Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
- * @note BUG: watch timeout should be configurable
* @note BUG: librados should provide a way for watchers to notice connection resets
* @note BUG: the ver parameter does not work, and -ERANGE will never be returned
* (See URL tracker.ceph.com/issues/2592)
* A watch operation registers the client as being interested in
* notifications on an object. OSDs keep track of watches on
* persistent storage, so they are preserved across cluster changes by
- * the normal recovery process. If the client loses its connection to
- * the primary OSD for a watched object, the watch will be removed
- * after 30 seconds. Watches are automatically reestablished when a new
+ * the normal recovery process. If the client loses its connection to the
+ * primary OSD for a watched object, the watch will be removed after
+ * a timeout configured with osd_client_watch_timeout.
+ * Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
- * @note BUG: watch timeout should be configurable
- *
* @param io the pool the object is in
* @param o the object to watch
* @param cookie where to store the internal id assigned to this watch
rados_watcherrcb_t watcherrcb,
void *arg);
+/**
+ * Register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param cookie where to store the internal id assigned to this watch
+ * @param watchcb what to do when a notify is received on this object
+ * @param watcherrcb what to do when the watch session encounters an error
+ * @param timeout how many seconds the connection will keep after disconnection
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie,
+ rados_watchcb2_t watchcb,
+ rados_watcherrcb_t watcherrcb,
+ uint32_t timeout,
+ void *arg);
+
/**
* Asynchronous register an interest in an object
*
* after 30 seconds. Watches are automatically reestablished when a new
* connection is made, or a placement group switches OSDs.
*
- * @note BUG: watch timeout should be configurable
- *
* @param io the pool the object is in
* @param o the object to watch
* @param completion what to do when operation has been attempted
rados_watcherrcb_t watcherrcb,
void *arg);
+/**
+ * Asynchronous register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. If the client loses its connection to
+ * the primary OSD for a watched object, the watch will be removed
+ * after the number of seconds that configured in timeout parameter.
+ * Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param completion what to do when operation has been attempted
+ * @param handle where to store the internal id assigned to this watch
+ * @param watchcb what to do when a notify is received on this object
+ * @param watcherrcb what to do when the watch session encounters an error
+ * @param timeout how many seconds the connection will keep after disconnection
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_aio_watch2(rados_ioctx_t io, const char *o,
+ rados_completion_t completion, uint64_t *handle,
+ rados_watchcb2_t watchcb,
+ rados_watcherrcb_t watcherrcb,
+ uint32_t timeout,
+ void *arg);
+
/**
* Check on the status of a watch
*
// watch/notify
int watch2(const std::string& o, uint64_t *handle,
librados::WatchCtx2 *ctx);
+ int watch3(const std::string& o, uint64_t *handle,
+ librados::WatchCtx2 *ctx, uint32_t timeout);
int aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle,
librados::WatchCtx2 *ctx);
+ int aio_watch2(const std::string& o, AioCompletion *c, uint64_t *handle,
+ librados::WatchCtx2 *ctx, uint32_t timeout);
int unwatch2(uint64_t handle);
int aio_unwatch(uint64_t handle, AioCompletion *c);
/**
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2,
bool internal)
+{
+ return watch(oid, handle, ctx, ctx2, 0, internal);
+}
+
+int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
+ librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2,
+ uint32_t timeout,
+ bool internal)
{
::ObjectOperation wr;
version_t objver;
oid, ctx, ctx2, internal);
prepare_assert_ops(&wr);
- wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
+ wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
bufferlist bl;
objecter->linger_watch(linger_op, wr,
snapc, ceph::real_clock::now(), bl,
uint64_t *handle,
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2,
+ bool internal) {
+ return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
+}
+
+int librados::IoCtxImpl::aio_watch(const object_t& oid,
+ AioCompletionImpl *c,
+ uint64_t *handle,
+ librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2,
+ uint32_t timeout,
bool internal)
{
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);
prepare_assert_ops(&wr);
- wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
+ wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
bufferlist bl;
objecter->linger_watch(linger_op, wr,
snapc, ceph::real_clock::now(), bl,
void set_sync_op_version(version_t ver);
int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2, bool internal = false);
+ int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2, uint32_t timeout, bool internal = false);
int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2,
bool internal = false);
+ int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
+ librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2,
+ uint32_t timeout, bool internal = false);
int watch_check(uint64_t cookie);
int unwatch(uint64_t cookie);
int aio_unwatch(uint64_t cookie, AioCompletionImpl *c);
return io_ctx_impl->watch(obj, cookie, NULL, ctx2);
}
+int librados::IoCtx::watch3(const string& oid, uint64_t *cookie,
+ librados::WatchCtx2 *ctx2, uint32_t timeout)
+{
+ object_t obj(oid);
+ return io_ctx_impl->watch(obj, cookie, NULL, ctx2, timeout);
+}
+
int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c,
uint64_t *cookie,
librados::WatchCtx2 *ctx2)
return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2);
}
+int librados::IoCtx::aio_watch2(const string& oid, AioCompletion *c,
+ uint64_t *cookie,
+ librados::WatchCtx2 *ctx2,
+ uint32_t timeout)
+{
+ object_t obj(oid);
+ return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2, timeout);
+}
int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
{
extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
rados_watchcb2_t watchcb,
rados_watcherrcb_t watcherrcb,
+ void *arg) {
+ return rados_watch3(io, o, handle, watchcb, watcherrcb, 0, arg);
+}
+
+extern "C" int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *handle,
+ rados_watchcb2_t watchcb,
+ rados_watcherrcb_t watcherrcb,
+ uint32_t timeout,
void *arg)
{
- tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg);
+ tracepoint(librados, rados_watch3_enter, io, o, handle, watchcb, timeout, arg);
int ret;
if (!watchcb || !o || !handle) {
ret = -EINVAL;
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
- ret = ctx->watch(oid, cookie, NULL, wc, true);
+ ret = ctx->watch(oid, cookie, NULL, wc, timeout, true);
}
- tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
+ tracepoint(librados, rados_watch3_exit, ret, handle ? *handle : 0);
return ret;
}
rados_completion_t completion,
uint64_t *handle,
rados_watchcb2_t watchcb,
- rados_watcherrcb_t watcherrcb, void *arg)
+ rados_watcherrcb_t watcherrcb, void *arg) {
+ return rados_aio_watch2(io, o, completion, handle, watchcb, watcherrcb, 0, arg);
+}
+
+extern "C" int rados_aio_watch2(rados_ioctx_t io, const char *o,
+ rados_completion_t completion,
+ uint64_t *handle,
+ rados_watchcb2_t watchcb,
+ rados_watcherrcb_t watcherrcb,
+ uint32_t timeout, void *arg)
{
- tracepoint(librados, rados_aio_watch_enter, io, o, completion, handle, watchcb, arg);
+ tracepoint(librados, rados_aio_watch2_enter, io, o, completion, handle, watchcb, timeout, arg);
int ret;
if (!completion || !watchcb || !o || !handle) {
ret = -EINVAL;
librados::AioCompletionImpl *c =
reinterpret_cast<librados::AioCompletionImpl*>(completion);
C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
- ret = ctx->aio_watch(oid, c, cookie, NULL, wc, true);
+ ret = ctx->aio_watch(oid, c, cookie, NULL, wc, timeout, true);
}
- tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
+ tracepoint(librados, rados_aio_watch2_exit, ret, handle ? *handle : 0);
return ret;
}
comp->release();
}
+TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
+ notify_oid = "foo";
+ notify_ioctx = &ioctx;
+ notify_cookies.clear();
+ uint32_t timeout = 12; // configured timeout
+ char buf[128];
+ memset(buf, 0xcc, sizeof(buf));
+ bufferlist bl1;
+ bl1.append(buf, sizeof(buf));
+ ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
+ uint64_t handle;
+ WatchNotifyTestCtx2 ctx;
+ ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
+ ASSERT_GT(ioctx.watch_check(handle), 0);
+ std::list<obj_watch_t> watches;
+ ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
+ ASSERT_EQ(watches.size(), 1u);
+ std::cout << "List watches" << std::endl;
+ for (std::list<obj_watch_t>::iterator it = watches.begin();
+ it != watches.end(); ++it) {
+ ASSERT_EQ(it->timeout_seconds, timeout);
+ }
+ bufferlist bl2, bl_reply;
+ ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
+ bufferlist::iterator p = bl_reply.begin();
+ std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
+ std::set<std::pair<uint64_t,uint64_t> > missed_map;
+ ::decode(reply_map, p);
+ ::decode(missed_map, p);
+ ASSERT_EQ(1u, notify_cookies.size());
+ ASSERT_EQ(1u, notify_cookies.count(handle));
+ ASSERT_EQ(1u, reply_map.size());
+ ASSERT_EQ(5u, reply_map.begin()->second.length());
+ ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+ ASSERT_EQ(0u, missed_map.size());
+ ASSERT_GT(ioctx.watch_check(handle), 0);
+ ioctx.unwatch2(handle);
+}
+
+TEST_F(LibRadosWatchNotify, AioWatchDelete2) {
+ notify_io = ioctx;
+ notify_oid = "foo";
+ notify_err = 0;
+ char buf[128];
+ uint32_t timeout = 3;
+ memset(buf, 0xcc, sizeof(buf));
+ ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
+
+
+ rados_completion_t comp;
+ uint64_t handle;
+ ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
+ rados_aio_watch2(ioctx, notify_oid, comp, &handle,
+ watch_notify2_test_cb, watch_notify2_test_errcb, timeout, NULL);
+ ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
+ ASSERT_EQ(0, rados_aio_get_return_value(comp));
+ rados_aio_release(comp);
+ ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
+ int left = 30;
+ std::cout << "waiting up to " << left << " for disconnect notification ..."
+ << std::endl;
+ while (notify_err == 0 && --left) {
+ sleep(1);
+ }
+ ASSERT_TRUE(left > 0);
+ ASSERT_EQ(-ENOTCONN, notify_err);
+ ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
+ ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
+ rados_aio_unwatch(ioctx, handle, comp);
+ ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
+ ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
+ rados_aio_release(comp);
+}
// --
INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
)
)
-TRACEPOINT_EVENT(librados, rados_watch2_enter,
+TRACEPOINT_EVENT(librados, rados_watch3_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
uint64_t*, phandle,
rados_watchcb2_t, callback,
+ uint32_t, timeout,
void*, arg),
TP_FIELDS(
ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
ctf_string(oid, oid)
ctf_integer_hex(uint64_t, phandle, phandle)
ctf_integer_hex(rados_watchcb2_t, callback, callback)
+ ctf_integer(uint32_t, timeout, timeout)
ctf_integer_hex(void*, arg, arg)
)
)
-TRACEPOINT_EVENT(librados, rados_watch2_exit,
+TRACEPOINT_EVENT(librados, rados_watch3_exit,
TP_ARGS(
int, retval,
uint64_t, handle),
)
)
-TRACEPOINT_EVENT(librados, rados_aio_watch_enter,
+TRACEPOINT_EVENT(librados, rados_aio_watch2_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
rados_completion_t, completion,
uint64_t*, phandle,
rados_watchcb2_t, callback,
+ uint32_t, timeout,
void*, arg),
TP_FIELDS(
ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
ctf_integer_hex(rados_completion_t, completion, completion)
ctf_integer_hex(uint64_t, phandle, phandle)
ctf_integer_hex(rados_watchcb2_t, callback, callback)
+ ctf_integer(uint32_t, timeout, timeout)
ctf_integer_hex(void*, arg, arg)
)
)
-TRACEPOINT_EVENT(librados, rados_aio_watch_exit,
+TRACEPOINT_EVENT(librados, rados_aio_watch2_exit,
TP_ARGS(
int, retval,
uint64_t, handle),