From 7e0c4efcd494224baf1e2f50aec020c46415bd9b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 Aug 2014 14:32:48 -0700 Subject: [PATCH] librados: define updated watch/notify interface - new notify callback with the correct values: - notify_id - watch handle - payload - new notify_ack call - not implicit when the callback returns (for new api only) - optional payload - new watch2 call - that provides the new callback - new notify2 call - with the right arguments, and optional timeout A couple refactors in here: - IoCtx notify_ack is now called unlocked (Note: this will soon change with pending Objecter locking changes) - Objecter notify_ack takes a buffer TODO: - no timeout on the individual watch, yet... Signed-off-by: Sage Weil --- src/include/rados/librados.h | 73 +++++++++++++++++++++++ src/include/rados/librados.hpp | 25 ++++++++ src/librados/IoCtxImpl.cc | 25 +++++--- src/librados/IoCtxImpl.h | 12 ++-- src/librados/RadosClient.cc | 15 +++-- src/librados/librados.cc | 104 ++++++++++++++++++++++++++++++--- src/osdc/Objecter.h | 6 +- src/tracing/librados.tp | 74 +++++++++++++++++++++++ 8 files changed, 306 insertions(+), 28 deletions(-) diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index 9c4a5732ff696..703cc728b2f77 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -1871,6 +1871,23 @@ CEPH_RADOS_API int rados_aio_cancel(rados_ioctx_t io, */ typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg); +/** + * @typedef rados_watchcb2_t + * + * Callback activated when a notify is received on a watched + * object. Parameters are: + * - arg opaque user-defined value provided to rados_watch2() + * - notify_id an id for this notify event + * - handle the watcher handle we are notifying + * - data payload from the notifier + * - datalen length of payload buffer + */ +typedef void (*rados_watchcb2_t)(void *arg, + uint64_t notify_id, + uint64_t handle, + void *data, + size_t data_len); + /** * Register an interest in an object * @@ -1900,6 +1917,30 @@ CEPH_RADOS_API int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, rados_watchcb_t watchcb, void *arg); + +/** + * Register an interest in an object + * + * A watch operation registers the client as being interested in + * notifications on an object. OSDs keep track of watches on + * persistent storage, so they are preserved across cluster changes by + * the normal recovery process. If the client loses its connection to + * the primary OSD for a watched object, the watch will be removed + * after 30 seconds. Watches are automatically reestablished when a new + * connection is made, or a placement group switches OSDs. + * + * @note BUG: watch timeout should be configurable + * + * @param io the pool the object is in + * @param o the object to watch + * @param handle where to store the internal id assigned to this watch + * @param watchcb2 what to do when a notify is received on this object + * @param arg opaque value to pass to the callback + * @returns 0 on success, negative error code on failure + */ +int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle, + rados_watchcb2_t watchcb, void *arg); + /** * Unregister an interest in an object * @@ -1933,6 +1974,38 @@ CEPH_RADOS_API int rados_unwatch(rados_ioctx_t io, const char *o, CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len); +/** + * Sychronously notify watchers of an object + * + * This blocks until all watchers of the object have received and + * reacted to the notify, or a timeout is reached. + * + * @param io the pool the object is in + * @param o the name of the object + * @param buf data to send to watchers + * @param buf_len length of buf in bytes + * @param timeout_ms notify timeout (in ms) + * @returns 0 on success, negative error code on failure + */ +int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len, + uint64_t timeout_ms); + +/** + * Acknolwedge receipt of a notify + * + * @param io the pool the object is in + * @param o the name of the object + * @param notify_id the notify_id we got on the watchcb2_t callback + * @param handle the watcher handle + * @param buf payload to return to notifier (optional) + * @param buf_len payload length + * @returns 0 on success + */ +int rados_notify_ack(rados_ioctx_t io, const char *o, + uint64_t notify_id, uint64_t handle, + const char *buf, int buf_len); + + /** @} Watch/Notify */ /** diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 402ede0068722..3ae0e0d00056e 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -145,12 +145,26 @@ namespace librados std::pair cur_obj; }; + /// DEPRECATED; do not use class CEPH_RADOS_API WatchCtx { public: virtual ~WatchCtx(); virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) = 0; }; + class CEPH_RADOS_API WatchCtx2 { + public: + virtual ~WatchCtx2(); + /** + * @param notify_id unique id for this notify event + * @param cookie the watcher we are notifying + * @param bl opaque notify payload (from the notifier) + */ + virtual void handle_notify(uint64_t notify_id, + uint64_t cookie, + bufferlist& bl) = 0; + }; + struct CEPH_RADOS_API AioCompletion { AioCompletion(AioCompletionImpl *pc_) : pc(pc_) {} int set_complete_callback(void *cb_arg, callback_t cb); @@ -893,12 +907,23 @@ namespace librados // watch/notify int watch(const std::string& o, uint64_t ver, uint64_t *handle, librados::WatchCtx *ctx); + int watch2(const std::string& o, uint64_t *handle, + librados::WatchCtx2 *ctx); int unwatch(const std::string& o, uint64_t handle); int notify(const std::string& o, uint64_t ver, bufferlist& bl); + int notify2(const std::string& o, ///< object + bufferlist& bl, ///< optional broadcast payload + uint64_t timeout_ms); ///< timeout (in ms) int list_watchers(const std::string& o, std::list *out_watchers); int list_snaps(const std::string& o, snap_set_t *out_snaps); void set_notify_timeout(uint32_t timeout); + /// acknowledge a notify we received. + void notify_ack(const std::string& o, ///< watched object + uint64_t notify_id, ///< notify id + uint64_t handle, ///< our watch handle + bufferlist& bl); ///< optional reply payload + /** * Set allocation hint for an object * diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index dcbe032863bc0..570b1a68c8664 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1032,8 +1032,10 @@ void librados::IoCtxImpl::set_sync_op_version(version_t ver) last_objver = ver; } -int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, - uint64_t *cookie, librados::WatchCtx *ctx) +int librados::IoCtxImpl::watch(const object_t& oid, + uint64_t *cookie, + librados::WatchCtx *ctx, + librados::WatchCtx2 *ctx2) { ::ObjectOperation wr; Mutex mylock("IoCtxImpl::watch::mylock"); @@ -1047,9 +1049,10 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid); wc->watch_ctx = ctx; + wc->watch_ctx2 = ctx2; client->register_watch_notify_callback(wc, cookie); prepare_assert_ops(&wr); - wr.watch(*cookie, ver, 1); + wr.watch(*cookie, 0, 1); bufferlist bl; wc->linger_id = objecter->linger_mutate(oid, oloc, wr, snapc, ceph_clock_now(NULL), bl, @@ -1074,15 +1077,16 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, } -/* this is called with IoCtxImpl::lock held */ -int librados::IoCtxImpl::_notify_ack( +int librados::IoCtxImpl::notify_ack( const object_t& oid, uint64_t notify_id, - uint64_t cookie) + uint64_t cookie, + bufferlist& bl) { + Mutex::Locker l(*lock); ::ObjectOperation rd; prepare_assert_ops(&rd); - rd.notify_ack(notify_id, 0, cookie); + rd.notify_ack(notify_id, cookie, bl); objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0); return 0; } @@ -1117,7 +1121,8 @@ int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie) return r; } -int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& bl) +int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, + uint64_t timeout_ms) { bufferlist inbl, outbl; @@ -1139,6 +1144,8 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b client->register_watch_notify_callback(wc, &cookie); uint32_t prot_ver = 1; uint32_t timeout = notify_timeout; + if (timeout_ms) + timeout = timeout_ms / 1000; ::encode(prot_ver, inbl); ::encode(timeout, inbl); ::encode(bl, inbl); @@ -1146,7 +1153,7 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b // Construct RADOS op ::ObjectOperation rd; prepare_assert_ops(&rd); - rd.notify(cookie, ver, inbl); + rd.notify(cookie, 0, inbl); // Issue RADOS op C_SaferCond onack; diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 0e4a1f044a99d..5b41ddbe6b0d5 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -197,10 +197,12 @@ struct librados::IoCtxImpl { bufferlist *pbl); void set_sync_op_version(version_t ver); - int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); + int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx, + librados::WatchCtx2 *ctx2); int unwatch(const object_t& oid, uint64_t cookie); - int notify(const object_t& oid, uint64_t ver, bufferlist& bl); - int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie); + int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms); + int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie, + bufferlist& bl); int set_alloc_hint(const object_t& oid, uint64_t expected_object_size, @@ -226,8 +228,9 @@ struct WatchNotifyInfo : public RefCountedWaitObject { uint64_t linger_id; // we use this to unlinger when we are done uint64_t cookie; // callback cookie - // watcher + // watcher. only one of these will be defined. librados::WatchCtx *watch_ctx; + librados::WatchCtx2 *watch_ctx2; // notify that we initiated Mutex *notify_lock; @@ -242,6 +245,7 @@ struct WatchNotifyInfo : public RefCountedWaitObject { linger_id(0), cookie(0), watch_ctx(NULL), + watch_ctx2(NULL), notify_lock(NULL), notify_cond(NULL), notify_done(NULL), diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 85a93466e4ec4..2e1cf7e3dc90b 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -720,13 +720,18 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m) wc->get(); // trigger the callback + assert(!!wc->watch_ctx ^ !!wc->watch_ctx2); // only one is defined lock.Unlock(); - wc->watch_ctx->notify(m->opcode, m->ver, m->bl); + if (wc->watch_ctx) { + wc->watch_ctx->notify(m->opcode, m->ver, m->bl); + // send ACK back to the OSD + bufferlist empty; + wc->io_ctx_impl->notify_ack(wc->oid, m->notify_id, m->cookie, empty); + } else if (wc->watch_ctx2) { + wc->watch_ctx2->handle_notify(m->notify_id, m->cookie, m->bl); + // user needs to explicitly ack (and may have already!) + } lock.Lock(); - - // send ACK back to the OSD - wc->io_ctx_impl->_notify_ack(wc->oid, m->notify_id, m->cookie); - ldout(cct,10) << __func__ << " notify done" << dendl; wc->put(); } diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 573fae7ee7c24..6a20db5ec00ae 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -480,6 +480,11 @@ librados::WatchCtx:: { } +librados::WatchCtx2:: +~WatchCtx2() +{ +} + struct librados::ObjListCtx { bool new_request; @@ -1658,7 +1663,14 @@ int librados::IoCtx::watch(const string& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx) { object_t obj(oid); - return io_ctx_impl->watch(obj, ver, cookie, ctx); + return io_ctx_impl->watch(obj, cookie, ctx, NULL); +} + +int librados::IoCtx::watch2(const string& oid, uint64_t *cookie, + librados::WatchCtx2 *ctx2) +{ + object_t obj(oid); + return io_ctx_impl->watch(obj, cookie, NULL, ctx2); } int librados::IoCtx::unwatch(const string& oid, uint64_t handle) @@ -1671,7 +1683,20 @@ int librados::IoCtx::unwatch(const string& oid, uint64_t handle) int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->notify(obj, ver, bl); + return io_ctx_impl->notify(obj, bl, 0); +} + +int librados::IoCtx::notify2(const string& oid, bufferlist& bl, + uint64_t timeout_ms) +{ + object_t obj(oid); + return io_ctx_impl->notify(obj, bl, timeout_ms); +} + +void librados::IoCtx::notify_ack(const std::string& o, + uint64_t notify_id, uint64_t handle) +{ + io_ctx_impl->notify_ack(o, notify_id, handle); } int librados::IoCtx::list_watchers(const std::string& oid, @@ -3729,20 +3754,45 @@ struct C_WatchCB : public librados::WatchCtx { } }; -int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, - rados_watchcb_t watchcb, void *arg) +extern "C" int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, + uint64_t *handle, + rados_watchcb_t watchcb, void *arg) { tracepoint(librados, rados_watch_enter, io, o, ver, watchcb, arg); uint64_t *cookie = handle; librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); C_WatchCB *wc = new C_WatchCB(watchcb, arg); - int retval = ctx->watch(oid, ver, cookie, wc); + int retval = ctx->watch(oid, cookie, wc, NULL); tracepoint(librados, rados_watch_exit, retval, *handle); return retval; } -int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) +struct C_WatchCB2 : public librados::WatchCtx2 { + rados_watchcb2_t wcb; + void *arg; + C_WatchCB2(rados_watchcb2_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {} + void handle_notify(uint64_t notify_id, + uint64_t cookie, + bufferlist& bl) { + wcb(arg, notify_id, cookie, bl.c_str(), bl.length()); + } +}; + +extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle, + rados_watchcb2_t watchcb, void *arg) +{ + tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg); + uint64_t *cookie = handle; + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + C_WatchCB2 *wc = new C_WatchCB2(watchcb, arg); + int ret = ctx->watch(oid, cookie, NULL, wc); + tracepoint(librados, rados_watch_exit, ret, *handle); + return ret; +} + +extern "C" int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) { tracepoint(librados, rados_unwatch_enter, io, o, handle); uint64_t cookie = handle; @@ -3753,7 +3803,8 @@ int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) return retval; } -int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len) +extern "C" int rados_notify(rados_ioctx_t io, const char *o, + uint64_t ver, const char *buf, int buf_len) { tracepoint(librados, rados_notify_enter, io, o, ver, buf, buf_len); librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; @@ -3764,11 +3815,48 @@ int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, memcpy(p.c_str(), buf, buf_len); bl.push_back(p); } - int retval = ctx->notify(oid, ver, bl); + int retval = ctx->notify(oid, bl, 0); tracepoint(librados, rados_notify_exit, retval); return retval; } +extern "C" int rados_notify2(rados_ioctx_t io, const char *o, + const char *buf, int buf_len, + uint64_t timeout_ms) +{ + tracepoint(librados, rados_notify2_enter, io, o, buf, buf_len, timeout_ms); + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + if (buf) { + bufferptr p = buffer::create(buf_len); + memcpy(p.c_str(), buf, buf_len); + bl.push_back(p); + } + int ret = ctx->notify(oid, bl, timeout_ms); + tracepoint(librados, rados_notify2_exit, ret); + return ret; +} + +extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o, + uint64_t notify_id, uint64_t handle, + const char *buf, int buf_len) +{ + tracepoint(librados, rados_notify_ack_enter, io, o, notify_id, handle, buf, buf_len); + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + if (buf) { + bufferptr p = buffer::create(buf_len); + memcpy(p.c_str(), buf, buf_len); + bl.push_back(p); + } + ctx->notify_ack(oid, notify_id, handle, bl); + int retval = 0; + tracepoint(librados, rados_notify_ack_exit, retval); + return retval; +} + extern "C" int rados_set_alloc_hint(rados_ioctx_t io, const char *o, uint64_t expected_object_size, uint64_t expected_write_size) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index c436bf23686d8..347f7f9d8b550 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -862,11 +862,13 @@ struct ObjectOperation { add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl); } - void notify_ack(uint64_t notify_id, uint64_t ver, uint64_t cookie) { + void notify_ack(uint64_t notify_id, uint64_t cookie, + bufferlist& reply_bl) { bufferlist bl; ::encode(notify_id, bl); ::encode(cookie, bl); - add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl); + ::encode(reply_bl, bl); + add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, 0, 0, bl); } void list_watchers(list *out, diff --git a/src/tracing/librados.tp b/src/tracing/librados.tp index 263a0d6aaa48d..339c2cedebb13 100644 --- a/src/tracing/librados.tp +++ b/src/tracing/librados.tp @@ -2190,6 +2190,32 @@ TRACEPOINT_EVENT(librados, rados_watch_exit, ) ) +TRACEPOINT_EVENT(librados, rados_watch2_enter, + TP_ARGS( + rados_ioctx_t, ioctx, + const char*, oid, + uint64_t*, phandle, + rados_watchcb2_t, callback, + void*, arg), + TP_FIELDS( + ctf_integer_hex(rados_ioctx_t, ioctx, ioctx) + ctf_string(oid, oid) + ctf_integer_hex(uint64_t, phandle, phandle) + ctf_integer_hex(rados_watchcb2_t, callback, callback) + ctf_integer_hex(void*, arg, arg) + ) +) + +TRACEPOINT_EVENT(librados, rados_watch2_exit, + TP_ARGS( + int, retval, + uint64_t, handle), + TP_FIELDS( + ctf_integer(int, retval, retval) + ctf_integer(uint64_t, handle, handle) + ) +) + TRACEPOINT_EVENT(librados, rados_unwatch_enter, TP_ARGS( rados_ioctx_t, ioctx, @@ -2233,6 +2259,54 @@ TRACEPOINT_EVENT(librados, rados_notify_exit, ) ) +TRACEPOINT_EVENT(librados, rados_notify2_enter, + TP_ARGS( + rados_ioctx_t, ioctx, + const char*, oid, + const char*, buf, + int, buf_len, + uint64_t, timeout_ms), + TP_FIELDS( + ctf_integer_hex(rados_ioctx_t, ioctx, ioctx) + ctf_string(oid, oid) + ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len) + ctf_integer(uint64_t, timeout_ms, timeout_ms) + ) +) + +TRACEPOINT_EVENT(librados, rados_notify2_exit, + TP_ARGS( + int, retval), + TP_FIELDS( + ctf_integer(int, retval, retval) + ) +) + +TRACEPOINT_EVENT(librados, rados_notify_ack_enter, + TP_ARGS( + rados_ioctx_t, ioctx, + const char*, oid, + uint64_t, notify_id, + uint64_t, handle, + const char*, buf, + int, buf_len), + TP_FIELDS( + ctf_integer_hex(rados_ioctx_t, ioctx, ioctx) + ctf_string(oid, oid) + ctf_integer(uint64_t, notify_id, notify_id) + ctf_integer(uint64_t, handle, handle) + ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len) + ) +) + +TRACEPOINT_EVENT(librados, rados_notify_ack_exit, + TP_ARGS( + int, retval), + TP_FIELDS( + ctf_integer(int, retval, retval) + ) +) + TRACEPOINT_EVENT(librados, rados_set_alloc_hint_enter, TP_ARGS( rados_ioctx_t, ioctx, -- 2.39.5