]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: new AIO version of notify API
authorJason Dillaman <dillaman@redhat.com>
Wed, 10 Jun 2015 19:33:17 +0000 (15:33 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:40 +0000 (20:42 -0500)
Allow watch/notify notifications to be sent asynchronously so
that they can be safely sent from a librados op callback.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/librados.cc
src/osdc/Objecter.h
src/tracing/librados.tp

index 5d4ac753a9607b8679609d35982b60b97c13f0f0..1ce385dde4fd394800b5a97ea8b5e18d313749e8 100644 (file)
@@ -2092,6 +2092,7 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
  * -ETIMEDOUT).
  *
  * @param io the pool the object is in
+ * @param completion what to do when operation has been attempted
  * @param o the name of the object
  * @param buf data to send to watchers
  * @param buf_len length of buf in bytes
@@ -2104,6 +2105,11 @@ CEPH_RADOS_API int rados_notify2(rados_ioctx_t io, const char *o,
                                 const char *buf, int buf_len,
                                 uint64_t timeout_ms,
                                 char **reply_buffer, size_t *reply_buffer_len);
+CEPH_RADOS_API int rados_aio_notify(rados_ioctx_t io, const char *o,
+                                    rados_completion_t completion,
+                                    const char *buf, int buf_len,
+                                    uint64_t timeout_ms, char **reply_buffer,
+                                    size_t *reply_buffer_len);
 
 /**
  * Acknolwedge receipt of a notify
index 52ce642934c88ca851c21f7305a9f5890838721a..0af06544dde8f2e5b5dd73bbf2907f00cc046eb7 100644 (file)
@@ -988,6 +988,12 @@ namespace librados
                bufferlist& bl,         ///< optional broadcast payload
                uint64_t timeout_ms,    ///< timeout (in ms)
                bufferlist *pbl);       ///< reply buffer
+    int aio_notify(const std::string& o,   ///< object
+                   AioCompletion *c,       ///< completion when notify completes
+                   bufferlist& bl,         ///< optional broadcast payload
+                   uint64_t timeout_ms,    ///< timeout (in ms)
+                   bufferlist *pbl);       ///< reply buffer
+
     int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
     int list_snaps(const std::string& o, snap_set_t *out_snaps);
     void set_notify_timeout(uint32_t timeout);
index 517eeb8200b422f55e24b435cabb36557fa922d5..f12c9214a3027dc1ae8d1637f0ab29e82f6cea21 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "librados: "
 
+namespace librados {
+namespace {
+
+struct C_notify_Finish : public Context {
+  CephContext *cct;
+  Context *ctx;
+  Objecter *objecter;
+  Objecter::LingerOp *linger_op;
+  bufferlist reply_bl;
+  bufferlist *preply_bl;
+  char **preply_buf;
+  size_t *preply_buf_len;
+
+  C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
+                  Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
+                  char **_preply_buf, size_t *_preply_buf_len)
+    : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
+      preply_bl(_preply_bl), preply_buf(_preply_buf),
+      preply_buf_len(_preply_buf_len)
+  {
+    linger_op->on_notify_finish = this;
+    linger_op->notify_result_bl = &reply_bl;
+  }
+
+  virtual void finish(int r)
+  {
+    ldout(cct, 10) << __func__ << " completed notify (linger op "
+                   << linger_op << "), r = " << r << dendl;
+
+    // pass result back to user
+    // NOTE: we do this regardless of what error code we return
+    if (preply_buf) {
+      if (reply_bl.length()) {
+        *preply_buf = (char*)malloc(reply_bl.length());
+        memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
+      } else {
+        *preply_buf = NULL;
+      }
+    }
+    if (preply_buf_len)
+      *preply_buf_len = reply_bl.length();
+    if (preply_bl)
+      preply_bl->claim(reply_bl);
+
+    ctx->complete(r);
+  }
+};
+
+struct C_aio_linger_cancel : public Context {
+  Objecter *objecter;
+  Objecter::LingerOp *linger_op;
+
+  C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
+    : objecter(_objecter), linger_op(_linger_op)
+  {
+  }
+
+  virtual void finish(int r)
+  {
+    objecter->linger_cancel(linger_op);
+  }
+};
+
+struct C_aio_notify_Complete : public Context {
+  AioCompletionImpl *c;
+  Objecter::LingerOp *linger_op;
+
+  C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
+    : c(_c), linger_op(_linger_op)
+  {
+    c->get();
+  }
+
+  virtual void finish(int r) {
+    c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
+                                                          linger_op));
+
+    c->lock.Lock();
+    c->rval = r;
+    c->ack = true;
+    c->safe = true;
+    c->cond.Signal();
+
+    if (c->callback_complete) {
+      c->io->client->finisher.queue(new C_AioComplete(c));
+    }
+    if (c->callback_safe) {
+      c->io->client->finisher.queue(new C_AioSafe(c));
+    }
+    c->put_unlock();
+  }
+};
+
+struct C_aio_notify_Ack : public Context {
+  CephContext *cct;
+  C_notify_Finish *f;
+
+  C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_f)
+    : cct(_cct), f(_f)
+  {
+  }
+
+  virtual void finish(int r)
+  {
+    ldout(cct, 10) << __func__ << " linger op " << f->linger_op << " acked ("
+                   << r << ")" << dendl;
+    if (r < 0) {
+      f->complete(r);
+    }
+  }
+};
+
+} // anonymous namespace
+} // namespace librados
+
 librados::IoCtxImpl::IoCtxImpl() :
   ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
   notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
@@ -1169,52 +1284,22 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
                                bufferlist *preply_bl,
                                char **preply_buf, size_t *preply_buf_len)
 {
-  bufferlist inbl;
-
-  struct C_NotifyFinish : public Context {
-    Cond cond;
-    Mutex lock;
-    bool done;
-    int result;
-    bufferlist reply_bl;
-
-    C_NotifyFinish()
-      : lock("IoCtxImpl::notify::C_NotifyFinish::lock"),
-       done(false),
-       result(0) { }
-
-    void finish(int r) {}
-    void complete(int r) {
-      lock.Lock();
-      done = true;
-      result = r;
-      cond.Signal();
-      lock.Unlock();
-    }
-    void wait() {
-      lock.Lock();
-      while (!done)
-       cond.Wait(lock);
-      lock.Unlock();
-    }
-  } notify_private;
-
   Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
-  linger_op->on_notify_finish = &notify_private;
-  linger_op->notify_result_bl = &notify_private.reply_bl;
 
-  uint32_t prot_ver = 1;
+  C_SaferCond notify_finish_cond;
+  Context *notify_finish = new C_notify_Finish(client->cct, &notify_finish_cond,
+                                               objecter, linger_op, preply_bl,
+                                               preply_buf, preply_buf_len);
+
   uint32_t timeout = notify_timeout;
   if (timeout_ms)
     timeout = timeout_ms / 1000;
-  ::encode(prot_ver, inbl);
-  ::encode(timeout, inbl);
-  ::encode(bl, inbl);
 
   // Construct RADOS op
   ::ObjectOperation rd;
   prepare_assert_ops(&rd);
-  rd.notify(linger_op->get_cookie(), inbl);
+  bufferlist inbl;
+  rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
 
   // Issue RADOS op
   C_SaferCond onack;
@@ -1224,44 +1309,58 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
                          &onack, &objver);
 
   ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
-  int r_issue = onack.wait();
+  int r = onack.wait();
   ldout(client->cct, 10) << __func__ << " linger op " << linger_op
-                        << " acked (" << r_issue << ")" << dendl;
+                        << " acked (" << r << ")" << dendl;
 
-  if (r_issue == 0) {
+  if (r == 0) {
     ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
                           << linger_op << dendl;
-    notify_private.wait();
+    r = notify_finish_cond.wait();
 
-    ldout(client->cct, 10) << __func__ << " completed notify (linger op "
-                          << linger_op << "), r = " << notify_private.result
-                          << dendl;
   } else {
     ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
-                          << r_issue << dendl;
+                          << r << dendl;
+    notify_finish->complete(r);
   }
 
-  // pass result back to user
-  // NOTE: we do this regardless of what error code we return
-  if (preply_buf) {
-    if (notify_private.reply_bl.length()) {
-      *preply_buf = (char*)malloc(notify_private.reply_bl.length());
-      memcpy(*preply_buf, notify_private.reply_bl.c_str(),
-            notify_private.reply_bl.length());
-    } else {
-      *preply_buf = NULL;
-    }
-  }
-  if (preply_buf_len)
-    *preply_buf_len = notify_private.reply_bl.length();
-  if (preply_bl)
-    preply_bl->claim(notify_private.reply_bl);
-
   objecter->linger_cancel(linger_op);
 
   set_sync_op_version(objver);
+  return r;
+}
+
+int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
+                                    bufferlist& bl, uint64_t timeout_ms,
+                                    bufferlist *preply_bl, char **preply_buf,
+                                    size_t *preply_buf_len)
+{
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
 
-  return r_issue ? r_issue : notify_private.result;
+  c->io = this;
+
+  Context *oncomplete = new C_aio_notify_Complete(c, linger_op);
+  C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
+                                                  objecter, linger_op,
+                                                  preply_bl, preply_buf,
+                                                  preply_buf_len);
+  Context *onack = new C_aio_notify_Ack(client->cct, onnotify);
+
+  uint32_t timeout = notify_timeout;
+  if (timeout_ms)
+    timeout = timeout_ms / 1000;
+
+  // Construct RADOS op
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  bufferlist inbl;
+  rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
+
+  // Issue RADOS op
+  objecter->linger_notify(linger_op,
+                         rd, snap_seq, inbl, NULL,
+                         onack, NULL);
+  return 0;
 }
 
 int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
@@ -1395,4 +1494,3 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r)
 
   c->put_unlock();
 }
-
index 45bfdfdbee7b04377b64ae9018c41b75296cf676..b0a1b19c2346d477991e555660658884315327f2 100644 (file)
@@ -206,6 +206,9 @@ struct librados::IoCtxImpl {
             bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len);
   int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie,
                 bufferlist& bl);
+  int aio_notify(const object_t& oid, AioCompletionImpl *c, bufferlist& bl,
+                 uint64_t timeout_ms, bufferlist *preplybl, char **preply_buf,
+                 size_t *preply_buf_len);
 
   int set_alloc_hint(const object_t& oid,
                      uint64_t expected_object_size,
index 86badc2bc84b044dc10a011cf0d50f621c55df3a..c55fa996384049f6b3a690d2ff9dfed95ff6b06a 100644 (file)
@@ -1811,6 +1811,15 @@ int librados::IoCtx::notify2(const string& oid, bufferlist& bl,
   return io_ctx_impl->notify(obj, bl, timeout_ms, preplybl, NULL, NULL);
 }
 
+int librados::IoCtx::aio_notify(const string& oid, AioCompletion *c,
+                                bufferlist& bl, uint64_t timeout_ms,
+                                bufferlist *preplybl)
+{
+  object_t obj(oid);
+  return io_ctx_impl->aio_notify(obj, c->pc, bl, timeout_ms, preplybl, NULL,
+                                 NULL);
+}
+
 void librados::IoCtx::notify_ack(const std::string& o,
                                 uint64_t notify_id, uint64_t handle,
                                 bufferlist& bl)
@@ -4072,6 +4081,28 @@ extern "C" int rados_notify2(rados_ioctx_t io, const char *o,
   return ret;
 }
 
+extern "C" int rados_aio_notify(rados_ioctx_t io, const char *o,
+                                rados_completion_t completion,
+                                const char *buf, int buf_len,
+                                uint64_t timeout_ms, char **reply_buffer,
+                                size_t *reply_buffer_len)
+{
+  tracepoint(librados, rados_aio_notify_enter, io, o, completion, buf, buf_len,
+             timeout_ms);
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  object_t oid(o);
+  bufferlist bl;
+  if (buf) {
+    bl.push_back(buffer::copy(buf, buf_len));
+  }
+  librados::AioCompletionImpl *c =
+    reinterpret_cast<librados::AioCompletionImpl*>(completion);
+  int ret = ctx->aio_notify(oid, c, bl, timeout_ms, NULL, reply_buffer,
+                            reply_buffer_len);
+  tracepoint(librados, rados_aio_notify_exit, ret);
+  return ret;
+}
+
 extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o,
                                uint64_t notify_id, uint64_t handle,
                                const char *buf, int buf_len)
index 97506a9899a182983c47862fd6a3ccc568ef1d75..ac09e70b7dd870d1f2b26519b6e35f9234360249 100644 (file)
@@ -904,10 +904,14 @@ struct ObjectOperation {
     osd_op.op.watch.op = op;
   }
 
-  void notify(uint64_t cookie, bufferlist& inbl) {
+  void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
+              bufferlist &bl, bufferlist *inbl) {
     OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
     osd_op.op.notify.cookie = cookie;
-    osd_op.indata.append(inbl);
+    ::encode(prot_ver, *inbl);
+    ::encode(timeout, *inbl);
+    ::encode(bl, *inbl);
+    osd_op.indata.append(*inbl);
   }
 
   void notify_ack(uint64_t notify_id, uint64_t cookie,
index 0ba22ea0410fbeec2c8317abe04725d871f3cc4d..dbb5d8dabbf90ca439805c3324df67fdbefca39e 100644 (file)
@@ -2339,6 +2339,31 @@ TRACEPOINT_EVENT(librados, rados_notify2_exit,
     )
 )
 
+TRACEPOINT_EVENT(librados, rados_aio_notify_enter,
+    TP_ARGS(
+        rados_ioctx_t, ioctx,
+        const char*, oid,
+        rados_completion_t, completion,
+        const char*, buf,
+        int, buf_len,
+       uint64_t, timeout_ms),
+    TP_FIELDS(
+        ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+        ctf_string(oid, oid)
+        ctf_integer_hex(rados_completion_t, completion, completion)
+        ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len)
+        ctf_integer(uint64_t, timeout_ms, timeout_ms)
+    )
+)
+
+TRACEPOINT_EVENT(librados, rados_aio_notify_exit,
+    TP_ARGS(
+        int, retval),
+    TP_FIELDS(
+        ctf_integer(int, retval, retval)
+    )
+)
+
 TRACEPOINT_EVENT(librados, rados_notify_ack_enter,
     TP_ARGS(
         rados_ioctx_t, ioctx,