]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: add async watch api
authorHaomai Wang <haomai@xsky.com>
Mon, 15 Feb 2016 09:16:59 +0000 (17:16 +0800)
committerHaomai Wang <haomai@xsky.com>
Sat, 20 Feb 2016 06:22:03 +0000 (14:22 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/librados.cc

index e9b5c91c0f502d9f76a926047e56cac34c9a25fe..17e19c9d9568538ac7a891b07b0b185ceaf86fc7 100644 (file)
@@ -2085,6 +2085,34 @@ CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cooki
                                rados_watcherrcb_t watcherrcb,
                                void *arg);
 
+/**
+ * Asynchronous register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. If the client loses its connection to
+ * the primary OSD for a watched object, the watch will be removed
+ * after 30 seconds. Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @note BUG: watch timeout should be configurable
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param completion what to do when operation has been attempted
+ * @param handle where to store the internal id assigned to this watch
+ * @param watchcb what to do when a notify is received on this object
+ * @param watcherrcb what to do when the watch session encounters an error
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_aio_watch(rados_ioctx_t io, const char *o,
+                                  rados_completion_t completion, uint64_t *handle,
+                                  rados_watchcb2_t watchcb,
+                                  rados_watcherrcb_t watcherrcb,
+                                  void *arg);
+
 /**
  * Check on the status of a watch
  *
index 1287c77de9c42302f36fa1f43035d860a524459c..6d42e2a5c684fe62eba1d70d729c80a313d8eb35 100644 (file)
@@ -989,6 +989,8 @@ namespace librados
     // watch/notify
     int watch2(const std::string& o, uint64_t *handle,
               librados::WatchCtx2 *ctx);
+    int aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle,
+              librados::WatchCtx2 *ctx);
     int unwatch2(uint64_t handle);
     /**
      * Send a notify event ot watchers
index b70c00ac6e4fe9246a5e06ef69e141b0e14855e2..51c4478ab817383be32bc4aa06dca5d133499670 100644 (file)
@@ -89,19 +89,21 @@ struct C_aio_linger_cancel : public Context {
   }
 };
 
-struct C_aio_notify_Complete : public Context {
+struct C_aio_linger_Complete : public Context {
   AioCompletionImpl *c;
   Objecter::LingerOp *linger_op;
+  bool cancel;
 
-  C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
-    : c(_c), linger_op(_linger_op)
+  C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
+    : c(_c), linger_op(_linger_op), cancel(_cancel)
   {
     c->get();
   }
 
   virtual void finish(int r) {
-    c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
-                                                          linger_op));
+    if (cancel || r < 0)
+      c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
+                                                            linger_op));
 
     c->lock.Lock();
     c->rval = r;
@@ -1258,6 +1260,33 @@ int librados::IoCtxImpl::watch(const object_t& oid,
   return r;
 }
 
+int librados::IoCtxImpl::aio_watch(const object_t& oid,
+                                   AioCompletionImpl *c,
+                                   uint64_t *handle,
+                                   librados::WatchCtx *ctx,
+                                   librados::WatchCtx2 *ctx2)
+{
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+  c->io = this;
+  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
+
+  ::ObjectOperation wr;
+  version_t objver;
+
+  *handle = linger_op->get_cookie();
+  linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2);
+
+  prepare_assert_ops(&wr);
+  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
+  *handle = 0;
+  bufferlist bl;
+  objecter->linger_watch(linger_op, wr,
+                         snapc, ceph::real_clock::now(), bl,
+                         oncomplete, &objver);
+
+  return 0;
+}
+
 
 int librados::IoCtxImpl::notify_ack(
   const object_t& oid,
@@ -1358,7 +1387,7 @@ int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
 
   c->io = this;
 
-  Context *oncomplete = new C_aio_notify_Complete(c, linger_op);
+  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
   C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
                                                   objecter, linger_op,
                                                   preply_bl, preply_buf,
index ff3c235f14868da7ddd64d757f69ad2b5b556ee8..4e530b3f9c6993d51b44375a744ef4d06a4075eb 100644 (file)
@@ -209,6 +209,8 @@ struct librados::IoCtxImpl {
   void set_sync_op_version(version_t ver);
   int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
            librados::WatchCtx2 *ctx2);
+  int aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *cookie,
+                librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2);
   int watch_check(uint64_t cookie);
   int unwatch(uint64_t cookie);
   int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms,
index e66d17b638aca3e91aeeeb236127be552810b9a6..1d12f407a518808cbbf48948652564fd7a0951c5 100644 (file)
@@ -1799,6 +1799,15 @@ int librados::IoCtx::watch2(const string& oid, uint64_t *cookie,
   return io_ctx_impl->watch(obj, cookie, NULL, ctx2);
 }
 
+int librados::IoCtx::aio_watch(const string& oid, AioCompletion *c,
+                               uint64_t *cookie,
+                               librados::WatchCtx2 *ctx2)
+{
+  object_t obj(oid);
+  return io_ctx_impl->aio_watch(obj, c->pc, cookie, NULL, ctx2);
+}
+
+
 int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
 {
   return io_ctx_impl->unwatch(handle);