]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rbd-mirror A/A: leader should track up/down rbd-mirror instances
authorMykola Golub <mgolub@mirantis.com>
Sat, 11 Feb 2017 16:05:59 +0000 (17:05 +0100)
committerMykola Golub <mgolub@mirantis.com>
Tue, 28 Feb 2017 11:48:38 +0000 (12:48 +0100)
Fixes: http://tracker.ceph.com/issues/18784
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/test/rbd_mirror/test_mock_LeaderWatcher.cc
src/test/rbd_mirror/test_mock_fixture.h
src/tools/rbd_mirror/InstanceWatcher.cc
src/tools/rbd_mirror/LeaderWatcher.cc
src/tools/rbd_mirror/LeaderWatcher.h
src/tools/rbd_mirror/MirrorStatusWatcher.cc
src/tools/rbd_mirror/MirrorStatusWatcher.h
src/tools/rbd_mirror/Replayer.cc

index 211c1b205d61cf59aecce574a8727e8c10662b80..4ac1298fee05d31d398e03c13d12b8bca562bd82 100644 (file)
@@ -1,11 +1,14 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "librbd/Utils.h"
 #include "test/librbd/mock/MockImageCtx.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "tools/rbd_mirror/LeaderWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
 
+using librbd::util::create_async_context_callback;
+
 namespace librbd {
 
 namespace {
@@ -29,6 +32,11 @@ struct MockManagedLock {
     s_instance = this;
   }
 
+  bool m_release_lock_on_shutdown = false;
+
+  MOCK_METHOD0(construct, void());
+  MOCK_METHOD0(destroy, void());
+
   MOCK_CONST_METHOD0(is_lock_owner, bool());
 
   MOCK_METHOD1(shut_down, void(Context *));
@@ -53,49 +61,166 @@ struct ManagedLock<MockTestImageCtx> {
               const std::string& oid, librbd::Watcher *watcher,
               managed_lock::Mode  mode, bool blacklist_on_break_lock,
               uint32_t blacklist_expire_seconds)
-    : m_lock("ManagedLock::m_lock") {
+    : m_work_queue(work_queue), m_lock("ManagedLock::m_lock") {
+    MockManagedLock::get_instance().construct();
+  }
+
+  virtual ~ManagedLock() {
+    MockManagedLock::get_instance().destroy();
   }
 
-  virtual ~ManagedLock() = default;
+  ContextWQ *m_work_queue;
 
   mutable Mutex m_lock;
 
   bool is_lock_owner() const {
     return MockManagedLock::get_instance().is_lock_owner();
   }
+
   void shut_down(Context *on_shutdown) {
+    if (MockManagedLock::get_instance().m_release_lock_on_shutdown) {
+      on_shutdown = new FunctionContext(
+        [this, on_shutdown](int r) {
+          MockManagedLock::get_instance().m_release_lock_on_shutdown = false;
+          shut_down(on_shutdown);
+        });
+      release_lock(on_shutdown);
+      return;
+    }
+
     MockManagedLock::get_instance().shut_down(on_shutdown);
   }
+
   void try_acquire_lock(Context *on_acquired) {
-    MockManagedLock::get_instance().try_acquire_lock(on_acquired);
+    Context *post_acquire_ctx = create_async_context_callback(
+      m_work_queue, new FunctionContext(
+        [this, on_acquired](int r) {
+          post_acquire_lock_handler(r, on_acquired);
+        }));
+    MockManagedLock::get_instance().try_acquire_lock(post_acquire_ctx);
   }
+
   void release_lock(Context *on_released) {
-    MockManagedLock::get_instance().release_lock(on_released);
+    Context *post_release_ctx = new FunctionContext(
+      [this, on_released](int r) {
+        post_release_lock_handler(false, r, on_released);
+      });
+
+    Context *release_ctx = new FunctionContext(
+      [this, on_released, post_release_ctx](int r) {
+        if (r < 0) {
+          on_released->complete(r);
+        } else {
+          MockManagedLock::get_instance().release_lock(post_release_ctx);
+        }
+      });
+
+    Context *pre_release_ctx = new FunctionContext(
+      [this, release_ctx](int r) {
+        bool shutting_down =
+          MockManagedLock::get_instance().m_release_lock_on_shutdown;
+        pre_release_lock_handler(shutting_down, release_ctx);
+      });
+
+    m_work_queue->queue(pre_release_ctx, 0);
   }
+
   void get_locker(managed_lock::Locker *locker, Context *on_finish) {
     MockManagedLock::get_instance().get_locker(locker, on_finish);
   }
+
   void break_lock(const managed_lock::Locker &locker, bool force_break_lock,
                   Context *on_finish) {
     MockManagedLock::get_instance().break_lock(locker, force_break_lock,
                                                on_finish);
   }
+
   void set_state_post_acquiring() {
     MockManagedLock::get_instance().set_state_post_acquiring();
   }
+
   bool is_shutdown() const {
     return MockManagedLock::get_instance().is_shutdown();
   }
+
   bool is_state_post_acquiring() const {
     return MockManagedLock::get_instance().is_state_post_acquiring();
   }
+
   bool is_state_locked() const {
     return MockManagedLock::get_instance().is_state_locked();
   }
+
+  virtual void post_acquire_lock_handler(int r, Context *on_finish) = 0;
+  virtual void pre_release_lock_handler(bool shutting_down,
+                                        Context *on_finish) = 0;
+  virtual void post_release_lock_handler(bool shutting_down, int r,
+                                         Context *on_finish) = 0;
 };
 
 } // namespace librbd
 
+namespace rbd {
+namespace mirror {
+
+template <>
+struct MirrorStatusWatcher<librbd::MockTestImageCtx> {
+  static MirrorStatusWatcher* s_instance;
+
+  static MirrorStatusWatcher *create(librados::IoCtx &io_ctx,
+                                     ContextWQ *work_queue) {
+    assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  MirrorStatusWatcher() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~MirrorStatusWatcher() {
+    assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(destroy, void());
+  MOCK_METHOD1(init, void(Context *));
+  MOCK_METHOD1(shut_down, void(Context *));
+};
+
+MirrorStatusWatcher<librbd::MockTestImageCtx> *MirrorStatusWatcher<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+template <>
+struct Instances<librbd::MockTestImageCtx> {
+  static Instances* s_instance;
+
+  static Instances *create(Threads *threads, librados::IoCtx &ioctx) {
+    assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  Instances() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~Instances() {
+    assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(destroy, void());
+  MOCK_METHOD1(init, void(Context *));
+  MOCK_METHOD1(shut_down, void(Context *));
+  MOCK_METHOD1(notify, void(const std::string &));
+};
+
+Instances<librbd::MockTestImageCtx> *Instances<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace mirror
+} // namespace rbd
+
+
 // template definitions
 #include "tools/rbd_mirror/LeaderWatcher.cc"
 
@@ -111,22 +236,47 @@ using ::testing::Return;
 
 using librbd::MockManagedLock;
 
+struct MockListener : public LeaderWatcher<librbd::MockTestImageCtx>::Listener {
+  static MockListener* s_instance;
+
+  MockListener() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~MockListener() override {
+    assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD1(post_acquire_handler, void(Context *));
+  MOCK_METHOD1(pre_release_handler, void(Context *));
+};
+
+MockListener *MockListener::s_instance = nullptr;
+
 class TestMockLeaderWatcher : public TestMockFixture {
 public:
+  typedef MirrorStatusWatcher<librbd::MockTestImageCtx> MockMirrorStatusWatcher;
+  typedef Instances<librbd::MockTestImageCtx> MockInstances;
   typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
 
-  class MockListener : public MockLeaderWatcher::Listener {
-  public:
-    MOCK_METHOD1(post_acquire_handler, void(Context *));
-    MOCK_METHOD1(pre_release_handler, void(Context *));
-  };
+  void expect_construct(MockManagedLock &mock_managed_lock) {
+    EXPECT_CALL(mock_managed_lock, construct());
+  }
+
+  void expect_destroy(MockManagedLock &mock_managed_lock) {
+    EXPECT_CALL(mock_managed_lock, destroy());
+  }
 
   void expect_is_lock_owner(MockManagedLock &mock_managed_lock, bool owner) {
     EXPECT_CALL(mock_managed_lock, is_lock_owner())
       .WillOnce(Return(owner));
   }
 
-  void expect_shut_down(MockManagedLock &mock_managed_lock, int r) {
+  void expect_shut_down(MockManagedLock &mock_managed_lock,
+                        bool release_lock_on_shutdown, int r) {
+    mock_managed_lock.m_release_lock_on_shutdown = release_lock_on_shutdown;
     EXPECT_CALL(mock_managed_lock, shut_down(_))
       .WillOnce(CompleteContext(r));
   }
@@ -134,11 +284,20 @@ public:
   void expect_try_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
     EXPECT_CALL(mock_managed_lock, try_acquire_lock(_))
       .WillOnce(CompleteContext(r));
+    if (r == 0) {
+      expect_set_state_post_acquiring(mock_managed_lock);
+    }
   }
 
-  void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
+  void expect_release_lock(MockManagedLock &mock_managed_lock, int r,
+                           Context *on_finish = nullptr) {
     EXPECT_CALL(mock_managed_lock, release_lock(_))
-      .WillOnce(CompleteContext(r));
+      .WillOnce(Invoke([on_finish, r](Context *ctx) {
+                         ctx->complete(r);
+                         if (on_finish != nullptr) {
+                           on_finish->complete(0);
+                         }
+                       }));
   }
 
   void expect_get_locker(MockManagedLock &mock_managed_lock,
@@ -158,8 +317,8 @@ public:
   }
 
   void expect_break_lock(MockManagedLock &mock_managed_lock,
-                        const librbd::managed_lock::Locker &locker, int r,
-                        Context *on_finish) {
+                         const librbd::managed_lock::Locker &locker, int r,
+                         Context *on_finish) {
     EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _))
       .WillOnce(Invoke([on_finish, r](const librbd::managed_lock::Locker &,
                                       bool, Context *ctx) {
@@ -196,74 +355,164 @@ public:
 
   void expect_notify_heartbeat(MockManagedLock &mock_managed_lock,
                                Context *on_finish) {
+    // is_leader in notify_heartbeat
     EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
       .WillOnce(Return(false));
     EXPECT_CALL(mock_managed_lock, is_state_locked())
       .WillOnce(Return(true));
+
+    // is_leader in handle_notify_heartbeat
     EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
       .WillOnce(Return(false));
     EXPECT_CALL(mock_managed_lock, is_state_locked())
       .WillOnce(DoAll(Invoke([on_finish]() {
                         on_finish->complete(0);
                       }),
-                     Return(true)));
+                      Return(true)));
+  }
+
+  void expect_destroy(MockMirrorStatusWatcher &mock_mirror_status_watcher) {
+    EXPECT_CALL(mock_mirror_status_watcher, destroy());
+  }
+
+  void expect_init(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) {
+    EXPECT_CALL(mock_mirror_status_watcher, init(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+  }
+
+  void expect_shut_down(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) {
+    EXPECT_CALL(mock_mirror_status_watcher, shut_down(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+    expect_destroy(mock_mirror_status_watcher);
+  }
+
+  void expect_destroy(MockInstances &mock_instances) {
+    EXPECT_CALL(mock_instances, destroy());
+  }
+
+  void expect_init(MockInstances &mock_instances, int r) {
+    EXPECT_CALL(mock_instances, init(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+  }
+
+  void expect_shut_down(MockInstances &mock_instances, int r) {
+    EXPECT_CALL(mock_instances, shut_down(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+    expect_destroy(mock_instances);
+  }
+
+  void expect_acquire_notify(MockManagedLock &mock_managed_lock,
+                             MockListener &mock_listener, int r) {
+    expect_is_leader(mock_managed_lock, true, false);
+    EXPECT_CALL(mock_listener, post_acquire_handler(_))
+      .WillOnce(CompleteContext(r));
+    expect_is_leader(mock_managed_lock, true, false);
+  }
+
+  void expect_release_notify(MockManagedLock &mock_managed_lock,
+                             MockListener &mock_listener, int r) {
+    expect_is_leader(mock_managed_lock, false, false);
+    EXPECT_CALL(mock_listener, pre_release_handler(_))
+      .WillOnce(CompleteContext(r));
+    expect_is_leader(mock_managed_lock, false, false);
   }
 };
 
 TEST_F(TestMockLeaderWatcher, InitShutdown) {
   MockManagedLock mock_managed_lock;
+  MockMirrorStatusWatcher mock_mirror_status_watcher;
+  MockInstances mock_instances;
   MockListener listener;
-  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
 
   expect_is_shutdown(mock_managed_lock);
+  expect_destroy(mock_managed_lock);
 
   InSequence seq;
+
+  expect_construct(mock_managed_lock);
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  // Inint
   C_SaferCond on_heartbeat_finish;
   expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_init(mock_mirror_status_watcher, 0);
+  expect_init(mock_instances, 0);
+  expect_acquire_notify(mock_managed_lock, listener, 0);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
   ASSERT_EQ(0, on_heartbeat_finish.wait());
 
-  expect_shut_down(mock_managed_lock, 0);
+  // Shutdown
+  expect_release_notify(mock_managed_lock, listener, 0);
+  expect_shut_down(mock_instances, 0);
+  expect_shut_down(mock_mirror_status_watcher, 0);
+  expect_is_leader(mock_managed_lock, false, false);
+  expect_release_lock(mock_managed_lock, 0);
+  expect_shut_down(mock_managed_lock, true, 0);
 
   leader_watcher.shut_down();
 }
 
 TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) {
   MockManagedLock mock_managed_lock;
+  MockMirrorStatusWatcher mock_mirror_status_watcher;
+  MockInstances mock_instances;
   MockListener listener;
-  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
 
   expect_is_shutdown(mock_managed_lock);
+  expect_destroy(mock_managed_lock);
 
   InSequence seq;
+
+  expect_construct(mock_managed_lock);
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  // Inint
   C_SaferCond on_heartbeat_finish;
   expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_init(mock_mirror_status_watcher, 0);
+  expect_init(mock_instances, 0);
+  expect_acquire_notify(mock_managed_lock, listener, 0);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
   ASSERT_EQ(0, on_heartbeat_finish.wait());
 
+  // Release
   expect_is_leader(mock_managed_lock, false, true);
-  expect_release_lock(mock_managed_lock, 0);
+  expect_release_notify(mock_managed_lock, listener, 0);
+  expect_shut_down(mock_instances, 0);
+  expect_shut_down(mock_mirror_status_watcher, 0);
+  expect_is_leader(mock_managed_lock, false, false);
+  C_SaferCond on_release;
+  expect_release_lock(mock_managed_lock, 0, &on_release);
 
   leader_watcher.release_leader();
+  ASSERT_EQ(0, on_release.wait());
 
-  expect_shut_down(mock_managed_lock, 0);
+  // Shutdown
+  expect_shut_down(mock_managed_lock, false, 0);
 
   leader_watcher.shut_down();
 }
 
 TEST_F(TestMockLeaderWatcher, AcquireError) {
   MockManagedLock mock_managed_lock;
+  MockMirrorStatusWatcher mock_mirror_status_watcher;
+  MockInstances mock_instances;
   MockListener listener;
-  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
 
   expect_is_shutdown(mock_managed_lock);
   expect_is_leader(mock_managed_lock);
+  expect_destroy(mock_managed_lock);
 
   InSequence seq;
+
+  expect_construct(mock_managed_lock);
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  // Inint
   C_SaferCond on_get_locker_finish;
   expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
   expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), 0,
@@ -271,32 +520,51 @@ TEST_F(TestMockLeaderWatcher, AcquireError) {
   ASSERT_EQ(0, leader_watcher.init());
   ASSERT_EQ(0, on_get_locker_finish.wait());
 
-  expect_shut_down(mock_managed_lock, 0);
+  // Shutdown
+  expect_shut_down(mock_managed_lock, false, 0);
 
   leader_watcher.shut_down();
 }
 
 TEST_F(TestMockLeaderWatcher, ReleaseError) {
   MockManagedLock mock_managed_lock;
+  MockMirrorStatusWatcher mock_mirror_status_watcher;
+  MockInstances mock_instances;
   MockListener listener;
-  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
 
   expect_is_shutdown(mock_managed_lock);
+  expect_destroy(mock_managed_lock);
 
   InSequence seq;
+
+  expect_construct(mock_managed_lock);
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  // Inint
   C_SaferCond on_heartbeat_finish;
   expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_init(mock_mirror_status_watcher, 0);
+  expect_init(mock_instances, 0);
+  expect_acquire_notify(mock_managed_lock, listener, 0);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
   ASSERT_EQ(0, on_heartbeat_finish.wait());
 
+  // Release
   expect_is_leader(mock_managed_lock, false, true);
-  expect_release_lock(mock_managed_lock, -EINVAL);
+  expect_release_notify(mock_managed_lock, listener, -EINVAL);
+  expect_shut_down(mock_instances, 0);
+  expect_shut_down(mock_mirror_status_watcher, -EINVAL);
+  expect_is_leader(mock_managed_lock, false, false);
+  C_SaferCond on_release;
+  expect_release_lock(mock_managed_lock, -EINVAL, &on_release);
 
   leader_watcher.release_leader();
+  ASSERT_EQ(0, on_release.wait());
 
-  expect_shut_down(mock_managed_lock, 0);
+  // Shutdown
+  expect_shut_down(mock_managed_lock, false, 0);
 
   leader_watcher.shut_down();
 }
@@ -304,21 +572,28 @@ TEST_F(TestMockLeaderWatcher, ReleaseError) {
 TEST_F(TestMockLeaderWatcher, Break) {
   EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1"));
   EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats",
-                               "1"));
+                                "1"));
   CephContext *cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
   int max_acquire_attempts =
     cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break;
 
   MockManagedLock mock_managed_lock;
+  MockMirrorStatusWatcher mock_mirror_status_watcher;
+  MockInstances mock_instances;
   MockListener listener;
-  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
   librbd::managed_lock::Locker
     locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
 
   expect_is_shutdown(mock_managed_lock);
   expect_is_leader(mock_managed_lock);
+  expect_destroy(mock_managed_lock);
 
   InSequence seq;
+
+  expect_construct(mock_managed_lock);
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  // Init
   for (int i = 0; i <= max_acquire_attempts; i++) {
     expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
     if (i < max_acquire_attempts) {
@@ -329,12 +604,21 @@ TEST_F(TestMockLeaderWatcher, Break) {
   expect_break_lock(mock_managed_lock, locker, 0, &on_break);
   C_SaferCond on_heartbeat_finish;
   expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_init(mock_mirror_status_watcher, 0);
+  expect_init(mock_instances, 0);
+  expect_acquire_notify(mock_managed_lock, listener, 0);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
   ASSERT_EQ(0, on_heartbeat_finish.wait());
 
-  expect_shut_down(mock_managed_lock, 0);
+  // Shutdown
+  expect_release_notify(mock_managed_lock, listener, 0);
+  expect_shut_down(mock_instances, 0);
+  expect_shut_down(mock_mirror_status_watcher, 0);
+  expect_is_leader(mock_managed_lock, false, false);
+  expect_release_lock(mock_managed_lock, 0);
+  expect_shut_down(mock_managed_lock, true, 0);
 
   leader_watcher.shut_down();
 }
index 77a36ddc6d8c031d0abd9650789118d83e2844a8..18ee6146763d94ea07241c6ac69d5c327fe286a6 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "test/rbd_mirror/test_fixture.h"
 #include "test/librados_test_stub/LibradosTestStub.h"
+#include "common/WorkQueue.h"
 #include <boost/shared_ptr.hpp>
 #include <gmock/gmock.h>
 
@@ -23,6 +24,11 @@ ACTION_P(CompleteContext, r) {
   arg0->complete(r);
 }
 
+ACTION_P2(CompleteContext, wq, r) {
+  ContextWQ *context_wq = reinterpret_cast<ContextWQ *>(wq);
+  context_wq->queue(arg0, r);
+}
+
 MATCHER_P(ContentsEqual, bl, "") {
   // TODO fix const-correctness of bufferlist
   return const_cast<bufferlist &>(arg).contents_equal(
index c07100b4648f60bb8c414e8105b09579d0845368..1655ecd244b5f2aee380b8aba96d4147c6acac14 100644 (file)
@@ -118,7 +118,7 @@ int InstanceWatcher<I>::init() {
 
 template <typename I>
 void InstanceWatcher<I>::init(Context *on_finish) {
-  dout(20) << dendl;
+  dout(20) << "instance_id=" << m_instance_id << dendl;
 
   Mutex::Locker locker(m_lock);
 
index 9ab4f0468c8081cbc01c7f44f368a6970e79e5da..6dd08a8b85f4b8ddd96f0dcc51e1af3072ad730b 100644 (file)
@@ -15,7 +15,6 @@
 #undef dout_prefix
 #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
                            << this << " " << __func__ << ": "
-
 namespace rbd {
 namespace mirror {
 
@@ -31,7 +30,18 @@ LeaderWatcher<I>::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx,
   : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
     m_threads(threads), m_listener(listener),
     m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
-    m_notifier_id(librados::Rados(io_ctx).get_instance_id()) {
+    m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
+    m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
+                                 m_cct->_conf->rbd_blacklist_expire_seconds)) {
+}
+
+template <typename I>
+LeaderWatcher<I>::~LeaderWatcher() {
+  assert(m_status_watcher == nullptr);
+  assert(m_instances == nullptr);
+  assert(m_timer_task == nullptr);
+
+  delete m_leader_lock;
 }
 
 template <typename I>
@@ -43,15 +53,10 @@ int LeaderWatcher<I>::init() {
 
 template <typename I>
 void LeaderWatcher<I>::init(Context *on_finish) {
-  dout(20) << dendl;
+  dout(20) << "notifier_id=" << m_notifier_id << dendl;
 
   Mutex::Locker locker(m_lock);
 
-  assert(!m_leader_lock);
-  m_leader_lock.reset(
-    new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
-                   m_cct->_conf->rbd_blacklist_expire_seconds));
-
   assert(m_on_finish == nullptr);
   m_on_finish = on_finish;
 
@@ -143,22 +148,6 @@ void LeaderWatcher<I>::shut_down(Context *on_finish) {
   Mutex::Locker timer_locker(m_threads->timer_lock);
   Mutex::Locker locker(m_lock);
 
-  assert(m_leader_lock);
-
-  if (false && is_leader(m_lock)) {
-    Context *ctx = create_async_context_callback(
-      m_work_queue, new FunctionContext(
-        [this, on_finish](int r) {
-          if (r < 0) {
-            derr << "error releasing leader lock: " << cpp_strerror(r) << dendl;
-          }
-          shut_down(on_finish);
-        }));
-
-    m_leader_lock->release_lock(ctx);
-    return;
-  }
-
   assert(m_on_shut_down_finish == nullptr);
   m_on_shut_down_finish = on_finish;
   cancel_timer_task();
@@ -234,7 +223,7 @@ template <typename I>
 bool LeaderWatcher<I>::is_leader(Mutex &lock) {
   assert(m_lock.is_locked());
 
-  bool leader = m_leader_lock && m_leader_lock->is_leader();
+  bool leader = m_leader_lock->is_leader();
   dout(20) << leader << dendl;
   return leader;
 }
@@ -251,6 +240,19 @@ void LeaderWatcher<I>::release_leader() {
   release_leader_lock();
 }
 
+template <typename I>
+void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  instance_ids->clear();
+  if (m_instances != nullptr) {
+    m_instances->list(instance_ids);
+  }
+}
+
+
 template <typename I>
 void LeaderWatcher<I>::cancel_timer_task() {
   assert(m_threads->timer_lock.is_locked());
@@ -273,6 +275,10 @@ void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
   assert(m_threads->timer_lock.is_locked());
   assert(m_lock.is_locked());
 
+  if (m_on_shut_down_finish != nullptr) {
+    return;
+  }
+
   cancel_timer_task();
 
   m_timer_task = new FunctionContext(
@@ -312,7 +318,7 @@ void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
   Mutex::Locker locker(m_lock);
   assert(m_on_finish == nullptr);
   m_on_finish = on_finish;
-  m_notify_error = 0;
+  m_ret_val = 0;
 
   init_status_watcher();
 }
@@ -324,7 +330,7 @@ void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
   Mutex::Locker locker(m_lock);
   assert(m_on_finish == nullptr);
   m_on_finish = on_finish;
-  m_notify_error = 0;
+  m_ret_val = 0;
 
   notify_listener();
 }
@@ -484,7 +490,7 @@ void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
 
   m_acquire_attempts = 0;
 
-  if (m_notify_error) {
+  if (m_ret_val) {
     dout(5) << "releasing due to error on notify" << dendl;
     release_leader_lock();
     return;
@@ -526,9 +532,9 @@ void LeaderWatcher<I>::init_status_watcher() {
   dout(20) << dendl;
 
   assert(m_lock.is_locked());
-  assert(!m_status_watcher);
+  assert(m_status_watcher == nullptr);
 
-  m_status_watcher.reset(new MirrorStatusWatcher(m_ioctx, m_work_queue));
+  m_status_watcher = MirrorStatusWatcher<I>::create(m_ioctx, m_work_queue);
 
   Context *ctx = create_context_callback<
     LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
@@ -545,13 +551,14 @@ void LeaderWatcher<I>::handle_init_status_watcher(int r) {
     Mutex::Locker locker(m_lock);
 
     if (r == 0) {
-      notify_listener();
+      init_instances();
       return;
     }
 
     derr << "error initializing mirror status watcher: " << cpp_strerror(r)
-        << dendl;
-    m_status_watcher.reset();
+         << dendl;
+    m_status_watcher->destroy();
+    m_status_watcher = nullptr;
     assert(m_on_finish != nullptr);
     std::swap(m_on_finish, on_finish);
   }
@@ -563,7 +570,7 @@ void LeaderWatcher<I>::shut_down_status_watcher() {
   dout(20) << dendl;
 
   assert(m_lock.is_locked());
-  assert(m_status_watcher);
+  assert(m_status_watcher != nullptr);
 
   Context *ctx = create_async_context_callback(
     m_work_queue, create_context_callback<LeaderWatcher<I>,
@@ -580,17 +587,22 @@ void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
   {
     Mutex::Locker locker(m_lock);
 
+    m_status_watcher->destroy();
+    m_status_watcher = nullptr;
+
     if (r < 0) {
       derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
-          << dendl;
-      if (!is_leader(m_lock)) {
-       // ignore on releasing
-       r = 0;
-      }
+           << dendl;
     }
 
-    assert(m_status_watcher);
-    m_status_watcher.reset();
+    if (m_ret_val != 0) {
+      r = m_ret_val;
+    }
+
+    if (!is_leader(m_lock)) {
+      // ignore on releasing
+      r = 0;
+    }
 
     assert(m_on_finish != nullptr);
     std::swap(m_on_finish, on_finish);
@@ -598,6 +610,66 @@ void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
   on_finish->complete(r);
 }
 
+template <typename I>
+void LeaderWatcher<I>::init_instances() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+  assert(m_instances == nullptr);
+
+  m_instances = Instances<I>::create(m_threads, m_ioctx);
+
+  Context *ctx = create_context_callback<
+    LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
+
+  m_instances->init(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_init_instances(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    derr << "error initializing instances: " << cpp_strerror(r) << dendl;
+    m_ret_val = r;
+    m_instances->destroy();
+    m_instances = nullptr;
+    shut_down_status_watcher();
+    return;
+  }
+
+  notify_listener();
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down_instances() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+  assert(m_instances != nullptr);
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<LeaderWatcher<I>,
+      &LeaderWatcher<I>::handle_shut_down_instances>(this));
+
+  m_instances->shut_down(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_shut_down_instances(int r) {
+  dout(20) << "r=" << r << dendl;
+  assert(r == 0);
+
+  Mutex::Locker locker(m_lock);
+
+  m_instances->destroy();
+  m_instances = nullptr;
+
+  shut_down_status_watcher();
+}
+
 template <typename I>
 void LeaderWatcher<I>::notify_listener() {
   dout(20) << dendl;
@@ -630,13 +702,13 @@ void LeaderWatcher<I>::handle_notify_listener(int r) {
 
   if (r < 0) {
     derr << "error notifying listener: " << cpp_strerror(r) << dendl;
-    m_notify_error = r;
+    m_ret_val = r;
   }
 
   if (is_leader(m_lock)) {
     notify_lock_acquired();
   } else {
-    shut_down_status_watcher();
+    shut_down_instances();
   }
 }
 
@@ -664,8 +736,8 @@ void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
     Mutex::Locker locker(m_lock);
     if (r < 0 && r != -ETIMEDOUT) {
       derr << "error notifying leader lock acquired: " << cpp_strerror(r)
-          << dendl;
-      m_notify_error = r;
+           << dendl;
+      m_ret_val = r;
     }
 
     assert(m_on_finish != nullptr);
@@ -724,7 +796,8 @@ void LeaderWatcher<I>::notify_heartbeat() {
   bufferlist bl;
   ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
 
-  send_notify(bl, nullptr, ctx);
+  m_heartbeat_ack_bl.clear();
+  send_notify(bl, &m_heartbeat_ack_bl, ctx);
 }
 
 template <typename I>
@@ -745,6 +818,33 @@ void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
     return;
   }
 
+  try {
+    bufferlist::iterator iter = m_heartbeat_ack_bl.begin();
+    uint32_t num_acks;
+    ::decode(num_acks, iter);
+
+    dout(20) << num_acks << " acks received" << dendl;
+
+    for (uint32_t i = 0; i < num_acks; i++) {
+      uint64_t notifier_id;
+      uint64_t cookie;
+      bufferlist reply_bl;
+
+      ::decode(notifier_id, iter);
+      ::decode(cookie, iter);
+      ::decode(reply_bl, iter);
+
+      if (notifier_id == m_notifier_id) {
+       continue;
+      }
+
+      std::string instance_id = stringify(notifier_id);
+      m_instances->notify(instance_id);
+    }
+  } catch (const buffer::error &err) {
+    derr << ": error decoding heartbeat acks: " << err.what() << dendl;
+  }
+
   schedule_timer_task("heartbeat", 1, true,
                       &LeaderWatcher<I>::notify_heartbeat);
 }
index 8d5fd5dae85f7dc26f9fcee37a044c35717c2bf3..e2e7cc8a0e9d255dc6c1305310d5beaf2f5aa5ac 100644 (file)
@@ -11,6 +11,7 @@
 #include "librbd/ManagedLock.h"
 #include "librbd/managed_lock/Types.h"
 #include "librbd/Watcher.h"
+#include "Instances.h"
 #include "MirrorStatusWatcher.h"
 #include "tools/rbd_mirror/leader_watcher/Types.h"
 
@@ -33,6 +34,7 @@ public:
   };
 
   LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, Listener *listener);
+  ~LeaderWatcher() override;
 
   int init();
   void shut_down();
@@ -42,6 +44,7 @@ public:
 
   bool is_leader();
   void release_leader();
+  void list_instances(std::vector<std::string> *instance_ids);
 
 private:
   /**
@@ -68,18 +71,21 @@ private:
    *     v    v  v  v  v  (error)        *  v           | | | |
    *  ACQUIRE_LEADER_LOCK  * * * * *> GET_LOCKER ---> <secondary>
    *     |                   *                           ^
-   * ....|...................*.............         .....|.....................
-   * .   v                   *            .         .    |       post_release .
-   * .INIT_STATUS_WATCHER  * *            .         .NOTIFY_LOCK_RELEASED     .
-   * .   |                 (error)        .         .....^.....................
-   * .   v                                .              |
-   * .NOTIFY_LISTENERS                    .          RELEASE_LEADER_LOCK
-   * .   |                                .              ^
-   * .   v                                .         .....|.....................
-   * .NOTIFY_LOCK_ACQUIRED   post_acquire .         .SHUT_DOWN_STATUS_WATCHER .
-   * ....|.................................         .    ^                    .
+   * ....|...................*....................  .....|.....................
+   * .   v                   *                   .  .    |       post_release .
+   * .INIT_STATUS_WATCHER  * * (error)           .  .NOTIFY_LOCK_RELEASED     .
+   * .   |                   ^                   .  .....^.....................
+   * .   v         (error)   |                   .       |
+   * .INIT_INSTANCES *> SHUT_DOWN_STATUS_WATCHER .   RELEASE_LEADER_LOCK
+   * .   |                                       .       ^
+   * .   v                                       .  .....|.....................
+   * .NOTIFY_LISTENER                            .  .SHUT_DOWN_STATUS_WATCHER .
+   * .   |                                       .  .    ^                    .
+   * .   v                                       .  .    |                    .
+   * .NOTIFY_LOCK_ACQUIRED          post_acquire .  .SHUT_DOWN_INSTANCES      .
+   * ....|........................................  .    ^                    .
    *     v                                          .    |                    .
-   *  <leader> -----------------------------------> .NOTIFY_LISTENERS         .
+   *  <leader> -----------------------------------> .NOTIFY_LISTENER          .
    *            (shut_down, release_leader,         .             pre_release .
    *             notify error)                      ...........................
    * @endverbatim
@@ -156,14 +162,16 @@ private:
 
   Mutex m_lock;
   uint64_t m_notifier_id;
+  LeaderLock *m_leader_lock;
   Context *m_on_finish = nullptr;
   Context *m_on_shut_down_finish = nullptr;
   int m_acquire_attempts = 0;
-  int m_notify_error = 0;
-  std::unique_ptr<LeaderLock> m_leader_lock;
-  std::unique_ptr<MirrorStatusWatcher> m_status_watcher;
+  int m_ret_val = 0;
+  MirrorStatusWatcher<ImageCtxT> *m_status_watcher = nullptr;
+  Instances<ImageCtxT> *m_instances = nullptr;
   librbd::managed_lock::Locker m_locker;
   Context *m_timer_task = nullptr;
+  bufferlist m_heartbeat_ack_bl;
 
   bool is_leader(Mutex &m_lock);
 
@@ -203,6 +211,12 @@ private:
   void shut_down_status_watcher();
   void handle_shut_down_status_watcher(int r);
 
+  void init_instances();
+  void handle_init_instances(int r);
+
+  void shut_down_instances();
+  void handle_shut_down_instances(int r);
+
   void notify_listener();
   void handle_notify_listener(int r);
 
@@ -215,6 +229,9 @@ private:
   void notify_heartbeat();
   void handle_notify_heartbeat(int r);
 
+  void get_instances();
+  void handle_get_instances(int r);
+
   void handle_post_acquire_leader_lock(int r, Context *on_finish);
   void handle_pre_release_leader_lock(Context *on_finish);
   void handle_post_release_leader_lock(int r, Context *on_finish);
index b969ed68ef100846aa2daf4aebe493ed9b4655c8..95e347363bac4bdb75f75b4af4ae42e6a918c025 100644 (file)
@@ -18,12 +18,18 @@ namespace mirror {
 
 using librbd::util::create_rados_ack_callback;
 
-MirrorStatusWatcher::MirrorStatusWatcher(librados::IoCtx &io_ctx,
-                                         ContextWQ *work_queue)
+template <typename I>
+MirrorStatusWatcher<I>::MirrorStatusWatcher(librados::IoCtx &io_ctx,
+                                            ContextWQ *work_queue)
   : Watcher(io_ctx, work_queue, RBD_MIRRORING) {
 }
 
-void MirrorStatusWatcher::init(Context *on_finish) {
+template <typename I>
+MirrorStatusWatcher<I>::~MirrorStatusWatcher() {
+}
+
+template <typename I>
+void MirrorStatusWatcher<I>::init(Context *on_finish) {
   dout(20) << dendl;
 
   on_finish = new FunctionContext(
@@ -45,14 +51,17 @@ void MirrorStatusWatcher::init(Context *on_finish) {
   aio_comp->release();
 }
 
-void MirrorStatusWatcher::shut_down(Context *on_finish) {
+template <typename I>
+void MirrorStatusWatcher<I>::shut_down(Context *on_finish) {
   dout(20) << dendl;
 
   unregister_watch(on_finish);
 }
 
-void MirrorStatusWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
-                                        uint64_t notifier_id, bufferlist &bl) {
+template <typename I>
+void MirrorStatusWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+                                           uint64_t notifier_id,
+                                           bufferlist &bl) {
   dout(20) << dendl;
 
   bufferlist out;
@@ -61,3 +70,5 @@ void MirrorStatusWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
 
 } // namespace mirror
 } // namespace rbd
+
+template class rbd::mirror::MirrorStatusWatcher<librbd::ImageCtx>;
index f9d9227b3c1f36c27e997b38e7888714e1c580c0..155f8cc8d05ad16f27d902cb23567e411af416b5 100644 (file)
@@ -6,12 +6,24 @@
 
 #include "librbd/Watcher.h"
 
+namespace librbd { class ImageCtx; }
+
 namespace rbd {
 namespace mirror {
 
+template <typename ImageCtxT = librbd::ImageCtx>
 class MirrorStatusWatcher : protected librbd::Watcher {
 public:
+  static MirrorStatusWatcher *create(librados::IoCtx &io_ctx,
+                                     ContextWQ *work_queue) {
+    return new MirrorStatusWatcher(io_ctx, work_queue);
+  }
+  void destroy() {
+    delete this;
+  }
+
   MirrorStatusWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue);
+  ~MirrorStatusWatcher() override;
 
   void init(Context *on_finish);
   void shut_down(Context *on_finish);
index 0b9cf4780578fc35e7568febf304131d6ccb3923..01750aaa77746242cc6e1ec57a277ef74bd6c96f 100644 (file)
@@ -480,26 +480,37 @@ void Replayer::print_status(Formatter *f, stringstream *ss)
 {
   dout(20) << "enter" << dendl;
 
+  if (!f) {
+    return;
+  }
+
   Mutex::Locker l(m_lock);
 
-  if (f) {
-    f->open_object_section("replayer_status");
-    f->dump_string("pool", m_local_io_ctx.get_pool_name());
-    f->dump_stream("peer") << m_peer;
-    f->dump_bool("leader", m_leader_watcher->is_leader());
-    f->open_array_section("image_replayers");
-  };
+  f->open_object_section("replayer_status");
+  f->dump_string("pool", m_local_io_ctx.get_pool_name());
+  f->dump_stream("peer") << m_peer;
+
+  bool leader = m_leader_watcher->is_leader();
+  f->dump_bool("leader", leader);
+  if (leader) {
+    std::vector<std::string> instance_ids;
+    m_leader_watcher->list_instances(&instance_ids);
+    f->open_array_section("instances");
+    for (auto instance_id : instance_ids) {
+      f->dump_string("instance_id", instance_id);
+    }
+    f->close_section();
+  }
+  f->open_array_section("image_replayers");
 
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
     image_replayer->print_status(f, ss);
   }
 
-  if (f) {
-    f->close_section();
-    f->close_section();
-    f->flush(*ss);
-  }
+  f->close_section();
+  f->close_section();
+  f->flush(*ss);
 }
 
 void Replayer::start()