From: Haomai Wang Date: Mon, 15 Feb 2016 09:16:59 +0000 (+0800) Subject: librados: add async watch api X-Git-Tag: v10.1.0~321^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=87e715c5db9c14b4b93682dc908464870e898aa6;p=ceph.git librados: add async watch api Signed-off-by: Haomai Wang --- diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index e9b5c91c0f50..17e19c9d9568 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -2085,6 +2085,34 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki rados_watcherrcb_t watcherrcb, void *arg); +/** + * Asynchronous register an interest in an object + * + * A watch operation registers the client as being interested in + * notifications on an object. OSDs keep track of watches on + * persistent storage, so they are preserved across cluster changes by + * the normal recovery process. If the client loses its connection to + * the primary OSD for a watched object, the watch will be removed + * after 30 seconds. Watches are automatically reestablished when a new + * connection is made, or a placement group switches OSDs. + * + * @note BUG: watch timeout should be configurable + * + * @param io the pool the object is in + * @param o the object to watch + * @param completion what to do when operation has been attempted + * @param handle where to store the internal id assigned to this watch + * @param watchcb what to do when a notify is received on this object + * @param watcherrcb what to do when the watch session encounters an error + * @param arg opaque value to pass to the callback + * @returns 0 on success, negative error code on failure + */ +CEPH_RADOS_API int rados_aio_watch(rados_ioctx_t io, const char *o, + rados_completion_t completion, uint64_t *handle, + rados_watchcb2_t watchcb, + rados_watcherrcb_t watcherrcb, + void *arg); + /** * Check on the status of a watch * diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 1287c77de9c4..6d42e2a5c684 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -989,6 +989,8 @@ namespace librados // watch/notify int watch2(const std::string& o, uint64_t *handle, librados::WatchCtx2 *ctx); + int aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle, + librados::WatchCtx2 *ctx); int unwatch2(uint64_t handle); /** * Send a notify event ot watchers diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index b70c00ac6e4f..51c4478ab817 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -89,19 +89,21 @@ struct C_aio_linger_cancel : public Context { } }; -struct C_aio_notify_Complete : public Context { +struct C_aio_linger_Complete : public Context { AioCompletionImpl *c; Objecter::LingerOp *linger_op; + bool cancel; - C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op) - : c(_c), linger_op(_linger_op) + C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel) + : c(_c), linger_op(_linger_op), cancel(_cancel) { c->get(); } virtual void finish(int r) { - c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter, - linger_op)); + if (cancel || r < 0) + c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter, + linger_op)); c->lock.Lock(); c->rval = r; @@ -1258,6 +1260,33 @@ int librados::IoCtxImpl::watch(const object_t& oid, return r; } +int librados::IoCtxImpl::aio_watch(const object_t& oid, + AioCompletionImpl *c, + uint64_t *handle, + librados::WatchCtx *ctx, + librados::WatchCtx2 *ctx2) +{ + Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); + c->io = this; + Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false); + + ::ObjectOperation wr; + version_t objver; + + *handle = linger_op->get_cookie(); + linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2); + + prepare_assert_ops(&wr); + wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH); + *handle = 0; + bufferlist bl; + objecter->linger_watch(linger_op, wr, + snapc, ceph::real_clock::now(), bl, + oncomplete, &objver); + + return 0; +} + int librados::IoCtxImpl::notify_ack( const object_t& oid, @@ -1358,7 +1387,7 @@ int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c, c->io = this; - Context *oncomplete = new C_aio_notify_Complete(c, linger_op); + Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true); C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete, objecter, linger_op, preply_bl, preply_buf, diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index ff3c235f1486..4e530b3f9c69 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -209,6 +209,8 @@ struct librados::IoCtxImpl { void set_sync_op_version(version_t ver); int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2); + int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie, + librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2); int watch_check(uint64_t cookie); int unwatch(uint64_t cookie); int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms, diff --git a/src/librados/librados.cc b/src/librados/librados.cc index e66d17b638ac..1d12f407a518 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -1799,6 +1799,15 @@ int librados::IoCtx::watch2(const string& oid, uint64_t *cookie, return io_ctx_impl->watch(obj, cookie, NULL, ctx2); } +int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c, + uint64_t *cookie, + librados::WatchCtx2 *ctx2) +{ + object_t obj(oid); + return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2); +} + + int librados::IoCtx::unwatch(const string& oid, uint64_t handle) { return io_ctx_impl->unwatch(handle);