OPTION(rbd_mirror_delete_retry_interval, OPT_DOUBLE, 30) // interval to check and retry the failed requests in deleter
OPTION(rbd_mirror_image_directory_refresh_interval, OPT_INT, 30) // interval to refresh images in pool watcher
OPTION(rbd_mirror_image_state_check_interval, OPT_INT, 30) // interval to get images from pool watcher and set sources in replayer
+OPTION(rbd_mirror_leader_heartbeat_interval, OPT_INT, 5) // interval (in seconds) between mirror leader heartbeats
+OPTION(rbd_mirror_leader_max_missed_heartbeats, OPT_INT, 2) // number of missed heartbeats for non-lock owner to attempt to acquire lock
+OPTION(rbd_mirror_leader_max_acquire_attempts_before_break, OPT_INT, 3) // number of failed attempts to acquire lock after missing heartbeats before breaking lock
OPTION(nss_db_path, OPT_STR, "") // path to nss db
test_ImageReplayer.cc
test_ImageDeleter.cc
test_ImageSync.cc
+ test_LeaderWatcher.cc
test_fixture.cc
)
add_library(rbd_mirror STATIC ${rbd_mirror_test_srcs})
test_mock_ImageReplayer.cc
test_mock_ImageSync.cc
test_mock_ImageSyncThrottler.cc
+ test_mock_LeaderWatcher.cc
image_replayer/test_mock_BootstrapRequest.cc
image_replayer/test_mock_CreateImageRequest.cc
image_replayer/test_mock_EventPreprocessor.cc
rbd_mirror
rados_test_stub
rbd_mirror_internal
+ rbd_mirror_types
rbd_api
rbd_internal
rbd_test_mock
target_link_libraries(ceph_test_rbd_mirror
rbd_mirror
rbd_mirror_internal
+ rbd_mirror_types
rbd_api
rbd_internal
journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/rados/librados.hpp"
+#include "librbd/internal.h"
+#include "librbd/Utils.h"
+#include "test/rbd_mirror/test_fixture.h"
+#include "tools/rbd_mirror/LeaderWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+#include "test/librados/test.h"
+#include "gtest/gtest.h"
+
+using librbd::util::unique_lock_name;
+using rbd::mirror::LeaderWatcher;
+
+void register_test_leader_watcher() {
+}
+
+class TestLeaderWatcher : public ::rbd::mirror::TestFixture {
+public:
+ class Listener : public rbd::mirror::LeaderWatcher<>::Listener {
+ public:
+ Listener()
+ : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) {
+ }
+
+ void on_acquire(int r, Context *ctx) {
+ Mutex::Locker locker(m_test_lock);
+ m_on_acquire_r = r;
+ m_on_acquire = ctx;
+ }
+
+ void on_release(int r, Context *ctx) {
+ Mutex::Locker locker(m_test_lock);
+ m_on_release_r = r;
+ m_on_release = ctx;
+ }
+
+ int acquire_count() const {
+ Mutex::Locker locker(m_test_lock);
+ return m_acquire_count;
+ }
+
+ int release_count() const {
+ Mutex::Locker locker(m_test_lock);
+ return m_release_count;
+ }
+
+ virtual void post_acquire_handler(Context *on_finish) {
+ Mutex::Locker locker(m_test_lock);
+ m_acquire_count++;
+ on_finish->complete(m_on_acquire_r);
+ m_on_acquire_r = 0;
+ if (m_on_acquire != nullptr) {
+ m_on_acquire->complete(0);
+ m_on_acquire = nullptr;
+ }
+ }
+
+ virtual void pre_release_handler(Context *on_finish) {
+ Mutex::Locker locker(m_test_lock);
+ m_release_count++;
+ on_finish->complete(m_on_release_r);
+ m_on_release_r = 0;
+ if (m_on_release != nullptr) {
+ m_on_release->complete(0);
+ m_on_release = nullptr;
+ }
+ }
+
+ private:
+ mutable Mutex m_test_lock;
+ int m_acquire_count = 0;
+ int m_release_count = 0;
+ int m_on_acquire_r = 0;
+ int m_on_release_r = 0;
+ Context *m_on_acquire = nullptr;
+ Context *m_on_release = nullptr;
+ };
+
+ struct Connection {
+ librados::Rados cluster;
+ librados::IoCtx io_ctx;
+ };
+
+ std::list<std::unique_ptr<Connection> > m_connections;
+
+ virtual void SetUp() {
+ TestFixture::SetUp();
+ EXPECT_EQ(0, librbd::mirror_mode_set(m_local_io_ctx, RBD_MIRROR_MODE_POOL));
+
+ if (is_librados_test_stub()) {
+ // speed testing up a little
+ EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval",
+ "1"));
+ }
+ }
+
+ bool is_librados_test_stub() {
+ std::string fsid;
+ EXPECT_EQ(0, _rados->cluster_fsid(&fsid));
+ return fsid == "00000000-1111-2222-3333-444444444444";
+ }
+
+ librados::IoCtx &create_connection(bool no_heartbeats = false) {
+ m_connections.push_back(std::unique_ptr<Connection>(new Connection()));
+ Connection *c = m_connections.back().get();
+
+ EXPECT_EQ("", connect_cluster_pp(c->cluster));
+ if (no_heartbeats) {
+ EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
+ "3600"));
+ } else if (is_librados_test_stub()) {
+ EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
+ "1"));
+ }
+ EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx));
+
+ return c->io_ctx;
+ }
+};
+
+TEST_F(TestLeaderWatcher, InitShutdown)
+{
+ Listener listener;
+ LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+ C_SaferCond on_init_acquire;
+ listener.on_acquire(0, &on_init_acquire);
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_init_acquire.wait());
+ ASSERT_TRUE(leader_watcher.is_leader());
+
+ leader_watcher.shut_down();
+ ASSERT_EQ(1, listener.acquire_count());
+ ASSERT_EQ(1, listener.release_count());
+ ASSERT_FALSE(leader_watcher.is_leader());
+}
+
+TEST_F(TestLeaderWatcher, Release)
+{
+ Listener listener;
+ LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+ C_SaferCond on_init_acquire;
+ listener.on_acquire(0, &on_init_acquire);
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_init_acquire.wait());
+ ASSERT_TRUE(leader_watcher.is_leader());
+
+ C_SaferCond on_release;
+ C_SaferCond on_acquire;
+ listener.on_release(0, &on_release);
+ listener.on_acquire(0, &on_acquire);
+ leader_watcher.release_leader();
+ ASSERT_EQ(0, on_release.wait());
+ ASSERT_FALSE(leader_watcher.is_leader());
+
+ // wait for lock re-acquired due to no another locker
+ ASSERT_EQ(0, on_acquire.wait());
+ ASSERT_TRUE(leader_watcher.is_leader());
+
+ C_SaferCond on_release2;
+ listener.on_release(0, &on_release2);
+ leader_watcher.release_leader();
+ ASSERT_EQ(0, on_release2.wait());
+
+ leader_watcher.shut_down();
+ ASSERT_EQ(2, listener.acquire_count());
+ ASSERT_EQ(2, listener.release_count());
+}
+
+TEST_F(TestLeaderWatcher, ListenerError)
+{
+ Listener listener;
+ LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+ // make listener return error on acquire
+ C_SaferCond on_init_acquire, on_init_release;
+ listener.on_acquire(-EINVAL, &on_init_acquire);
+ listener.on_release(0, &on_init_release);
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_init_acquire.wait());
+ ASSERT_EQ(0, on_init_release.wait());
+ ASSERT_FALSE(leader_watcher.is_leader());
+
+ // wait for lock re-acquired due to no another locker
+ C_SaferCond on_acquire;
+ listener.on_acquire(0, &on_acquire);
+ ASSERT_EQ(0, on_acquire.wait());
+ ASSERT_TRUE(leader_watcher.is_leader());
+
+ // make listener return error on release
+ C_SaferCond on_release;
+ listener.on_release(-EINVAL, &on_release);
+ leader_watcher.release_leader();
+ ASSERT_EQ(0, on_release.wait());
+ ASSERT_FALSE(leader_watcher.is_leader());
+
+ leader_watcher.shut_down();
+ ASSERT_EQ(2, listener.acquire_count());
+ ASSERT_EQ(2, listener.release_count());
+ ASSERT_FALSE(leader_watcher.is_leader());
+}
+
+TEST_F(TestLeaderWatcher, Two)
+{
+ Listener listener1;
+ LeaderWatcher<> leader_watcher1(m_threads, m_local_io_ctx, &listener1);
+
+ C_SaferCond on_init_acquire;
+ listener1.on_acquire(0, &on_init_acquire);
+ ASSERT_EQ(0, leader_watcher1.init());
+ ASSERT_EQ(0, on_init_acquire.wait());
+
+ Listener listener2;
+ LeaderWatcher<> leader_watcher2(m_threads, m_local_io_ctx, &listener2);
+
+ ASSERT_EQ(0, leader_watcher2.init());
+ ASSERT_TRUE(leader_watcher1.is_leader());
+ ASSERT_FALSE(leader_watcher2.is_leader());
+
+ C_SaferCond on_release;
+ C_SaferCond on_acquire;
+ listener1.on_release(0, &on_release);
+ listener2.on_acquire(0, &on_acquire);
+ leader_watcher1.release_leader();
+ ASSERT_EQ(0, on_release.wait());
+ ASSERT_FALSE(leader_watcher1.is_leader());
+
+ // wait for lock acquired by another watcher
+ ASSERT_EQ(0, on_acquire.wait());
+ ASSERT_TRUE(leader_watcher2.is_leader());
+
+ leader_watcher1.shut_down();
+ leader_watcher2.shut_down();
+
+ ASSERT_EQ(1, listener1.acquire_count());
+ ASSERT_EQ(1, listener1.release_count());
+ ASSERT_EQ(1, listener2.acquire_count());
+ ASSERT_EQ(1, listener2.release_count());
+}
+
+TEST_F(TestLeaderWatcher, Break)
+{
+ if (is_librados_test_stub()) {
+ // break_lock (blacklist) does not work on librados test stub
+ std::cout << "SKIPPING" << std::endl;
+ return SUCCEED();
+ }
+
+ Listener listener1, listener2;
+ LeaderWatcher<> leader_watcher1(m_threads,
+ create_connection(true /* no heartbeats */),
+ &listener1);
+ LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
+
+ C_SaferCond on_init_acquire;
+ listener1.on_acquire(0, &on_init_acquire);
+ ASSERT_EQ(0, leader_watcher1.init());
+ ASSERT_EQ(0, on_init_acquire.wait());
+
+ C_SaferCond on_acquire;
+ listener2.on_acquire(0, &on_acquire);
+ ASSERT_EQ(0, leader_watcher2.init());
+ ASSERT_FALSE(leader_watcher2.is_leader());
+
+ // wait for lock broken due to no heartbeats and re-acquired
+ ASSERT_EQ(0, on_acquire.wait());
+ ASSERT_TRUE(leader_watcher2.is_leader());
+
+ leader_watcher1.shut_down();
+ leader_watcher2.shut_down();
+}
+
+TEST_F(TestLeaderWatcher, Stress)
+{
+ if (is_librados_test_stub()) {
+ // skipping due to possible break request sent
+ std::cout << "SKIPPING" << std::endl;
+ return SUCCEED();
+ }
+
+ const int WATCHERS_COUNT = 20;
+ std::list<LeaderWatcher<> *> leader_watchers;
+ Listener listener;
+
+ for (int i = 0; i < WATCHERS_COUNT; i++) {
+ auto leader_watcher =
+ new LeaderWatcher<>(m_threads, create_connection(), &listener);
+ leader_watchers.push_back(leader_watcher);
+ }
+
+ C_SaferCond on_init_acquire;
+ listener.on_acquire(0, &on_init_acquire);
+ for (auto &leader_watcher : leader_watchers) {
+ ASSERT_EQ(0, leader_watcher->init());
+ }
+ ASSERT_EQ(0, on_init_acquire.wait());
+
+ while (true) {
+ C_SaferCond on_acquire;
+ listener.on_acquire(0, &on_acquire);
+ std::unique_ptr<LeaderWatcher<> > leader_watcher;
+ for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) {
+ if ((*it)->is_leader()) {
+ ASSERT_FALSE(leader_watcher);
+ leader_watcher.reset(*it);
+ it = leader_watchers.erase(it);
+ } else {
+ it++;
+ }
+ }
+
+ ASSERT_TRUE(leader_watcher);
+ leader_watcher->shut_down();
+ if (leader_watchers.empty()) {
+ break;
+ }
+ ASSERT_EQ(0, on_acquire.wait());
+ }
+}
extern void register_test_rbd_mirror();
extern void register_test_rbd_mirror_image_deleter();
extern void register_test_image_sync();
+extern void register_test_leader_watcher();
int main(int argc, char **argv)
{
register_test_rbd_mirror();
register_test_rbd_mirror_image_deleter();
register_test_image_sync();
+ register_test_leader_watcher();
::testing::InitGoogleTest(&argc, argv);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/LeaderWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+ MockTestImageCtx(librbd::ImageCtx &image_ctx)
+ : librbd::MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+
+struct MockManagedLock {
+ static MockManagedLock *s_instance;
+ static MockManagedLock &get_instance() {
+ assert(s_instance != nullptr);
+ return *s_instance;
+ }
+
+ MockManagedLock() {
+ s_instance = this;
+ }
+
+ MOCK_CONST_METHOD0(is_lock_owner, bool());
+
+ MOCK_METHOD1(shut_down, void(Context *));
+ MOCK_METHOD1(try_acquire_lock, void(Context *));
+ MOCK_METHOD1(release_lock, void(Context *));
+ MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *));
+ MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *));
+
+ MOCK_METHOD0(set_state_post_acquiring, void());
+
+ MOCK_CONST_METHOD0(is_shutdown, bool());
+
+ MOCK_CONST_METHOD0(is_state_post_acquiring, bool());
+ MOCK_CONST_METHOD0(is_state_locked, bool());
+};
+
+MockManagedLock *MockManagedLock::s_instance = nullptr;
+
+template <>
+struct ManagedLock<MockTestImageCtx> {
+ ManagedLock(librados::IoCtx& ioctx, ContextWQ *work_queue,
+ const std::string& oid, librbd::Watcher *watcher,
+ managed_lock::Mode mode, bool blacklist_on_break_lock,
+ uint32_t blacklist_expire_seconds)
+ : m_lock("ManagedLock::m_lock") {
+ }
+
+ mutable Mutex m_lock;
+
+ bool is_lock_owner() const {
+ return MockManagedLock::get_instance().is_lock_owner();
+ }
+ void shut_down(Context *on_shutdown) {
+ MockManagedLock::get_instance().shut_down(on_shutdown);
+ }
+ void try_acquire_lock(Context *on_acquired) {
+ MockManagedLock::get_instance().try_acquire_lock(on_acquired);
+ }
+ void release_lock(Context *on_released) {
+ MockManagedLock::get_instance().release_lock(on_released);
+ }
+ void get_locker(managed_lock::Locker *locker, Context *on_finish) {
+ MockManagedLock::get_instance().get_locker(locker, on_finish);
+ }
+ void break_lock(const managed_lock::Locker &locker, bool force_break_lock,
+ Context *on_finish) {
+ MockManagedLock::get_instance().break_lock(locker, force_break_lock,
+ on_finish);
+ }
+ void set_state_post_acquiring() {
+ MockManagedLock::get_instance().set_state_post_acquiring();
+ }
+ bool is_shutdown() const {
+ return MockManagedLock::get_instance().is_shutdown();
+ }
+ bool is_state_post_acquiring() const {
+ return MockManagedLock::get_instance().is_state_post_acquiring();
+ }
+ bool is_state_locked() const {
+ return MockManagedLock::get_instance().is_state_locked();
+ }
+};
+
+} // namespace librbd
+
+// template definitions
+#include "tools/rbd_mirror/LeaderWatcher.cc"
+template class rbd::mirror::LeaderWatcher<librbd::MockTestImageCtx>;
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::AtLeast;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+
+using librbd::MockManagedLock;
+
+class TestMockLeaderWatcher : public TestMockFixture {
+public:
+ typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
+
+ class MockListener : public MockLeaderWatcher::Listener {
+ public:
+ MOCK_METHOD1(post_acquire_handler, void(Context *));
+ MOCK_METHOD1(pre_release_handler, void(Context *));
+ };
+
+ void expect_is_lock_owner(MockManagedLock &mock_managed_lock, bool owner) {
+ EXPECT_CALL(mock_managed_lock, is_lock_owner())
+ .WillOnce(Return(owner));
+ }
+
+ void expect_shut_down(MockManagedLock &mock_managed_lock, int r) {
+ EXPECT_CALL(mock_managed_lock, shut_down(_))
+ .WillOnce(CompleteContext(r));
+ }
+
+ void expect_try_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
+ EXPECT_CALL(mock_managed_lock, try_acquire_lock(_))
+ .WillOnce(CompleteContext(r));
+ }
+
+ void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
+ EXPECT_CALL(mock_managed_lock, release_lock(_))
+ .WillOnce(CompleteContext(r));
+ }
+
+ void expect_get_locker(MockManagedLock &mock_managed_lock,
+ const librbd::managed_lock::Locker &locker, int r,
+ Context *on_finish = nullptr) {
+ EXPECT_CALL(mock_managed_lock, get_locker(_, _))
+ .WillOnce(Invoke([on_finish, r, locker](librbd::managed_lock::Locker *out,
+ Context *ctx) {
+ if (r == 0) {
+ *out = locker;
+ }
+ ctx->complete(r);
+ if (on_finish != nullptr) {
+ on_finish->complete(0);
+ }
+ }));
+ }
+
+ void expect_break_lock(MockManagedLock &mock_managed_lock,
+ const librbd::managed_lock::Locker &locker, int r,
+ Context *on_finish) {
+ EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _))
+ .WillOnce(Invoke([on_finish, r](const librbd::managed_lock::Locker &,
+ bool, Context *ctx) {
+ ctx->complete(r);
+ on_finish->complete(0);
+ }));
+ }
+
+ void expect_set_state_post_acquiring(MockManagedLock &mock_managed_lock) {
+ EXPECT_CALL(mock_managed_lock, set_state_post_acquiring());
+ }
+
+ void expect_is_shutdown(MockManagedLock &mock_managed_lock) {
+ EXPECT_CALL(mock_managed_lock, is_shutdown())
+ .Times(AtLeast(0)).WillRepeatedly(Return(false));
+ }
+
+ void expect_is_leader(MockManagedLock &mock_managed_lock, bool post_acquiring,
+ bool locked) {
+ EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+ .WillOnce(Return(post_acquiring));
+ if (!post_acquiring) {
+ EXPECT_CALL(mock_managed_lock, is_state_locked())
+ .WillOnce(Return(locked));
+ }
+ }
+
+ void expect_is_leader(MockManagedLock &mock_managed_lock) {
+ EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+ .Times(AtLeast(0)).WillRepeatedly(Return(false));
+ EXPECT_CALL(mock_managed_lock, is_state_locked())
+ .Times(AtLeast(0)).WillRepeatedly(Return(false));
+ }
+
+ void expect_notify_heartbeat(MockManagedLock &mock_managed_lock,
+ Context *on_finish) {
+ EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_managed_lock, is_state_locked())
+ .WillOnce(Return(true));
+ EXPECT_CALL(mock_managed_lock, is_state_post_acquiring())
+ .WillOnce(Return(false));
+ EXPECT_CALL(mock_managed_lock, is_state_locked())
+ .WillOnce(DoAll(Invoke([on_finish]() {
+ on_finish->complete(0);
+ }),
+ Return(true)));
+ }
+};
+
+TEST_F(TestMockLeaderWatcher, InitShutdown) {
+ MockManagedLock mock_managed_lock;
+ MockListener listener;
+ MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+ expect_is_shutdown(mock_managed_lock);
+
+ InSequence seq;
+ C_SaferCond on_heartbeat_finish;
+ expect_try_acquire_lock(mock_managed_lock, 0);
+ expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+ expect_shut_down(mock_managed_lock, 0);
+
+ leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) {
+ MockManagedLock mock_managed_lock;
+ MockListener listener;
+ MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+ expect_is_shutdown(mock_managed_lock);
+
+ InSequence seq;
+ C_SaferCond on_heartbeat_finish;
+ expect_try_acquire_lock(mock_managed_lock, 0);
+ expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+ expect_is_leader(mock_managed_lock, false, true);
+ expect_release_lock(mock_managed_lock, 0);
+
+ leader_watcher.release_leader();
+
+ expect_shut_down(mock_managed_lock, 0);
+
+ leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, AcquireError) {
+ MockManagedLock mock_managed_lock;
+ MockListener listener;
+ MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+ expect_is_shutdown(mock_managed_lock);
+ expect_is_leader(mock_managed_lock);
+
+ InSequence seq;
+ C_SaferCond on_get_locker_finish;
+ expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
+ expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), 0,
+ &on_get_locker_finish);
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_get_locker_finish.wait());
+
+ expect_shut_down(mock_managed_lock, 0);
+
+ leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, ReleaseError) {
+ MockManagedLock mock_managed_lock;
+ MockListener listener;
+ MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+
+ expect_is_shutdown(mock_managed_lock);
+
+ InSequence seq;
+ C_SaferCond on_heartbeat_finish;
+ expect_try_acquire_lock(mock_managed_lock, 0);
+ expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+ expect_is_leader(mock_managed_lock, false, true);
+ expect_release_lock(mock_managed_lock, -EINVAL);
+
+ leader_watcher.release_leader();
+
+ expect_shut_down(mock_managed_lock, 0);
+
+ leader_watcher.shut_down();
+}
+
+TEST_F(TestMockLeaderWatcher, Break) {
+ EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1"));
+ EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats",
+ "1"));
+ CephContext *cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+ int max_acquire_attempts =
+ cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break;
+
+ MockManagedLock mock_managed_lock;
+ MockListener listener;
+ MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
+ librbd::managed_lock::Locker
+ locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
+
+ expect_is_shutdown(mock_managed_lock);
+ expect_is_leader(mock_managed_lock);
+
+ InSequence seq;
+ for (int i = 0; i <= max_acquire_attempts; i++) {
+ expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
+ if (i < max_acquire_attempts) {
+ expect_get_locker(mock_managed_lock, locker, 0);
+ }
+ }
+ C_SaferCond on_break;
+ expect_break_lock(mock_managed_lock, locker, 0, &on_break);
+ C_SaferCond on_heartbeat_finish;
+ expect_try_acquire_lock(mock_managed_lock, 0);
+ expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
+
+ ASSERT_EQ(0, leader_watcher.init());
+ ASSERT_EQ(0, on_heartbeat_finish.wait());
+
+ expect_shut_down(mock_managed_lock, 0);
+
+ leader_watcher.shut_down();
+}
+
+} // namespace mirror
+} // namespace rbd
ImageDeleter.cc
ImageSync.cc
ImageSyncThrottler.cc
+ LeaderWatcher.cc
Mirror.cc
MirrorStatusWatcher.cc
PoolWatcher.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "LeaderWatcher.h"
+#include "common/Timer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/Utils.h"
+#include "librbd/watcher/Types.h"
+#include "Threads.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
+ << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+using namespace leader_watcher;
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+using librbd::util::create_rados_ack_callback;
+
+template <typename I>
+LeaderWatcher<I>::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx,
+ Listener *listener)
+ : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
+ m_threads(threads), m_listener(listener),
+ m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
+ m_notifier_id(librados::Rados(io_ctx).get_instance_id()) {
+}
+
+template <typename I>
+int LeaderWatcher<I>::init() {
+ C_SaferCond init_ctx;
+ init(&init_ctx);
+ return init_ctx.wait();
+}
+
+template <typename I>
+void LeaderWatcher<I>::init(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(!m_leader_lock);
+ m_leader_lock.reset(
+ new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
+ m_cct->_conf->rbd_blacklist_expire_seconds));
+
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+
+ create_leader_object();
+}
+
+template <typename I>
+void LeaderWatcher<I>::create_leader_object() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ librados::ObjectWriteOperation op;
+ op.create(false);
+
+ librados::AioCompletion *aio_comp = create_rados_ack_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
+ int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_create_leader_object(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (r == 0) {
+ register_watch();
+ return;
+ }
+
+ derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
+ << dendl;
+
+ std::swap(on_finish, m_on_finish);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::register_watch() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
+
+ librbd::Watcher::register_watch(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_register_watch(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error registering leader watcher for " << m_oid << " object: "
+ << cpp_strerror(r) << dendl;
+ } else {
+ acquire_leader_lock(true);
+ }
+
+ std::swap(on_finish, m_on_finish);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down() {
+ C_SaferCond shut_down_ctx;
+ shut_down(&shut_down_ctx);
+ int r = shut_down_ctx.wait();
+ assert(r == 0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+
+ assert(m_leader_lock);
+
+ if (false && is_leader(m_lock)) {
+ Context *ctx = create_async_context_callback(
+ m_work_queue, new FunctionContext(
+ [this, on_finish](int r) {
+ if (r < 0) {
+ derr << "error releasing leader lock: " << cpp_strerror(r) << dendl;
+ }
+ shut_down(on_finish);
+ }));
+
+ m_leader_lock->release_lock(ctx);
+ return;
+ }
+
+ assert(m_on_shut_down_finish == nullptr);
+ m_on_shut_down_finish = on_finish;
+ cancel_timer_task();
+ shut_down_leader_lock();
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down_leader_lock() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
+
+ m_leader_lock->shut_down(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
+ }
+
+ unregister_watch();
+}
+
+template <typename I>
+void LeaderWatcher<I>::unregister_watch() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
+
+ librbd::Watcher::unregister_watch(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_unregister_watch(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error unregistering leader watcher for " << m_oid << " object: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ assert(m_on_shut_down_finish != nullptr);
+ std::swap(on_finish, m_on_shut_down_finish);
+ }
+ on_finish->complete(0);
+}
+
+template <typename I>
+bool LeaderWatcher<I>::is_leader() {
+ Mutex::Locker locker(m_lock);
+
+ return is_leader(m_lock);
+}
+
+template <typename I>
+bool LeaderWatcher<I>::is_leader(Mutex &lock) {
+ assert(m_lock.is_locked());
+
+ bool leader = m_leader_lock && m_leader_lock->is_leader();
+ dout(20) << leader << dendl;
+ return leader;
+}
+
+template <typename I>
+void LeaderWatcher<I>::release_leader() {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (!is_leader(m_lock)) {
+ return;
+ }
+
+ release_leader_lock();
+}
+
+template <typename I>
+void LeaderWatcher<I>::cancel_timer_task() {
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_lock.is_locked());
+
+ if (m_timer_task == nullptr) {
+ return;
+ }
+
+ dout(20) << m_timer_task << dendl;
+ bool canceled = m_threads->timer->cancel_event(m_timer_task);
+ assert(canceled);
+ m_timer_task = nullptr;
+}
+
+template <typename I>
+void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
+ int delay_factor, bool leader,
+ void (LeaderWatcher<I>::*cb)()) {
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_lock.is_locked());
+
+ cancel_timer_task();
+
+ m_timer_task = new FunctionContext(
+ [this, cb, leader](int r) {
+ assert(m_threads->timer_lock.is_locked());
+ m_timer_task = nullptr;
+ Mutex::Locker locker(m_lock);
+ if (is_leader(m_lock) != leader) {
+ return;
+ }
+ (this->*cb)();
+ });
+
+ int after = delay_factor *
+ max(1, m_cct->_conf->rbd_mirror_leader_heartbeat_interval);
+
+ dout(20) << "scheduling " << name << " after " << after << " sec (task "
+ << m_timer_task << ")" << dendl;
+ m_threads->timer->add_event_after(after, m_timer_task);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
+ Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ dout(20) << "already locked" << dendl;
+ } else {
+ derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
+ }
+ on_finish->complete(r);
+ return;
+ }
+
+ Mutex::Locker locker(m_lock);
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+ m_notify_error = 0;
+
+ init_status_watcher();
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+ m_notify_error = 0;
+
+ notify_listener();
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
+ Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ on_finish->complete(r);
+ return;
+ }
+
+ Mutex::Locker locker(m_lock);
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+
+ notify_lock_released();
+}
+
+template <typename I>
+void LeaderWatcher<I>::break_leader_lock() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ if (m_locker.cookie.empty()) {
+ acquire_leader_lock(true);
+ return;
+ }
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
+
+ m_leader_lock->break_lock(m_locker, true, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_break_leader_lock(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+
+ if (m_leader_lock->is_shutdown()) {
+ dout(20) << "canceling due to shutdown" << dendl;
+ return;
+ }
+
+ if (r < 0 && r != -ENOENT) {
+ derr << "error beaking leader lock: " << cpp_strerror(r) << dendl;
+
+ schedule_timer_task("get locker", 1, false, &LeaderWatcher<I>::get_locker);
+ return;
+ }
+
+ acquire_leader_lock(true);
+}
+
+template <typename I>
+void LeaderWatcher<I>::get_locker() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ C_GetLocker *get_locker_ctx = new C_GetLocker(this);
+ Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
+
+ m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_get_locker(int r,
+ librbd::managed_lock::Locker& locker) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker mutex_locker(m_lock);
+
+ if (m_leader_lock->is_shutdown()) {
+ dout(20) << "canceling due to shutdown" << dendl;
+ return;
+ }
+
+ if (is_leader(m_lock)) {
+ m_locker = {};
+ } else {
+ if (r == -ENOENT) {
+ acquire_leader_lock(true);
+ } else {
+ if (r < 0) {
+ derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
+ } else {
+ m_locker = locker;
+ }
+
+ schedule_timer_task("acquire leader lock",
+ m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats,
+ false, &LeaderWatcher<I>::acquire_leader_lock);
+ }
+ }
+}
+
+template <typename I>
+void LeaderWatcher<I>::acquire_leader_lock() {
+ return acquire_leader_lock(false);
+}
+
+template <typename I>
+void LeaderWatcher<I>::acquire_leader_lock(bool reset_attempt_counter) {
+ dout(20) << "reset_attempt_counter=" << reset_attempt_counter << dendl;
+
+ assert(m_lock.is_locked());
+
+ if (reset_attempt_counter) {
+ m_acquire_attempts = 0;
+ }
+
+ dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
+
+ m_leader_lock->try_acquire_lock(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (m_leader_lock->is_shutdown()) {
+ dout(20) << "canceling due to shutdown" << dendl;
+ return;
+ }
+
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ dout(20) << "already locked" << dendl;
+ } else {
+ derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
+ }
+ if (++m_acquire_attempts >
+ m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) {
+ dout(0) << "breaking leader lock after failed attemts to acquire"
+ << dendl;
+ break_leader_lock();
+ } else {
+ get_locker();
+ }
+ return;
+ }
+
+ m_acquire_attempts = 0;
+
+ if (m_notify_error) {
+ dout(5) << "releasing due to error on notify" << dendl;
+ release_leader_lock();
+ return;
+ }
+
+ notify_heartbeat();
+}
+
+template <typename I>
+void LeaderWatcher<I>::release_leader_lock() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
+
+ m_leader_lock->release_lock(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_release_leader_lock(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error releasing lock: " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ schedule_timer_task("get locker", 1, false, &LeaderWatcher<I>::get_locker);
+}
+
+template <typename I>
+void LeaderWatcher<I>::init_status_watcher() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+ assert(!m_status_watcher);
+
+ m_status_watcher.reset(new MirrorStatusWatcher(m_ioctx, m_work_queue));
+
+ Context *ctx = create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
+
+ m_status_watcher->init(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_init_status_watcher(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (r == 0) {
+ notify_listener();
+ return;
+ }
+
+ derr << "error initializing mirror status watcher: " << cpp_strerror(r)
+ << dendl;
+ m_status_watcher.reset();
+ assert(m_on_finish != nullptr);
+ std::swap(m_on_finish, on_finish);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::shut_down_status_watcher() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+ assert(m_status_watcher);
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<LeaderWatcher<I>,
+ &LeaderWatcher<I>::handle_shut_down_status_watcher>(this));
+
+ m_status_watcher->shut_down(ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
+ << dendl;
+ if (!is_leader(m_lock)) {
+ // ignore on releasing
+ r = 0;
+ }
+ }
+
+ assert(m_status_watcher);
+ m_status_watcher.reset();
+
+ assert(m_on_finish != nullptr);
+ std::swap(m_on_finish, on_finish);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_listener() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
+
+ if (is_leader(m_lock)) {
+ ctx = new FunctionContext(
+ [this, ctx](int r) {
+ m_listener->post_acquire_handler(ctx);
+ });
+ } else {
+ ctx = new FunctionContext(
+ [this, ctx](int r) {
+ m_listener->pre_release_handler(ctx);
+ });
+ }
+ m_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_listener(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error notifying listener: " << cpp_strerror(r) << dendl;
+ m_notify_error = r;
+ }
+
+ if (is_leader(m_lock)) {
+ notify_lock_acquired();
+ } else {
+ shut_down_status_watcher();
+ }
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_lock_acquired() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
+
+ bufferlist bl;
+ ::encode(NotifyMessage{LockAcquiredPayload{}}, bl);
+
+ send_notify(bl, nullptr, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ if (r < 0 && r != -ETIMEDOUT) {
+ derr << "error notifying leader lock acquired: " << cpp_strerror(r)
+ << dendl;
+ m_notify_error = r;
+ }
+
+ assert(m_on_finish != nullptr);
+ std::swap(m_on_finish, on_finish);
+ }
+ on_finish->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_lock_released() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
+
+ bufferlist bl;
+ ::encode(NotifyMessage{LockReleasedPayload{}}, bl);
+
+ send_notify(bl, nullptr, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_lock_released(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ if (r < 0 && r != -ETIMEDOUT) {
+ derr << "error notifying leader lock released: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ assert(m_on_finish != nullptr);
+ std::swap(m_on_finish, on_finish);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void LeaderWatcher<I>::notify_heartbeat() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ if (!is_leader(m_lock)) {
+ dout(5) << "not leader, canceling" << dendl;
+ return;
+ }
+
+ Context *ctx = create_context_callback<
+ LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
+
+ bufferlist bl;
+ ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
+
+ send_notify(bl, nullptr, ctx);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+
+ if (!is_leader(m_lock)) {
+ return;
+ }
+
+ if (r < 0 && r != -ETIMEDOUT) {
+ derr << "error notifying hearbeat: " << cpp_strerror(r)
+ << ", releasing leader" << dendl;
+ release_leader_lock();
+ return;
+ }
+
+ schedule_timer_task("heartbeat", 1, true,
+ &LeaderWatcher<I>::notify_heartbeat);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
+ dout(20) << dendl;
+
+ {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (is_leader(m_lock)) {
+ dout(5) << "got another leader heartbeat, ignoring" << dendl;
+ } else {
+ m_acquire_attempts = 0;
+ cancel_timer_task();
+ get_locker();
+ }
+ }
+
+ on_notify_ack->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
+ dout(20) << dendl;
+
+ {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (is_leader(m_lock)) {
+ dout(5) << "got another leader lock_acquired, ignoring" << dendl;
+ } else {
+ cancel_timer_task();
+ m_acquire_attempts = 0;
+ get_locker();
+ }
+ }
+
+ on_notify_ack->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
+ dout(20) << dendl;
+
+ {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (is_leader(m_lock)) {
+ dout(5) << "got another leader lock_released, ignoring" << dendl;
+ } else {
+ cancel_timer_task();
+ acquire_leader_lock(true);
+ }
+ }
+
+ on_notify_ack->complete(0);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist &bl) {
+ dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
+ << "notifier_id=" << notifier_id << dendl;
+
+ Context *ctx = new librbd::watcher::C_NotifyAck(this, notify_id, handle);
+
+ if (notifier_id == m_notifier_id) {
+ dout(20) << "our own notification, ignoring" << dendl;
+ ctx->complete(0);
+ return;
+ }
+
+ NotifyMessage notify_message;
+ try {
+ bufferlist::iterator iter = bl.begin();
+ ::decode(notify_message, iter);
+ } catch (const buffer::error &err) {
+ derr << ": error decoding image notification: " << err.what() << dendl;
+ ctx->complete(0);
+ return;
+ }
+
+ apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
+ Context *on_notify_ack) {
+ dout(20) << "heartbeat" << dendl;
+
+ handle_heartbeat(on_notify_ack);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
+ Context *on_notify_ack) {
+ dout(20) << "lock_acquired" << dendl;
+
+ handle_lock_acquired(on_notify_ack);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
+ Context *on_notify_ack) {
+ dout(20) << "lock_released" << dendl;
+
+ handle_lock_released(on_notify_ack);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
+ Context *on_notify_ack) {
+ dout(20) << "unknown" << dendl;
+
+ on_notify_ack->complete(0);
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_LEADER_WATCHER_H
+#define CEPH_RBD_MIRROR_LEADER_WATCHER_H
+
+#include <list>
+#include <memory>
+#include <string>
+
+#include "librbd/ManagedLock.h"
+#include "librbd/managed_lock/Types.h"
+#include "librbd/Watcher.h"
+#include "MirrorStatusWatcher.h"
+#include "tools/rbd_mirror/leader_watcher/Types.h"
+
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+struct Threads;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class LeaderWatcher : protected librbd::Watcher {
+public:
+ struct Listener {
+ virtual ~Listener() {
+ }
+
+ virtual void post_acquire_handler(Context *on_finish) = 0;
+ virtual void pre_release_handler(Context *on_finish) = 0;
+ };
+
+ LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, Listener *listener);
+
+ int init();
+ void shut_down();
+
+ void init(Context *on_finish);
+ void shut_down(Context *on_finish);
+
+ bool is_leader();
+ void release_leader();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <uninitialized> <------------------------------ UNREGISTER_WATCH
+ * | (init) ^ ^
+ * v * |
+ * CREATE_OBJECT * * (error) SHUT_DOWN_LEADER_LOCK
+ * | * ^
+ * v * |
+ * REGISTER_WATCH * * | (shut_down)
+ * | |
+ * | (no leader heartbeat and acquire failed) |
+ * | BREAK_LOCK <-------------------------------------\ |
+ * | | (no leader heartbeat) | |
+ * | | /----------------------------------------\ | |
+ * | | | (lock_released received) | |
+ * | | | /-------------------------------------\ | |
+ * | | | | (lock_acquired or | | |
+ * | | | | heartbeat received) | | |
+ * | | | | (ENOENT) /-----------\ | | |
+ * | | | | * * * * * * * * * * | | | | |
+ * v v v v v (error) * v | | | |
+ * ACQUIRE_LEADER_LOCK * * * * *> GET_LOCKER ---> <secondary>
+ * | * ^
+ * ....|...................*............. .....|.....................
+ * . v * . . | post_release .
+ * .INIT_STATUS_WATCHER * * . .NOTIFY_LOCK_RELEASED .
+ * . | (error) . .....^.....................
+ * . v . |
+ * .NOTIFY_LISTENERS . RELEASE_LEADER_LOCK
+ * . | . ^
+ * . v . .....|.....................
+ * .NOTIFY_LOCK_ACQUIRED post_acquire . .SHUT_DOWN_STATUS_WATCHER .
+ * ....|................................. . ^ .
+ * v . | .
+ * <leader> -----------------------------------> .NOTIFY_LISTENERS .
+ * (shut_down, release_leader, . pre_release .
+ * notify error) ...........................
+ * @endverbatim
+ */
+
+ class LeaderLock : public librbd::ManagedLock<ImageCtxT> {
+ public:
+ typedef librbd::ManagedLock<ImageCtxT> Parent;
+
+ LeaderLock(librados::IoCtx& ioctx, ContextWQ *work_queue,
+ const std::string& oid, LeaderWatcher *watcher,
+ bool blacklist_on_break_lock,
+ uint32_t blacklist_expire_seconds)
+ : Parent(ioctx, work_queue, oid, watcher, librbd::managed_lock::EXCLUSIVE,
+ blacklist_on_break_lock, blacklist_expire_seconds),
+ watcher(watcher) {
+ }
+
+ bool is_leader() const {
+ Mutex::Locker loker(Parent::m_lock);
+ return Parent::is_state_post_acquiring() || Parent::is_state_locked();
+ }
+
+ protected:
+ virtual void post_acquire_lock_handler(int r, Context *on_finish) {
+ if (r == 0) {
+ // lock is owned at this point
+ Mutex::Locker locker(Parent::m_lock);
+ Parent::set_state_post_acquiring();
+ }
+ watcher->handle_post_acquire_leader_lock(r, on_finish);
+ }
+ virtual void pre_release_lock_handler(bool shutting_down,
+ Context *on_finish) {
+ watcher->handle_pre_release_leader_lock(on_finish);
+ }
+ virtual void post_release_lock_handler(bool shutting_down, int r,
+ Context *on_finish) {
+ watcher->handle_post_release_leader_lock(r, on_finish);
+ }
+ private:
+ LeaderWatcher *watcher;
+ };
+
+ struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ LeaderWatcher *leader_watcher;
+ Context *on_notify_ack;
+
+ HandlePayloadVisitor(LeaderWatcher *leader_watcher, Context *on_notify_ack)
+ : leader_watcher(leader_watcher), on_notify_ack(on_notify_ack) {
+ }
+
+ template <typename Payload>
+ inline void operator()(const Payload &payload) const {
+ leader_watcher->handle_payload(payload, on_notify_ack);
+ }
+ };
+
+ struct C_GetLocker : public Context {
+ LeaderWatcher *leader_watcher;
+ librbd::managed_lock::Locker locker;
+
+ C_GetLocker(LeaderWatcher *leader_watcher)
+ : leader_watcher(leader_watcher) {
+ }
+
+ virtual void finish(int r) {
+ leader_watcher->handle_get_locker(r, locker);
+ }
+ };
+
+ Threads *m_threads;
+ Listener *m_listener;
+
+ Mutex m_lock;
+ uint64_t m_notifier_id;
+ Context *m_on_finish = nullptr;
+ Context *m_on_shut_down_finish = nullptr;
+ int m_acquire_attempts = 0;
+ int m_notify_error = 0;
+ std::unique_ptr<LeaderLock> m_leader_lock;
+ std::unique_ptr<MirrorStatusWatcher> m_status_watcher;
+ librbd::managed_lock::Locker m_locker;
+ Context *m_timer_task = nullptr;
+
+ bool is_leader(Mutex &m_lock);
+
+ void cancel_timer_task();
+ void schedule_timer_task(const std::string &name,
+ int delay_factor, bool leader,
+ void (LeaderWatcher<ImageCtxT>::*callback)());
+
+ void create_leader_object();
+ void handle_create_leader_object(int r);
+
+ void register_watch();
+ void handle_register_watch(int r);
+
+ void shut_down_leader_lock();
+ void handle_shut_down_leader_lock(int r);
+
+ void unregister_watch();
+ void handle_unregister_watch(int r);
+
+ void break_leader_lock();
+ void handle_break_leader_lock(int r);
+
+ void get_locker();
+ void handle_get_locker(int r, librbd::managed_lock::Locker& locker);
+
+ void acquire_leader_lock(bool reset_attempt_counter);
+ void acquire_leader_lock();
+ void handle_acquire_leader_lock(int r);
+
+ void release_leader_lock();
+ void handle_release_leader_lock(int r);
+
+ void init_status_watcher();
+ void handle_init_status_watcher(int r);
+
+ void shut_down_status_watcher();
+ void handle_shut_down_status_watcher(int r);
+
+ void notify_listener();
+ void handle_notify_listener(int r);
+
+ void notify_lock_acquired();
+ void handle_notify_lock_acquired(int r);
+
+ void notify_lock_released();
+ void handle_notify_lock_released(int r);
+
+ void notify_heartbeat();
+ void handle_notify_heartbeat(int r);
+
+ void handle_post_acquire_leader_lock(int r, Context *on_finish);
+ void handle_pre_release_leader_lock(Context *on_finish);
+ void handle_post_release_leader_lock(int r, Context *on_finish);
+
+ void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist &bl);
+
+ void handle_heartbeat(Context *on_ack);
+ void handle_lock_acquired(Context *on_ack);
+ void handle_lock_released(Context *on_ack);
+
+ void handle_payload(const leader_watcher::HeartbeatPayload &payload,
+ Context *on_notify_ack);
+ void handle_payload(const leader_watcher::LockAcquiredPayload &payload,
+ Context *on_notify_ack);
+ void handle_payload(const leader_watcher::LockReleasedPayload &payload,
+ Context *on_notify_ack);
+ void handle_payload(const leader_watcher::UnknownPayload &payload,
+ Context *on_notify_ack);
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_LEADER_WATCHER_H