]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror HA: create pool locker / leader class
authorMykola Golub <mgolub@mirantis.com>
Mon, 23 Jan 2017 14:22:51 +0000 (15:22 +0100)
committerMykola Golub <mgolub@mirantis.com>
Wed, 1 Feb 2017 09:55:03 +0000 (10:55 +0100)
Fixes: http://tracker.ceph.com/issues/17019
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/common/config_opts.h
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/test_LeaderWatcher.cc [new file with mode: 0644]
src/test/rbd_mirror/test_main.cc
src/test/rbd_mirror/test_mock_LeaderWatcher.cc [new file with mode: 0644]
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/LeaderWatcher.cc [new file with mode: 0644]
src/tools/rbd_mirror/LeaderWatcher.h [new file with mode: 0644]

index ff8125ef48040851c07e0b825daff2b9ce495914..3f15a3d91f5c6e918334d8ca9eef2c9acaee0a41 100644 (file)
@@ -1365,6 +1365,9 @@ OPTION(rbd_mirror_pool_replayers_refresh_interval, OPT_INT, 30) // interval to r
 OPTION(rbd_mirror_delete_retry_interval, OPT_DOUBLE, 30) // interval to check and retry the failed requests in deleter
 OPTION(rbd_mirror_image_directory_refresh_interval, OPT_INT, 30) // interval to refresh images in pool watcher
 OPTION(rbd_mirror_image_state_check_interval, OPT_INT, 30) // interval to get images from pool watcher and set sources in replayer
+OPTION(rbd_mirror_leader_heartbeat_interval, OPT_INT, 5) // interval (in seconds) between mirror leader heartbeats
+OPTION(rbd_mirror_leader_max_missed_heartbeats, OPT_INT, 2) // number of missed heartbeats for non-lock owner to attempt to acquire lock
+OPTION(rbd_mirror_leader_max_acquire_attempts_before_break, OPT_INT, 3) // number of failed attempts to acquire lock after missing heartbeats before breaking lock
 
 OPTION(nss_db_path, OPT_STR, "") // path to nss db
 
index 917eedc01e64834a0ba98a678c0af7e180ac68d2..259b2db6fa332b8b5b5251dd8652c3d30406db20 100644 (file)
@@ -4,6 +4,7 @@ set(rbd_mirror_test_srcs
   test_ImageReplayer.cc
   test_ImageDeleter.cc
   test_ImageSync.cc
+  test_LeaderWatcher.cc
   test_fixture.cc
   )
 add_library(rbd_mirror STATIC ${rbd_mirror_test_srcs})
@@ -16,6 +17,7 @@ add_executable(unittest_rbd_mirror
   test_mock_ImageReplayer.cc
   test_mock_ImageSync.cc
   test_mock_ImageSyncThrottler.cc
+  test_mock_LeaderWatcher.cc
   image_replayer/test_mock_BootstrapRequest.cc
   image_replayer/test_mock_CreateImageRequest.cc
   image_replayer/test_mock_EventPreprocessor.cc
@@ -37,6 +39,7 @@ target_link_libraries(unittest_rbd_mirror
   rbd_mirror
   rados_test_stub
   rbd_mirror_internal
+  rbd_mirror_types
   rbd_api
   rbd_internal
   rbd_test_mock
@@ -60,6 +63,7 @@ set_target_properties(ceph_test_rbd_mirror PROPERTIES COMPILE_FLAGS
 target_link_libraries(ceph_test_rbd_mirror
   rbd_mirror
   rbd_mirror_internal
+  rbd_mirror_types
   rbd_api
   rbd_internal
   journal
diff --git a/src/test/rbd_mirror/test_LeaderWatcher.cc b/src/test/rbd_mirror/test_LeaderWatcher.cc
new file mode 100644 (file)
index 0000000..6d0533f
--- /dev/null
@@ -0,0 +1,323 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/rados/librados.hpp"
+#include "librbd/internal.h"
+#include "librbd/Utils.h"
+#include "test/rbd_mirror/test_fixture.h"
+#include "tools/rbd_mirror/LeaderWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+#include "test/librados/test.h"
+#include "gtest/gtest.h"
+
+using librbd::util::unique_lock_name;
+using rbd::mirror::LeaderWatcher;
+
+void register_test_leader_watcher() {
+}
+
+class TestLeaderWatcher : public ::rbd::mirror::TestFixture {
+public:
+  class Listener : public rbd::mirror::LeaderWatcher<>::Listener {
+  public:
+    Listener()
+      : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) {
+    }
+
+    void on_acquire(int r, Context *ctx) {
+      Mutex::Locker locker(m_test_lock);
+      m_on_acquire_r = r;
+      m_on_acquire = ctx;
+    }
+
+    void on_release(int r, Context *ctx) {
+      Mutex::Locker locker(m_test_lock);
+      m_on_release_r = r;
+      m_on_release = ctx;
+    }
+
+    int acquire_count() const {
+      Mutex::Locker locker(m_test_lock);
+      return m_acquire_count;
+    }
+
+    int release_count() const {
+      Mutex::Locker locker(m_test_lock);
+      return m_release_count;
+    }
+
+    virtual void post_acquire_handler(Context *on_finish) {
+      Mutex::Locker locker(m_test_lock);
+      m_acquire_count++;
+      on_finish->complete(m_on_acquire_r);
+      m_on_acquire_r = 0;
+      if (m_on_acquire != nullptr) {
+        m_on_acquire->complete(0);
+        m_on_acquire = nullptr;
+      }
+    }
+
+    virtual void pre_release_handler(Context *on_finish) {
+      Mutex::Locker locker(m_test_lock);
+      m_release_count++;
+      on_finish->complete(m_on_release_r);
+      m_on_release_r = 0;
+      if (m_on_release != nullptr) {
+        m_on_release->complete(0);
+        m_on_release = nullptr;
+      }
+    }
+
+  private:
+    mutable Mutex m_test_lock;
+    int m_acquire_count = 0;
+    int m_release_count = 0;
+    int m_on_acquire_r = 0;
+    int m_on_release_r = 0;
+    Context *m_on_acquire = nullptr;
+    Context *m_on_release = nullptr;
+  };
+
+  struct Connection {
+    librados::Rados cluster;
+    librados::IoCtx io_ctx;
+  };
+
+  std::list<std::unique_ptr<Connection> > m_connections;
+
+  virtual void SetUp() {
+    TestFixture::SetUp();
+    EXPECT_EQ(0, librbd::mirror_mode_set(m_local_io_ctx, RBD_MIRROR_MODE_POOL));
+
+    if (is_librados_test_stub()) {
+      // speed testing up a little
+      EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval",
+                                    "1"));
+    }
+  }
+
+  bool is_librados_test_stub() {
+    std::string fsid;
+    EXPECT_EQ(0, _rados->cluster_fsid(&fsid));
+    return fsid == "00000000-1111-2222-3333-444444444444";
+  }
+
+  librados::IoCtx &create_connection(bool no_heartbeats = false) {
+    m_connections.push_back(std::unique_ptr<Connection>(new Connection()));
+    Connection *c = m_connections.back().get();
+
+    EXPECT_EQ("", connect_cluster_pp(c->cluster));
+    if (no_heartbeats) {
+      EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
+                                       "3600"));
+    } else if (is_librados_test_stub()) {
+      EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
+                                       "1"));
+    }
+    EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx));
+
+    return c->io_ctx;
+  }
+};
+
+TEST_F(TestLeaderWatcher, InitShutdown)
+{
+  Listener listener;
+  LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  C_SaferCond on_init_acquire;
+  listener.on_acquire(0, &on_init_acquire);
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_init_acquire.wait());
+  ASSERT_TRUE(leader_watcher.is_leader());
+
+  leader_watcher.shut_down();
+  ASSERT_EQ(1, listener.acquire_count());
+  ASSERT_EQ(1, listener.release_count());
+  ASSERT_FALSE(leader_watcher.is_leader());
+}
+
+TEST_F(TestLeaderWatcher, Release)
+{
+  Listener listener;
+  LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  C_SaferCond on_init_acquire;
+  listener.on_acquire(0, &on_init_acquire);
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_init_acquire.wait());
+  ASSERT_TRUE(leader_watcher.is_leader());
+
+  C_SaferCond on_release;
+  C_SaferCond on_acquire;
+  listener.on_release(0, &on_release);
+  listener.on_acquire(0, &on_acquire);
+  leader_watcher.release_leader();
+  ASSERT_EQ(0, on_release.wait());
+  ASSERT_FALSE(leader_watcher.is_leader());
+
+  // wait for lock re-acquired due to no another locker
+  ASSERT_EQ(0, on_acquire.wait());
+  ASSERT_TRUE(leader_watcher.is_leader());
+
+  C_SaferCond on_release2;
+  listener.on_release(0, &on_release2);
+  leader_watcher.release_leader();
+  ASSERT_EQ(0, on_release2.wait());
+
+  leader_watcher.shut_down();
+  ASSERT_EQ(2, listener.acquire_count());
+  ASSERT_EQ(2, listener.release_count());
+}
+
+TEST_F(TestLeaderWatcher, ListenerError)
+{
+  Listener listener;
+  LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  // make listener return error on acquire
+  C_SaferCond on_init_acquire, on_init_release;
+  listener.on_acquire(-EINVAL, &on_init_acquire);
+  listener.on_release(0, &on_init_release);
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_init_acquire.wait());
+  ASSERT_EQ(0, on_init_release.wait());
+  ASSERT_FALSE(leader_watcher.is_leader());
+
+  // wait for lock re-acquired due to no another locker
+  C_SaferCond on_acquire;
+  listener.on_acquire(0, &on_acquire);
+  ASSERT_EQ(0, on_acquire.wait());
+  ASSERT_TRUE(leader_watcher.is_leader());
+
+  // make listener return error on release
+  C_SaferCond on_release;
+  listener.on_release(-EINVAL, &on_release);
+  leader_watcher.release_leader();
+  ASSERT_EQ(0, on_release.wait());
+  ASSERT_FALSE(leader_watcher.is_leader());
+
+  leader_watcher.shut_down();
+  ASSERT_EQ(2, listener.acquire_count());
+  ASSERT_EQ(2, listener.release_count());
+  ASSERT_FALSE(leader_watcher.is_leader());
+}
+
+TEST_F(TestLeaderWatcher, Two)
+{
+  Listener listener1;
+  LeaderWatcher<> leader_watcher1(m_threads, m_local_io_ctx, &listener1);
+
+  C_SaferCond on_init_acquire;
+  listener1.on_acquire(0, &on_init_acquire);
+  ASSERT_EQ(0, leader_watcher1.init());
+  ASSERT_EQ(0, on_init_acquire.wait());
+
+  Listener listener2;
+  LeaderWatcher<> leader_watcher2(m_threads, m_local_io_ctx, &listener2);
+
+  ASSERT_EQ(0, leader_watcher2.init());
+  ASSERT_TRUE(leader_watcher1.is_leader());
+  ASSERT_FALSE(leader_watcher2.is_leader());
+
+  C_SaferCond on_release;
+  C_SaferCond on_acquire;
+  listener1.on_release(0, &on_release);
+  listener2.on_acquire(0, &on_acquire);
+  leader_watcher1.release_leader();
+  ASSERT_EQ(0, on_release.wait());
+  ASSERT_FALSE(leader_watcher1.is_leader());
+
+  // wait for lock acquired by another watcher
+  ASSERT_EQ(0, on_acquire.wait());
+  ASSERT_TRUE(leader_watcher2.is_leader());
+
+  leader_watcher1.shut_down();
+  leader_watcher2.shut_down();
+
+  ASSERT_EQ(1, listener1.acquire_count());
+  ASSERT_EQ(1, listener1.release_count());
+  ASSERT_EQ(1, listener2.acquire_count());
+  ASSERT_EQ(1, listener2.release_count());
+}
+
+TEST_F(TestLeaderWatcher, Break)
+{
+  if (is_librados_test_stub()) {
+    // break_lock (blacklist) does not work on librados test stub
+    std::cout << "SKIPPING" << std::endl;
+    return SUCCEED();
+  }
+
+  Listener listener1, listener2;
+  LeaderWatcher<> leader_watcher1(m_threads,
+                                  create_connection(true /* no heartbeats */),
+                                  &listener1);
+  LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
+
+  C_SaferCond on_init_acquire;
+  listener1.on_acquire(0, &on_init_acquire);
+  ASSERT_EQ(0, leader_watcher1.init());
+  ASSERT_EQ(0, on_init_acquire.wait());
+
+  C_SaferCond on_acquire;
+  listener2.on_acquire(0, &on_acquire);
+  ASSERT_EQ(0, leader_watcher2.init());
+  ASSERT_FALSE(leader_watcher2.is_leader());
+
+  // wait for lock broken due to no heartbeats and re-acquired
+  ASSERT_EQ(0, on_acquire.wait());
+  ASSERT_TRUE(leader_watcher2.is_leader());
+
+  leader_watcher1.shut_down();
+  leader_watcher2.shut_down();
+}
+
+TEST_F(TestLeaderWatcher, Stress)
+{
+  if (is_librados_test_stub()) {
+    // skipping due to possible break request sent
+    std::cout << "SKIPPING" << std::endl;
+    return SUCCEED();
+  }
+
+  const int WATCHERS_COUNT = 20;
+  std::list<LeaderWatcher<> *> leader_watchers;
+  Listener listener;
+
+  for (int i = 0; i < WATCHERS_COUNT; i++) {
+    auto leader_watcher =
+      new LeaderWatcher<>(m_threads, create_connection(), &listener);
+    leader_watchers.push_back(leader_watcher);
+  }
+
+  C_SaferCond on_init_acquire;
+  listener.on_acquire(0, &on_init_acquire);
+  for (auto &leader_watcher : leader_watchers) {
+    ASSERT_EQ(0, leader_watcher->init());
+  }
+  ASSERT_EQ(0, on_init_acquire.wait());
+
+  while (true) {
+    C_SaferCond on_acquire;
+    listener.on_acquire(0, &on_acquire);
+    std::unique_ptr<LeaderWatcher<> > leader_watcher;
+    for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) {
+      if ((*it)->is_leader()) {
+        ASSERT_FALSE(leader_watcher);
+        leader_watcher.reset(*it);
+        it = leader_watchers.erase(it);
+      } else {
+        it++;
+      }
+    }
+
+    ASSERT_TRUE(leader_watcher);
+    leader_watcher->shut_down();
+    if (leader_watchers.empty()) {
+      break;
+    }
+    ASSERT_EQ(0, on_acquire.wait());
+  }
+}
index 0234805b117dbdc950bbe33822be8fc848901439..d0d577e056e45ec226fb0633dcc80e8a5fd4cbe8 100644 (file)
@@ -13,6 +13,7 @@ extern void register_test_pool_watcher();
 extern void register_test_rbd_mirror();
 extern void register_test_rbd_mirror_image_deleter();
 extern void register_test_image_sync();
+extern void register_test_leader_watcher();
 
 int main(int argc, char **argv)
 {
@@ -21,6 +22,7 @@ int main(int argc, char **argv)
   register_test_rbd_mirror();
   register_test_rbd_mirror_image_deleter();
   register_test_image_sync();
+  register_test_leader_watcher();
 
   ::testing::InitGoogleTest(&argc, argv);
 
diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc
new file mode 100644 (file)
index 0000000..8477fce
--- /dev/null
@@ -0,0 +1,342 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/LeaderWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+  MockTestImageCtx(librbd::ImageCtx &image_ctx)
+    : librbd::MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+
+struct MockManagedLock {
+  static MockManagedLock *s_instance;
+  static MockManagedLock &get_instance() {
+    assert(s_instance != nullptr);
+    return *s_instance;
+  }
+
+  MockManagedLock() {
+    s_instance = this;
+  }
+
+  MOCK_CONST_METHOD0(is_lock_owner, bool());
+
+  MOCK_METHOD1(shut_down, void(Context *));
+  MOCK_METHOD1(try_acquire_lock, void(Context *));
+  MOCK_METHOD1(release_lock, void(Context *));
+  MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *));
+  MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *));
+
+  MOCK_METHOD0(set_state_post_acquiring, void());
+
+  MOCK_CONST_METHOD0(is_shutdown, bool());
+
+  MOCK_CONST_METHOD0(is_state_post_acquiring, bool());
+  MOCK_CONST_METHOD0(is_state_locked, bool());
+};
+
+MockManagedLock *MockManagedLock::s_instance = nullptr;
+
+template <>
+struct ManagedLock<MockTestImageCtx> {
+  ManagedLock(librados::IoCtx& ioctx, ContextWQ *work_queue,
+              const std::string& oid, librbd::Watcher *watcher,
+              managed_lock::Mode  mode, bool blacklist_on_break_lock,
+              uint32_t blacklist_expire_seconds)
+    : m_lock("ManagedLock::m_lock") {
+  }
+
+  mutable Mutex m_lock;
+
+  bool is_lock_owner() const {
+    return MockManagedLock::get_instance().is_lock_owner();
+  }
+  void shut_down(Context *on_shutdown) {
+    MockManagedLock::get_instance().shut_down(on_shutdown);
+  }
+  void try_acquire_lock(Context *on_acquired) {
+    MockManagedLock::get_instance().try_acquire_lock(on_acquired);
+  }
+  void release_lock(Context *on_released) {
+    MockManagedLock::get_instance().release_lock(on_released);
+  }
+  void get_locker(managed_lock::Locker *locker, Context *on_finish) {
+    MockManagedLock::get_instance().get_locker(locker, on_finish);
+  }
+  void break_lock(const managed_lock::Locker &locker, bool force_break_lock,
+                  Context *on_finish) {
+    MockManagedLock::get_instance().break_lock(locker, force_break_lock,
+                                               on_finish);
+  }
+  void set_state_post_acquiring() {
+    MockManagedLock::get_instance().set_state_post_acquiring();
+  }
+  bool is_shutdown() const {
+    return MockManagedLock::get_instance().is_shutdown();
+  }
+  bool is_state_post_acquiring() const {
+    return MockManagedLock::get_instance().is_state_post_acquiring();
+  }
+  bool is_state_locked() const {
+    return MockManagedLock::get_instance().is_state_locked();
+  }
+};
+
+} // namespace librbd
+
+// template definitions
+#include "tools/rbd_mirror/LeaderWatcher.cc"
+template class rbd::mirror::LeaderWatcher<librbd::MockTestImageCtx>;
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::AtLeast;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+
+using librbd::MockManagedLock;
+
+class TestMockLeaderWatcher : public TestMockFixture {
+public:
+  typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
+
+  class MockListener : public MockLeaderWatcher::Listener {
+  public:
+    MOCK_METHOD1(post_acquire_handler, void(Context *));
+    MOCK_METHOD1(pre_release_handler, void(Context *));
+  };
+
+  void expect_is_lock_owner(MockManagedLock &mock_managed_lock, bool owner) {
+    EXPECT_CALL(mock_managed_lock, is_lock_owner())
+      .WillOnce(Return(owner));
+  }
+
+  void expect_shut_down(MockManagedLock &mock_managed_lock, int r) {
+    EXPECT_CALL(mock_managed_lock, shut_down(_))
+      .WillOnce(CompleteContext(r));
+  }
+
+  void expect_try_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
+    EXPECT_CALL(mock_managed_lock, try_acquire_lock(_))
+      .WillOnce(CompleteContext(r));
+  }
+
+  void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
+    EXPECT_CALL(mock_managed_lock, release_lock(_))
+      .WillOnce(CompleteContext(r));
+  }
+
+  void expect_get_locker(MockManagedLock &mock_managed_lock,
+                         const librbd::managed_lock::Locker &locker, int r,
+                         Context *on_finish = nullptr) {
+    EXPECT_CALL(mock_managed_lock, get_locker(_, _))
+      .WillOnce(Invoke([on_finish, r, locker](librbd::managed_lock::Locker *out,
+                                              Context *ctx) {
+                         if (r == 0) {
+                           *out = locker;
+                         }
+                         ctx->complete(r);
+                         if (on_finish != nullptr) {
+                           on_finish->complete(0);
+                         }
+                       }));
+  }
+
+  void expect_break_lock(MockManagedLock &mock_managed_lock,
+                        const librbd::managed_lock::Locker &locker, int r,
+                        Context *on_finish) {
+    EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _))
+      .WillOnce(Invoke([on_finish, r](const librbd::managed_lock::Locker &,
+                                      bool, Context *ctx) {
+                         ctx->complete(r);
+                         on_finish->complete(0);
+                       }));
+  }
+
+  void expect_set_state_post_acquiring(MockManagedLock &mock_managed_lock) {
+    EXPECT_CALL(mock_managed_lock, set_state_post_acquiring());
+  }
+
+  void expect_is_shutdown(MockManagedLock &mock_managed_lock) {
+    EXPECT_CALL(mock_managed_lock, is_shutdown())
+      .Times(AtLeast(0)).WillRepeatedly(Return(false));
+  }
+
+  void expect_is_leader(MockManagedLock &mock_managed_lock, bool post_acquiring,
+                        bool locked) {
+    EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+      .WillOnce(Return(post_acquiring));
+    if (!post_acquiring) {
+      EXPECT_CALL(mock_managed_lock, is_state_locked())
+        .WillOnce(Return(locked));
+    }
+  }
+
+  void expect_is_leader(MockManagedLock &mock_managed_lock) {
+    EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+      .Times(AtLeast(0)).WillRepeatedly(Return(false));
+    EXPECT_CALL(mock_managed_lock, is_state_locked())
+      .Times(AtLeast(0)).WillRepeatedly(Return(false));
+  }
+
+  void expect_notify_heartbeat(MockManagedLock &mock_managed_lock,
+                               Context *on_finish) {
+    EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+      .WillOnce(Return(false));
+    EXPECT_CALL(mock_managed_lock, is_state_locked())
+      .WillOnce(Return(true));
+    EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+      .WillOnce(Return(false));
+    EXPECT_CALL(mock_managed_lock, is_state_locked())
+      .WillOnce(DoAll(Invoke([on_finish]() {
+                        on_finish->complete(0);
+                      }),
+                     Return(true)));
+  }
+};
+
+TEST_F(TestMockLeaderWatcher, InitShutdown) {
+  MockManagedLock mock_managed_lock;
+  MockListener listener;
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  expect_is_shutdown(mock_managed_lock);
+
+  InSequence seq;
+  C_SaferCond on_heartbeat_finish;
+  expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+  expect_shut_down(mock_managed_lock, 0);
+
+  leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) {
+  MockManagedLock mock_managed_lock;
+  MockListener listener;
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  expect_is_shutdown(mock_managed_lock);
+
+  InSequence seq;
+  C_SaferCond on_heartbeat_finish;
+  expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+  expect_is_leader(mock_managed_lock, false, true);
+  expect_release_lock(mock_managed_lock, 0);
+
+  leader_watcher.release_leader();
+
+  expect_shut_down(mock_managed_lock, 0);
+
+  leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, AcquireError) {
+  MockManagedLock mock_managed_lock;
+  MockListener listener;
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  expect_is_shutdown(mock_managed_lock);
+  expect_is_leader(mock_managed_lock);
+
+  InSequence seq;
+  C_SaferCond on_get_locker_finish;
+  expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
+  expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), 0,
+                    &on_get_locker_finish);
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_get_locker_finish.wait());
+
+  expect_shut_down(mock_managed_lock, 0);
+
+  leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, ReleaseError) {
+  MockManagedLock mock_managed_lock;
+  MockListener listener;
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+  expect_is_shutdown(mock_managed_lock);
+
+  InSequence seq;
+  C_SaferCond on_heartbeat_finish;
+  expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+  expect_is_leader(mock_managed_lock, false, true);
+  expect_release_lock(mock_managed_lock, -EINVAL);
+
+  leader_watcher.release_leader();
+
+  expect_shut_down(mock_managed_lock, 0);
+
+  leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, Break) {
+  EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1"));
+  EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats",
+                               "1"));
+  CephContext *cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+  int max_acquire_attempts =
+    cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break;
+
+  MockManagedLock mock_managed_lock;
+  MockListener listener;
+  MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+  librbd::managed_lock::Locker
+    locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
+
+  expect_is_shutdown(mock_managed_lock);
+  expect_is_leader(mock_managed_lock);
+
+  InSequence seq;
+  for (int i = 0; i <= max_acquire_attempts; i++) {
+    expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
+    if (i < max_acquire_attempts) {
+      expect_get_locker(mock_managed_lock, locker, 0);
+    }
+  }
+  C_SaferCond on_break;
+  expect_break_lock(mock_managed_lock, locker, 0, &on_break);
+  C_SaferCond on_heartbeat_finish;
+  expect_try_acquire_lock(mock_managed_lock, 0);
+  expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+  ASSERT_EQ(0, leader_watcher.init());
+  ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+  expect_shut_down(mock_managed_lock, 0);
+
+  leader_watcher.shut_down();
+}
+
+} // namespace mirror
+} // namespace rbd
index fe1ee4d9f444c44c1ddf443880ec4e8b014e25ed..faa10b3523bd7c49578a4c120cb423d3b7a21680 100644 (file)
@@ -7,6 +7,7 @@ set(rbd_mirror_internal
   ImageDeleter.cc
   ImageSync.cc
   ImageSyncThrottler.cc
+  LeaderWatcher.cc
   Mirror.cc
   MirrorStatusWatcher.cc
   PoolWatcher.cc
diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc
new file mode 100644 (file)
index 0000000..9ab4f04
--- /dev/null
@@ -0,0 +1,870 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "LeaderWatcher.h"
+#include "common/Timer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/Utils.h"
+#include "librbd/watcher/Types.h"
+#include "Threads.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
+                           << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+using namespace leader_watcher;
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+using librbd::util::create_rados_ack_callback;
+
+template <typename I>
+LeaderWatcher<I>::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx,
+                                Listener *listener)
+  : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
+    m_threads(threads), m_listener(listener),
+    m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
+    m_notifier_id(librados::Rados(io_ctx).get_instance_id()) {
+}
+
+template <typename I>
+int LeaderWatcher<I>::init() {
+  C_SaferCond init_ctx;
+  init(&init_ctx);
+  return init_ctx.wait();
+}
+
+template <typename I>
+void LeaderWatcher<I>::init(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(!m_leader_lock);
+  m_leader_lock.reset(
+    new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
+                   m_cct->_conf->rbd_blacklist_expire_seconds));
+
+  assert(m_on_finish == nullptr);
+  m_on_finish = on_finish;
+
+  create_leader_object();
+}
+
+template <typename I>
+void LeaderWatcher<I>::create_leader_object() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  librados::ObjectWriteOperation op;
+  op.create(false);
+
+  librados::AioCompletion *aio_comp = create_rados_ack_callback<
+    LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
+  int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+  assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_create_leader_object(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (r == 0) {
+      register_watch();
+      return;
+    }
+
+    derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
+         << dendl;
+
+    std::swap(on_finish, m_on_finish);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::register_watch() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
+
+  librbd::Watcher::register_watch(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_register_watch(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (r < 0) {
+      derr << "error registering leader watcher for " << m_oid << " object: "
+           << cpp_strerror(r) << dendl;
+    } else {
+      acquire_leader_lock(true);
+    }
+
+    std::swap(on_finish, m_on_finish);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down() {
+  C_SaferCond shut_down_ctx;
+  shut_down(&shut_down_ctx);
+  int r = shut_down_ctx.wait();
+  assert(r == 0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  Mutex::Locker locker(m_lock);
+
+  assert(m_leader_lock);
+
+  if (false && is_leader(m_lock)) {
+    Context *ctx = create_async_context_callback(
+      m_work_queue, new FunctionContext(
+        [this, on_finish](int r) {
+          if (r < 0) {
+            derr << "error releasing leader lock: " << cpp_strerror(r) << dendl;
+          }
+          shut_down(on_finish);
+        }));
+
+    m_leader_lock->release_lock(ctx);
+    return;
+  }
+
+  assert(m_on_shut_down_finish == nullptr);
+  m_on_shut_down_finish = on_finish;
+  cancel_timer_task();
+  shut_down_leader_lock();
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down_leader_lock() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
+
+  m_leader_lock->shut_down(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
+  }
+
+  unregister_watch();
+}
+
+template <typename I>
+void LeaderWatcher<I>::unregister_watch() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
+
+  librbd::Watcher::unregister_watch(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_unregister_watch(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (r < 0) {
+      derr << "error unregistering leader watcher for " << m_oid << " object: "
+           << cpp_strerror(r) << dendl;
+    }
+
+    assert(m_on_shut_down_finish != nullptr);
+    std::swap(on_finish, m_on_shut_down_finish);
+  }
+  on_finish->complete(0);
+}
+
+template <typename I>
+bool LeaderWatcher<I>::is_leader() {
+  Mutex::Locker locker(m_lock);
+
+  return is_leader(m_lock);
+}
+
+template <typename I>
+bool LeaderWatcher<I>::is_leader(Mutex &lock) {
+  assert(m_lock.is_locked());
+
+  bool leader = m_leader_lock && m_leader_lock->is_leader();
+  dout(20) << leader << dendl;
+  return leader;
+}
+
+template <typename I>
+void LeaderWatcher<I>::release_leader() {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (!is_leader(m_lock)) {
+    return;
+  }
+
+  release_leader_lock();
+}
+
+template <typename I>
+void LeaderWatcher<I>::cancel_timer_task() {
+  assert(m_threads->timer_lock.is_locked());
+  assert(m_lock.is_locked());
+
+  if (m_timer_task == nullptr) {
+    return;
+  }
+
+  dout(20) << m_timer_task << dendl;
+  bool canceled = m_threads->timer->cancel_event(m_timer_task);
+  assert(canceled);
+  m_timer_task = nullptr;
+}
+
+template <typename I>
+void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
+                                           int delay_factor, bool leader,
+                                           void (LeaderWatcher<I>::*cb)()) {
+  assert(m_threads->timer_lock.is_locked());
+  assert(m_lock.is_locked());
+
+  cancel_timer_task();
+
+  m_timer_task = new FunctionContext(
+    [this, cb, leader](int r) {
+      assert(m_threads->timer_lock.is_locked());
+      m_timer_task = nullptr;
+      Mutex::Locker locker(m_lock);
+      if (is_leader(m_lock) != leader) {
+        return;
+      }
+      (this->*cb)();
+    });
+
+  int after = delay_factor *
+    max(1, m_cct->_conf->rbd_mirror_leader_heartbeat_interval);
+
+  dout(20) << "scheduling " << name << " after " << after << " sec (task "
+           << m_timer_task << ")" << dendl;
+  m_threads->timer->add_event_after(after, m_timer_task);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
+                                                       Context *on_finish) {
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    if (r == -EAGAIN) {
+      dout(20) << "already locked" << dendl;
+    } else {
+      derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
+    }
+    on_finish->complete(r);
+    return;
+  }
+
+  Mutex::Locker locker(m_lock);
+  assert(m_on_finish == nullptr);
+  m_on_finish = on_finish;
+  m_notify_error = 0;
+
+  init_status_watcher();
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_on_finish == nullptr);
+  m_on_finish = on_finish;
+  m_notify_error = 0;
+
+  notify_listener();
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
+                                                       Context *on_finish) {
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    on_finish->complete(r);
+    return;
+  }
+
+  Mutex::Locker locker(m_lock);
+  assert(m_on_finish == nullptr);
+  m_on_finish = on_finish;
+
+  notify_lock_released();
+}
+
+template <typename I>
+void LeaderWatcher<I>::break_leader_lock() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  if (m_locker.cookie.empty()) {
+    acquire_leader_lock(true);
+    return;
+  }
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
+
+  m_leader_lock->break_lock(m_locker, true, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_break_leader_lock(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  Mutex::Locker locker(m_lock);
+
+  if (m_leader_lock->is_shutdown()) {
+    dout(20) << "canceling due to shutdown" << dendl;
+    return;
+  }
+
+  if (r < 0 && r != -ENOENT) {
+    derr << "error beaking leader lock: " << cpp_strerror(r)  << dendl;
+
+    schedule_timer_task("get locker", 1, false, &LeaderWatcher<I>::get_locker);
+    return;
+  }
+
+  acquire_leader_lock(true);
+}
+
+template <typename I>
+void LeaderWatcher<I>::get_locker() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  C_GetLocker *get_locker_ctx = new C_GetLocker(this);
+  Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
+
+  m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_get_locker(int r,
+                                         librbd::managed_lock::Locker& locker) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  Mutex::Locker mutex_locker(m_lock);
+
+  if (m_leader_lock->is_shutdown()) {
+    dout(20) << "canceling due to shutdown" << dendl;
+    return;
+  }
+
+  if (is_leader(m_lock)) {
+    m_locker = {};
+  } else {
+    if (r == -ENOENT) {
+      acquire_leader_lock(true);
+    } else {
+      if (r < 0) {
+        derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
+      } else {
+        m_locker = locker;
+      }
+
+      schedule_timer_task("acquire leader lock",
+                          m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats,
+                          false, &LeaderWatcher<I>::acquire_leader_lock);
+    }
+  }
+}
+
+template <typename I>
+void LeaderWatcher<I>::acquire_leader_lock() {
+    return acquire_leader_lock(false);
+}
+
+template <typename I>
+void LeaderWatcher<I>::acquire_leader_lock(bool reset_attempt_counter) {
+  dout(20) << "reset_attempt_counter=" << reset_attempt_counter << dendl;
+
+  assert(m_lock.is_locked());
+
+  if (reset_attempt_counter) {
+    m_acquire_attempts = 0;
+  }
+
+  dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
+
+  m_leader_lock->try_acquire_lock(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (m_leader_lock->is_shutdown()) {
+    dout(20) << "canceling due to shutdown" << dendl;
+    return;
+  }
+
+  if (r < 0) {
+    if (r == -EAGAIN) {
+      dout(20) << "already locked" << dendl;
+    } else {
+      derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
+    }
+    if (++m_acquire_attempts >
+        m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) {
+      dout(0) << "breaking leader lock after failed attemts to acquire"
+              << dendl;
+      break_leader_lock();
+    } else {
+      get_locker();
+    }
+    return;
+  }
+
+  m_acquire_attempts = 0;
+
+  if (m_notify_error) {
+    dout(5) << "releasing due to error on notify" << dendl;
+    release_leader_lock();
+    return;
+  }
+
+  notify_heartbeat();
+}
+
+template <typename I>
+void LeaderWatcher<I>::release_leader_lock() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
+
+  m_leader_lock->release_lock(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_release_leader_lock(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    derr << "error releasing lock: " << cpp_strerror(r) << dendl;
+    return;
+  }
+
+  schedule_timer_task("get locker", 1, false, &LeaderWatcher<I>::get_locker);
+}
+
+template <typename I>
+void LeaderWatcher<I>::init_status_watcher() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+  assert(!m_status_watcher);
+
+  m_status_watcher.reset(new MirrorStatusWatcher(m_ioctx, m_work_queue));
+
+  Context *ctx = create_context_callback<
+    LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
+
+  m_status_watcher->init(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_init_status_watcher(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (r == 0) {
+      notify_listener();
+      return;
+    }
+
+    derr << "error initializing mirror status watcher: " << cpp_strerror(r)
+        << dendl;
+    m_status_watcher.reset();
+    assert(m_on_finish != nullptr);
+    std::swap(m_on_finish, on_finish);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down_status_watcher() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+  assert(m_status_watcher);
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<LeaderWatcher<I>,
+      &LeaderWatcher<I>::handle_shut_down_status_watcher>(this));
+
+  m_status_watcher->shut_down(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (r < 0) {
+      derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
+          << dendl;
+      if (!is_leader(m_lock)) {
+       // ignore on releasing
+       r = 0;
+      }
+    }
+
+    assert(m_status_watcher);
+    m_status_watcher.reset();
+
+    assert(m_on_finish != nullptr);
+    std::swap(m_on_finish, on_finish);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_listener() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_work_queue, create_context_callback<
+      LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
+
+  if (is_leader(m_lock)) {
+    ctx = new FunctionContext(
+      [this, ctx](int r) {
+        m_listener->post_acquire_handler(ctx);
+      });
+  } else {
+    ctx = new FunctionContext(
+      [this, ctx](int r) {
+        m_listener->pre_release_handler(ctx);
+      });
+  }
+  m_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_listener(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  if (r < 0) {
+    derr << "error notifying listener: " << cpp_strerror(r) << dendl;
+    m_notify_error = r;
+  }
+
+  if (is_leader(m_lock)) {
+    notify_lock_acquired();
+  } else {
+    shut_down_status_watcher();
+  }
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_lock_acquired() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_context_callback<
+    LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
+
+  bufferlist bl;
+  ::encode(NotifyMessage{LockAcquiredPayload{}}, bl);
+
+  send_notify(bl, nullptr, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    if (r < 0 && r != -ETIMEDOUT) {
+      derr << "error notifying leader lock acquired: " << cpp_strerror(r)
+          << dendl;
+      m_notify_error = r;
+    }
+
+    assert(m_on_finish != nullptr);
+    std::swap(m_on_finish, on_finish);
+  }
+  on_finish->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_lock_released() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_context_callback<
+    LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
+
+  bufferlist bl;
+  ::encode(NotifyMessage{LockReleasedPayload{}}, bl);
+
+  send_notify(bl, nullptr, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_lock_released(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    if (r < 0 && r != -ETIMEDOUT) {
+      derr << "error notifying leader lock released: " << cpp_strerror(r)
+           << dendl;
+    }
+
+    assert(m_on_finish != nullptr);
+    std::swap(m_on_finish, on_finish);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_heartbeat() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  if (!is_leader(m_lock)) {
+    dout(5) << "not leader, canceling" << dendl;
+    return;
+  }
+
+  Context *ctx = create_context_callback<
+    LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
+
+  bufferlist bl;
+  ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
+
+  send_notify(bl, nullptr, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+  Mutex::Locker locker(m_lock);
+
+  if (!is_leader(m_lock)) {
+    return;
+  }
+
+  if (r < 0 && r != -ETIMEDOUT) {
+    derr << "error notifying hearbeat: " << cpp_strerror(r)
+         <<  ", releasing leader" << dendl;
+    release_leader_lock();
+    return;
+  }
+
+  schedule_timer_task("heartbeat", 1, true,
+                      &LeaderWatcher<I>::notify_heartbeat);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
+  dout(20) << dendl;
+
+  {
+    Mutex::Locker timer_locker(m_threads->timer_lock);
+    Mutex::Locker locker(m_lock);
+    if (is_leader(m_lock)) {
+      dout(5) << "got another leader heartbeat, ignoring" << dendl;
+    } else {
+      m_acquire_attempts = 0;
+      cancel_timer_task();
+      get_locker();
+    }
+  }
+
+  on_notify_ack->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
+  dout(20) << dendl;
+
+  {
+    Mutex::Locker timer_locker(m_threads->timer_lock);
+    Mutex::Locker locker(m_lock);
+    if (is_leader(m_lock)) {
+      dout(5) << "got another leader lock_acquired, ignoring" << dendl;
+    } else {
+      cancel_timer_task();
+      m_acquire_attempts = 0;
+      get_locker();
+    }
+  }
+
+  on_notify_ack->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
+  dout(20) << dendl;
+
+  {
+    Mutex::Locker timer_locker(m_threads->timer_lock);
+    Mutex::Locker locker(m_lock);
+    if (is_leader(m_lock)) {
+      dout(5) << "got another leader lock_released, ignoring" << dendl;
+    } else {
+      cancel_timer_task();
+      acquire_leader_lock(true);
+    }
+  }
+
+  on_notify_ack->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+                                     uint64_t notifier_id, bufferlist &bl) {
+  dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
+           << "notifier_id=" << notifier_id << dendl;
+
+  Context *ctx = new librbd::watcher::C_NotifyAck(this, notify_id, handle);
+
+  if (notifier_id == m_notifier_id) {
+    dout(20) << "our own notification, ignoring" << dendl;
+    ctx->complete(0);
+    return;
+  }
+
+  NotifyMessage notify_message;
+  try {
+    bufferlist::iterator iter = bl.begin();
+    ::decode(notify_message, iter);
+  } catch (const buffer::error &err) {
+    derr << ": error decoding image notification: " << err.what() << dendl;
+    ctx->complete(0);
+    return;
+  }
+
+  apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
+                                      Context *on_notify_ack) {
+  dout(20) << "heartbeat" << dendl;
+
+  handle_heartbeat(on_notify_ack);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
+                                      Context *on_notify_ack) {
+  dout(20) << "lock_acquired" << dendl;
+
+  handle_lock_acquired(on_notify_ack);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
+                                      Context *on_notify_ack) {
+  dout(20) << "lock_released" << dendl;
+
+  handle_lock_released(on_notify_ack);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
+                                      Context *on_notify_ack) {
+  dout(20) << "unknown" << dendl;
+
+  on_notify_ack->complete(0);
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h
new file mode 100644 (file)
index 0000000..8a2c6f7
--- /dev/null
@@ -0,0 +1,242 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_LEADER_WATCHER_H
+#define CEPH_RBD_MIRROR_LEADER_WATCHER_H
+
+#include <list>
+#include <memory>
+#include <string>
+
+#include "librbd/ManagedLock.h"
+#include "librbd/managed_lock/Types.h"
+#include "librbd/Watcher.h"
+#include "MirrorStatusWatcher.h"
+#include "tools/rbd_mirror/leader_watcher/Types.h"
+
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+struct Threads;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class LeaderWatcher : protected librbd::Watcher {
+public:
+  struct Listener {
+    virtual ~Listener() {
+    }
+
+    virtual void post_acquire_handler(Context *on_finish) = 0;
+    virtual void pre_release_handler(Context *on_finish) = 0;
+  };
+
+  LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, Listener *listener);
+
+  int init();
+  void shut_down();
+
+  void init(Context *on_finish);
+  void shut_down(Context *on_finish);
+
+  bool is_leader();
+  void release_leader();
+
+private:
+  /**
+   * @verbatim
+   *
+   *  <uninitialized> <------------------------------ UNREGISTER_WATCH
+   *     | (init)      ^                                      ^
+   *     v             *                                      |
+   *  CREATE_OBJECT  * *  (error)                     SHUT_DOWN_LEADER_LOCK
+   *     |             *                                      ^
+   *     v             *                                      |
+   *  REGISTER_WATCH * *                                      | (shut_down)
+   *     |                                                    |
+   *     |           (no leader heartbeat and acquire failed) |
+   *     | BREAK_LOCK <-------------------------------------\ |
+   *     |    |                 (no leader heartbeat)       | |
+   *     |    |  /----------------------------------------\ | |
+   *     |    |  |              (lock_released received)    | |
+   *     |    |  |  /-------------------------------------\ | |
+   *     |    |  |  |                   (lock_acquired or | | |
+   *     |    |  |  |                 heartbeat received) | | |
+   *     |    |  |  |       (ENOENT)        /-----------\ | | |
+   *     |    |  |  |  * * * * * * * * * *  |           | | | |
+   *     v    v  v  v  v  (error)        *  v           | | | |
+   *  ACQUIRE_LEADER_LOCK  * * * * *> GET_LOCKER ---> <secondary>
+   *     |                   *                           ^
+   * ....|...................*.............         .....|.....................
+   * .   v                   *            .         .    |       post_release .
+   * .INIT_STATUS_WATCHER  * *            .         .NOTIFY_LOCK_RELEASED     .
+   * .   |                 (error)        .         .....^.....................
+   * .   v                                .              |
+   * .NOTIFY_LISTENERS                    .          RELEASE_LEADER_LOCK
+   * .   |                                .              ^
+   * .   v                                .         .....|.....................
+   * .NOTIFY_LOCK_ACQUIRED   post_acquire .         .SHUT_DOWN_STATUS_WATCHER .
+   * ....|.................................         .    ^                    .
+   *     v                                          .    |                    .
+   *  <leader> -----------------------------------> .NOTIFY_LISTENERS         .
+   *            (shut_down, release_leader,         .             pre_release .
+   *             notify error)                      ...........................
+   * @endverbatim
+   */
+
+  class LeaderLock : public librbd::ManagedLock<ImageCtxT> {
+  public:
+    typedef librbd::ManagedLock<ImageCtxT> Parent;
+
+    LeaderLock(librados::IoCtx& ioctx, ContextWQ *work_queue,
+               const std::string& oid, LeaderWatcher *watcher,
+               bool blacklist_on_break_lock,
+               uint32_t blacklist_expire_seconds)
+      : Parent(ioctx, work_queue, oid, watcher, librbd::managed_lock::EXCLUSIVE,
+               blacklist_on_break_lock, blacklist_expire_seconds),
+        watcher(watcher) {
+    }
+
+    bool is_leader() const {
+      Mutex::Locker loker(Parent::m_lock);
+      return Parent::is_state_post_acquiring() || Parent::is_state_locked();
+    }
+
+  protected:
+    virtual void post_acquire_lock_handler(int r, Context *on_finish) {
+      if (r == 0) {
+        // lock is owned at this point
+        Mutex::Locker locker(Parent::m_lock);
+        Parent::set_state_post_acquiring();
+      }
+      watcher->handle_post_acquire_leader_lock(r, on_finish);
+    }
+    virtual void pre_release_lock_handler(bool shutting_down,
+                                          Context *on_finish) {
+      watcher->handle_pre_release_leader_lock(on_finish);
+    }
+    virtual void post_release_lock_handler(bool shutting_down, int r,
+                                           Context *on_finish) {
+      watcher->handle_post_release_leader_lock(r, on_finish);
+    }
+  private:
+    LeaderWatcher *watcher;
+  };
+
+  struct HandlePayloadVisitor : public boost::static_visitor<void> {
+    LeaderWatcher *leader_watcher;
+    Context *on_notify_ack;
+
+    HandlePayloadVisitor(LeaderWatcher *leader_watcher, Context *on_notify_ack)
+      : leader_watcher(leader_watcher), on_notify_ack(on_notify_ack) {
+    }
+
+    template <typename Payload>
+    inline void operator()(const Payload &payload) const {
+      leader_watcher->handle_payload(payload, on_notify_ack);
+    }
+  };
+
+  struct C_GetLocker : public Context {
+    LeaderWatcher *leader_watcher;
+    librbd::managed_lock::Locker locker;
+
+    C_GetLocker(LeaderWatcher *leader_watcher)
+      : leader_watcher(leader_watcher) {
+    }
+
+    virtual void finish(int r) {
+      leader_watcher->handle_get_locker(r, locker);
+    }
+  };
+
+  Threads *m_threads;
+  Listener *m_listener;
+
+  Mutex m_lock;
+  uint64_t m_notifier_id;
+  Context *m_on_finish = nullptr;
+  Context *m_on_shut_down_finish = nullptr;
+  int m_acquire_attempts = 0;
+  int m_notify_error = 0;
+  std::unique_ptr<LeaderLock> m_leader_lock;
+  std::unique_ptr<MirrorStatusWatcher> m_status_watcher;
+  librbd::managed_lock::Locker m_locker;
+  Context *m_timer_task = nullptr;
+
+  bool is_leader(Mutex &m_lock);
+
+  void cancel_timer_task();
+  void schedule_timer_task(const std::string &name,
+                           int delay_factor, bool leader,
+                           void (LeaderWatcher<ImageCtxT>::*callback)());
+
+  void create_leader_object();
+  void handle_create_leader_object(int r);
+
+  void register_watch();
+  void handle_register_watch(int r);
+
+  void shut_down_leader_lock();
+  void handle_shut_down_leader_lock(int r);
+
+  void unregister_watch();
+  void handle_unregister_watch(int r);
+
+  void break_leader_lock();
+  void handle_break_leader_lock(int r);
+
+  void get_locker();
+  void handle_get_locker(int r, librbd::managed_lock::Locker& locker);
+
+  void acquire_leader_lock(bool reset_attempt_counter);
+  void acquire_leader_lock();
+  void handle_acquire_leader_lock(int r);
+
+  void release_leader_lock();
+  void handle_release_leader_lock(int r);
+
+  void init_status_watcher();
+  void handle_init_status_watcher(int r);
+
+  void shut_down_status_watcher();
+  void handle_shut_down_status_watcher(int r);
+
+  void notify_listener();
+  void handle_notify_listener(int r);
+
+  void notify_lock_acquired();
+  void handle_notify_lock_acquired(int r);
+
+  void notify_lock_released();
+  void handle_notify_lock_released(int r);
+
+  void notify_heartbeat();
+  void handle_notify_heartbeat(int r);
+
+  void handle_post_acquire_leader_lock(int r, Context *on_finish);
+  void handle_pre_release_leader_lock(Context *on_finish);
+  void handle_post_release_leader_lock(int r, Context *on_finish);
+
+  void handle_notify(uint64_t notify_id, uint64_t handle,
+                     uint64_t notifier_id, bufferlist &bl);
+
+  void handle_heartbeat(Context *on_ack);
+  void handle_lock_acquired(Context *on_ack);
+  void handle_lock_released(Context *on_ack);
+
+  void handle_payload(const leader_watcher::HeartbeatPayload &payload,
+                      Context *on_notify_ack);
+  void handle_payload(const leader_watcher::LockAcquiredPayload &payload,
+                      Context *on_notify_ack);
+  void handle_payload(const leader_watcher::LockReleasedPayload &payload,
+                      Context *on_notify_ack);
+  void handle_payload(const leader_watcher::UnknownPayload &payload,
+                      Context *on_notify_ack);
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_LEADER_WATCHER_H