]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: update notify2 API to accept reply payloads
authorSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 21:52:34 +0000 (14:52 -0700)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:32:37 +0000 (10:32 -0800)
Allow the notify2 callers to provide bufferlists or char ** pointers
so that we can pass the reply buffer back to them.

Signed-off-by: Sage Weil <sage@redhat.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/RadosClient.cc
src/librados/librados.cc

index 703cc728b2f77b5aa61f087defd3cc5ebc4a34e9..3e4a2673e45eef326a65c581a4dd381b96280e80 100644 (file)
@@ -1980,15 +1980,35 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
  * This blocks until all watchers of the object have received and
  * reacted to the notify, or a timeout is reached.
  *
+ * The reply buffer is optional.  If specified, the client will get
+ * back an encoded buffer that includes the ids of the clients that
+ * acknowledged the notify as well as their notify ack payloads (if
+ * any).  Clients that timed out are not included.  Even clients that
+ * do not include a notify ack payload are included in the list but
+ * have a 0-length payload associated with them.  The format:
+ *
+ *    le32 num_acks
+ *    {
+ *      le64 gid     global id for the client (for client.1234 that's 1234)
+ *      le32 buflen  length of reply message buffer
+ *      u8 * buflen  payload
+ *    } * num_acks
+ *
+ * Note that this buffer must be released with rados_buffer_free()
+ * when the user is done with it.
+ *
  * @param io the pool the object is in
  * @param o the name of the object
  * @param buf data to send to watchers
  * @param buf_len length of buf in bytes
  * @param timeout_ms notify timeout (in ms)
+ * @param reply_buffer pointer to reply buffer pointer (free with rados_buffer_free)
+ * @param reply_buffer_len pointer to size of reply buffer
  * @returns 0 on success, negative error code on failure
  */
 int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len,
-                 uint64_t timeout_ms);
+                 uint64_t timeout_ms,
+                 char **reply_buffer, size_t *reply_buffer_len);
 
 /**
  * Acknolwedge receipt of a notify
index 3ae0e0d00056e1b8f13769e1fc09eeb794cd9252..20eec5bc5e8343d9d870632e0bb86502cbe126ae 100644 (file)
@@ -913,7 +913,8 @@ namespace librados
     int notify(const std::string& o, uint64_t ver, bufferlist& bl);
     int notify2(const std::string& o,   ///< object
                bufferlist& bl,         ///< optional broadcast payload
-               uint64_t timeout_ms);   ///< timeout (in ms)
+               uint64_t timeout_ms,    ///< timeout (in ms)
+               bufferlist *pbl);       ///< reply buffer
     int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
     int list_snaps(const std::string& o, snap_set_t *out_snaps);
     void set_notify_timeout(uint32_t timeout);
index 570b1a68c86647322b3665c35f397a573a988cc9..9cfddd5bf309dffd3586663441bdc55c280c9db8 100644 (file)
@@ -1122,7 +1122,9 @@ int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie)
 }
 
 int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
-                               uint64_t timeout_ms)
+                               uint64_t timeout_ms,
+                               bufferlist *preply_bl,
+                               char **preply_buf, size_t *preply_buf_len)
 {
   bufferlist inbl, outbl;
 
@@ -1136,6 +1138,9 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
   wc->notify_lock = &mylock_all;
   wc->notify_cond = &cond_all;
   wc->notify_rval = &r_notify;
+  wc->notify_reply_bl = preply_bl;
+  wc->notify_reply_buf = preply_buf;
+  wc->notify_reply_buf_len = preply_buf_len;
 
   lock->Lock();
 
index 5b41ddbe6b0d5ed23878cb10fe344ba724bdaed1..55d4a4944b3d3b4a0b6da1f68f33c5b62338d9ac 100644 (file)
@@ -200,7 +200,8 @@ struct librados::IoCtxImpl {
   int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
            librados::WatchCtx2 *ctx2);
   int unwatch(const object_t& oid, uint64_t cookie);
-  int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms);
+  int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms,
+            bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len);
   int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie,
                 bufferlist& bl);
 
@@ -236,6 +237,9 @@ struct WatchNotifyInfo : public RefCountedWaitObject {
   Mutex *notify_lock;
   Cond *notify_cond;
   bool *notify_done;
+  bufferlist *notify_reply_bl;
+  char **notify_reply_buf;
+  size_t *notify_reply_buf_len;
   int *notify_rval;
 
   WatchNotifyInfo(IoCtxImpl *io_ctx_impl_,
@@ -249,6 +253,9 @@ struct WatchNotifyInfo : public RefCountedWaitObject {
       notify_lock(NULL),
       notify_cond(NULL),
       notify_done(NULL),
+      notify_reply_bl(NULL),
+      notify_reply_buf(NULL),
+      notify_reply_buf_len(NULL),
       notify_rval(NULL) {
     io_ctx_impl->get();
   }
index 2e1cf7e3dc90be65fadddc7df474cbf7ae91cc3d..b19c1e5b02057720e2ad41772958a00cd581eb7b 100644 (file)
@@ -712,6 +712,17 @@ void librados::RadosClient::do_watch_notify(MWatchNotify *m)
       wc->notify_lock->Lock();
       *wc->notify_done = true;
       *wc->notify_rval = m->return_code;
+      if (wc->notify_reply_bl) {
+       wc->notify_reply_bl->claim(m->get_data());
+      }
+      if (wc->notify_reply_buf) {
+       *wc->notify_reply_buf = (char*)malloc(m->get_data().length());
+       memcpy(*wc->notify_reply_buf, m->get_data().c_str(),
+              m->get_data().length());
+      }
+      if (wc->notify_reply_buf_len) {
+       *wc->notify_reply_buf_len = m->get_data().length();
+      }
       wc->notify_cond->Signal();
       wc->notify_lock->Unlock();
     } else {
index 6a20db5ec00ae0379b3b28b78eabb5ef1de03d5c..aad5eaac781e898e1aed2874a265dd4a47056967 100644 (file)
@@ -1683,20 +1683,21 @@ int librados::IoCtx::unwatch(const string& oid, uint64_t handle)
 int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
 {
   object_t obj(oid);
-  return io_ctx_impl->notify(obj, bl, 0);
+  return io_ctx_impl->notify(obj, bl, 0, NULL, NULL, NULL);
 }
 
 int librados::IoCtx::notify2(const string& oid, bufferlist& bl,
-                            uint64_t timeout_ms)
+                            uint64_t timeout_ms, bufferlist *preplybl)
 {
   object_t obj(oid);
-  return io_ctx_impl->notify(obj, bl, timeout_ms);
+  return io_ctx_impl->notify(obj, bl, timeout_ms, preplybl, NULL, NULL);
 }
 
 void librados::IoCtx::notify_ack(const std::string& o,
-                                uint64_t notify_id, uint64_t handle)
+                                uint64_t notify_id, uint64_t handle,
+                                bufferlist& bl)
 {
-  io_ctx_impl->notify_ack(o, notify_id, handle);
+  io_ctx_impl->notify_ack(o, notify_id, handle, bl);
 }
 
 int librados::IoCtx::list_watchers(const std::string& oid,
@@ -3815,14 +3816,16 @@ extern "C" int rados_notify(rados_ioctx_t io, const char *o,
     memcpy(p.c_str(), buf, buf_len);
     bl.push_back(p);
   }
-  int retval = ctx->notify(oid, bl, 0);
+  int retval = ctx->notify(oid, bl, 0, NULL, NULL, NULL);
   tracepoint(librados, rados_notify_exit, retval);
   return retval;
 }
 
 extern "C" int rados_notify2(rados_ioctx_t io, const char *o,
                             const char *buf, int buf_len,
-                            uint64_t timeout_ms)
+                            uint64_t timeout_ms,
+                            char **reply_buffer,
+                            size_t *reply_buffer_len)
 {
   tracepoint(librados, rados_notify2_enter, io, o, buf, buf_len, timeout_ms);
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
@@ -3833,7 +3836,7 @@ extern "C" int rados_notify2(rados_ioctx_t io, const char *o,
     memcpy(p.c_str(), buf, buf_len);
     bl.push_back(p);
   }
-  int ret = ctx->notify(oid, bl, timeout_ms);
+  int ret = ctx->notify(oid, bl, timeout_ms, NULL, reply_buffer, reply_buffer_len);
   tracepoint(librados, rados_notify2_exit, ret);
   return ret;
 }