]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd: Implementation of a generic object watcher class
authorRicardo Dias <rdias@suse.com>
Fri, 4 Nov 2016 02:22:59 +0000 (02:22 +0000)
committerJason Dillaman <dillaman@redhat.com>
Tue, 3 Jan 2017 14:19:38 +0000 (09:19 -0500)
Signed-off-by: Ricardo Dias <rdias@suse.com>
14 files changed:
src/librbd/CMakeLists.txt
src/librbd/Operations.cc
src/librbd/Watcher.cc [new file with mode: 0644]
src/librbd/Watcher.h [new file with mode: 0644]
src/librbd/object_watcher/Notifier.cc [deleted file]
src/librbd/object_watcher/Notifier.h [deleted file]
src/librbd/watcher/Notifier.cc [new file with mode: 0644]
src/librbd/watcher/Notifier.h [new file with mode: 0644]
src/librbd/watcher/RewatchRequest.cc [new file with mode: 0644]
src/librbd/watcher/RewatchRequest.h [new file with mode: 0644]
src/librbd/watcher/Types.cc [new file with mode: 0644]
src/librbd/watcher/Types.h [new file with mode: 0644]
src/test/librbd/CMakeLists.txt
src/test/librbd/watcher/test_mock_RewatchRequest.cc [new file with mode: 0644]

index 10f57cd73c69cb9bcd6842e781b2bf7f52091ac2..554544e36349a84eb765877733dee3025742a65a 100644 (file)
@@ -29,6 +29,9 @@ set(librbd_internal_srcs
   Utils.cc
   cache/ImageWriteback.cc
   cache/PassthroughImageCache.cc
+  Watcher.cc
+  watcher/Types.cc
+  watcher/RewatchRequest.cc
   exclusive_lock/AcquireRequest.cc
   exclusive_lock/AutomaticPolicy.cc
   exclusive_lock/ReacquireRequest.cc
@@ -64,7 +67,7 @@ set(librbd_internal_srcs
   object_map/SnapshotRollbackRequest.cc
   object_map/UnlockRequest.cc
   object_map/UpdateRequest.cc
-  object_watcher/Notifier.cc
+  watcher/Notifier.cc
   operation/DisableFeaturesRequest.cc
   operation/EnableFeaturesRequest.cc
   operation/FlattenRequest.cc
index 3d4dd0d479f5378016e0f0d6e3fe9cacfc54f965..b83e1c6f4e58cf51d99f060bfdc97acf72301c48 100644 (file)
@@ -578,6 +578,9 @@ void Operations<I>::execute_rename(const std::string &dest_name,
     // unregister watch before and register back after rename
     on_finish = new C_NotifyUpdate<I>(m_image_ctx, on_finish);
     on_finish = new FunctionContext([this, on_finish](int r) {
+        if (m_image_ctx.old_format) {
+          m_image_ctx.image_watcher->set_oid(m_image_ctx.header_oid);
+        }
        m_image_ctx.image_watcher->register_watch(on_finish);
       });
     on_finish = new FunctionContext([this, dest_name, on_finish](int r) {
diff --git a/src/librbd/Watcher.cc b/src/librbd/Watcher.cc
new file mode 100644 (file)
index 0000000..f145317
--- /dev/null
@@ -0,0 +1,228 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/Watcher.h"
+#include "librbd/watcher/RewatchRequest.h"
+#include "librbd/Utils.h"
+#include "librbd/TaskFinisher.h"
+#include "include/encoding.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include <boost/bind.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Watcher: "
+
+namespace librbd {
+
+using namespace watcher;
+
+using util::create_context_callback;
+using util::create_rados_safe_callback;
+using std::string;
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+  librados::Rados rados;
+  Context *on_finish;
+  bool flushing = false;
+  int ret_val = 0;
+
+  C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
+    : rados(io_ctx), on_finish(on_finish) {
+  }
+
+  virtual void complete(int r) override {
+    if (ret_val == 0 && r < 0) {
+      ret_val = r;
+    }
+
+    if (!flushing) {
+      flushing = true;
+
+      librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
+      r = rados.aio_watch_flush(aio_comp);
+      assert(r == 0);
+      aio_comp->release();
+      return;
+    }
+
+    // ensure our reference to the RadosClient is released prior
+    // to completing the callback to avoid racing an explicit
+    // librados shutdown
+    Context *ctx = on_finish;
+    r = ret_val;
+    delete this;
+
+    ctx->complete(r);
+  }
+
+  virtual void finish(int r) override {
+  }
+};
+
+} // anonymous namespace
+
+Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
+                          const string& oid)
+  : m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
+    m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
+    m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
+    m_watch_state(WATCH_STATE_UNREGISTERED), m_ioctx(ioctx),
+    m_work_queue(work_queue), m_oid(oid), m_watch_ctx(*this)
+{
+}
+
+Watcher::~Watcher()
+{
+  RWLock::RLocker l(m_watch_lock);
+  assert(m_watch_state != WATCH_STATE_REGISTERED);
+}
+
+void Watcher::register_watch(Context *on_finish) {
+  ldout(m_cct, 10) << this << " registering watcher" << dendl;
+
+  RWLock::RLocker watch_locker(m_watch_lock);
+  assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+  librados::AioCompletion *aio_comp = create_rados_safe_callback(
+                                         new C_RegisterWatch(this, on_finish));
+  int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+void Watcher::handle_register_watch(int r) {
+  ldout(m_cct, 10) << this << " handle register r=" << r << dendl;
+  RWLock::WLocker watch_locker(m_watch_lock);
+  assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+  if (r < 0) {
+    lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r) << dendl;
+    m_watch_handle = 0;
+  } else if (r >= 0) {
+    m_watch_state = WATCH_STATE_REGISTERED;
+  }
+}
+
+void Watcher::unregister_watch(Context *on_finish) {
+  ldout(m_cct, 10) << this << " unregistering watcher" << dendl;
+
+  RWLock::WLocker watch_locker(m_watch_lock);
+  if (m_watch_state == WATCH_STATE_REWATCHING) {
+    ldout(m_cct, 10) << this << " delaying unregister until rewatch completed"
+                     << dendl;
+
+    assert(m_unregister_watch_ctx == nullptr);
+    m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
+        unregister_watch(on_finish);
+      });
+    return;
+  }
+
+  if (m_watch_state == WATCH_STATE_REGISTERED ||
+      m_watch_state == WATCH_STATE_ERROR) {
+    m_watch_state = WATCH_STATE_UNREGISTERED;
+
+    librados::AioCompletion *aio_comp = create_rados_safe_callback(
+                      new C_UnwatchAndFlush(m_ioctx, on_finish));
+    int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
+    assert(r == 0);
+    aio_comp->release();
+  }
+}
+
+void Watcher::flush(Context *on_finish) {
+  m_notifier.flush(on_finish);
+}
+
+void Watcher::set_oid(const string& oid) {
+  RWLock::WLocker l(m_watch_lock);
+  assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+
+  m_oid = oid;
+}
+
+void Watcher::handle_error(uint64_t handle, int err) {
+  lderr(m_cct) << this << " watch failed: " << handle << ", "
+               << cpp_strerror(err) << dendl;
+
+  RWLock::WLocker l(m_watch_lock);
+  if (m_watch_state == WATCH_STATE_REGISTERED) {
+    m_watch_state = WATCH_STATE_ERROR;
+
+    FunctionContext *ctx = new FunctionContext(
+        boost::bind(&Watcher::rewatch, this));
+    m_work_queue->queue(ctx);
+  }
+}
+
+void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
+                                bufferlist &out) {
+  m_ioctx.notify_ack(m_oid, notify_id, handle, out);
+}
+
+void Watcher::rewatch() {
+  ldout(m_cct, 10) << this << " re-registering watch" << dendl;
+
+  RWLock::WLocker l(m_watch_lock);
+  if (m_watch_state != WATCH_STATE_ERROR) {
+    return;
+  }
+  m_watch_state = WATCH_STATE_REWATCHING;
+
+  Context *ctx = create_context_callback<Watcher,
+                                         &Watcher::handle_rewatch>(this);
+  RewatchRequest *req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
+                                               &m_watch_ctx,
+                                               &m_watch_handle, ctx);
+  req->send();
+}
+
+void Watcher::handle_rewatch(int r) {
+  ldout(m_cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
+
+  WatchState next_watch_state = WATCH_STATE_REGISTERED;
+  if (r < 0) {
+    // only EBLACKLISTED or ENOENT can be returned
+    assert(r == -EBLACKLISTED || r == -ENOENT);
+    next_watch_state = WATCH_STATE_UNREGISTERED;
+  }
+
+  Context *unregister_watch_ctx = nullptr;
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_REWATCHING);
+    m_watch_state = next_watch_state;
+
+    std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+
+    handle_rewatch_complete(r);
+    m_work_queue->queue(
+        create_context_callback<Watcher,
+                                &Watcher::handle_rewatch_complete>(this));
+  }
+
+  // wake up pending unregister request
+  if (unregister_watch_ctx != nullptr) {
+    unregister_watch_ctx->complete(0);
+  }
+}
+
+void Watcher::send_notify(bufferlist& payload, bufferlist *out_bl,
+                          Context *on_finish) {
+  m_notifier.notify(payload, out_bl, on_finish);
+}
+
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
+                                               uint64_t handle,
+                                               uint64_t notifier_id,
+                                               bufferlist& bl) {
+  watcher.handle_notify(notify_id, handle, bl);
+}
+
+void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
+  watcher.handle_error(handle, err);
+}
+
+} // namespace librbd
diff --git a/src/librbd/Watcher.h b/src/librbd/Watcher.h
new file mode 100644 (file)
index 0000000..21a59c8
--- /dev/null
@@ -0,0 +1,143 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_H
+#define CEPH_LIBRBD_WATCHER_H
+
+#include "common/Mutex.h"
+#include "common/RWLock.h"
+#include "include/rados/librados.hpp"
+#include "librbd/watcher/Notifier.h"
+#include "librbd/watcher/Types.h"
+#include <string>
+#include <utility>
+
+class ContextWQ;
+
+namespace librbd {
+
+class Watcher {
+  friend struct watcher::C_NotifyAck;
+
+public:
+  Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
+          const std::string& oid);
+  virtual ~Watcher();
+
+  void register_watch(Context *on_finish);
+  void unregister_watch(Context *on_finish);
+  void flush(Context *on_finish);
+
+  void set_oid(const string& oid);
+
+  uint64_t get_watch_handle() const {
+    RWLock::RLocker watch_locker(m_watch_lock);
+    return m_watch_handle;
+  }
+
+  bool is_registered() const {
+    RWLock::RLocker locker(m_watch_lock);
+    return m_watch_state == WATCH_STATE_REGISTERED;
+  }
+
+protected:
+  enum WatchState {
+    WATCH_STATE_UNREGISTERED,
+    WATCH_STATE_REGISTERED,
+    WATCH_STATE_ERROR,
+    WATCH_STATE_REWATCHING
+  };
+
+  CephContext *m_cct;
+  mutable RWLock m_watch_lock;
+  uint64_t m_watch_handle;
+  watcher::Notifier m_notifier;
+  WatchState m_watch_state;
+
+  void send_notify(bufferlist &payload, bufferlist *out_bl = nullptr,
+                   Context *on_finish = nullptr);
+
+  virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+                             bufferlist &bl) = 0;
+
+  virtual void handle_error(uint64_t cookie, int err);
+
+  void acknowledge_notify(uint64_t notify_id, uint64_t handle,
+                          bufferlist &out);
+
+  virtual void handle_rewatch_complete(int r) { }
+
+private:
+  /**
+   * @verbatim
+   *
+   * <start>
+   *    |
+   *    v
+   * UNREGISTERED
+   *    |
+   *    | (register_watch)
+   *    |
+   *    v      (watch error)
+   * REGISTERED * * * * * * * > ERROR
+   *    |   ^                     |
+   *    |   |                     | (rewatch)
+   *    |   |                     v
+   *    |   |                   REWATCHING
+   *    |   |                     |
+   *    |   |                     |
+   *    |   \---------------------/
+   *    |
+   *    | (unregister_watch)
+   *    |
+   *    v
+   * UNREGISTERED
+   *    |
+   *    v
+   * <finish>
+   *
+   * @endverbatim
+   */
+
+  struct WatchCtx : public librados::WatchCtx2 {
+    Watcher &watcher;
+
+    WatchCtx(Watcher &parent) : watcher(parent) {}
+
+    virtual void handle_notify(uint64_t notify_id,
+                               uint64_t handle,
+                              uint64_t notifier_id,
+                               bufferlist& bl);
+    virtual void handle_error(uint64_t handle, int err);
+  };
+
+  struct C_RegisterWatch : public Context {
+    Watcher *watcher;
+    Context *on_finish;
+
+    C_RegisterWatch(Watcher *watcher, Context *on_finish)
+       : watcher(watcher), on_finish(on_finish) {
+    }
+    virtual void finish(int r) override {
+      watcher->handle_register_watch(r);
+      on_finish->complete(r);
+    }
+  };
+
+  librados::IoCtx& m_ioctx;
+  ContextWQ *m_work_queue;
+  std::string m_oid;
+
+  WatchCtx m_watch_ctx;
+  Context *m_unregister_watch_ctx = nullptr;
+
+  void handle_register_watch(int r);
+
+  void rewatch();
+  void handle_rewatch(int r);
+
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_H
diff --git a/src/librbd/object_watcher/Notifier.cc b/src/librbd/object_watcher/Notifier.cc
deleted file mode 100644 (file)
index 019c786..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "librbd/object_watcher/Notifier.h"
-#include "common/WorkQueue.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/Utils.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::object_watcher::Notifier: "
-
-namespace librbd {
-namespace object_watcher {
-
-const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
-
-Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid)
-  : m_work_queue(work_queue), m_oid(oid),
-    m_aio_notify_lock(util::unique_lock_name(
-      "librbd::object_watcher::Notifier::m_aio_notify_lock", this)) {
-  m_ioctx.dup(ioctx);
-  m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
-}
-
-Notifier::~Notifier() {
-  Mutex::Locker aio_notify_locker(m_aio_notify_lock);
-  assert(m_pending_aio_notifies == 0);
-}
-
-void Notifier::flush(Context *on_finish) {
-  Mutex::Locker aio_notify_locker(m_aio_notify_lock);
-  if (m_pending_aio_notifies == 0) {
-    m_work_queue->queue(on_finish, 0);
-    return;
-  }
-
-  m_aio_notify_flush_ctxs.push_back(on_finish);
-}
-
-void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
-  {
-    Mutex::Locker aio_notify_locker(m_aio_notify_lock);
-    ++m_pending_aio_notifies;
-
-    ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
-                     << dendl;
-  }
-
-  C_AioNotify *ctx = new C_AioNotify(this, on_finish);
-  librados::AioCompletion *comp = util::create_rados_ack_callback(ctx);
-  int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl);
-  assert(r == 0);
-  comp->release();
-}
-
-void Notifier::handle_notify(int r, Context *on_finish) {
-  if (on_finish != nullptr) {
-    m_work_queue->queue(on_finish, r);
-  }
-
-  Mutex::Locker aio_notify_locker(m_aio_notify_lock);
-  assert(m_pending_aio_notifies > 0);
-  --m_pending_aio_notifies;
-
-  ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
-                   << dendl;
-  if (m_pending_aio_notifies == 0) {
-    for (auto ctx : m_aio_notify_flush_ctxs) {
-      m_work_queue->queue(ctx, 0);
-    }
-    m_aio_notify_flush_ctxs.clear();
-  }
-}
-
-} // namespace object_watcher
-} // namespace librbd
diff --git a/src/librbd/object_watcher/Notifier.h b/src/librbd/object_watcher/Notifier.h
deleted file mode 100644 (file)
index bfe996e..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H
-#define CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H
-
-#include "include/int_types.h"
-#include "include/buffer_fwd.h"
-#include "include/Context.h"
-#include "include/rados/librados.hpp"
-#include "common/Mutex.h"
-#include "common/WorkQueue.h"
-#include <list>
-
-namespace librbd {
-
-namespace object_watcher {
-
-class Notifier {
-public:
-  static const uint64_t NOTIFY_TIMEOUT;
-
-  Notifier(ContextWQ *work_queue, librados::IoCtx &ioctx,
-           const std::string &oid);
-  ~Notifier();
-
-  void flush(Context *on_finish);
-  void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
-
-private:
-  typedef std::list<Context*> Contexts;
-
-  struct C_AioNotify : public Context {
-    Notifier *notifier;
-    Context *on_finish;
-
-    C_AioNotify(Notifier *notifier, Context *on_finish)
-      : notifier(notifier), on_finish(on_finish) {
-    }
-    virtual void finish(int r) override {
-      notifier->handle_notify(r, on_finish);
-    }
-  };
-
-  ContextWQ *m_work_queue;
-  librados::IoCtx m_ioctx;
-  CephContext *m_cct;
-  std::string m_oid;
-
-  Mutex m_aio_notify_lock;
-  size_t m_pending_aio_notifies = 0;
-  Contexts m_aio_notify_flush_ctxs;
-
-  void handle_notify(int r, Context *on_finish);
-
-};
-
-} // namespace object_watcher
-} // namespace librbd
-
-#endif // CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H
diff --git a/src/librbd/watcher/Notifier.cc b/src/librbd/watcher/Notifier.cc
new file mode 100644 (file)
index 0000000..318f077
--- /dev/null
@@ -0,0 +1,77 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/Notifier.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::object_watcher::Notifier: "
+
+namespace librbd {
+namespace watcher {
+
+const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
+
+Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid)
+  : m_work_queue(work_queue), m_oid(oid),
+    m_aio_notify_lock(util::unique_lock_name(
+      "librbd::object_watcher::Notifier::m_aio_notify_lock", this)) {
+  m_ioctx.dup(ioctx);
+  m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+}
+
+Notifier::~Notifier() {
+  Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+  assert(m_pending_aio_notifies == 0);
+}
+
+void Notifier::flush(Context *on_finish) {
+  Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+  if (m_pending_aio_notifies == 0) {
+    m_work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  m_aio_notify_flush_ctxs.push_back(on_finish);
+}
+
+void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
+  {
+    Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+    ++m_pending_aio_notifies;
+
+    ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
+                     << dendl;
+  }
+
+  C_AioNotify *ctx = new C_AioNotify(this, on_finish);
+  librados::AioCompletion *comp = util::create_rados_ack_callback(ctx);
+  int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl);
+  assert(r == 0);
+  comp->release();
+}
+
+void Notifier::handle_notify(int r, Context *on_finish) {
+  if (on_finish != nullptr) {
+    m_work_queue->queue(on_finish, r);
+  }
+
+  Mutex::Locker aio_notify_locker(m_aio_notify_lock);
+  assert(m_pending_aio_notifies > 0);
+  --m_pending_aio_notifies;
+
+  ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies
+                   << dendl;
+  if (m_pending_aio_notifies == 0) {
+    for (auto ctx : m_aio_notify_flush_ctxs) {
+      m_work_queue->queue(ctx, 0);
+    }
+    m_aio_notify_flush_ctxs.clear();
+  }
+}
+
+} // namespace watcher
+} // namespace librbd
diff --git a/src/librbd/watcher/Notifier.h b/src/librbd/watcher/Notifier.h
new file mode 100644 (file)
index 0000000..c45e9ac
--- /dev/null
@@ -0,0 +1,61 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_NOTIFIER_H
+#define CEPH_LIBRBD_WATCHER_NOTIFIER_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/WorkQueue.h"
+#include <list>
+
+namespace librbd {
+
+namespace watcher {
+
+class Notifier {
+public:
+  static const uint64_t NOTIFY_TIMEOUT;
+
+  Notifier(ContextWQ *work_queue, librados::IoCtx &ioctx,
+           const std::string &oid);
+  ~Notifier();
+
+  void flush(Context *on_finish);
+  void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
+
+private:
+  typedef std::list<Context*> Contexts;
+
+  struct C_AioNotify : public Context {
+    Notifier *notifier;
+    Context *on_finish;
+
+    C_AioNotify(Notifier *notifier, Context *on_finish)
+      : notifier(notifier), on_finish(on_finish) {
+    }
+    virtual void finish(int r) override {
+      notifier->handle_notify(r, on_finish);
+    }
+  };
+
+  ContextWQ *m_work_queue;
+  librados::IoCtx m_ioctx;
+  CephContext *m_cct;
+  std::string m_oid;
+
+  Mutex m_aio_notify_lock;
+  size_t m_pending_aio_notifies = 0;
+  Contexts m_aio_notify_flush_ctxs;
+
+  void handle_notify(int r, Context *on_finish);
+
+};
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_NOTIFIER_H
diff --git a/src/librbd/watcher/RewatchRequest.cc b/src/librbd/watcher/RewatchRequest.cc
new file mode 100644 (file)
index 0000000..6881069
--- /dev/null
@@ -0,0 +1,114 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/RewatchRequest.h"
+#include "common/RWLock.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::watcher::RewatchRequest: " \
+                           << this << ": " << __func__
+
+namespace librbd {
+
+using util::create_context_callback;
+using util::create_rados_safe_callback;
+
+namespace watcher {
+
+using std::string;
+
+RewatchRequest::RewatchRequest(librados::IoCtx& ioctx, const string& oid,
+                               RWLock &watch_lock,
+                               librados::WatchCtx2 *watch_ctx,
+                               uint64_t *watch_handle, Context *on_finish)
+  : m_ioctx(ioctx), m_oid(oid), m_watch_lock(watch_lock),
+    m_watch_ctx(watch_ctx), m_watch_handle(watch_handle),
+    m_on_finish(on_finish) {
+}
+
+void RewatchRequest::send() {
+  unwatch();
+}
+
+void RewatchRequest::unwatch() {
+  assert(m_watch_lock.is_wlocked());
+  assert(*m_watch_handle != 0);
+
+  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+  ldout(cct, 10) << dendl;
+
+  librados::AioCompletion *aio_comp = create_rados_safe_callback<
+                        RewatchRequest, &RewatchRequest::handle_unwatch>(this);
+  int r = m_ioctx.aio_unwatch(*m_watch_handle, aio_comp);
+  assert(r == 0);
+  aio_comp->release();
+
+  *m_watch_handle = 0;
+}
+
+void RewatchRequest::handle_unwatch(int r) {
+  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+  ldout(cct, 10) << "r=" << r << dendl;
+
+  if (r == -EBLACKLISTED) {
+    lderr(cct) << "client blacklisted" << dendl;
+    finish(r);
+    return;
+  } else if (r < 0) {
+    lderr(cct) << "failed to unwatch: " << cpp_strerror(r) << dendl;
+  }
+  rewatch();
+}
+
+void RewatchRequest::rewatch() {
+  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+  ldout(cct, 10) << dendl;
+
+  librados::AioCompletion *aio_comp = create_rados_safe_callback<
+                        RewatchRequest, &RewatchRequest::handle_rewatch>(this);
+  int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+void RewatchRequest::handle_rewatch(int r) {
+  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+  ldout(cct, 10) << "r=" << r << dendl;
+
+  if (r == -EBLACKLISTED) {
+    lderr(cct) << "client blacklisted" << dendl;
+    finish(r);
+    return;
+  } else if (r == -ENOENT) {
+    ldout(cct, 5) << "object deleted" << dendl;
+    finish(r);
+    return;
+  } else if (r < 0) {
+    lderr(cct) << "failed to watch object: " << cpp_strerror(r)
+               << dendl;
+    rewatch();
+    return;
+  }
+
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    *m_watch_handle = m_rewatch_handle;
+  }
+
+  finish(0);
+}
+
+void RewatchRequest::finish(int r) {
+  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+  ldout(cct, 10) << "r=" << r << dendl;
+
+  m_on_finish->complete(r);
+  delete this;
+}
+
+} // namespace watcher
+} // namespace librbd
+
diff --git a/src/librbd/watcher/RewatchRequest.h b/src/librbd/watcher/RewatchRequest.h
new file mode 100644 (file)
index 0000000..d4fc250
--- /dev/null
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
+#define CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+
+struct Context;
+struct RWLock;
+
+namespace librbd {
+
+namespace watcher {
+
+class RewatchRequest {
+public:
+
+  static RewatchRequest *create(librados::IoCtx& ioctx, const std::string& oid,
+                                RWLock &watch_lock,
+                                librados::WatchCtx2 *watch_ctx,
+                                uint64_t *watch_handle, Context *on_finish) {
+    return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle,
+                              on_finish);
+  }
+
+  RewatchRequest(librados::IoCtx& ioctx, const std::string& oid,
+                 RWLock &watch_lock, librados::WatchCtx2 *watch_ctx,
+                 uint64_t *watch_handle, Context *on_finish);
+
+  void send();
+
+private:
+  /**
+   * @verbatim
+   *
+   * <start>
+   *    |
+   *    v
+   * UNWATCH
+   *    |
+   *    |  . . . .
+   *    |  .     . (recoverable error)
+   *    v  v     .
+   * REWATCH . . .
+   *    |
+   *    v
+   * <finish>
+   *
+   * @endverbatim
+   */
+
+  librados::IoCtx& m_ioctx;
+  std::string m_oid;
+  RWLock &m_watch_lock;
+  librados::WatchCtx2 *m_watch_ctx;
+  uint64_t *m_watch_handle;
+  Context *m_on_finish;
+
+  uint64_t m_rewatch_handle = 0;
+
+  void unwatch();
+  void handle_unwatch(int r);
+
+  void rewatch();
+  void handle_rewatch(int r);
+
+  void finish(int r);
+};
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H
diff --git a/src/librbd/watcher/Types.cc b/src/librbd/watcher/Types.cc
new file mode 100644 (file)
index 0000000..f107c72
--- /dev/null
@@ -0,0 +1,31 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/watcher/Types.h"
+#include "librbd/Watcher.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Watcher: "
+
+namespace librbd {
+namespace watcher {
+
+C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
+                         uint64_t handle)
+  : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
+    handle(handle) {
+  ldout(cct, 10) << this << " C_NotifyAck start: id=" << notify_id << ", "
+                 << "handle=" << handle << dendl;
+}
+
+void C_NotifyAck::finish(int r) {
+  assert(r == 0);
+  ldout(cct, 10) << this << " C_NotifyAck finish: id=" << notify_id << ", "
+                 << "handle=" << handle << dendl;
+  watcher->acknowledge_notify(notify_id, handle, out);
+}
+
+} // namespace watcher
+} // namespace librbd
diff --git a/src/librbd/watcher/Types.h b/src/librbd/watcher/Types.h
new file mode 100644 (file)
index 0000000..d9823ca
--- /dev/null
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_WATCHER_TYPES_H
+#define CEPH_LIBRBD_WATCHER_TYPES_H
+
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include "include/Context.h"
+
+namespace ceph {
+class Formatter;
+}
+
+namespace librbd {
+
+class Watcher;
+
+namespace watcher {
+
+struct C_NotifyAck : public Context {
+  Watcher *watcher;
+  CephContext *cct;
+  uint64_t notify_id;
+  uint64_t handle;
+  bufferlist out;
+
+  C_NotifyAck(Watcher *watcher, uint64_t notify_id, uint64_t handle);
+  void finish(int r);
+};
+
+template <typename Watcher>
+struct HandlePayloadVisitor : public boost::static_visitor<void> {
+  Watcher *watcher;
+  uint64_t notify_id;
+  uint64_t handle;
+
+  HandlePayloadVisitor(Watcher *watcher_, uint64_t notify_id_,
+      uint64_t handle_)
+    : watcher(watcher_), notify_id(notify_id_), handle(handle_)
+  {
+  }
+
+  template <typename P>
+  inline void operator()(const P &payload) const {
+    C_NotifyAck *ctx = new C_NotifyAck(watcher, notify_id, handle);
+    if (watcher->handle_payload(payload, ctx)) {
+      ctx->complete(0);
+    }
+  }
+};
+
+class EncodePayloadVisitor : public boost::static_visitor<void> {
+public:
+  explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
+
+  template <typename P>
+  inline void operator()(const P &payload) const {
+    ::encode(static_cast<uint32_t>(P::NOTIFY_OP), m_bl);
+    payload.encode(m_bl);
+  }
+
+private:
+  bufferlist &m_bl;
+};
+
+class DecodePayloadVisitor : public boost::static_visitor<void> {
+public:
+  DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
+    : m_version(version), m_iter(iter) {}
+
+  template <typename P>
+  inline void operator()(P &payload) const {
+    payload.decode(m_version, m_iter);
+  }
+
+private:
+  __u8 m_version;
+  bufferlist::iterator &m_iter;
+};
+
+
+} // namespace watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_WATCHER_TYPES_H
index 9a1f3727bfe5d65c31cdf4d7e31b5eb613dc7d4e..6eb47ef1373cbcc14b898d0462530b72b696e501 100644 (file)
@@ -59,6 +59,7 @@ set(unittest_librbd_srcs
   operation/test_mock_SnapshotRemoveRequest.cc
   operation/test_mock_SnapshotRollbackRequest.cc
   operation/test_mock_SnapshotUnprotectRequest.cc
+  watcher/test_mock_RewatchRequest.cc
   )
 add_executable(unittest_librbd
   ${unittest_librbd_srcs}
diff --git a/src/test/librbd/watcher/test_mock_RewatchRequest.cc b/src/test/librbd/watcher/test_mock_RewatchRequest.cc
new file mode 100644 (file)
index 0000000..36356b7
--- /dev/null
@@ -0,0 +1,196 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "include/rados/librados.hpp"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "test/librbd/test_support.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "librados/AioCompletionImpl.h"
+#include "librbd/watcher/RewatchRequest.h"
+
+namespace librbd {
+namespace watcher {
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::WithArg;
+
+struct TestMockWatcherRewatchRequest : public TestMockFixture {
+  typedef RewatchRequest MockRewatchRequest;
+
+  TestMockWatcherRewatchRequest()
+    : m_watch_lock("watch_lock") {
+  }
+
+  void expect_aio_watch(MockImageCtx &mock_image_ctx, int r) {
+    librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+      mock_image_ctx.md_ctx));
+
+    EXPECT_CALL(mock_io_ctx, aio_watch(mock_image_ctx.header_oid, _, _, _))
+      .WillOnce(DoAll(WithArg<1>(Invoke([&mock_image_ctx, &mock_io_ctx, r](librados::AioCompletionImpl *c) {
+                                   c->get();
+                                   mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
+                                       mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
+                                     }), r);
+                                   })),
+                      Return(0)));
+  }
+
+  void expect_aio_unwatch(MockImageCtx &mock_image_ctx, int r) {
+    librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+      mock_image_ctx.md_ctx));
+
+    EXPECT_CALL(mock_io_ctx, aio_unwatch(m_watch_handle, _))
+      .WillOnce(DoAll(Invoke([&mock_image_ctx, &mock_io_ctx, r](uint64_t handle,
+                                                                librados::AioCompletionImpl *c) {
+                        c->get();
+                        mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
+                            mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
+                          }), r);
+                        }),
+                      Return(0)));
+  }
+
+  struct WatchCtx : public librados::WatchCtx2 {
+    virtual void handle_notify(uint64_t, uint64_t, uint64_t,
+                               ceph::bufferlist&) {
+      assert(false);
+    }
+    virtual void handle_error(uint64_t, int) {
+      assert(false);
+    }
+  };
+
+  RWLock m_watch_lock;
+  WatchCtx m_watch_ctx;
+  uint64_t m_watch_handle = 123;
+};
+
+TEST_F(TestMockWatcherRewatchRequest, Success) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+
+  InSequence seq;
+  expect_aio_unwatch(mock_image_ctx, 0);
+  expect_aio_watch(mock_image_ctx, 0);
+
+  C_SaferCond ctx;
+  MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+                                                       mock_image_ctx.header_oid,
+                                                       m_watch_lock,
+                                                       &m_watch_ctx,
+                                                       &m_watch_handle,
+                                                       &ctx);
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    req->send();
+  }
+  ASSERT_EQ(0, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, UnwatchError) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+
+  InSequence seq;
+  expect_aio_unwatch(mock_image_ctx, -EINVAL);
+  expect_aio_watch(mock_image_ctx, 0);
+
+  C_SaferCond ctx;
+  MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+                                                       mock_image_ctx.header_oid,
+                                                       m_watch_lock,
+                                                       &m_watch_ctx,
+                                                       &m_watch_handle,
+                                                       &ctx);
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    req->send();
+  }
+  ASSERT_EQ(0, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, WatchBlacklist) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+
+  InSequence seq;
+  expect_aio_unwatch(mock_image_ctx, 0);
+  expect_aio_watch(mock_image_ctx, -EBLACKLISTED);
+
+  C_SaferCond ctx;
+  MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+                                                       mock_image_ctx.header_oid,
+                                                       m_watch_lock,
+                                                       &m_watch_ctx,
+                                                       &m_watch_handle,
+                                                       &ctx);
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    req->send();
+  }
+  ASSERT_EQ(-EBLACKLISTED, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, WatchDNE) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+
+  InSequence seq;
+  expect_aio_unwatch(mock_image_ctx, 0);
+  expect_aio_watch(mock_image_ctx, -ENOENT);
+
+  C_SaferCond ctx;
+  MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+                                                       mock_image_ctx.header_oid,
+                                                       m_watch_lock,
+                                                       &m_watch_ctx,
+                                                       &m_watch_handle,
+                                                       &ctx);
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    req->send();
+  }
+  ASSERT_EQ(-ENOENT, ctx.wait());
+}
+
+TEST_F(TestMockWatcherRewatchRequest, WatchError) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+
+  InSequence seq;
+  expect_aio_unwatch(mock_image_ctx, 0);
+  expect_aio_watch(mock_image_ctx, -EINVAL);
+  expect_aio_watch(mock_image_ctx, 0);
+
+  C_SaferCond ctx;
+  MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx,
+                                                       mock_image_ctx.header_oid,
+                                                       m_watch_lock,
+                                                       &m_watch_ctx,
+                                                       &m_watch_handle,
+                                                       &ctx);
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    req->send();
+  }
+  ASSERT_EQ(0, ctx.wait());
+}
+
+} // namespace watcher
+} // namespace librbd