* This blocks until all watchers of the object have received and
* reacted to the notify, or a timeout is reached.
*
+ * The reply buffer is optional. If specified, the client will get
+ * back an encoded buffer that includes the ids of the clients that
+ * acknowledged the notify as well as their notify ack payloads (if
+ * any). Clients that timed out are not included. Even clients that
+ * do not include a notify ack payload are included in the list but
+ * have a 0-length payload associated with them. The format:
+ *
+ * le32 num_acks
+ * {
+ * le64 gid global id for the client (for client.1234 that's 1234)
+ * le32 buflen length of reply message buffer
+ * u8 * buflen payload
+ * } * num_acks
+ *
+ * Note that this buffer must be released with rados_buffer_free()
+ * when the user is done with it.
+ *
* @param io the pool the object is in
* @param o the name of the object
* @param buf data to send to watchers
* @param buf_len length of buf in bytes
* @param timeout_ms notify timeout (in ms)
+ * @param reply_buffer pointer to reply buffer pointer (free with rados_buffer_free)
+ * @param reply_buffer_len pointer to size of reply buffer
* @returns 0 on success, negative error code on failure
*/
int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len,
- uint64_t timeout_ms);
+ uint64_t timeout_ms,
+ char **reply_buffer, size_t *reply_buffer_len);
/**
* Acknolwedge receipt of a notify
int notify(const std::string& o, uint64_t ver, bufferlist& bl);
int notify2(const std::string& o, ///< object
bufferlist& bl, ///< optional broadcast payload
- uint64_t timeout_ms); ///< timeout (in ms)
+ uint64_t timeout_ms, ///< timeout (in ms)
+ bufferlist *pbl); ///< reply buffer
int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
int list_snaps(const std::string& o, snap_set_t *out_snaps);
void set_notify_timeout(uint32_t timeout);
}
int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
- uint64_t timeout_ms)
+ uint64_t timeout_ms,
+ bufferlist *preply_bl,
+ char **preply_buf, size_t *preply_buf_len)
{
bufferlist inbl, outbl;
wc->notify_lock = &mylock_all;
wc->notify_cond = &cond_all;
wc->notify_rval = &r_notify;
+ wc->notify_reply_bl = preply_bl;
+ wc->notify_reply_buf = preply_buf;
+ wc->notify_reply_buf_len = preply_buf_len;
lock->Lock();
int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2);
int unwatch(const object_t& oid, uint64_t cookie);
- int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms);
+ int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms,
+ bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len);
int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie,
bufferlist& bl);
Mutex *notify_lock;
Cond *notify_cond;
bool *notify_done;
+ bufferlist *notify_reply_bl;
+ char **notify_reply_buf;
+ size_t *notify_reply_buf_len;
int *notify_rval;
WatchNotifyInfo(IoCtxImpl *io_ctx_impl_,
notify_lock(NULL),
notify_cond(NULL),
notify_done(NULL),
+ notify_reply_bl(NULL),
+ notify_reply_buf(NULL),
+ notify_reply_buf_len(NULL),
notify_rval(NULL) {
io_ctx_impl->get();
}
wc->notify_lock->Lock();
*wc->notify_done = true;
*wc->notify_rval = m->return_code;
+ if (wc->notify_reply_bl) {
+ wc->notify_reply_bl->claim(m->get_data());
+ }
+ if (wc->notify_reply_buf) {
+ *wc->notify_reply_buf = (char*)malloc(m->get_data().length());
+ memcpy(*wc->notify_reply_buf, m->get_data().c_str(),
+ m->get_data().length());
+ }
+ if (wc->notify_reply_buf_len) {
+ *wc->notify_reply_buf_len = m->get_data().length();
+ }
wc->notify_cond->Signal();
wc->notify_lock->Unlock();
} else {
int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->notify(obj, bl, 0);
+ return io_ctx_impl->notify(obj, bl, 0, NULL, NULL, NULL);
}
int librados::IoCtx::notify2(const string& oid, bufferlist& bl,
- uint64_t timeout_ms)
+ uint64_t timeout_ms, bufferlist *preplybl)
{
object_t obj(oid);
- return io_ctx_impl->notify(obj, bl, timeout_ms);
+ return io_ctx_impl->notify(obj, bl, timeout_ms, preplybl, NULL, NULL);
}
void librados::IoCtx::notify_ack(const std::string& o,
- uint64_t notify_id, uint64_t handle)
+ uint64_t notify_id, uint64_t handle,
+ bufferlist& bl)
{
- io_ctx_impl->notify_ack(o, notify_id, handle);
+ io_ctx_impl->notify_ack(o, notify_id, handle, bl);
}
int librados::IoCtx::list_watchers(const std::string& oid,
memcpy(p.c_str(), buf, buf_len);
bl.push_back(p);
}
- int retval = ctx->notify(oid, bl, 0);
+ int retval = ctx->notify(oid, bl, 0, NULL, NULL, NULL);
tracepoint(librados, rados_notify_exit, retval);
return retval;
}
extern "C" int rados_notify2(rados_ioctx_t io, const char *o,
const char *buf, int buf_len,
- uint64_t timeout_ms)
+ uint64_t timeout_ms,
+ char **reply_buffer,
+ size_t *reply_buffer_len)
{
tracepoint(librados, rados_notify2_enter, io, o, buf, buf_len, timeout_ms);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
memcpy(p.c_str(), buf, buf_len);
bl.push_back(p);
}
- int ret = ctx->notify(oid, bl, timeout_ms);
+ int ret = ctx->notify(oid, bl, timeout_ms, NULL, reply_buffer, reply_buffer_len);
tracepoint(librados, rados_notify2_exit, ret);
return ret;
}