]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror A/A: InstanceWatcher watch/notify stub for leader/follower RPC 13312/head
authorMykola Golub <mgolub@mirantis.com>
Wed, 8 Feb 2017 13:40:24 +0000 (14:40 +0100)
committerMykola Golub <mgolub@mirantis.com>
Thu, 23 Feb 2017 12:12:00 +0000 (13:12 +0100)
Fixes: http://tracker.ceph.com/issues/18783
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/include/rbd_types.h
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/test_InstanceWatcher.cc [new file with mode: 0644]
src/test/rbd_mirror/test_main.cc
src/test/rbd_mirror/test_mock_InstanceWatcher.cc [new file with mode: 0644]
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/InstanceWatcher.cc [new file with mode: 0644]
src/tools/rbd_mirror/InstanceWatcher.h [new file with mode: 0644]
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index a81da8bb6bc657dac41f1d72faca78d60e274f47..845a94e1ebd03d37fa6484b466b2789454ac9811 100644 (file)
  */
 #define RBD_MIRRORING       "rbd_mirroring"
 
-
 /**
- * rbd_mirror_leader object is used for pool-level coordination
- * between rbd-mirror daemons.
+ * rbd_mirror_leader and rbd_mirror_instance.<instance id> objects are used
+ * for pool-level coordination between rbd-mirror daemons.
  */
-#define RBD_MIRROR_LEADER      "rbd_mirror_leader"
+#define RBD_MIRROR_LEADER               "rbd_mirror_leader"
+#define RBD_MIRROR_INSTANCE_PREFIX      "rbd_mirror_instance."
 
 #define RBD_MAX_OBJ_NAME_SIZE  96
 #define RBD_MAX_BLOCK_NAME_SIZE 24
index b815780650a329d2d0bfd7b26544b61ec234016b..369508eaf32b44e5cac19c5d40f9d23bd446e669 100644 (file)
@@ -4,6 +4,7 @@ set(rbd_mirror_test_srcs
   test_ImageReplayer.cc
   test_ImageDeleter.cc
   test_ImageSync.cc
+  test_InstanceWatcher.cc
   test_LeaderWatcher.cc
   test_fixture.cc
   )
@@ -17,6 +18,7 @@ add_executable(unittest_rbd_mirror
   test_mock_ImageReplayer.cc
   test_mock_ImageSync.cc
   test_mock_ImageSyncThrottler.cc
+  test_mock_InstanceWatcher.cc
   test_mock_LeaderWatcher.cc
   image_replayer/test_mock_BootstrapRequest.cc
   image_replayer/test_mock_CreateImageRequest.cc
diff --git a/src/test/rbd_mirror/test_InstanceWatcher.cc b/src/test/rbd_mirror/test_InstanceWatcher.cc
new file mode 100644 (file)
index 0000000..cbf8cf0
--- /dev/null
@@ -0,0 +1,133 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/rados/librados.hpp"
+#include "include/stringify.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/Utils.h"
+#include "librbd/internal.h"
+#include "test/rbd_mirror/test_fixture.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+#include "test/librados/test.h"
+#include "gtest/gtest.h"
+
+using rbd::mirror::InstanceWatcher;
+
+void register_test_instance_watcher() {
+}
+
+class TestInstanceWatcher : public ::rbd::mirror::TestFixture {
+public:
+  std::string m_instance_id;
+  std::string m_oid;
+
+  void SetUp() override {
+    TestFixture::SetUp();
+    m_local_io_ctx.remove(RBD_MIRROR_LEADER);
+    EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
+
+    m_instance_id = stringify(m_local_io_ctx.get_instance_id());
+    m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
+  }
+
+  void get_instances(std::vector<std::string> *instance_ids) {
+    instance_ids->clear();
+    C_SaferCond on_get;
+    InstanceWatcher<>::get_instances(m_local_io_ctx, instance_ids, &on_get);
+    EXPECT_EQ(0, on_get.wait());
+  }
+};
+
+TEST_F(TestInstanceWatcher, InitShutdown)
+{
+  InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue);
+  std::vector<std::string> instance_ids;
+  get_instances(&instance_ids);
+  ASSERT_EQ(0U, instance_ids.size());
+
+  uint64_t size;
+  ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr));
+
+  // Init
+  ASSERT_EQ(0, instance_watcher.init());
+
+  get_instances(&instance_ids);
+  ASSERT_EQ(1U, instance_ids.size());
+  ASSERT_EQ(m_instance_id, instance_ids[0]);
+
+  ASSERT_EQ(0, m_local_io_ctx.stat(m_oid, &size, nullptr));
+  std::list<obj_watch_t> watchers;
+  ASSERT_EQ(0, m_local_io_ctx.list_watchers(m_oid, &watchers));
+  ASSERT_EQ(1U, watchers.size());
+  ASSERT_EQ(m_instance_id, stringify(watchers.begin()->watcher_id));
+
+  get_instances(&instance_ids);
+  ASSERT_EQ(1U, instance_ids.size());
+
+  // Shutdown
+  instance_watcher.shut_down();
+
+  ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr));
+  get_instances(&instance_ids);
+  ASSERT_EQ(0U, instance_ids.size());
+}
+
+TEST_F(TestInstanceWatcher, Remove)
+{
+  std::string instance_id = "instance_id";
+  std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id;
+
+  std::vector<std::string> instance_ids;
+  get_instances(&instance_ids);
+  ASSERT_EQ(0U, instance_ids.size());
+
+  uint64_t size;
+  ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(oid, &size, nullptr));
+
+  librados::Rados cluster;
+  librados::IoCtx io_ctx;
+  ASSERT_EQ("", connect_cluster_pp(cluster));
+  ASSERT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx));
+  InstanceWatcher<> instance_watcher(io_ctx, m_threads->work_queue,
+                                     instance_id);
+  // Init
+  ASSERT_EQ(0, instance_watcher.init());
+
+  get_instances(&instance_ids);
+  ASSERT_EQ(1U, instance_ids.size());
+  ASSERT_EQ(instance_id, instance_ids[0]);
+
+  ASSERT_EQ(0, m_local_io_ctx.stat(oid, &size, nullptr));
+  std::list<obj_watch_t> watchers;
+  ASSERT_EQ(0, m_local_io_ctx.list_watchers(oid, &watchers));
+  ASSERT_EQ(1U, watchers.size());
+
+  get_instances(&instance_ids);
+  ASSERT_EQ(1U, instance_ids.size());
+
+  // Remove
+  C_SaferCond on_remove;
+  InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue,
+                                     "instance_id", &on_remove);
+  ASSERT_EQ(0, on_remove.wait());
+
+  ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(oid, &size, nullptr));
+  get_instances(&instance_ids);
+  ASSERT_EQ(0U, instance_ids.size());
+
+  // Shutdown
+  instance_watcher.shut_down();
+
+  ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr));
+  get_instances(&instance_ids);
+  ASSERT_EQ(0U, instance_ids.size());
+
+  // Remove NOENT
+  C_SaferCond on_remove_noent;
+  InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue,
+                                     instance_id, &on_remove_noent);
+  ASSERT_EQ(0, on_remove_noent.wait());
+}
index d0d577e056e45ec226fb0633dcc80e8a5fd4cbe8..aae4b143787a15272246873f1fae3ab2d2bad391 100644 (file)
@@ -9,20 +9,22 @@
 #include <string>
 
 extern void register_test_cluster_watcher();
+extern void register_test_image_sync();
+extern void register_test_instance_watcher();
+extern void register_test_leader_watcher();
 extern void register_test_pool_watcher();
 extern void register_test_rbd_mirror();
 extern void register_test_rbd_mirror_image_deleter();
-extern void register_test_image_sync();
-extern void register_test_leader_watcher();
 
 int main(int argc, char **argv)
 {
   register_test_cluster_watcher();
+  register_test_image_sync();
+  register_test_instance_watcher();
+  register_test_leader_watcher();
   register_test_pool_watcher();
   register_test_rbd_mirror();
   register_test_rbd_mirror_image_deleter();
-  register_test_image_sync();
-  register_test_leader_watcher();
 
   ::testing::InitGoogleTest(&argc, argv);
 
diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc
new file mode 100644 (file)
index 0000000..f2f451a
--- /dev/null
@@ -0,0 +1,247 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/ManagedLock.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+  MockTestImageCtx(librbd::ImageCtx &image_ctx)
+    : librbd::MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+
+template <>
+struct ManagedLock<MockTestImageCtx> {
+  static ManagedLock* s_instance;
+
+  static ManagedLock *create(librados::IoCtx& ioctx, ContextWQ *work_queue,
+                             const std::string& oid, librbd::Watcher *watcher,
+                             managed_lock::Mode  mode,
+                             bool blacklist_on_break_lock,
+                             uint32_t blacklist_expire_seconds) {
+    assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  ManagedLock() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~ManagedLock() {
+    assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(destroy, void());
+  MOCK_METHOD1(shut_down, void(Context *));
+  MOCK_METHOD1(acquire_lock, void(Context *));
+  MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *));
+  MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *));
+};
+
+ManagedLock<MockTestImageCtx> *ManagedLock<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace librbd
+
+// template definitions
+#include "tools/rbd_mirror/InstanceWatcher.cc"
+template class rbd::mirror::InstanceWatcher<librbd::MockTestImageCtx>;
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::StrEq;
+using ::testing::WithArg;
+
+class TestMockInstanceWatcher : public TestMockFixture {
+public:
+  typedef librbd::ManagedLock<librbd::MockTestImageCtx> MockManagedLock;
+  typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
+
+  std::string m_instance_id;
+  std::string m_oid;
+
+  void SetUp() override {
+    TestFixture::SetUp();
+    m_local_io_ctx.remove(RBD_MIRROR_LEADER);
+    EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
+
+    m_instance_id = stringify(m_local_io_ctx.get_instance_id());
+    m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
+  }
+
+  void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
+    EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
+  }
+
+  void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
+    EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
+  }
+
+  void expect_register_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
+                                int r) {
+    EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
+                                  StrEq("mirror_instances_add"), _, _, _))
+      .WillOnce(Return(r));
+  }
+
+  void expect_unregister_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
+                                  int r) {
+    EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
+                                  StrEq("mirror_instances_remove"), _, _, _))
+      .WillOnce(Return(r));
+  }
+
+  void expect_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
+    EXPECT_CALL(mock_managed_lock, acquire_lock(_))
+      .WillOnce(CompleteContext(r));
+  }
+
+  void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
+    EXPECT_CALL(mock_managed_lock, shut_down(_)).WillOnce(CompleteContext(r));
+  }
+
+  void expect_destroy_lock(MockManagedLock &mock_managed_lock) {
+    EXPECT_CALL(mock_managed_lock, destroy());
+  }
+
+  void expect_get_locker(MockManagedLock &mock_managed_lock,
+                         const librbd::managed_lock::Locker &locker, int r) {
+    EXPECT_CALL(mock_managed_lock, get_locker(_, _))
+      .WillOnce(Invoke([r, locker](librbd::managed_lock::Locker *out,
+                                   Context *ctx) {
+                         if (r == 0) {
+                           *out = locker;
+                         }
+                         ctx->complete(r);
+                       }));
+  }
+
+  void expect_break_lock(MockManagedLock &mock_managed_lock,
+                         const librbd::managed_lock::Locker &locker, int r) {
+    EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _))
+      .WillOnce(WithArg<2>(CompleteContext(r)));
+  }
+};
+
+TEST_F(TestMockInstanceWatcher, InitShutdown) {
+  MockManagedLock mock_managed_lock;
+  librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+  auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
+                                                  m_threads->work_queue);
+  InSequence seq;
+
+  // Init
+  expect_register_instance(mock_io_ctx, 0);
+  expect_register_watch(mock_io_ctx);
+  expect_acquire_lock(mock_managed_lock, 0);
+  ASSERT_EQ(0, instance_watcher->init());
+
+  // Shutdown
+  expect_release_lock(mock_managed_lock, 0);
+  expect_unregister_watch(mock_io_ctx);
+  expect_unregister_instance(mock_io_ctx, 0);
+  instance_watcher->shut_down();
+
+  expect_destroy_lock(mock_managed_lock);
+  delete instance_watcher;
+}
+
+TEST_F(TestMockInstanceWatcher, InitError) {
+  MockManagedLock mock_managed_lock;
+  librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+  auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
+                                                  m_threads->work_queue);
+  InSequence seq;
+
+  expect_register_instance(mock_io_ctx, 0);
+  expect_register_watch(mock_io_ctx);
+  expect_acquire_lock(mock_managed_lock, -EINVAL);
+  expect_unregister_watch(mock_io_ctx);
+  expect_unregister_instance(mock_io_ctx, 0);
+
+  ASSERT_EQ(-EINVAL, instance_watcher->init());
+
+  expect_destroy_lock(mock_managed_lock);
+  delete instance_watcher;
+}
+
+TEST_F(TestMockInstanceWatcher, ShutdownError) {
+  MockManagedLock mock_managed_lock;
+  librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+  auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
+                                                  m_threads->work_queue);
+  InSequence seq;
+
+  // Init
+  expect_register_instance(mock_io_ctx, 0);
+  expect_register_watch(mock_io_ctx);
+  expect_acquire_lock(mock_managed_lock, 0);
+  ASSERT_EQ(0, instance_watcher->init());
+
+  // Shutdown
+  expect_release_lock(mock_managed_lock, -EINVAL);
+  expect_unregister_watch(mock_io_ctx);
+  expect_unregister_instance(mock_io_ctx, 0);
+  instance_watcher->shut_down();
+
+  expect_destroy_lock(mock_managed_lock);
+  delete instance_watcher;
+}
+
+
+TEST_F(TestMockInstanceWatcher, Remove) {
+  MockManagedLock mock_managed_lock;
+  librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+  librbd::managed_lock::Locker
+    locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
+
+  InSequence seq;
+
+  expect_get_locker(mock_managed_lock, locker, 0);
+  expect_break_lock(mock_managed_lock, locker, 0);
+  expect_unregister_instance(mock_io_ctx, 0);
+  expect_destroy_lock(mock_managed_lock);
+
+  C_SaferCond on_remove;
+  MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+                                       "instance_id", &on_remove);
+  ASSERT_EQ(0, on_remove.wait());
+}
+
+TEST_F(TestMockInstanceWatcher, RemoveNoent) {
+  MockManagedLock mock_managed_lock;
+  librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+  InSequence seq;
+
+  expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), -ENOENT);
+  expect_unregister_instance(mock_io_ctx, 0);
+  expect_destroy_lock(mock_managed_lock);
+
+  C_SaferCond on_remove;
+  MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+                                       "instance_id", &on_remove);
+  ASSERT_EQ(0, on_remove.wait());
+}
+
+} // namespace mirror
+} // namespace rbd
index 10d6c8918d5d5c934eb28aef60538abe575a3abf..9224199e5331c910fdac4b1d3a043aa419269abb 100644 (file)
@@ -3,10 +3,11 @@ add_library(rbd_mirror_types STATIC
 
 set(rbd_mirror_internal
   ClusterWatcher.cc
-  ImageReplayer.cc
   ImageDeleter.cc
+  ImageReplayer.cc
   ImageSync.cc
   ImageSyncThrottler.cc
+  InstanceWatcher.cc
   LeaderWatcher.cc
   Mirror.cc
   MirrorStatusWatcher.cc
diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc
new file mode 100644 (file)
index 0000000..c07100b
--- /dev/null
@@ -0,0 +1,501 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "InstanceWatcher.h"
+#include "include/stringify.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/ManagedLock.h"
+#include "librbd/Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
+                           << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+using librbd::util::create_rados_ack_callback;
+
+namespace {
+
+struct C_GetInstances : public Context {
+  std::vector<std::string> *instance_ids;
+  Context *on_finish;
+  bufferlist out_bl;
+
+  C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
+    : instance_ids(instance_ids), on_finish(on_finish) {
+  }
+
+  void finish(int r) override {
+    if (r == 0) {
+      bufferlist::iterator it = out_bl.begin();
+      r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
+    } else if (r == -ENOENT) {
+      r = 0;
+    }
+    on_finish->complete(r);
+  }
+};
+
+template <typename I>
+struct RemoveInstanceRequest : public Context {
+  InstanceWatcher<I> instance_watcher;
+  Context *on_finish;
+
+  RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+                        const std::string &instance_id, Context *on_finish)
+    : instance_watcher(io_ctx, work_queue, instance_id), on_finish(on_finish) {
+  }
+
+  void send() {
+    instance_watcher.remove(this);
+  }
+
+  void finish(int r) override {
+    assert(r == 0);
+
+    on_finish->complete(r);
+  }
+};
+
+} // anonymous namespace
+
+template <typename I>
+void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
+                                       std::vector<std::string> *instance_ids,
+                                       Context *on_finish) {
+  librados::ObjectReadOperation op;
+  librbd::cls_client::mirror_instances_list_start(&op);
+  C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
+  librados::AioCompletion *aio_comp = create_rados_ack_callback(ctx);
+
+  int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
+                                         ContextWQ *work_queue,
+                                         const std::string &instance_id,
+                                         Context *on_finish) {
+  auto req = new RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
+                                          on_finish);
+  req->send();
+}
+
+template <typename I>
+InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
+                                    ContextWQ *work_queue,
+                                    const boost::optional<std::string> &id)
+  : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX +
+            (id ? *id : stringify(io_ctx.get_instance_id()))),
+    m_instance_id(id ? *id : stringify(io_ctx.get_instance_id())),
+    m_lock("rbd::mirror::InstanceWatcher " + io_ctx.get_pool_name()),
+    m_instance_lock(librbd::ManagedLock<I>::create(
+      m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
+      m_cct->_conf->rbd_blacklist_expire_seconds)) {
+}
+
+template <typename I>
+InstanceWatcher<I>::~InstanceWatcher() {
+  m_instance_lock->destroy();
+}
+
+template <typename I>
+int InstanceWatcher<I>::init() {
+  C_SaferCond init_ctx;
+  init(&init_ctx);
+  return init_ctx.wait();
+}
+
+template <typename I>
+void InstanceWatcher<I>::init(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_finish == nullptr);
+  m_on_finish = on_finish;
+  m_ret_val = 0;
+
+  register_instance();
+}
+
+template <typename I>
+void InstanceWatcher<I>::shut_down() {
+  C_SaferCond shut_down_ctx;
+  shut_down(&shut_down_ctx);
+  int r = shut_down_ctx.wait();
+  assert(r == 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::shut_down(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_finish == nullptr);
+  m_on_finish = on_finish;
+  m_ret_val = 0;
+
+  release_lock();
+}
+
+template <typename I>
+void InstanceWatcher<I>::remove(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_finish == nullptr);
+  m_on_finish = on_finish;
+  m_ret_val = 0;
+  m_removing = true;
+
+  get_instance_locker();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+                                       uint64_t notifier_id, bufferlist &bl) {
+  dout(20) << dendl;
+
+  bufferlist out;
+  acknowledge_notify(notify_id, handle, out);
+}
+
+template <typename I>
+void InstanceWatcher<I>::register_instance() {
+  assert(m_lock.is_locked());
+
+  dout(20) << dendl;
+
+  librados::ObjectWriteOperation op;
+  librbd::cls_client::mirror_instances_add(&op, m_instance_id);
+  librados::AioCompletion *aio_comp = create_rados_ack_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
+
+  int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_register_instance(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (r == 0) {
+      create_instance_object();
+      return;
+    }
+
+    derr << "error registering instance: " << cpp_strerror(r) << dendl;
+
+    std::swap(on_finish, m_on_finish);
+  }
+  on_finish->complete(r);
+}
+
+
+template <typename I>
+void InstanceWatcher<I>::create_instance_object() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  librados::ObjectWriteOperation op;
+  op.create(true);
+
+  librados::AioCompletion *aio_comp = create_rados_ack_callback<
+    InstanceWatcher<I>,
+    &InstanceWatcher<I>::handle_create_instance_object>(this);
+  int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_create_instance_object(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
+         << dendl;
+
+    m_ret_val = r;
+    unregister_instance();
+    return;
+  }
+
+  register_watch();
+}
+
+template <typename I>
+void InstanceWatcher<I>::register_watch() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
+
+  librbd::Watcher::register_watch(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_register_watch(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    derr << "error registering instance watcher for " << m_oid << " object: "
+         << cpp_strerror(r) << dendl;
+
+    m_ret_val = r;
+    remove_instance_object();
+    return;
+  }
+
+  acquire_lock();
+}
+
+template <typename I>
+void InstanceWatcher<I>::acquire_lock() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
+
+  m_instance_lock->acquire_lock(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_acquire_lock(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (r < 0) {
+
+      derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
+
+      m_ret_val = r;
+      unregister_watch();
+      return;
+    }
+
+    std::swap(on_finish, m_on_finish);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceWatcher<I>::release_lock() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
+
+  m_instance_lock->shut_down(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_release_lock(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
+  }
+
+  unregister_watch();
+}
+
+template <typename I>
+void InstanceWatcher<I>::unregister_watch() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
+
+  librbd::Watcher::unregister_watch(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_unregister_watch(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "error unregistering instance watcher for " << m_oid << " object: "
+         << cpp_strerror(r) << dendl;
+  }
+
+  Mutex::Locker locker(m_lock);
+  remove_instance_object();
+}
+
+template <typename I>
+void InstanceWatcher<I>::remove_instance_object() {
+  assert(m_lock.is_locked());
+
+  dout(20) << dendl;
+
+  librados::ObjectWriteOperation op;
+  op.remove();
+
+  librados::AioCompletion *aio_comp = create_rados_ack_callback<
+    InstanceWatcher<I>,
+    &InstanceWatcher<I>::handle_remove_instance_object>(this);
+  int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_remove_instance_object(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  if (m_removing && r == -ENOENT) {
+    r = 0;
+  }
+
+  if (r < 0) {
+    derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
+         << dendl;
+  }
+
+  Mutex::Locker locker(m_lock);
+  unregister_instance();
+}
+
+template <typename I>
+void InstanceWatcher<I>::unregister_instance() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  librados::ObjectWriteOperation op;
+  librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
+  librados::AioCompletion *aio_comp = create_rados_ack_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
+
+  int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_unregister_instance(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
+  }
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    std::swap(on_finish, m_on_finish);
+    r = m_ret_val;
+
+    if (m_removing) {
+      m_removing = false;
+    }
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceWatcher<I>::get_instance_locker() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
+
+  m_instance_lock->get_locker(&m_instance_locker, ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_get_instance_locker(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    if (r != -ENOENT) {
+      derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
+    }
+    remove_instance_object();
+    return;
+  }
+
+  break_instance_lock();
+}
+
+template <typename I>
+void InstanceWatcher<I>::break_instance_lock() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+    InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
+
+  m_instance_lock->break_lock(m_instance_locker, true, ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_break_instance_lock(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    if (r != -ENOENT) {
+      derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
+    }
+    remove_instance_object();
+    return;
+  }
+
+  remove_instance_object();
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h
new file mode 100644 (file)
index 0000000..d89e40f
--- /dev/null
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
+#define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
+
+#include <string>
+#include <vector>
+#include <boost/optional.hpp>
+
+#include "librbd/Watcher.h"
+#include "librbd/managed_lock/Types.h"
+
+namespace librbd {
+  class ImageCtx;
+  template <typename> class ManagedLock;
+}
+
+namespace rbd {
+namespace mirror {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class InstanceWatcher : protected librbd::Watcher {
+public:
+  static void get_instances(librados::IoCtx &io_ctx,
+                            std::vector<std::string> *instance_ids,
+                            Context *on_finish);
+  static void remove_instance(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+                              const std::string &instance_id,
+                              Context *on_finish);
+
+  static InstanceWatcher *create(
+    librados::IoCtx &io_ctx, ContextWQ *work_queue,
+    const boost::optional<std::string> &id = boost::none) {
+    return new InstanceWatcher(io_ctx, work_queue, id);
+  }
+  void destroy() {
+    delete this;
+  }
+
+  InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+                  const boost::optional<std::string> &id = boost::none);
+  ~InstanceWatcher() override;
+
+  int init();
+  void shut_down();
+
+  void init(Context *on_finish);
+  void shut_down(Context *on_finish);
+  void remove(Context *on_finish);
+
+protected:
+  void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id,
+                     bufferlist &bl) override;
+
+private:
+  /**
+   * @verbatim
+   *
+   *       BREAK_INSTANCE_LOCK -------\
+   *          ^                       |
+   *          |               (error) |
+   *       GET_INSTANCE_LOCKER  * * *>|
+   *          ^ (remove)              |
+   *          |                       |
+   * <uninitialized> <----------------+--------\
+   *    | (init)         ^            |        |
+   *    v        (error) *            |        |
+   * REGISTER_INSTANCE * *     * * * *|* *> UNREGISTER_INSTANCE
+   *    |                      *      |        ^
+   *    v              (error) *      v        |
+   * CREATE_INSTANCE_OBJECT  * *   * * * *> REMOVE_INSTANCE_OBJECT
+   *    |                          *           ^
+   *    v           (error)        *           |
+   * REGISTER_WATCH  * * * * * * * *   * *> UNREGISTER_WATCH
+   *    |                              *       ^
+   *    v         (error)              *       |
+   * ACQUIRE_LOCK  * * * * * * * * * * *    RELEASE_LOCK
+   *    |                                      ^
+   *    v       (shut_down)                    |
+   * <watching> -------------------------------/
+   *
+   * @endverbatim
+   */
+
+  bool m_owner;
+  std::string m_instance_id;
+
+  mutable Mutex m_lock;
+  librbd::ManagedLock<ImageCtxT> *m_instance_lock;
+  Context *m_on_finish = nullptr;
+  int m_ret_val = 0;
+  bool m_removing = false;
+  librbd::managed_lock::Locker m_instance_locker;
+
+  void register_instance();
+  void handle_register_instance(int r);
+
+  void create_instance_object();
+  void handle_create_instance_object(int r);
+
+  void register_watch();
+  void handle_register_watch(int r);
+
+  void acquire_lock();
+  void handle_acquire_lock(int r);
+
+  void release_lock();
+  void handle_release_lock(int r);
+
+  void unregister_watch();
+  void handle_unregister_watch(int r);
+
+  void remove_instance_object();
+  void handle_remove_instance_object(int r);
+
+  void unregister_instance();
+  void handle_unregister_instance(int r);
+
+  void get_instance_locker();
+  void handle_get_instance_locker(int r);
+
+  void break_instance_lock();
+  void handle_break_instance_lock(int r);
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
index 59092404359ff2d388a53249dccb1fd3193c79a3..0b9cf4780578fc35e7568febf304131d6ccb3923 100644 (file)
@@ -16,7 +16,7 @@
 #include "librbd/Utils.h"
 #include "librbd/Watcher.h"
 #include "librbd/internal.h"
-#include "LeaderWatcher.h"
+#include "InstanceWatcher.h"
 #include "Replayer.h"
 #include "Threads.h"
 
@@ -236,6 +236,9 @@ Replayer::~Replayer()
   if (m_leader_watcher) {
     m_leader_watcher->shut_down();
   }
+  if (m_instance_watcher) {
+    m_instance_watcher->shut_down();
+  }
 }
 
 bool Replayer::is_blacklisted() const {
@@ -292,6 +295,14 @@ int Replayer::init()
     return r;
   }
 
+  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
+                                                     m_threads->work_queue));
+  r = m_instance_watcher->init();
+  if (r < 0) {
+    derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
   // Bootstrap existing mirroring images
   init_local_mirroring_images();
 
index 36dc074320f41558b247d867e780f85cd6e4cdfa..286b70dddbfee4eebd0f26ce82d02f9e34588979 100644 (file)
 #include "ImageDeleter.h"
 #include "types.h"
 
+namespace librbd { class ImageCtx; }
+
 namespace rbd {
 namespace mirror {
 
 struct Threads;
 class ReplayerAdminSocketHook;
+template <typename> class InstanceWatcher;
 
 /**
  * Controls mirroring for a single remote cluster.
@@ -124,6 +127,7 @@ private:
   } m_leader_listener;
 
   std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
+  std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
 };
 
 } // namespace mirror