rados_watcherrcb_t watcherrcb,
void *arg);
+/**
+ * Asynchronous register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. If the client loses its connection to
+ * the primary OSD for a watched object, the watch will be removed
+ * after 30 seconds. Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @note BUG: watch timeout should be configurable
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param completion what to do when operation has been attempted
+ * @param handle where to store the internal id assigned to this watch
+ * @param watchcb what to do when a notify is received on this object
+ * @param watcherrcb what to do when the watch session encounters an error
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_aio_watch(rados_ioctx_t io, const char *o,
+ rados_completion_t completion, uint64_t *handle,
+ rados_watchcb2_t watchcb,
+ rados_watcherrcb_t watcherrcb,
+ void *arg);
+
/**
* Check on the status of a watch
*
// watch/notify
int watch2(const std::string& o, uint64_t *handle,
librados::WatchCtx2 *ctx);
+ int aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle,
+ librados::WatchCtx2 *ctx);
int unwatch2(uint64_t handle);
/**
* Send a notify event ot watchers
}
};
-struct C_aio_notify_Complete : public Context {
+struct C_aio_linger_Complete : public Context {
AioCompletionImpl *c;
Objecter::LingerOp *linger_op;
+ bool cancel;
- C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
- : c(_c), linger_op(_linger_op)
+ C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
+ : c(_c), linger_op(_linger_op), cancel(_cancel)
{
c->get();
}
virtual void finish(int r) {
- c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
- linger_op));
+ if (cancel || r < 0)
+ c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
+ linger_op));
c->lock.Lock();
c->rval = r;
return r;
}
+int librados::IoCtxImpl::aio_watch(const object_t& oid,
+ AioCompletionImpl *c,
+ uint64_t *handle,
+ librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2)
+{
+ Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+ c->io = this;
+ Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
+
+ ::ObjectOperation wr;
+ version_t objver;
+
+ *handle = linger_op->get_cookie();
+ linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2);
+
+ prepare_assert_ops(&wr);
+ wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
+ *handle = 0;
+ bufferlist bl;
+ objecter->linger_watch(linger_op, wr,
+ snapc, ceph::real_clock::now(), bl,
+ oncomplete, &objver);
+
+ return 0;
+}
+
int librados::IoCtxImpl::notify_ack(
const object_t& oid,
c->io = this;
- Context *oncomplete = new C_aio_notify_Complete(c, linger_op);
+ Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
objecter, linger_op,
preply_bl, preply_buf,
void set_sync_op_version(version_t ver);
int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2);
+ int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
+ librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2);
int watch_check(uint64_t cookie);
int unwatch(uint64_t cookie);
int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms,
return io_ctx_impl->watch(obj, cookie, NULL, ctx2);
}
+int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c,
+ uint64_t *cookie,
+ librados::WatchCtx2 *ctx2)
+{
+ object_t obj(oid);
+ return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2);
+}
+
+
int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
{
return io_ctx_impl->unwatch(handle);