*/
typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg);
+/**
+ * @typedef rados_watchcb2_t
+ *
+ * Callback activated when a notify is received on a watched
+ * object. Parameters are:
+ * - arg opaque user-defined value provided to rados_watch2()
+ * - notify_id an id for this notify event
+ * - handle the watcher handle we are notifying
+ * - data payload from the notifier
+ * - datalen length of payload buffer
+ */
+typedef void (*rados_watchcb2_t)(void *arg,
+ uint64_t notify_id,
+ uint64_t handle,
+ void *data,
+ size_t data_len);
+
/**
* Register an interest in an object
*
uint64_t *handle, rados_watchcb_t watchcb,
void *arg);
+
+/**
+ * Register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. If the client loses its connection to
+ * the primary OSD for a watched object, the watch will be removed
+ * after 30 seconds. Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @note BUG: watch timeout should be configurable
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param handle where to store the internal id assigned to this watch
+ * @param watchcb2 what to do when a notify is received on this object
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
+ rados_watchcb2_t watchcb, void *arg);
+
/**
* Unregister an interest in an object
*
CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
const char *buf, int buf_len);
+/**
+ * Sychronously notify watchers of an object
+ *
+ * This blocks until all watchers of the object have received and
+ * reacted to the notify, or a timeout is reached.
+ *
+ * @param io the pool the object is in
+ * @param o the name of the object
+ * @param buf data to send to watchers
+ * @param buf_len length of buf in bytes
+ * @param timeout_ms notify timeout (in ms)
+ * @returns 0 on success, negative error code on failure
+ */
+int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len,
+ uint64_t timeout_ms);
+
+/**
+ * Acknolwedge receipt of a notify
+ *
+ * @param io the pool the object is in
+ * @param o the name of the object
+ * @param notify_id the notify_id we got on the watchcb2_t callback
+ * @param handle the watcher handle
+ * @param buf payload to return to notifier (optional)
+ * @param buf_len payload length
+ * @returns 0 on success
+ */
+int rados_notify_ack(rados_ioctx_t io, const char *o,
+ uint64_t notify_id, uint64_t handle,
+ const char *buf, int buf_len);
+
+
/** @} Watch/Notify */
/**
std::pair<std::string, std::string> cur_obj;
};
+ /// DEPRECATED; do not use
class CEPH_RADOS_API WatchCtx {
public:
virtual ~WatchCtx();
virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) = 0;
};
+ class CEPH_RADOS_API WatchCtx2 {
+ public:
+ virtual ~WatchCtx2();
+ /**
+ * @param notify_id unique id for this notify event
+ * @param cookie the watcher we are notifying
+ * @param bl opaque notify payload (from the notifier)
+ */
+ virtual void handle_notify(uint64_t notify_id,
+ uint64_t cookie,
+ bufferlist& bl) = 0;
+ };
+
struct CEPH_RADOS_API AioCompletion {
AioCompletion(AioCompletionImpl *pc_) : pc(pc_) {}
int set_complete_callback(void *cb_arg, callback_t cb);
// watch/notify
int watch(const std::string& o, uint64_t ver, uint64_t *handle,
librados::WatchCtx *ctx);
+ int watch2(const std::string& o, uint64_t *handle,
+ librados::WatchCtx2 *ctx);
int unwatch(const std::string& o, uint64_t handle);
int notify(const std::string& o, uint64_t ver, bufferlist& bl);
+ int notify2(const std::string& o, ///< object
+ bufferlist& bl, ///< optional broadcast payload
+ uint64_t timeout_ms); ///< timeout (in ms)
int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
int list_snaps(const std::string& o, snap_set_t *out_snaps);
void set_notify_timeout(uint32_t timeout);
+ /// acknowledge a notify we received.
+ void notify_ack(const std::string& o, ///< watched object
+ uint64_t notify_id, ///< notify id
+ uint64_t handle, ///< our watch handle
+ bufferlist& bl); ///< optional reply payload
+
/**
* Set allocation hint for an object
*
last_objver = ver;
}
-int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
- uint64_t *cookie, librados::WatchCtx *ctx)
+int librados::IoCtxImpl::watch(const object_t& oid,
+ uint64_t *cookie,
+ librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2)
{
::ObjectOperation wr;
Mutex mylock("IoCtxImpl::watch::mylock");
WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
wc->watch_ctx = ctx;
+ wc->watch_ctx2 = ctx2;
client->register_watch_notify_callback(wc, cookie);
prepare_assert_ops(&wr);
- wr.watch(*cookie, ver, 1);
+ wr.watch(*cookie, 0, 1);
bufferlist bl;
wc->linger_id = objecter->linger_mutate(oid, oloc, wr,
snapc, ceph_clock_now(NULL), bl,
}
-/* this is called with IoCtxImpl::lock held */
-int librados::IoCtxImpl::_notify_ack(
+int librados::IoCtxImpl::notify_ack(
const object_t& oid,
uint64_t notify_id,
- uint64_t cookie)
+ uint64_t cookie,
+ bufferlist& bl)
{
+ Mutex::Locker l(*lock);
::ObjectOperation rd;
prepare_assert_ops(&rd);
- rd.notify_ack(notify_id, 0, cookie);
+ rd.notify_ack(notify_id, cookie, bl);
objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
return 0;
}
return r;
}
-int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& bl)
+int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
+ uint64_t timeout_ms)
{
bufferlist inbl, outbl;
client->register_watch_notify_callback(wc, &cookie);
uint32_t prot_ver = 1;
uint32_t timeout = notify_timeout;
+ if (timeout_ms)
+ timeout = timeout_ms / 1000;
::encode(prot_ver, inbl);
::encode(timeout, inbl);
::encode(bl, inbl);
// Construct RADOS op
::ObjectOperation rd;
prepare_assert_ops(&rd);
- rd.notify(cookie, ver, inbl);
+ rd.notify(cookie, 0, inbl);
// Issue RADOS op
C_SaferCond onack;
bufferlist *pbl);
void set_sync_op_version(version_t ver);
- int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
+ int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2);
int unwatch(const object_t& oid, uint64_t cookie);
- int notify(const object_t& oid, uint64_t ver, bufferlist& bl);
- int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie);
+ int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms);
+ int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie,
+ bufferlist& bl);
int set_alloc_hint(const object_t& oid,
uint64_t expected_object_size,
uint64_t linger_id; // we use this to unlinger when we are done
uint64_t cookie; // callback cookie
- // watcher
+ // watcher. only one of these will be defined.
librados::WatchCtx *watch_ctx;
+ librados::WatchCtx2 *watch_ctx2;
// notify that we initiated
Mutex *notify_lock;
linger_id(0),
cookie(0),
watch_ctx(NULL),
+ watch_ctx2(NULL),
notify_lock(NULL),
notify_cond(NULL),
notify_done(NULL),
wc->get();
// trigger the callback
+ assert(!!wc->watch_ctx ^ !!wc->watch_ctx2); // only one is defined
lock.Unlock();
- wc->watch_ctx->notify(m->opcode, m->ver, m->bl);
+ if (wc->watch_ctx) {
+ wc->watch_ctx->notify(m->opcode, m->ver, m->bl);
+ // send ACK back to the OSD
+ bufferlist empty;
+ wc->io_ctx_impl->notify_ack(wc->oid, m->notify_id, m->cookie, empty);
+ } else if (wc->watch_ctx2) {
+ wc->watch_ctx2->handle_notify(m->notify_id, m->cookie, m->bl);
+ // user needs to explicitly ack (and may have already!)
+ }
lock.Lock();
-
- // send ACK back to the OSD
- wc->io_ctx_impl->_notify_ack(wc->oid, m->notify_id, m->cookie);
-
ldout(cct,10) << __func__ << " notify done" << dendl;
wc->put();
}
{
}
+librados::WatchCtx2::
+~WatchCtx2()
+{
+}
+
struct librados::ObjListCtx {
bool new_request;
librados::WatchCtx *ctx)
{
object_t obj(oid);
- return io_ctx_impl->watch(obj, ver, cookie, ctx);
+ return io_ctx_impl->watch(obj, cookie, ctx, NULL);
+}
+
+int librados::IoCtx::watch2(const string& oid, uint64_t *cookie,
+ librados::WatchCtx2 *ctx2)
+{
+ object_t obj(oid);
+ return io_ctx_impl->watch(obj, cookie, NULL, ctx2);
}
int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->notify(obj, ver, bl);
+ return io_ctx_impl->notify(obj, bl, 0);
+}
+
+int librados::IoCtx::notify2(const string& oid, bufferlist& bl,
+ uint64_t timeout_ms)
+{
+ object_t obj(oid);
+ return io_ctx_impl->notify(obj, bl, timeout_ms);
+}
+
+void librados::IoCtx::notify_ack(const std::string& o,
+ uint64_t notify_id, uint64_t handle)
+{
+ io_ctx_impl->notify_ack(o, notify_id, handle);
}
int librados::IoCtx::list_watchers(const std::string& oid,
}
};
-int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle,
- rados_watchcb_t watchcb, void *arg)
+extern "C" int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver,
+ uint64_t *handle,
+ rados_watchcb_t watchcb, void *arg)
{
tracepoint(librados, rados_watch_enter, io, o, ver, watchcb, arg);
uint64_t *cookie = handle;
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
C_WatchCB *wc = new C_WatchCB(watchcb, arg);
- int retval = ctx->watch(oid, ver, cookie, wc);
+ int retval = ctx->watch(oid, cookie, wc, NULL);
tracepoint(librados, rados_watch_exit, retval, *handle);
return retval;
}
-int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
+struct C_WatchCB2 : public librados::WatchCtx2 {
+ rados_watchcb2_t wcb;
+ void *arg;
+ C_WatchCB2(rados_watchcb2_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {}
+ void handle_notify(uint64_t notify_id,
+ uint64_t cookie,
+ bufferlist& bl) {
+ wcb(arg, notify_id, cookie, bl.c_str(), bl.length());
+ }
+};
+
+extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
+ rados_watchcb2_t watchcb, void *arg)
+{
+ tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg);
+ uint64_t *cookie = handle;
+ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+ object_t oid(o);
+ C_WatchCB2 *wc = new C_WatchCB2(watchcb, arg);
+ int ret = ctx->watch(oid, cookie, NULL, wc);
+ tracepoint(librados, rados_watch_exit, ret, *handle);
+ return ret;
+}
+
+extern "C" int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
{
tracepoint(librados, rados_unwatch_enter, io, o, handle);
uint64_t cookie = handle;
return retval;
}
-int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len)
+extern "C" int rados_notify(rados_ioctx_t io, const char *o,
+ uint64_t ver, const char *buf, int buf_len)
{
tracepoint(librados, rados_notify_enter, io, o, ver, buf, buf_len);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
memcpy(p.c_str(), buf, buf_len);
bl.push_back(p);
}
- int retval = ctx->notify(oid, ver, bl);
+ int retval = ctx->notify(oid, bl, 0);
tracepoint(librados, rados_notify_exit, retval);
return retval;
}
+extern "C" int rados_notify2(rados_ioctx_t io, const char *o,
+ const char *buf, int buf_len,
+ uint64_t timeout_ms)
+{
+ tracepoint(librados, rados_notify2_enter, io, o, buf, buf_len, timeout_ms);
+ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+ object_t oid(o);
+ bufferlist bl;
+ if (buf) {
+ bufferptr p = buffer::create(buf_len);
+ memcpy(p.c_str(), buf, buf_len);
+ bl.push_back(p);
+ }
+ int ret = ctx->notify(oid, bl, timeout_ms);
+ tracepoint(librados, rados_notify2_exit, ret);
+ return ret;
+}
+
+extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o,
+ uint64_t notify_id, uint64_t handle,
+ const char *buf, int buf_len)
+{
+ tracepoint(librados, rados_notify_ack_enter, io, o, notify_id, handle, buf, buf_len);
+ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+ object_t oid(o);
+ bufferlist bl;
+ if (buf) {
+ bufferptr p = buffer::create(buf_len);
+ memcpy(p.c_str(), buf, buf_len);
+ bl.push_back(p);
+ }
+ ctx->notify_ack(oid, notify_id, handle, bl);
+ int retval = 0;
+ tracepoint(librados, rados_notify_ack_exit, retval);
+ return retval;
+}
+
extern "C" int rados_set_alloc_hint(rados_ioctx_t io, const char *o,
uint64_t expected_object_size,
uint64_t expected_write_size)
add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl);
}
- void notify_ack(uint64_t notify_id, uint64_t ver, uint64_t cookie) {
+ void notify_ack(uint64_t notify_id, uint64_t cookie,
+ bufferlist& reply_bl) {
bufferlist bl;
::encode(notify_id, bl);
::encode(cookie, bl);
- add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
+ ::encode(reply_bl, bl);
+ add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, 0, 0, bl);
}
void list_watchers(list<obj_watch_t> *out,
)
)
+TRACEPOINT_EVENT(librados, rados_watch2_enter,
+ TP_ARGS(
+ rados_ioctx_t, ioctx,
+ const char*, oid,
+ uint64_t*, phandle,
+ rados_watchcb2_t, callback,
+ void*, arg),
+ TP_FIELDS(
+ ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+ ctf_string(oid, oid)
+ ctf_integer_hex(uint64_t, phandle, phandle)
+ ctf_integer_hex(rados_watchcb2_t, callback, callback)
+ ctf_integer_hex(void*, arg, arg)
+ )
+)
+
+TRACEPOINT_EVENT(librados, rados_watch2_exit,
+ TP_ARGS(
+ int, retval,
+ uint64_t, handle),
+ TP_FIELDS(
+ ctf_integer(int, retval, retval)
+ ctf_integer(uint64_t, handle, handle)
+ )
+)
+
TRACEPOINT_EVENT(librados, rados_unwatch_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
)
)
+TRACEPOINT_EVENT(librados, rados_notify2_enter,
+ TP_ARGS(
+ rados_ioctx_t, ioctx,
+ const char*, oid,
+ const char*, buf,
+ int, buf_len,
+ uint64_t, timeout_ms),
+ TP_FIELDS(
+ ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+ ctf_string(oid, oid)
+ ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len)
+ ctf_integer(uint64_t, timeout_ms, timeout_ms)
+ )
+)
+
+TRACEPOINT_EVENT(librados, rados_notify2_exit,
+ TP_ARGS(
+ int, retval),
+ TP_FIELDS(
+ ctf_integer(int, retval, retval)
+ )
+)
+
+TRACEPOINT_EVENT(librados, rados_notify_ack_enter,
+ TP_ARGS(
+ rados_ioctx_t, ioctx,
+ const char*, oid,
+ uint64_t, notify_id,
+ uint64_t, handle,
+ const char*, buf,
+ int, buf_len),
+ TP_FIELDS(
+ ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+ ctf_string(oid, oid)
+ ctf_integer(uint64_t, notify_id, notify_id)
+ ctf_integer(uint64_t, handle, handle)
+ ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len)
+ )
+)
+
+TRACEPOINT_EVENT(librados, rados_notify_ack_exit,
+ TP_ARGS(
+ int, retval),
+ TP_FIELDS(
+ ctf_integer(int, retval, retval)
+ )
+)
+
TRACEPOINT_EVENT(librados, rados_set_alloc_hint_enter,
TP_ARGS(
rados_ioctx_t, ioctx,