]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: define updated watch/notify interface
authorSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 21:32:48 +0000 (14:32 -0700)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:32:37 +0000 (10:32 -0800)
- new notify callback with the correct values:
  - notify_id
  - watch handle
  - payload
- new notify_ack call
  - not implicit when the callback returns (for new api only)
  - optional payload
- new watch2 call
  - that provides the new callback
- new notify2 call
  - with the right arguments, and optional timeout

A couple refactors in here:
- IoCtx notify_ack is now called unlocked (Note: this will soon change
  with pending Objecter locking changes)
- Objecter notify_ack takes a buffer

TODO:
- no timeout on the individual watch, yet...

Signed-off-by: Sage Weil <sage@redhat.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/RadosClient.cc
src/librados/librados.cc
src/osdc/Objecter.h
src/tracing/librados.tp

index 9c4a5732ff69613b975f3cc555d7eb30422f0a3b..703cc728b2f77b5aa61f087defd3cc5ebc4a34e9 100644 (file)
@@ -1871,6 +1871,23 @@ CEPH_RADOS_API int rados_aio_cancel(rados_ioctx_t io,
  */
 typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg);
 
+/**
+ * @typedef rados_watchcb2_t
+ *
+ * Callback activated when a notify is received on a watched
+ * object. Parameters are:
+ * - arg opaque user-defined value provided to rados_watch2()
+ * - notify_id an id for this notify event
+ * - handle the watcher handle we are notifying
+ * - data payload from the notifier
+ * - datalen length of payload buffer
+ */
+typedef void (*rados_watchcb2_t)(void *arg,
+                                uint64_t notify_id,
+                                uint64_t handle,
+                                void *data,
+                                size_t data_len);
+
 /**
  * Register an interest in an object
  *
@@ -1900,6 +1917,30 @@ CEPH_RADOS_API int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver,
                                uint64_t *handle, rados_watchcb_t watchcb,
                                void *arg);
 
+
+/**
+ * Register an interest in an object
+ *
+ * A watch operation registers the client as being interested in
+ * notifications on an object. OSDs keep track of watches on
+ * persistent storage, so they are preserved across cluster changes by
+ * the normal recovery process. If the client loses its connection to
+ * the primary OSD for a watched object, the watch will be removed
+ * after 30 seconds. Watches are automatically reestablished when a new
+ * connection is made, or a placement group switches OSDs.
+ *
+ * @note BUG: watch timeout should be configurable
+ *
+ * @param io the pool the object is in
+ * @param o the object to watch
+ * @param handle where to store the internal id assigned to this watch
+ * @param watchcb2 what to do when a notify is received on this object
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
+                rados_watchcb2_t watchcb, void *arg);
+
 /**
  * Unregister an interest in an object
  *
@@ -1933,6 +1974,38 @@ CEPH_RADOS_API int rados_unwatch(rados_ioctx_t io, const char *o,
 CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
                                 const char *buf, int buf_len);
 
+/**
+ * Sychronously notify watchers of an object
+ *
+ * This blocks until all watchers of the object have received and
+ * reacted to the notify, or a timeout is reached.
+ *
+ * @param io the pool the object is in
+ * @param o the name of the object
+ * @param buf data to send to watchers
+ * @param buf_len length of buf in bytes
+ * @param timeout_ms notify timeout (in ms)
+ * @returns 0 on success, negative error code on failure
+ */
+int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len,
+                 uint64_t timeout_ms);
+
+/**
+ * Acknolwedge receipt of a notify
+ *
+ * @param io the pool the object is in
+ * @param o the name of the object
+ * @param notify_id the notify_id we got on the watchcb2_t callback
+ * @param handle the watcher handle
+ * @param buf payload to return to notifier (optional)
+ * @param buf_len payload length
+ * @returns 0 on success
+ */
+int rados_notify_ack(rados_ioctx_t io, const char *o,
+                    uint64_t notify_id, uint64_t handle,
+                    const char *buf, int buf_len);
+
+
 /** @} Watch/Notify */
 
 /**
index 402ede00687220d3dcb768558e3bcdcb57f5ae47..3ae0e0d00056e1b8f13769e1fc09eeb794cd9252 100644 (file)
@@ -145,12 +145,26 @@ namespace librados
     std::pair<std::string, std::string> cur_obj;
   };
 
+  /// DEPRECATED; do not use
   class CEPH_RADOS_API WatchCtx {
   public:
     virtual ~WatchCtx();
     virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) = 0;
   };
 
+  class CEPH_RADOS_API WatchCtx2 {
+  public:
+    virtual ~WatchCtx2();
+    /**
+     * @param notify_id unique id for this notify event
+     * @param cookie the watcher we are notifying
+     * @param bl opaque notify payload (from the notifier)
+     */
+    virtual void handle_notify(uint64_t notify_id,
+                              uint64_t cookie,
+                              bufferlist& bl) = 0;
+  };
+
   struct CEPH_RADOS_API AioCompletion {
     AioCompletion(AioCompletionImpl *pc_) : pc(pc_) {}
     int set_complete_callback(void *cb_arg, callback_t cb);
@@ -893,12 +907,23 @@ namespace librados
     // watch/notify
     int watch(const std::string& o, uint64_t ver, uint64_t *handle,
              librados::WatchCtx *ctx);
+    int watch2(const std::string& o, uint64_t *handle,
+              librados::WatchCtx2 *ctx);
     int unwatch(const std::string& o, uint64_t handle);
     int notify(const std::string& o, uint64_t ver, bufferlist& bl);
+    int notify2(const std::string& o,   ///< object
+               bufferlist& bl,         ///< optional broadcast payload
+               uint64_t timeout_ms);   ///< timeout (in ms)
     int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
     int list_snaps(const std::string& o, snap_set_t *out_snaps);
     void set_notify_timeout(uint32_t timeout);
 
+    /// acknowledge a notify we received.
+    void notify_ack(const std::string& o, ///< watched object
+                   uint64_t notify_id,   ///< notify id
+                   uint64_t handle,      ///< our watch handle
+                   bufferlist& bl);      ///< optional reply payload
+
     /**
      * Set allocation hint for an object
      *
index dcbe032863bc04d5a4e01a05eb59146dc9b94d03..570b1a68c86647322b3665c35f397a573a988cc9 100644 (file)
@@ -1032,8 +1032,10 @@ void librados::IoCtxImpl::set_sync_op_version(version_t ver)
   last_objver = ver;
 }
 
-int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
-                              uint64_t *cookie, librados::WatchCtx *ctx)
+int librados::IoCtxImpl::watch(const object_t& oid,
+                              uint64_t *cookie,
+                              librados::WatchCtx *ctx,
+                              librados::WatchCtx2 *ctx2)
 {
   ::ObjectOperation wr;
   Mutex mylock("IoCtxImpl::watch::mylock");
@@ -1047,9 +1049,10 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
 
   WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
   wc->watch_ctx = ctx;
+  wc->watch_ctx2 = ctx2;
   client->register_watch_notify_callback(wc, cookie);
   prepare_assert_ops(&wr);
-  wr.watch(*cookie, ver, 1);
+  wr.watch(*cookie, 0, 1);
   bufferlist bl;
   wc->linger_id = objecter->linger_mutate(oid, oloc, wr,
                                          snapc, ceph_clock_now(NULL), bl,
@@ -1074,15 +1077,16 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
 }
 
 
-/* this is called with IoCtxImpl::lock held */
-int librados::IoCtxImpl::_notify_ack(
+int librados::IoCtxImpl::notify_ack(
   const object_t& oid,
   uint64_t notify_id,
-  uint64_t cookie)
+  uint64_t cookie,
+  bufferlist& bl)
 {
+  Mutex::Locker l(*lock);
   ::ObjectOperation rd;
   prepare_assert_ops(&rd);
-  rd.notify_ack(notify_id, 0, cookie);
+  rd.notify_ack(notify_id, cookie, bl);
   objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
   return 0;
 }
@@ -1117,7 +1121,8 @@ int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie)
   return r;
 }
 
-int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& bl)
+int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
+                               uint64_t timeout_ms)
 {
   bufferlist inbl, outbl;
 
@@ -1139,6 +1144,8 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
   client->register_watch_notify_callback(wc, &cookie);
   uint32_t prot_ver = 1;
   uint32_t timeout = notify_timeout;
+  if (timeout_ms)
+    timeout = timeout_ms / 1000;
   ::encode(prot_ver, inbl);
   ::encode(timeout, inbl);
   ::encode(bl, inbl);
@@ -1146,7 +1153,7 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
   // Construct RADOS op
   ::ObjectOperation rd;
   prepare_assert_ops(&rd);
-  rd.notify(cookie, ver, inbl);
+  rd.notify(cookie, 0, inbl);
 
   // Issue RADOS op
   C_SaferCond onack;
index 0e4a1f044a99d347cfcc4548855cf213abe6ab2f..5b41ddbe6b0d5ed23878cb10fe344ba724bdaed1 100644 (file)
@@ -197,10 +197,12 @@ struct librados::IoCtxImpl {
                  bufferlist *pbl);
 
   void set_sync_op_version(version_t ver);
-  int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
+  int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
+           librados::WatchCtx2 *ctx2);
   int unwatch(const object_t& oid, uint64_t cookie);
-  int notify(const object_t& oid, uint64_t ver, bufferlist& bl);
-  int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie);
+  int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms);
+  int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie,
+                bufferlist& bl);
 
   int set_alloc_hint(const object_t& oid,
                      uint64_t expected_object_size,
@@ -226,8 +228,9 @@ struct WatchNotifyInfo : public RefCountedWaitObject {
   uint64_t linger_id;      // we use this to unlinger when we are done
   uint64_t cookie;         // callback cookie
 
-  // watcher
+  // watcher.  only one of these will be defined.
   librados::WatchCtx *watch_ctx;
+  librados::WatchCtx2 *watch_ctx2;
 
   // notify that we initiated
   Mutex *notify_lock;
@@ -242,6 +245,7 @@ struct WatchNotifyInfo : public RefCountedWaitObject {
       linger_id(0),
       cookie(0),
       watch_ctx(NULL),
+      watch_ctx2(NULL),
       notify_lock(NULL),
       notify_cond(NULL),
       notify_done(NULL),
index 85a93466e4ec40795543852489e7ebf36f682e13..2e1cf7e3dc90be65fadddc7df474cbf7ae91cc3d 100644 (file)
@@ -720,13 +720,18 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m)
       wc->get();
 
       // trigger the callback
+      assert(!!wc->watch_ctx ^ !!wc->watch_ctx2);  // only one is defined
       lock.Unlock();
-      wc->watch_ctx->notify(m->opcode, m->ver, m->bl);
+      if (wc->watch_ctx) {
+       wc->watch_ctx->notify(m->opcode, m->ver, m->bl);
+       // send ACK back to the OSD
+       bufferlist empty;
+       wc->io_ctx_impl->notify_ack(wc->oid, m->notify_id, m->cookie, empty);
+      } else if (wc->watch_ctx2) {
+       wc->watch_ctx2->handle_notify(m->notify_id, m->cookie, m->bl);
+       // user needs to explicitly ack (and may have already!)
+      }
       lock.Lock();
-
-      // send ACK back to the OSD
-      wc->io_ctx_impl->_notify_ack(wc->oid, m->notify_id, m->cookie);
-
       ldout(cct,10) << __func__ << " notify done" << dendl;
       wc->put();
     }
index 573fae7ee7c24efe6c37eec347d14f10dd44e5a1..6a20db5ec00ae0379b3b28b78eabb5ef1de03d5c 100644 (file)
@@ -480,6 +480,11 @@ librados::WatchCtx::
 {
 }
 
+librados::WatchCtx2::
+~WatchCtx2()
+{
+}
+
 
 struct librados::ObjListCtx {
   bool new_request;
@@ -1658,7 +1663,14 @@ int librados::IoCtx::watch(const string& oid, uint64_t ver, uint64_t *cookie,
                           librados::WatchCtx *ctx)
 {
   object_t obj(oid);
-  return io_ctx_impl->watch(obj, ver, cookie, ctx);
+  return io_ctx_impl->watch(obj, cookie, ctx, NULL);
+}
+
+int librados::IoCtx::watch2(const string& oid, uint64_t *cookie,
+                           librados::WatchCtx2 *ctx2)
+{
+  object_t obj(oid);
+  return io_ctx_impl->watch(obj, cookie, NULL, ctx2);
 }
 
 int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
@@ -1671,7 +1683,20 @@ int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
 int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
 {
   object_t obj(oid);
-  return io_ctx_impl->notify(obj, ver, bl);
+  return io_ctx_impl->notify(obj, bl, 0);
+}
+
+int librados::IoCtx::notify2(const string& oid, bufferlist& bl,
+                            uint64_t timeout_ms)
+{
+  object_t obj(oid);
+  return io_ctx_impl->notify(obj, bl, timeout_ms);
+}
+
+void librados::IoCtx::notify_ack(const std::string& o,
+                                uint64_t notify_id, uint64_t handle)
+{
+  io_ctx_impl->notify_ack(o, notify_id, handle);
 }
 
 int librados::IoCtx::list_watchers(const std::string& oid,
@@ -3729,20 +3754,45 @@ struct C_WatchCB : public librados::WatchCtx {
   }
 };
 
-int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle,
-                rados_watchcb_t watchcb, void *arg)
+extern "C" int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver,
+                          uint64_t *handle,
+                          rados_watchcb_t watchcb, void *arg)
 {
   tracepoint(librados, rados_watch_enter, io, o, ver, watchcb, arg);
   uint64_t *cookie = handle;
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
   object_t oid(o);
   C_WatchCB *wc = new C_WatchCB(watchcb, arg);
-  int retval = ctx->watch(oid, ver, cookie, wc);
+  int retval = ctx->watch(oid, cookie, wc, NULL);
   tracepoint(librados, rados_watch_exit, retval, *handle);
   return retval;
 }
 
-int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
+struct C_WatchCB2 : public librados::WatchCtx2 {
+  rados_watchcb2_t wcb;
+  void *arg;
+  C_WatchCB2(rados_watchcb2_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {}
+  void handle_notify(uint64_t notify_id,
+                    uint64_t cookie,
+                    bufferlist& bl) {
+    wcb(arg, notify_id, cookie, bl.c_str(), bl.length());
+  }
+};
+
+extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
+                           rados_watchcb2_t watchcb, void *arg)
+{
+  tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg);
+  uint64_t *cookie = handle;
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  object_t oid(o);
+  C_WatchCB2 *wc = new C_WatchCB2(watchcb, arg);
+  int ret = ctx->watch(oid, cookie, NULL, wc);
+  tracepoint(librados, rados_watch_exit, ret, *handle);
+  return ret;
+}
+
+extern "C" int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
 {
   tracepoint(librados, rados_unwatch_enter, io, o, handle);
   uint64_t cookie = handle;
@@ -3753,7 +3803,8 @@ int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
   return retval;
 }
 
-int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len)
+extern "C" int rados_notify(rados_ioctx_t io, const char *o,
+                           uint64_t ver, const char *buf, int buf_len)
 {
   tracepoint(librados, rados_notify_enter, io, o, ver, buf, buf_len);
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
@@ -3764,11 +3815,48 @@ int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf,
     memcpy(p.c_str(), buf, buf_len);
     bl.push_back(p);
   }
-  int retval = ctx->notify(oid, ver, bl);
+  int retval = ctx->notify(oid, bl, 0);
   tracepoint(librados, rados_notify_exit, retval);
   return retval;
 }
 
+extern "C" int rados_notify2(rados_ioctx_t io, const char *o,
+                            const char *buf, int buf_len,
+                            uint64_t timeout_ms)
+{
+  tracepoint(librados, rados_notify2_enter, io, o, buf, buf_len, timeout_ms);
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  object_t oid(o);
+  bufferlist bl;
+  if (buf) {
+    bufferptr p = buffer::create(buf_len);
+    memcpy(p.c_str(), buf, buf_len);
+    bl.push_back(p);
+  }
+  int ret = ctx->notify(oid, bl, timeout_ms);
+  tracepoint(librados, rados_notify2_exit, ret);
+  return ret;
+}
+
+extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o,
+                               uint64_t notify_id, uint64_t handle,
+                               const char *buf, int buf_len)
+{
+  tracepoint(librados, rados_notify_ack_enter, io, o, notify_id, handle, buf, buf_len);
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  object_t oid(o);
+  bufferlist bl;
+  if (buf) {
+    bufferptr p = buffer::create(buf_len);
+    memcpy(p.c_str(), buf, buf_len);
+    bl.push_back(p);
+  }
+  ctx->notify_ack(oid, notify_id, handle, bl);
+  int retval = 0;
+  tracepoint(librados, rados_notify_ack_exit, retval);
+  return retval;
+}
+
 extern "C" int rados_set_alloc_hint(rados_ioctx_t io, const char *o,
                                     uint64_t expected_object_size,
                                     uint64_t expected_write_size)
index c436bf23686d8e3b90d3cf12bae3cc85d5184ec9..347f7f9d8b5503ba50224387fc8437394d458e94 100644 (file)
@@ -862,11 +862,13 @@ struct ObjectOperation {
     add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl); 
   }
 
-  void notify_ack(uint64_t notify_id, uint64_t ver, uint64_t cookie) {
+  void notify_ack(uint64_t notify_id, uint64_t cookie,
+                 bufferlist& reply_bl) {
     bufferlist bl;
     ::encode(notify_id, bl);
     ::encode(cookie, bl);
-    add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
+    ::encode(reply_bl, bl);
+    add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, 0, 0, bl);
   }
 
   void list_watchers(list<obj_watch_t> *out,
index 263a0d6aaa48d3f307c2b2a652b803884805c646..339c2cedebb13135216b757f0240268c98db8828 100644 (file)
@@ -2190,6 +2190,32 @@ TRACEPOINT_EVENT(librados, rados_watch_exit,
     )
 )
 
+TRACEPOINT_EVENT(librados, rados_watch2_enter,
+    TP_ARGS(
+        rados_ioctx_t, ioctx,
+        const char*, oid,
+        uint64_t*, phandle,
+        rados_watchcb2_t, callback,
+        void*, arg),
+    TP_FIELDS(
+        ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+        ctf_string(oid, oid)
+        ctf_integer_hex(uint64_t, phandle, phandle)
+        ctf_integer_hex(rados_watchcb2_t, callback, callback)
+        ctf_integer_hex(void*, arg, arg)
+    )
+)
+
+TRACEPOINT_EVENT(librados, rados_watch2_exit,
+    TP_ARGS(
+        int, retval,
+        uint64_t, handle),
+    TP_FIELDS(
+        ctf_integer(int, retval, retval)
+        ctf_integer(uint64_t, handle, handle)
+    )
+)
+
 TRACEPOINT_EVENT(librados, rados_unwatch_enter,
     TP_ARGS(
         rados_ioctx_t, ioctx,
@@ -2233,6 +2259,54 @@ TRACEPOINT_EVENT(librados, rados_notify_exit,
     )
 )
 
+TRACEPOINT_EVENT(librados, rados_notify2_enter,
+    TP_ARGS(
+        rados_ioctx_t, ioctx,
+        const char*, oid,
+        const char*, buf,
+        int, buf_len,
+       uint64_t, timeout_ms),
+    TP_FIELDS(
+        ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+        ctf_string(oid, oid)
+        ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len)
+        ctf_integer(uint64_t, timeout_ms, timeout_ms)
+    )
+)
+
+TRACEPOINT_EVENT(librados, rados_notify2_exit,
+    TP_ARGS(
+        int, retval),
+    TP_FIELDS(
+        ctf_integer(int, retval, retval)
+    )
+)
+
+TRACEPOINT_EVENT(librados, rados_notify_ack_enter,
+    TP_ARGS(
+        rados_ioctx_t, ioctx,
+        const char*, oid,
+       uint64_t, notify_id,
+       uint64_t, handle,
+        const char*, buf,
+        int, buf_len),
+    TP_FIELDS(
+        ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+        ctf_string(oid, oid)
+        ctf_integer(uint64_t, notify_id, notify_id)
+        ctf_integer(uint64_t, handle, handle)
+        ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len)
+    )
+)
+
+TRACEPOINT_EVENT(librados, rados_notify_ack_exit,
+    TP_ARGS(
+        int, retval),
+    TP_FIELDS(
+        ctf_integer(int, retval, retval)
+    )
+)
+
 TRACEPOINT_EVENT(librados, rados_set_alloc_hint_enter,
     TP_ARGS(
         rados_ioctx_t, ioctx,