]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: generic object watcher and mirroring watcher
authorJason Dillaman <dillaman@redhat.com>
Tue, 29 Mar 2016 03:13:17 +0000 (23:13 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 30 Mar 2016 21:00:31 +0000 (17:00 -0400)
The ObjectWatcher can handle generic watch and recovery of
RADOS objects.  The MirroringWatcher will handle updates
to the RBD mirroring object.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Makefile.am
src/librbd/MirroringWatcher.cc [new file with mode: 0644]
src/librbd/MirroringWatcher.h [new file with mode: 0644]
src/librbd/ObjectWatcher.cc [new file with mode: 0644]
src/librbd/ObjectWatcher.h [new file with mode: 0644]
src/librbd/mirroring_watcher/Types.cc [new file with mode: 0644]
src/librbd/mirroring_watcher/Types.h [new file with mode: 0644]
src/test/Makefile-client.am
src/test/librados_test_stub/MockTestMemIoCtxImpl.h
src/test/librbd/test_mock_ObjectWatcher.cc [new file with mode: 0644]

index 5cdd9fbd159aed9f1d8389ee365792da59918a68..08c9738806ee21246fd2241625b4c9a675116c5d 100644 (file)
@@ -1,5 +1,6 @@
 librbd_types_la_SOURCES = \
        librbd/journal/Types.cc \
+       librbd/mirroring_watcher/Types.cc \
        librbd/WatchNotifyTypes.cc
 noinst_LTLIBRARIES += librbd_types.la
 
@@ -25,7 +26,9 @@ librbd_internal_la_SOURCES = \
        librbd/Journal.cc \
        librbd/LibrbdAdminSocketHook.cc \
        librbd/LibrbdWriteback.cc \
+       librbd/MirroringWatcher.cc \
        librbd/ObjectMap.cc \
+       librbd/ObjectWatcher.cc \
        librbd/Operations.cc \
        librbd/Utils.cc \
        librbd/exclusive_lock/AcquireRequest.cc \
@@ -104,7 +107,9 @@ noinst_HEADERS += \
        librbd/Journal.h \
        librbd/LibrbdAdminSocketHook.h \
        librbd/LibrbdWriteback.h \
+       librbd/MirroringWatcher.h \
        librbd/ObjectMap.h \
+       librbd/ObjectWatcher.h \
        librbd/Operations.h \
        librbd/parent_types.h \
        librbd/SnapInfo.h \
@@ -127,6 +132,7 @@ noinst_HEADERS += \
        librbd/journal/StandardPolicy.h \
        librbd/journal/Types.h \
        librbd/journal/TypeTraits.h \
+       librbd/mirroring_watcher/Types.h \
        librbd/object_map/InvalidateRequest.h \
        librbd/object_map/LockRequest.h \
        librbd/object_map/Request.h \
diff --git a/src/librbd/MirroringWatcher.cc b/src/librbd/MirroringWatcher.cc
new file mode 100644 (file)
index 0000000..3d8237e
--- /dev/null
@@ -0,0 +1,26 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/MirroringWatcher.h"
+#include "include/rbd_types.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::MirroringWatcher: "
+
+namespace librbd {
+
+template <typename I>
+MirroringWatcher<I>::MirroringWatcher(librados::IoCtx &io_ctx,
+                                      ContextWQT *work_queue)
+  : ObjectWatcher<I>(io_ctx, work_queue) {
+}
+
+template <typename I>
+std::string MirroringWatcher<I>::get_oid() const {
+  return RBD_MIRRORING;
+}
+
+} // namespace librbd
+
+template class librbd::MirroringWatcher<librbd::ImageCtx>;
diff --git a/src/librbd/MirroringWatcher.h b/src/librbd/MirroringWatcher.h
new file mode 100644 (file)
index 0000000..f979953
--- /dev/null
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIRRORING_WATCHER_H
+#define CEPH_LIBRBD_MIRRORING_WATCHER_H
+
+#include "include/int_types.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ObjectWatcher.h"
+#include "librbd/mirroring_watcher/Types.h"
+
+namespace librbd {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class MirroringWatcher : public ObjectWatcher<ImageCtxT> {
+public:
+  typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
+
+  MirroringWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
+
+protected:
+  virtual std::string get_oid() const;
+
+private:
+
+};
+
+} // namespace librbd
+
+extern template class librbd::MirroringWatcher<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIRRORING_WATCHER_H
diff --git a/src/librbd/ObjectWatcher.cc b/src/librbd/ObjectWatcher.cc
new file mode 100644 (file)
index 0000000..678cc3c
--- /dev/null
@@ -0,0 +1,337 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/ObjectWatcher.h"
+#include "include/Context.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::ObjectWatcher: " << get_oid() << ": " \
+                           << __func__
+
+namespace librbd {
+
+using util::create_context_callback;
+using util::create_rados_safe_callback;
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+  librados::Rados rados;
+  Context *on_finish;
+  bool flushing = false;
+  int ret_val = 0;
+
+  C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
+    : rados(io_ctx), on_finish(on_finish) {
+  }
+
+  virtual void complete(int r) override {
+    if (ret_val == 0 && r < 0) {
+      ret_val = r;
+    }
+
+    if (!flushing) {
+      flushing = true;
+
+      librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
+      r = rados.aio_watch_flush(aio_comp);
+      assert(r == 0);
+      aio_comp->release();
+    } else {
+      Context::complete(ret_val);
+    }
+  }
+
+  virtual void finish(int r) override {
+    on_finish->complete(r);
+  }
+};
+
+} // anonymous namespace
+
+template <typename I>
+ObjectWatcher<I>::ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue)
+  : m_io_ctx(io_ctx), m_cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
+    m_work_queue(work_queue),
+    m_watch_lock(util::unique_lock_name("librbd::ObjectWatcher::m_watch_lock", this)),
+    m_watch_ctx(this) {
+}
+
+template <typename I>
+ObjectWatcher<I>::~ObjectWatcher() {
+  RWLock::RLocker watch_locker(m_watch_lock);
+  assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+}
+
+template <typename I>
+void ObjectWatcher<I>::register_watch(Context *on_finish) {
+  ldout(m_cct, 5) << dendl;
+
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    assert(on_finish != nullptr);
+    assert(m_on_register_watch == nullptr);
+    assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+
+    m_watch_state = WATCH_STATE_REGISTERING;
+    m_on_register_watch = on_finish;
+  }
+
+  librados::AioCompletion *aio_comp = create_rados_safe_callback<
+    ObjectWatcher<I>, &ObjectWatcher<I>::handle_register_watch>(this);
+  int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
+                             &m_watch_ctx);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_register_watch(int r) {
+  ldout(m_cct, 20) << ": r=" << r << dendl;
+
+  Context *on_register_watch = nullptr;
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_REGISTERING);
+
+    std::swap(on_register_watch, m_on_register_watch);
+    if (r < 0) {
+      lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r)
+                   << dendl;
+
+      m_watch_state = WATCH_STATE_UNREGISTERED;
+      m_watch_handle = 0;
+    } else {
+      m_watch_state = WATCH_STATE_REGISTERED;
+    }
+  }
+  on_register_watch->complete(r);
+}
+
+template <typename I>
+void ObjectWatcher<I>::unregister_watch(Context *on_finish) {
+  ldout(m_cct, 5) << dendl;
+
+  RWLock::WLocker watch_locker(m_watch_lock);
+  assert(on_finish != nullptr);
+  assert(m_on_unregister_watch == nullptr);
+  assert(m_watch_state != WATCH_STATE_UNREGISTERED &&
+         m_watch_state != WATCH_STATE_REGISTERING);
+
+  m_on_unregister_watch = on_finish;
+  if (m_watch_state == WATCH_STATE_REGISTERED) {
+    unregister_watch_();
+  }
+}
+
+template <typename I>
+void ObjectWatcher<I>::unregister_watch_() {
+  assert(m_watch_lock.is_wlocked());
+  assert(m_on_unregister_watch != nullptr);
+  assert(m_watch_state == WATCH_STATE_REGISTERED);
+  m_watch_state = WATCH_STATE_UNREGISTERING;
+
+  Context *ctx = create_context_callback<
+    ObjectWatcher<I>, &ObjectWatcher<I>::handle_unregister_watch>(this);
+  librados::AioCompletion *aio_comp = create_rados_safe_callback(
+      new C_UnwatchAndFlush(m_io_ctx, ctx));
+  int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_unregister_watch(int r) {
+  ldout(m_cct, 20) << ": r=" << r << dendl;
+
+  Context *on_unregister_watch = nullptr;
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_UNREGISTERING);
+
+    if (r < 0) {
+      lderr(m_cct) << ": error encountered unregister watch: "
+                   << cpp_strerror(r) << dendl;
+    }
+
+    m_watch_state = WATCH_STATE_UNREGISTERED;
+    m_watch_handle = 0;
+    std::swap(on_unregister_watch, m_on_unregister_watch);
+  }
+
+  on_unregister_watch->complete(r);
+}
+
+template <typename I>
+void ObjectWatcher<I>::pre_unwatch(Context *on_finish) {
+  ldout(m_cct, 20) << dendl;
+
+  on_finish->complete(0);
+}
+
+template <typename I>
+void ObjectWatcher<I>::post_rewatch(Context *on_finish) {
+  ldout(m_cct, 20) << dendl;
+
+  on_finish->complete(0);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+                                     bufferlist &bl) {
+  ldout(m_cct, 15) << ": notify_id=" << notify_id << ", "
+                   << "handle=" << handle << dendl;
+}
+
+template <typename I>
+void ObjectWatcher<I>::acknowledge_notify(uint64_t notify_id, uint64_t handle,
+                                          bufferlist &out) {
+  ldout(m_cct, 15) << ": notify_id=" << notify_id << ", "
+                   << "handle=" << handle << dendl;
+  m_io_ctx.notify_ack(get_oid(), notify_id, handle, out);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_error(uint64_t handle, int err) {
+  lderr(m_cct) << ": handle=" << handle << ", " << "err=" << err << dendl;
+
+  RWLock::WLocker watch_locker(m_watch_lock);
+  if (m_watch_state != WATCH_STATE_REGISTERED) {
+    return;
+  }
+
+  m_watch_state = WATCH_STATE_REREGISTERING;
+  Context *pre_unwatch_ctx = new FunctionContext([this](int r) {
+      assert(r == 0);
+      Context *ctx = create_context_callback<
+        ObjectWatcher<I>, &ObjectWatcher<I>::handle_pre_unwatch>(this);
+      pre_unwatch(ctx);
+    });
+  m_work_queue->queue(pre_unwatch_ctx, 0);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_pre_unwatch(int r) {
+  ldout(m_cct, 20) << dendl;
+
+  assert(r == 0);
+  unwatch();
+}
+
+template <typename I>
+void ObjectWatcher<I>::unwatch() {
+  ldout(m_cct, 20) << dendl;
+
+  {
+    RWLock::RLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_REREGISTERING);
+  }
+
+  Context *ctx = create_context_callback<
+    ObjectWatcher<I>, &ObjectWatcher<I>::handle_unwatch>(this);
+  librados::AioCompletion *aio_comp = create_rados_safe_callback(
+    new C_UnwatchAndFlush(m_io_ctx, ctx));
+  int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_unwatch(int r) {
+  ldout(m_cct, 20) << ": r=" << r << dendl;
+
+  if (r < 0) {
+    lderr(m_cct) << ": error encountered during unwatch: " << cpp_strerror(r)
+                 << dendl;
+  }
+
+  // handling pending unregister (if any)
+  if (pending_unregister_watch(r)) {
+    return;
+  }
+
+  rewatch();
+}
+
+template <typename I>
+void ObjectWatcher<I>::rewatch() {
+  ldout(m_cct, 20) << dendl;
+
+  {
+    RWLock::RLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_REREGISTERING);
+  }
+
+  librados::AioCompletion *aio_comp = create_rados_safe_callback<
+    ObjectWatcher<I>, &ObjectWatcher<I>::handle_rewatch>(this);
+  int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
+                                       &m_watch_ctx);
+  assert(r == 0);
+  aio_comp->release();
+
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_rewatch(int r) {
+  ldout(m_cct, 20) << ": r=" << r << dendl;
+
+  if (r < 0) {
+    lderr(m_cct) << ": error encountered during re-watch: " << cpp_strerror(r)
+                 << dendl;
+    m_watch_handle = 0;
+
+    if (!pending_unregister_watch(0)) {
+      rewatch();
+    }
+    return;
+  }
+
+  Context *ctx = create_context_callback<
+    ObjectWatcher<I>, &ObjectWatcher<I>::handle_post_watch>(this);
+  post_rewatch(ctx);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_post_watch(int r) {
+  ldout(m_cct, 20) << dendl;
+
+  assert(r == 0);
+
+  RWLock::WLocker watch_locker(m_watch_lock);
+  m_watch_state = WATCH_STATE_REGISTERED;
+
+  // handling pending unregister (if any)
+  if (m_on_unregister_watch != nullptr) {
+    unregister_watch_();
+    return;
+  }
+}
+
+template <typename I>
+bool ObjectWatcher<I>::pending_unregister_watch(int r) {
+  Context *on_unregister_watch = nullptr;
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_REREGISTERING);
+
+    if (m_on_unregister_watch != nullptr) {
+      m_watch_state = WATCH_STATE_UNREGISTERED;
+      std::swap(on_unregister_watch, m_on_unregister_watch);
+    }
+  }
+
+  if (on_unregister_watch != nullptr) {
+    on_unregister_watch->complete(r);
+    return true;
+  }
+
+  return false;
+}
+
+} // namespace librbd
+
+template class librbd::ObjectWatcher<librbd::ImageCtx>;
diff --git a/src/librbd/ObjectWatcher.h b/src/librbd/ObjectWatcher.h
new file mode 100644 (file)
index 0000000..d84cd88
--- /dev/null
@@ -0,0 +1,140 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_OBJECT_WATCHER_H
+#define CEPH_LIBRBD_OBJECT_WATCHER_H
+
+#include "include/rados/librados.hpp"
+#include "common/RWLock.h"
+#include "librbd/ImageCtx.h"
+#include <string>
+#include <type_traits>
+
+class Context;
+
+namespace librbd {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ObjectWatcher {
+public:
+  typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
+
+  ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
+  virtual ~ObjectWatcher();
+
+  ObjectWatcher(const ObjectWatcher&) = delete;
+  ObjectWatcher& operator= (const ObjectWatcher&) = delete;
+
+  void register_watch(Context *on_finish);
+  virtual void unregister_watch(Context *on_finish);
+
+protected:
+  librados::IoCtx &m_io_ctx;
+  CephContext *m_cct;
+
+  virtual std::string get_oid() const = 0;
+
+  virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+                             bufferlist &bl);
+  void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
+
+  virtual void pre_unwatch(Context *on_finish);
+  virtual void post_rewatch(Context *on_finish);
+
+private:
+  /**
+   * @verbatim
+   *
+   * <start>
+   *    |
+   *    v
+   * REGISTER_WATCH
+   *    |
+   *    |   /-------------------------------------\
+   *    |   |                                     |
+   *    v   v   (watch error)                     |
+   * REGISTERED * * * * * * * > PRE_UNWATCH       |
+   *    |                         |               |
+   *    |                         v               |
+   *    |                       UNWATCH           |
+   *    |                         |               |
+   *    |                         v               |
+   *    |                       REWATCH           |
+   *    |                         |               |
+   *    |                         v               |
+   *    |                       POST_REWATCH      |
+   *    |                         |               |
+   *    v                         \---------------/
+   * UNREGISTER_WATCH
+   *    |
+   *    v
+   * UNREGISTERED
+   *    |
+   *    v
+   * <finish>
+   *
+   * @endverbatim
+   */
+
+  struct WatchCtx : public librados::WatchCtx2 {
+    ObjectWatcher *object_watcher;
+
+    WatchCtx(ObjectWatcher *object_watcher) : object_watcher(object_watcher) {
+    }
+
+    virtual void handle_notify(uint64_t notify_id,
+                               uint64_t handle,
+                               uint64_t notifier_id,
+                               bufferlist& bl) {
+      object_watcher->handle_notify(notify_id, handle, bl);
+    }
+
+    virtual void handle_error(uint64_t handle, int err) {
+      object_watcher->handle_error(handle, err);
+    }
+  };
+
+  enum WatchState {
+    WATCH_STATE_UNREGISTERED,
+    WATCH_STATE_REGISTERING,
+    WATCH_STATE_REGISTERED,
+    WATCH_STATE_UNREGISTERING,
+    WATCH_STATE_REREGISTERING
+  };
+
+  ContextWQT* m_work_queue;
+
+  mutable RWLock m_watch_lock;
+  WatchCtx m_watch_ctx;
+  uint64_t m_watch_handle = 0;
+  WatchState m_watch_state = WATCH_STATE_UNREGISTERED;
+
+  Context *m_on_register_watch = nullptr;
+  Context *m_on_unregister_watch = nullptr;
+
+  void handle_register_watch(int r);
+
+  void unregister_watch_();
+  void handle_unregister_watch(int r);
+
+  void handle_error(uint64_t handle, int err);
+
+  void handle_pre_unwatch(int r);
+
+  void unwatch();
+  void handle_unwatch(int r);
+
+  void rewatch();
+  void handle_rewatch(int r);
+
+  void handle_post_watch(int r);
+
+  bool pending_unregister_watch(int r);
+
+};
+
+} // namespace librbd
+
+extern template class librbd::ObjectWatcher<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_OBJECT_WATCHER_H
diff --git a/src/librbd/mirroring_watcher/Types.cc b/src/librbd/mirroring_watcher/Types.cc
new file mode 100644 (file)
index 0000000..55e29c9
--- /dev/null
@@ -0,0 +1,10 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/mirroring_watcher/Types.h"
+
+namespace librbd {
+namespace mirroring_watcher {
+
+} // namespace mirroring_watcher
+} // namespace librbd
diff --git a/src/librbd/mirroring_watcher/Types.h b/src/librbd/mirroring_watcher/Types.h
new file mode 100644 (file)
index 0000000..4e4dcf4
--- /dev/null
@@ -0,0 +1,15 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIRRORING_WATCHER_TYPES_H
+#define CEPH_LIBRBD_MIRRORING_WATCHER_TYPES_H
+
+#include "include/int_types.h"
+
+namespace librbd {
+namespace mirroring_watcher {
+
+} // namespace mirroring_watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIRRORING_WATCHER_TYPES_H
index c35c57f8d7490446b91577a0f89e128b5a20cc48..240648b5fabc10263fdba4a44a0126a758620371 100644 (file)
@@ -378,6 +378,7 @@ unittest_librbd_SOURCES = \
        test/librbd/test_mock_fixture.cc \
        test/librbd/test_mock_ExclusiveLock.cc \
        test/librbd/test_mock_Journal.cc \
+       test/librbd/test_mock_ObjectWatcher.cc \
        test/librbd/exclusive_lock/test_mock_AcquireRequest.cc \
        test/librbd/exclusive_lock/test_mock_ReleaseRequest.cc \
        test/librbd/image/test_mock_RefreshRequest.cc \
index 40ccc8f23a21c9ba08518921d4d763432cc5f7d9..b15b241db625cdd9fc6e3189a9001a5fc377d5cb 100644 (file)
@@ -34,6 +34,18 @@ public:
     return io_ctx_impl;
   }
 
+  MOCK_METHOD4(aio_watch, int(const std::string& o, AioCompletionImpl *c,
+                              uint64_t *handle, librados::WatchCtx2 *ctx));
+  int do_aio_watch(const std::string& o, AioCompletionImpl *c,
+                   uint64_t *handle, librados::WatchCtx2 *ctx) {
+    return TestMemIoCtxImpl::aio_watch(o, c, handle, ctx);
+  }
+
+  MOCK_METHOD2(aio_unwatch, int(uint64_t handle, AioCompletionImpl *c));
+  int do_aio_unwatch(uint64_t handle, AioCompletionImpl *c) {
+    return TestMemIoCtxImpl::aio_unwatch(handle, c);
+  }
+
   MOCK_METHOD7(exec, int(const std::string& oid,
                          TestClassHandler *handler,
                          const char *cls,
@@ -60,6 +72,13 @@ public:
     return TestMemIoCtxImpl::list_watchers(o, out_watchers);
   }
 
+  MOCK_METHOD4(notify, int(const std::string& o, bufferlist& bl,
+                           uint64_t timeout_ms, bufferlist *pbl));
+  int do_notify(const std::string& o, bufferlist& bl,
+                uint64_t timeout_ms, bufferlist *pbl) {
+    return TestMemIoCtxImpl::notify(o, bl, timeout_ms, pbl);
+  }
+
   MOCK_METHOD4(read, int(const std::string& oid,
                          size_t len,
                          uint64_t off,
@@ -116,9 +135,12 @@ public:
   void default_to_parent() {
     using namespace ::testing;
 
+    ON_CALL(*this, aio_watch(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_watch));
+    ON_CALL(*this, aio_unwatch(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_unwatch));
     ON_CALL(*this, exec(_, _, _, _, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_exec));
     ON_CALL(*this, list_snaps(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_list_snaps));
     ON_CALL(*this, list_watchers(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_list_watchers));
+    ON_CALL(*this, notify(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_notify));
     ON_CALL(*this, read(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_read));
     ON_CALL(*this, remove(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_remove));
     ON_CALL(*this, selfmanaged_snap_create(_)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_selfmanaged_snap_create));
diff --git a/src/test/librbd/test_mock_ObjectWatcher.cc b/src/test/librbd/test_mock_ObjectWatcher.cc
new file mode 100644 (file)
index 0000000..9520c95
--- /dev/null
@@ -0,0 +1,401 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "librados/AioCompletionImpl.h"
+#include "librbd/ObjectWatcher.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include <list>
+
+namespace librbd {
+
+namespace {
+
+struct MockObjectWatcher : public ObjectWatcher<MockImageCtx> {
+  std::string oid;
+
+  MockObjectWatcher(MockImageCtx &mock_image_ctx, const std::string &oid)
+    : ObjectWatcher<MockImageCtx>(mock_image_ctx.md_ctx,
+                                  mock_image_ctx.op_work_queue),
+      oid(oid) {
+  }
+
+  virtual std::string get_oid() const override {
+    return oid;
+  }
+};
+
+} // anonymous namespace
+
+} // namespace librbd
+
+// template definitions
+#include "librbd/ObjectWatcher.cc"
+template class librbd::ObjectWatcher<librbd::MockImageCtx>;
+
+namespace librbd {
+
+using ::testing::_;
+using ::testing::DoDefault;
+using ::testing::Invoke;
+using ::testing::InSequence;
+using ::testing::Return;
+using ::testing::SaveArg;
+using ::testing::WithArg;
+
+class TestMockObjectWatcher : public TestMockFixture {
+public:
+  TestMockObjectWatcher() : m_lock("TestMockObjectWatcher::m_lock") {
+  }
+
+  virtual void SetUp() {
+    TestMockFixture::SetUp();
+
+    m_oid = get_temp_image_name();
+
+    bufferlist bl;
+    ASSERT_EQ(0, m_ioctx.write_full(m_oid, bl));
+  }
+
+  void expect_aio_watch(MockImageCtx &mock_image_ctx, int r,
+                        const std::function<void()> &action = std::function<void()>()) {
+    librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+      mock_image_ctx.md_ctx));
+    librados::MockTestMemRadosClient *mock_rados_client(
+      mock_io_ctx.get_mock_rados_client());
+
+    auto &expect = EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
+    if (r < 0) {
+      expect.WillOnce(DoAll(WithArg<1>(Invoke([this, mock_rados_client, r, action](librados::AioCompletionImpl *c) {
+                                if (action) {
+                                  action();
+                                }
+
+                                c->get();
+                                mock_rados_client->finish_aio_completion(c, r);
+                                notify_watch();
+                              })),
+                            Return(0)));
+    } else {
+      expect.WillOnce(DoAll(SaveArg<3>(&m_watch_ctx),
+                            Invoke([this, &mock_io_ctx, action](const std::string& o,
+                                                                librados::AioCompletionImpl *c,
+                                                                uint64_t *handle,
+                                                                librados::WatchCtx2 *ctx) {
+                                if (action) {
+                                  action();
+                                }
+
+                                mock_io_ctx.do_aio_watch(o, c, handle, ctx);
+                                notify_watch();
+                              }),
+                            Return(0)));
+    }
+  }
+
+  void expect_aio_unwatch(MockImageCtx &mock_image_ctx, int r,
+                          const std::function<void()> &action = std::function<void()>()) {
+    librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+      mock_image_ctx.md_ctx));
+
+    auto &expect = EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
+    if (r < 0) {
+      expect.WillOnce(DoAll(Invoke([this, &mock_io_ctx, r, action](uint64_t handle,
+                                                                   librados::AioCompletionImpl *c) {
+                                if (action) {
+                                  action();
+                                }
+
+                                librados::AioCompletionImpl *dummy_c = new librados::AioCompletionImpl();
+                                mock_io_ctx.do_aio_unwatch(handle, dummy_c);
+                                ASSERT_EQ(0, dummy_c->wait_for_complete());
+                                dummy_c->release();
+
+                                c->get();
+                                mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
+                                notify_watch();
+                              }),
+                            Return(0)));
+    } else {
+      expect.WillOnce(DoAll(Invoke([this, &mock_io_ctx, action](uint64_t handle,
+                                                                librados::AioCompletionImpl *c) {
+                                if (action) {
+                                  action();
+                                }
+
+                                mock_io_ctx.do_aio_unwatch(handle, c);
+                                notify_watch();
+                              }),
+                            Return(0)));
+    }
+  }
+
+  std::string m_oid;
+  librados::WatchCtx2 *m_watch_ctx = nullptr;
+
+  void notify_watch() {
+    Mutex::Locker locker(m_lock);
+    ++m_watch_count;
+    m_cond.Signal();
+  }
+
+  bool wait_for_watch(MockImageCtx &mock_image_ctx, size_t count) {
+    Mutex::Locker locker(m_lock);
+    while (m_watch_count < count) {
+      if (m_cond.WaitInterval(mock_image_ctx.cct, m_lock,
+                              utime_t(10, 0)) != 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  Mutex m_lock;
+  Cond m_cond;
+  size_t m_watch_count = 0;
+};
+
+TEST_F(TestMockObjectWatcher, Success) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  C_SaferCond unregister_ctx;
+  mock_image_watcher.unregister_watch(&unregister_ctx);
+  ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, RegisterError) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, -EINVAL);
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(-EINVAL, register_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, UnregisterError) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, -EINVAL);
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  C_SaferCond unregister_ctx;
+  mock_image_watcher.unregister_watch(&unregister_ctx);
+  ASSERT_EQ(-EINVAL, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, Reregister) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  expect_op_work_queue(mock_image_ctx);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  assert(m_watch_ctx != nullptr);
+  m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+  // wait for recovery unwatch/watch
+  ASSERT_TRUE(wait_for_watch(mock_image_ctx, 3));
+
+  C_SaferCond unregister_ctx;
+  mock_image_watcher.unregister_watch(&unregister_ctx);
+  ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterUnwatchError) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  expect_op_work_queue(mock_image_ctx);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, -EINVAL);
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  assert(m_watch_ctx != nullptr);
+  m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+  // wait for recovery unwatch/watch
+  ASSERT_TRUE(wait_for_watch(mock_image_ctx, 3));
+
+  C_SaferCond unregister_ctx;
+  mock_image_watcher.unregister_watch(&unregister_ctx);
+  ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterWatchError) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  expect_op_work_queue(mock_image_ctx);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+  expect_aio_watch(mock_image_ctx, -ESHUTDOWN);
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  assert(m_watch_ctx != nullptr);
+  m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+  // wait for recovery unwatch/watch
+  ASSERT_TRUE(wait_for_watch(mock_image_ctx, 4));
+
+  C_SaferCond unregister_ctx;
+  mock_image_watcher.unregister_watch(&unregister_ctx);
+  ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterUnwatchPendingUnregister) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  expect_op_work_queue(mock_image_ctx);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+
+  // inject an unregister
+  C_SaferCond unregister_ctx;
+  expect_aio_unwatch(mock_image_ctx, 0, [&mock_image_watcher, &unregister_ctx]() {
+      mock_image_watcher.unregister_watch(&unregister_ctx);
+    });
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  assert(m_watch_ctx != nullptr);
+  m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+  ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterWatchPendingUnregister) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  expect_op_work_queue(mock_image_ctx);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+
+  // inject an unregister
+  C_SaferCond unregister_ctx;
+  expect_aio_watch(mock_image_ctx, -ESHUTDOWN,
+                   [&mock_image_watcher, &unregister_ctx]() {
+      mock_image_watcher.unregister_watch(&unregister_ctx);
+    });
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  assert(m_watch_ctx != nullptr);
+  m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+  ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterPendingUnregister) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+  expect_op_work_queue(mock_image_ctx);
+
+  InSequence seq;
+  expect_aio_watch(mock_image_ctx, 0);
+  expect_aio_unwatch(mock_image_ctx, 0);
+
+  // inject an unregister
+  C_SaferCond unregister_ctx;
+  expect_aio_watch(mock_image_ctx, 0,
+                   [&mock_image_watcher, &unregister_ctx]() {
+      mock_image_watcher.unregister_watch(&unregister_ctx);
+    });
+
+  expect_aio_unwatch(mock_image_ctx, 0);
+
+  C_SaferCond register_ctx;
+  mock_image_watcher.register_watch(&register_ctx);
+  ASSERT_EQ(0, register_ctx.wait());
+
+  assert(m_watch_ctx != nullptr);
+  m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+  ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+} // namespace librbd