]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd: Removed ObjectWatcher class and substituted with Watcher class
authorRicardo Dias <rdias@suse.com>
Tue, 15 Nov 2016 11:11:28 +0000 (11:11 +0000)
committerJason Dillaman <dillaman@redhat.com>
Tue, 3 Jan 2017 14:20:08 +0000 (09:20 -0500)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/librbd/CMakeLists.txt
src/librbd/MirroringWatcher.cc
src/librbd/MirroringWatcher.h
src/librbd/ObjectWatcher.cc [deleted file]
src/librbd/ObjectWatcher.h [deleted file]
src/librbd/mirroring_watcher/Types.cc
src/librbd/watcher/Types.cc
src/test/librbd/CMakeLists.txt
src/test/librbd/test_MirroringWatcher.cc
src/tools/rbd_mirror/Replayer.cc

index 554544e36349a84eb765877733dee3025742a65a..431a2af41f37937c8a38597c88121fc8068b9b28 100644 (file)
@@ -24,7 +24,6 @@ set(librbd_internal_srcs
   LibrbdWriteback.cc
   MirroringWatcher.cc
   ObjectMap.cc
-  ObjectWatcher.cc
   Operations.cc
   Utils.cc
   cache/ImageWriteback.cc
index ffed247cfc8e4e4a1c6e833252d5f32bc0a36c34..d8b829947bda086f87c7cb3c08a262f7df60beaf 100644 (file)
@@ -14,6 +14,7 @@
 namespace librbd {
 
 using namespace mirroring_watcher;
+using namespace watcher;
 
 namespace {
 
@@ -23,41 +24,43 @@ static const uint64_t NOTIFY_TIMEOUT_MS = 5000;
 
 template <typename I>
 MirroringWatcher<I>::MirroringWatcher(librados::IoCtx &io_ctx,
-                                      ContextWQT *work_queue)
-  : ObjectWatcher<I>(io_ctx, work_queue) {
+                                      ContextWQ *work_queue)
+  : Watcher(io_ctx, work_queue, RBD_MIRRORING) {
 }
 
 template <typename I>
-std::string MirroringWatcher<I>::get_oid() const {
-  return RBD_MIRRORING;
+int MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
+                                              cls::rbd::MirrorMode mirror_mode) {
+  C_SaferCond ctx;
+  notify_mode_updated(io_ctx, mirror_mode, &ctx);
+  return ctx.wait();
 }
 
 template <typename I>
-int MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
-                                             cls::rbd::MirrorMode mirror_mode) {
+void MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
+                                              cls::rbd::MirrorMode mirror_mode,
+                                              Context *on_finish) {
   CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
   ldout(cct, 20) << dendl;
 
   bufferlist bl;
   ::encode(NotifyMessage{ModeUpdatedPayload{mirror_mode}}, bl);
 
-  int r = io_ctx.notify2(RBD_MIRRORING, bl, NOTIFY_TIMEOUT_MS, nullptr);
-  if (r < 0) {
-    lderr(cct) << ": error encountered sending mode updated notification: "
-               << cpp_strerror(r) << dendl;
-    return r;
-  }
-  return 0;
+  librados::AioCompletion *comp = util::create_rados_ack_callback(on_finish);
+  int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
+                            nullptr);
+  assert(r == 0);
+  comp->release();
 }
 
 template <typename I>
 int MirroringWatcher<I>::notify_image_updated(
     librados::IoCtx &io_ctx, cls::rbd::MirrorImageState mirror_image_state,
     const std::string &image_id, const std::string &global_image_id) {
-  C_SaferCond cond;
+  C_SaferCond ctx;
   notify_image_updated(io_ctx, mirror_image_state, image_id, global_image_id,
-                       &cond);
-  return cond.wait();
+                       &ctx);
+  return ctx.wait();
 }
 
 template <typename I>
@@ -65,18 +68,20 @@ void MirroringWatcher<I>::notify_image_updated(
     librados::IoCtx &io_ctx, cls::rbd::MirrorImageState mirror_image_state,
     const std::string &image_id, const std::string &global_image_id,
     Context *on_finish) {
+
   CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
   ldout(cct, 20) << dendl;
 
   bufferlist bl;
-  ::encode(NotifyMessage{ImageUpdatedPayload{mirror_image_state, image_id,
-                                             global_image_id}},
-           bl);
+  ::encode(NotifyMessage{ImageUpdatedPayload{
+      mirror_image_state, image_id, global_image_id}}, bl);
+
   librados::AioCompletion *comp = util::create_rados_ack_callback(on_finish);
   int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
                             nullptr);
   assert(r == 0);
   comp->release();
+
 }
 
 template <typename I>
@@ -86,8 +91,6 @@ void MirroringWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
   ldout(cct, 15) << ": notify_id=" << notify_id << ", "
                  << "handle=" << handle << dendl;
 
-  Context *ctx = new typename ObjectWatcher<I>::C_NotifyAck(this, notify_id,
-                                                            handle);
 
   NotifyMessage notify_message;
   try {
@@ -96,34 +99,39 @@ void MirroringWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
   } catch (const buffer::error &err) {
     lderr(cct) << ": error decoding image notification: " << err.what()
                << dendl;
+    Context *ctx = new C_NotifyAck(this, notify_id, handle);
     ctx->complete(0);
     return;
   }
 
-  apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
+  apply_visitor(HandlePayloadVisitor<MirroringWatcher<I>>(this, notify_id,
+                                                          handle),
+                notify_message.payload);
 }
 
 template <typename I>
-void MirroringWatcher<I>::handle_payload(const ModeUpdatedPayload &payload,
+bool MirroringWatcher<I>::handle_payload(const ModeUpdatedPayload &payload,
                                          Context *on_notify_ack) {
   CephContext *cct = this->m_cct;
   ldout(cct, 20) << ": mode updated: " << payload.mirror_mode << dendl;
   handle_mode_updated(payload.mirror_mode, on_notify_ack);
+  return true;
 }
 
 template <typename I>
-void MirroringWatcher<I>::handle_payload(const ImageUpdatedPayload &payload,
+bool MirroringWatcher<I>::handle_payload(const ImageUpdatedPayload &payload,
                                          Context *on_notify_ack) {
   CephContext *cct = this->m_cct;
   ldout(cct, 20) << ": image state updated" << dendl;
   handle_image_updated(payload.mirror_image_state, payload.image_id,
                        payload.global_image_id, on_notify_ack);
+  return true;
 }
 
 template <typename I>
-void MirroringWatcher<I>::handle_payload(const UnknownPayload &payload,
+bool MirroringWatcher<I>::handle_payload(const UnknownPayload &payload,
                                          Context *on_notify_ack) {
-  on_notify_ack->complete(0);
+  return true;
 }
 
 } // namespace librbd
index 09155c42aae55b7c76c377f49df9394948570385..1f385c5d10f7a0a5d080e23017b6879821a142a2 100644 (file)
@@ -7,7 +7,7 @@
 #include "include/int_types.h"
 #include "cls/rbd/cls_rbd_types.h"
 #include "librbd/ImageCtx.h"
-#include "librbd/ObjectWatcher.h"
+#include "librbd/Watcher.h"
 #include "librbd/mirroring_watcher/Types.h"
 
 namespace librados {
@@ -16,15 +16,23 @@ namespace librados {
 
 namespace librbd {
 
+namespace watcher {
+template <typename> struct HandlePayloadVisitor;
+}
+
 template <typename ImageCtxT = librbd::ImageCtx>
-class MirroringWatcher : public ObjectWatcher<ImageCtxT> {
-public:
-  typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
+class MirroringWatcher : public Watcher {
+  friend struct watcher::HandlePayloadVisitor<MirroringWatcher<ImageCtxT>>;
 
-  MirroringWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
+public:
+  MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue);
 
   static int notify_mode_updated(librados::IoCtx &io_ctx,
                                  cls::rbd::MirrorMode mirror_mode);
+  static void notify_mode_updated(librados::IoCtx &io_ctx,
+                                  cls::rbd::MirrorMode mirror_mode,
+                                  Context *on_finish);
+
   static int notify_image_updated(librados::IoCtx &io_ctx,
                                   cls::rbd::MirrorImageState mirror_image_state,
                                   const std::string &image_id,
@@ -42,35 +50,16 @@ public:
                                     const std::string &global_image_id,
                                     Context *on_ack) = 0;
 
-protected:
-  virtual std::string get_oid() const;
-
-  virtual void handle_notify(uint64_t notify_id, uint64_t handle,
-                             bufferlist &bl);
-
 private:
-  struct HandlePayloadVisitor : public boost::static_visitor<void> {
-    MirroringWatcher *mirroring_watcher;
-    Context *on_notify_ack;
-
-    HandlePayloadVisitor(MirroringWatcher *mirroring_watcher,
-                         Context *on_notify_ack)
-      : mirroring_watcher(mirroring_watcher), on_notify_ack(on_notify_ack) {
-    }
-
-    template <typename Payload>
-    inline void operator()(const Payload &payload) const {
-      mirroring_watcher->handle_payload(payload, on_notify_ack);
-    }
-  };
-
-  void handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload,
+  bool handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload,
                       Context *on_notify_ack);
-  void handle_payload(const mirroring_watcher::ImageUpdatedPayload &payload,
+  bool handle_payload(const mirroring_watcher::ImageUpdatedPayload &payload,
                       Context *on_notify_ack);
-  void handle_payload(const mirroring_watcher::UnknownPayload &payload,
+  bool handle_payload(const mirroring_watcher::UnknownPayload &payload,
                       Context *on_notify_ack);
 
+  virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+                             bufferlist &bl);
 };
 
 } // namespace librbd
diff --git a/src/librbd/ObjectWatcher.cc b/src/librbd/ObjectWatcher.cc
deleted file mode 100644 (file)
index 8bc99f6..0000000
+++ /dev/null
@@ -1,348 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "librbd/ObjectWatcher.h"
-#include "include/Context.h"
-#include "common/errno.h"
-#include "common/WorkQueue.h"
-#include "librbd/Utils.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::ObjectWatcher: " << get_oid() << ": " \
-                           << __func__
-
-namespace librbd {
-
-using util::create_context_callback;
-using util::create_rados_safe_callback;
-
-namespace {
-
-struct C_UnwatchAndFlush : public Context {
-  librados::Rados rados;
-  Context *on_finish;
-  bool flushing = false;
-  int ret_val = 0;
-
-  C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
-    : rados(io_ctx), on_finish(on_finish) {
-  }
-
-  virtual void complete(int r) override {
-    if (ret_val == 0 && r < 0) {
-      ret_val = r;
-    }
-
-    if (!flushing) {
-      flushing = true;
-
-      librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
-      r = rados.aio_watch_flush(aio_comp);
-      assert(r == 0);
-      aio_comp->release();
-    } else {
-      Context::complete(ret_val);
-    }
-  }
-
-  virtual void finish(int r) override {
-    on_finish->complete(r);
-  }
-};
-
-} // anonymous namespace
-
-template <typename I>
-ObjectWatcher<I>::ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue)
-  : m_io_ctx(io_ctx), m_cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
-    m_work_queue(work_queue),
-    m_watch_lock(util::unique_lock_name("librbd::ObjectWatcher::m_watch_lock", this)),
-    m_watch_ctx(this) {
-}
-
-template <typename I>
-ObjectWatcher<I>::~ObjectWatcher() {
-  RWLock::RLocker watch_locker(m_watch_lock);
-  assert(m_watch_state == WATCH_STATE_UNREGISTERED);
-}
-
-template <typename I>
-void ObjectWatcher<I>::register_watch(Context *on_finish) {
-  ldout(m_cct, 5) << dendl;
-
-  {
-    RWLock::WLocker watch_locker(m_watch_lock);
-    assert(on_finish != nullptr);
-    assert(m_on_register_watch == nullptr);
-    assert(m_watch_state == WATCH_STATE_UNREGISTERED);
-
-    m_watch_state = WATCH_STATE_REGISTERING;
-    m_on_register_watch = on_finish;
-  }
-
-  librados::AioCompletion *aio_comp = create_rados_safe_callback<
-    ObjectWatcher<I>, &ObjectWatcher<I>::handle_register_watch>(this);
-  int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
-                             &m_watch_ctx);
-  assert(r == 0);
-  aio_comp->release();
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_register_watch(int r) {
-  ldout(m_cct, 20) << ": r=" << r << dendl;
-
-  Context *on_register_watch = nullptr;
-  {
-    RWLock::WLocker watch_locker(m_watch_lock);
-    assert(m_watch_state == WATCH_STATE_REGISTERING);
-
-    std::swap(on_register_watch, m_on_register_watch);
-    if (r < 0) {
-      lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r)
-                   << dendl;
-
-      m_watch_state = WATCH_STATE_UNREGISTERED;
-      m_watch_handle = 0;
-    } else {
-      m_watch_state = WATCH_STATE_REGISTERED;
-    }
-  }
-  on_register_watch->complete(r);
-}
-
-template <typename I>
-void ObjectWatcher<I>::unregister_watch(Context *on_finish) {
-  ldout(m_cct, 5) << dendl;
-
-  RWLock::WLocker watch_locker(m_watch_lock);
-  assert(on_finish != nullptr);
-  assert(m_on_unregister_watch == nullptr);
-  assert(m_watch_state != WATCH_STATE_UNREGISTERED &&
-         m_watch_state != WATCH_STATE_REGISTERING);
-
-  m_on_unregister_watch = on_finish;
-  if (m_watch_state == WATCH_STATE_REGISTERED) {
-    unregister_watch_();
-  }
-}
-
-template <typename I>
-void ObjectWatcher<I>::unregister_watch_() {
-  assert(m_watch_lock.is_wlocked());
-  assert(m_on_unregister_watch != nullptr);
-  assert(m_watch_state == WATCH_STATE_REGISTERED);
-  m_watch_state = WATCH_STATE_UNREGISTERING;
-
-  Context *ctx = create_context_callback<
-    ObjectWatcher<I>, &ObjectWatcher<I>::handle_unregister_watch>(this);
-  librados::AioCompletion *aio_comp = create_rados_safe_callback(
-      new C_UnwatchAndFlush(m_io_ctx, ctx));
-  int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
-  assert(r == 0);
-  aio_comp->release();
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_unregister_watch(int r) {
-  ldout(m_cct, 20) << ": r=" << r << dendl;
-
-  Context *on_unregister_watch = nullptr;
-  {
-    RWLock::WLocker watch_locker(m_watch_lock);
-    assert(m_watch_state == WATCH_STATE_UNREGISTERING);
-
-    if (r < 0) {
-      lderr(m_cct) << ": error encountered unregister watch: "
-                   << cpp_strerror(r) << dendl;
-    }
-
-    m_watch_state = WATCH_STATE_UNREGISTERED;
-    m_watch_handle = 0;
-    std::swap(on_unregister_watch, m_on_unregister_watch);
-  }
-
-  on_unregister_watch->complete(r);
-}
-
-template <typename I>
-void ObjectWatcher<I>::pre_unwatch(Context *on_finish) {
-  ldout(m_cct, 20) << dendl;
-
-  on_finish->complete(0);
-}
-
-template <typename I>
-void ObjectWatcher<I>::post_rewatch(Context *on_finish) {
-  ldout(m_cct, 20) << dendl;
-
-  on_finish->complete(0);
-}
-
-template <typename I>
-void ObjectWatcher<I>::acknowledge_notify(uint64_t notify_id, uint64_t handle,
-                                          bufferlist &out) {
-  ldout(m_cct, 15) << ": notify_id=" << notify_id << ", "
-                   << "handle=" << handle << dendl;
-  m_io_ctx.notify_ack(get_oid(), notify_id, handle, out);
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_error(uint64_t handle, int err) {
-  lderr(m_cct) << ": handle=" << handle << ", " << "err=" << err << dendl;
-
-  RWLock::WLocker watch_locker(m_watch_lock);
-  if (m_watch_state != WATCH_STATE_REGISTERED) {
-    return;
-  }
-
-  m_watch_state = WATCH_STATE_REREGISTERING;
-  Context *pre_unwatch_ctx = new FunctionContext([this](int r) {
-      assert(r == 0);
-      Context *ctx = create_context_callback<
-        ObjectWatcher<I>, &ObjectWatcher<I>::handle_pre_unwatch>(this);
-      pre_unwatch(ctx);
-    });
-  m_work_queue->queue(pre_unwatch_ctx, 0);
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_pre_unwatch(int r) {
-  ldout(m_cct, 20) << dendl;
-
-  assert(r == 0);
-  unwatch();
-}
-
-template <typename I>
-void ObjectWatcher<I>::unwatch() {
-  ldout(m_cct, 20) << dendl;
-
-  {
-    RWLock::RLocker watch_locker(m_watch_lock);
-    assert(m_watch_state == WATCH_STATE_REREGISTERING);
-  }
-
-  Context *ctx = create_context_callback<
-    ObjectWatcher<I>, &ObjectWatcher<I>::handle_unwatch>(this);
-  librados::AioCompletion *aio_comp = create_rados_safe_callback(
-    new C_UnwatchAndFlush(m_io_ctx, ctx));
-  int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
-  assert(r == 0);
-  aio_comp->release();
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_unwatch(int r) {
-  ldout(m_cct, 20) << ": r=" << r << dendl;
-
-  if (r < 0) {
-    lderr(m_cct) << ": error encountered during unwatch: " << cpp_strerror(r)
-                 << dendl;
-  }
-
-  // handling pending unregister (if any)
-  if (pending_unregister_watch(r)) {
-    return;
-  }
-
-  rewatch();
-}
-
-template <typename I>
-void ObjectWatcher<I>::rewatch() {
-  ldout(m_cct, 20) << dendl;
-
-  {
-    RWLock::RLocker watch_locker(m_watch_lock);
-    assert(m_watch_state == WATCH_STATE_REREGISTERING);
-  }
-
-  librados::AioCompletion *aio_comp = create_rados_safe_callback<
-    ObjectWatcher<I>, &ObjectWatcher<I>::handle_rewatch>(this);
-  int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
-                                       &m_watch_ctx);
-  assert(r == 0);
-  aio_comp->release();
-
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_rewatch(int r) {
-  ldout(m_cct, 20) << ": r=" << r << dendl;
-
-  if (r < 0) {
-    lderr(m_cct) << ": error encountered during re-watch: " << cpp_strerror(r)
-                 << dendl;
-    m_watch_handle = 0;
-
-    if (!pending_unregister_watch(0)) {
-      rewatch();
-    }
-    return;
-  }
-
-  Context *ctx = create_context_callback<
-    ObjectWatcher<I>, &ObjectWatcher<I>::handle_post_watch>(this);
-  post_rewatch(ctx);
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_post_watch(int r) {
-  ldout(m_cct, 20) << dendl;
-
-  assert(r == 0);
-
-  RWLock::WLocker watch_locker(m_watch_lock);
-  m_watch_state = WATCH_STATE_REGISTERED;
-
-  // handling pending unregister (if any)
-  if (m_on_unregister_watch != nullptr) {
-    unregister_watch_();
-    return;
-  }
-}
-
-template <typename I>
-bool ObjectWatcher<I>::pending_unregister_watch(int r) {
-  Context *on_unregister_watch = nullptr;
-  {
-    RWLock::WLocker watch_locker(m_watch_lock);
-    assert(m_watch_state == WATCH_STATE_REREGISTERING);
-
-    if (m_on_unregister_watch != nullptr) {
-      m_watch_state = WATCH_STATE_UNREGISTERED;
-      std::swap(on_unregister_watch, m_on_unregister_watch);
-    }
-  }
-
-  if (on_unregister_watch != nullptr) {
-    on_unregister_watch->complete(r);
-    return true;
-  }
-
-  return false;
-}
-
-template <typename I>
-ObjectWatcher<I>::C_NotifyAck::C_NotifyAck(ObjectWatcher *object_watcher,
-                                           uint64_t notify_id, uint64_t handle)
-  : object_watcher(object_watcher), notify_id(notify_id), handle(handle) {
-  CephContext *cct = object_watcher->m_cct;
-  ldout(cct, 10) << ": C_NotifyAck start: id=" << notify_id << ", "
-                 << "handle=" << handle << dendl;
-}
-
-template <typename I>
-void ObjectWatcher<I>::C_NotifyAck::finish(int r) {
-  assert(r == 0);
-  CephContext *cct = object_watcher->m_cct;
-  ldout(cct, 10) << ": C_NotifyAck finish: id=" << notify_id << ", "
-                 << "handle=" << handle << dendl;
-  object_watcher->acknowledge_notify(notify_id, handle, out);
-}
-
-} // namespace librbd
-
-template class librbd::ObjectWatcher<librbd::ImageCtx>;
diff --git a/src/librbd/ObjectWatcher.h b/src/librbd/ObjectWatcher.h
deleted file mode 100644 (file)
index 5ba5c80..0000000
+++ /dev/null
@@ -1,155 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_LIBRBD_OBJECT_WATCHER_H
-#define CEPH_LIBRBD_OBJECT_WATCHER_H
-
-#include "include/rados/librados.hpp"
-#include "common/RWLock.h"
-#include "librbd/ImageCtx.h"
-#include <string>
-#include <type_traits>
-
-class Context;
-
-namespace librbd {
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class ObjectWatcher {
-public:
-  typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
-
-  ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
-  virtual ~ObjectWatcher();
-
-  ObjectWatcher(const ObjectWatcher&) = delete;
-  ObjectWatcher& operator= (const ObjectWatcher&) = delete;
-
-  void register_watch(Context *on_finish);
-  virtual void unregister_watch(Context *on_finish);
-
-protected:
-  struct C_NotifyAck : public Context {
-    ObjectWatcher *object_watcher;
-    uint64_t notify_id;
-    uint64_t handle;
-    bufferlist out;
-
-    C_NotifyAck(ObjectWatcher *object_watcher, uint64_t notify_id,
-                uint64_t handle);
-    virtual void finish(int r);
-
-    std::string get_oid() const {
-      return object_watcher->get_oid();
-    }
-  };
-
-  librados::IoCtx &m_io_ctx;
-  CephContext *m_cct;
-
-  virtual std::string get_oid() const = 0;
-
-  virtual void handle_notify(uint64_t notify_id, uint64_t handle,
-                             bufferlist &bl) = 0;
-  void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
-
-  virtual void pre_unwatch(Context *on_finish);
-  virtual void post_rewatch(Context *on_finish);
-
-private:
-  /**
-   * @verbatim
-   *
-   * <start>
-   *    |
-   *    v
-   * REGISTER_WATCH
-   *    |
-   *    |   /-------------------------------------\
-   *    |   |                                     |
-   *    v   v   (watch error)                     |
-   * REGISTERED * * * * * * * > PRE_UNWATCH       |
-   *    |                         |               |
-   *    |                         v               |
-   *    |                       UNWATCH           |
-   *    |                         |               |
-   *    |                         v               |
-   *    |                       REWATCH           |
-   *    |                         |               |
-   *    |                         v               |
-   *    |                       POST_REWATCH      |
-   *    |                         |               |
-   *    v                         \---------------/
-   * UNREGISTER_WATCH
-   *    |
-   *    v
-   * UNREGISTERED
-   *    |
-   *    v
-   * <finish>
-   *
-   * @endverbatim
-   */
-
-  struct WatchCtx : public librados::WatchCtx2 {
-    ObjectWatcher *object_watcher;
-
-    WatchCtx(ObjectWatcher *object_watcher) : object_watcher(object_watcher) {
-    }
-
-    virtual void handle_notify(uint64_t notify_id,
-                               uint64_t handle,
-                               uint64_t notifier_id,
-                               bufferlist& bl) {
-      object_watcher->handle_notify(notify_id, handle, bl);
-    }
-
-    virtual void handle_error(uint64_t handle, int err) {
-      object_watcher->handle_error(handle, err);
-    }
-  };
-
-  enum WatchState {
-    WATCH_STATE_UNREGISTERED,
-    WATCH_STATE_REGISTERING,
-    WATCH_STATE_REGISTERED,
-    WATCH_STATE_UNREGISTERING,
-    WATCH_STATE_REREGISTERING
-  };
-
-  ContextWQT* m_work_queue;
-
-  mutable RWLock m_watch_lock;
-  WatchCtx m_watch_ctx;
-  uint64_t m_watch_handle = 0;
-  WatchState m_watch_state = WATCH_STATE_UNREGISTERED;
-
-  Context *m_on_register_watch = nullptr;
-  Context *m_on_unregister_watch = nullptr;
-
-  void handle_register_watch(int r);
-
-  void unregister_watch_();
-  void handle_unregister_watch(int r);
-
-  void handle_error(uint64_t handle, int err);
-
-  void handle_pre_unwatch(int r);
-
-  void unwatch();
-  void handle_unwatch(int r);
-
-  void rewatch();
-  void handle_rewatch(int r);
-
-  void handle_post_watch(int r);
-
-  bool pending_unregister_watch(int r);
-
-};
-
-} // namespace librbd
-
-extern template class librbd::ObjectWatcher<librbd::ImageCtx>;
-
-#endif // CEPH_LIBRBD_OBJECT_WATCHER_H
index f81b4ba693ac462015dfe19a96377bba8130dd07..ffbc866fa21b9d124e3990ccc2356b0f80d68dae 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "librbd/mirroring_watcher/Types.h"
+#include "librbd/watcher/Types.h"
 #include "include/assert.h"
 #include "include/stringify.h"
 #include "common/Formatter.h"
@@ -11,35 +12,6 @@ namespace mirroring_watcher {
 
 namespace {
 
-class EncodePayloadVisitor : public boost::static_visitor<void> {
-public:
-  explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
-
-  template <typename Payload>
-  inline void operator()(const Payload &payload) const {
-    ::encode(static_cast<uint32_t>(Payload::NOTIFY_OP), m_bl);
-    payload.encode(m_bl);
-  }
-
-private:
-  bufferlist &m_bl;
-};
-
-class DecodePayloadVisitor : public boost::static_visitor<void> {
-public:
-  DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
-    : m_version(version), m_iter(iter) {}
-
-  template <typename Payload>
-  inline void operator()(Payload &payload) const {
-    payload.decode(m_version, m_iter);
-  }
-
-private:
-  __u8 m_version;
-  bufferlist::iterator &m_iter;
-};
-
 class DumpPayloadVisitor : public boost::static_visitor<void> {
 public:
   explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {}
@@ -104,7 +76,7 @@ void UnknownPayload::dump(Formatter *f) const {
 
 void NotifyMessage::encode(bufferlist& bl) const {
   ENCODE_START(1, 1, bl);
-  boost::apply_visitor(EncodePayloadVisitor(bl), payload);
+  boost::apply_visitor(watcher::EncodePayloadVisitor(bl), payload);
   ENCODE_FINISH(bl);
 }
 
@@ -127,7 +99,7 @@ void NotifyMessage::decode(bufferlist::iterator& iter) {
     break;
   }
 
-  apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
+  apply_visitor(watcher::DecodePayloadVisitor(struct_v, iter), payload);
   DECODE_FINISH(iter);
 }
 
index f107c722fe18d67c6f1e5689d3a089f7f854191c..4f6855f16367db256e94242d0aa6fe0ea66d2a86 100644 (file)
@@ -5,6 +5,9 @@
 #include "librbd/Watcher.h"
 #include "common/dout.h"
 
+#include "librbd/ImageCtx.h"
+#include "librbd/MirroringWatcher.h"
+
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::Watcher: "
@@ -29,3 +32,6 @@ void C_NotifyAck::finish(int r) {
 
 } // namespace watcher
 } // namespace librbd
+
+template struct librbd::watcher::HandlePayloadVisitor<
+    librbd::MirroringWatcher<librbd::ImageCtx>>;
index 6eb47ef1373cbcc14b898d0462530b72b696e501..39a0ea031ab4e3be1b51688aea43c3096bd1099a 100644 (file)
@@ -31,7 +31,6 @@ set(unittest_librbd_srcs
   test_mock_ExclusiveLock.cc
   test_mock_Journal.cc
   test_mock_ObjectMap.cc
-  test_mock_ObjectWatcher.cc
   exclusive_lock/test_mock_AcquireRequest.cc
   exclusive_lock/test_mock_ReacquireRequest.cc
   exclusive_lock/test_mock_ReleaseRequest.cc
index 15084993a0b6a6a16d9eef932560d54d13f9c89a..f965019de664bdf8a418aa82a47cec0583aff7c4 100644 (file)
@@ -73,28 +73,24 @@ public:
 };
 
 TEST_F(TestMirroringWatcher, ModeUpdated) {
-  EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _))
-    .WillRepeatedly(WithArg<1>(Invoke([](Context *on_finish) {
-        on_finish->complete(0);
-      })));
-
-  ASSERT_EQ(0, MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED));
+  EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _));
 
+  C_SaferCond ctx;
+  MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED, &ctx);
+  ASSERT_EQ(0, ctx.wait());
 }
 
 TEST_F(TestMirroringWatcher, ImageStatusUpdated) {
   EXPECT_CALL(*m_image_watcher,
               handle_image_updated(cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
                                    StrEq("image id"), StrEq("global image id"),
-                                   _))
-    .WillRepeatedly(WithArg<3>(Invoke([](Context *on_finish) {
-        on_finish->complete(0);
-      })));
-
-  ASSERT_EQ(0, MockMirroringWatcher::notify_image_updated(m_ioctx,
-                                                          cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
-                                                          "image id",
-                                                          "global image id"));
+                                   _));
+
+  C_SaferCond ctx;
+  MockMirroringWatcher::notify_image_updated(m_ioctx,
+                                             cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
+                                             "image id", "global image id", &ctx);
+  ASSERT_EQ(0, ctx.wait());
 }
 
 } // namespace librbd
index 930bd13749ced0e200bc5c616e244ddb261c677b..346a9c0e277c9a1a5239e65bad91df8a008c189e 100644 (file)
@@ -13,7 +13,7 @@
 #include "include/stringify.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "global/global_context.h"
-#include "librbd/ObjectWatcher.h"
+#include "librbd/Watcher.h"
 #include "librbd/internal.h"
 #include "Replayer.h"
 #include "Threads.h"
@@ -209,10 +209,10 @@ public:
   }
 
 private:
-  class Watcher : public librbd::ObjectWatcher<> {
+  class Watcher : public librbd::Watcher {
   public:
     Watcher(librados::IoCtx &ioctx, ContextWQ *work_queue) :
-      ObjectWatcher<>(ioctx, work_queue) {
+      librbd::Watcher(ioctx, work_queue, RBD_MIRRORING) {
     }
 
     virtual std::string get_oid() const {
@@ -220,7 +220,7 @@ private:
     }
 
     virtual void handle_notify(uint64_t notify_id, uint64_t handle,
-                              bufferlist &bl) {
+                               bufferlist &bl) {
       bufferlist out;
       acknowledge_notify(notify_id, handle, out);
     }