From: Sage Weil Date: Thu, 21 Aug 2014 21:52:34 +0000 (-0700) Subject: librados: update notify2 API to accept reply payloads X-Git-Tag: v0.91~153 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7589bfcc9071062cc28337263252962efbc59bb0;p=ceph.git librados: update notify2 API to accept reply payloads Allow the notify2 callers to provide bufferlists or char ** pointers so that we can pass the reply buffer back to them. Signed-off-by: Sage Weil --- diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index 703cc728b2f..3e4a2673e45 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -1980,15 +1980,35 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, * This blocks until all watchers of the object have received and * reacted to the notify, or a timeout is reached. * + * The reply buffer is optional. If specified, the client will get + * back an encoded buffer that includes the ids of the clients that + * acknowledged the notify as well as their notify ack payloads (if + * any). Clients that timed out are not included. Even clients that + * do not include a notify ack payload are included in the list but + * have a 0-length payload associated with them. The format: + * + * le32 num_acks + * { + * le64 gid global id for the client (for client.1234 that's 1234) + * le32 buflen length of reply message buffer + * u8 * buflen payload + * } * num_acks + * + * Note that this buffer must be released with rados_buffer_free() + * when the user is done with it. + * * @param io the pool the object is in * @param o the name of the object * @param buf data to send to watchers * @param buf_len length of buf in bytes * @param timeout_ms notify timeout (in ms) + * @param reply_buffer pointer to reply buffer pointer (free with rados_buffer_free) + * @param reply_buffer_len pointer to size of reply buffer * @returns 0 on success, negative error code on failure */ int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len, - uint64_t timeout_ms); + uint64_t timeout_ms, + char **reply_buffer, size_t *reply_buffer_len); /** * Acknolwedge receipt of a notify diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 3ae0e0d0005..20eec5bc5e8 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -913,7 +913,8 @@ namespace librados int notify(const std::string& o, uint64_t ver, bufferlist& bl); int notify2(const std::string& o, ///< object bufferlist& bl, ///< optional broadcast payload - uint64_t timeout_ms); ///< timeout (in ms) + uint64_t timeout_ms, ///< timeout (in ms) + bufferlist *pbl); ///< reply buffer int list_watchers(const std::string& o, std::list *out_watchers); int list_snaps(const std::string& o, snap_set_t *out_snaps); void set_notify_timeout(uint32_t timeout); diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index 570b1a68c86..9cfddd5bf30 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1122,7 +1122,9 @@ int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie) } int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, - uint64_t timeout_ms) + uint64_t timeout_ms, + bufferlist *preply_bl, + char **preply_buf, size_t *preply_buf_len) { bufferlist inbl, outbl; @@ -1136,6 +1138,9 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, wc->notify_lock = &mylock_all; wc->notify_cond = &cond_all; wc->notify_rval = &r_notify; + wc->notify_reply_bl = preply_bl; + wc->notify_reply_buf = preply_buf; + wc->notify_reply_buf_len = preply_buf_len; lock->Lock(); diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 5b41ddbe6b0..55d4a4944b3 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -200,7 +200,8 @@ struct librados::IoCtxImpl { int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2); int unwatch(const object_t& oid, uint64_t cookie); - int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms); + int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms, + bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len); int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie, bufferlist& bl); @@ -236,6 +237,9 @@ struct WatchNotifyInfo : public RefCountedWaitObject { Mutex *notify_lock; Cond *notify_cond; bool *notify_done; + bufferlist *notify_reply_bl; + char **notify_reply_buf; + size_t *notify_reply_buf_len; int *notify_rval; WatchNotifyInfo(IoCtxImpl *io_ctx_impl_, @@ -249,6 +253,9 @@ struct WatchNotifyInfo : public RefCountedWaitObject { notify_lock(NULL), notify_cond(NULL), notify_done(NULL), + notify_reply_bl(NULL), + notify_reply_buf(NULL), + notify_reply_buf_len(NULL), notify_rval(NULL) { io_ctx_impl->get(); } diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 2e1cf7e3dc9..b19c1e5b020 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -712,6 +712,17 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m) wc->notify_lock->Lock(); *wc->notify_done = true; *wc->notify_rval = m->return_code; + if (wc->notify_reply_bl) { + wc->notify_reply_bl->claim(m->get_data()); + } + if (wc->notify_reply_buf) { + *wc->notify_reply_buf = (char*)malloc(m->get_data().length()); + memcpy(*wc->notify_reply_buf, m->get_data().c_str(), + m->get_data().length()); + } + if (wc->notify_reply_buf_len) { + *wc->notify_reply_buf_len = m->get_data().length(); + } wc->notify_cond->Signal(); wc->notify_lock->Unlock(); } else { diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 6a20db5ec00..aad5eaac781 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -1683,20 +1683,21 @@ int librados::IoCtx::unwatch(const string& oid, uint64_t handle) int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->notify(obj, bl, 0); + return io_ctx_impl->notify(obj, bl, 0, NULL, NULL, NULL); } int librados::IoCtx::notify2(const string& oid, bufferlist& bl, - uint64_t timeout_ms) + uint64_t timeout_ms, bufferlist *preplybl) { object_t obj(oid); - return io_ctx_impl->notify(obj, bl, timeout_ms); + return io_ctx_impl->notify(obj, bl, timeout_ms, preplybl, NULL, NULL); } void librados::IoCtx::notify_ack(const std::string& o, - uint64_t notify_id, uint64_t handle) + uint64_t notify_id, uint64_t handle, + bufferlist& bl) { - io_ctx_impl->notify_ack(o, notify_id, handle); + io_ctx_impl->notify_ack(o, notify_id, handle, bl); } int librados::IoCtx::list_watchers(const std::string& oid, @@ -3815,14 +3816,16 @@ extern "C" int rados_notify(rados_ioctx_t io, const char *o, memcpy(p.c_str(), buf, buf_len); bl.push_back(p); } - int retval = ctx->notify(oid, bl, 0); + int retval = ctx->notify(oid, bl, 0, NULL, NULL, NULL); tracepoint(librados, rados_notify_exit, retval); return retval; } extern "C" int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len, - uint64_t timeout_ms) + uint64_t timeout_ms, + char **reply_buffer, + size_t *reply_buffer_len) { tracepoint(librados, rados_notify2_enter, io, o, buf, buf_len, timeout_ms); librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; @@ -3833,7 +3836,7 @@ extern "C" int rados_notify2(rados_ioctx_t io, const char *o, memcpy(p.c_str(), buf, buf_len); bl.push_back(p); } - int ret = ctx->notify(oid, bl, timeout_ms); + int ret = ctx->notify(oid, bl, timeout_ms, NULL, reply_buffer, reply_buffer_len); tracepoint(librados, rados_notify2_exit, ret); return ret; }