From: Mykola Golub Date: Mon, 23 Jan 2017 14:22:51 +0000 (+0100) Subject: rbd-mirror HA: create pool locker / leader class X-Git-Tag: v12.0.0~38^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=733a0049b0a0405aea8a5694401feee642625eea;p=ceph.git rbd-mirror HA: create pool locker / leader class Fixes: http://tracker.ceph.com/issues/17019 Signed-off-by: Mykola Golub --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index ff8125ef4804..3f15a3d91f5c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1365,6 +1365,9 @@ OPTION(rbd_mirror_pool_replayers_refresh_interval, OPT_INT, 30) // interval to r OPTION(rbd_mirror_delete_retry_interval, OPT_DOUBLE, 30) // interval to check and retry the failed requests in deleter OPTION(rbd_mirror_image_directory_refresh_interval, OPT_INT, 30) // interval to refresh images in pool watcher OPTION(rbd_mirror_image_state_check_interval, OPT_INT, 30) // interval to get images from pool watcher and set sources in replayer +OPTION(rbd_mirror_leader_heartbeat_interval, OPT_INT, 5) // interval (in seconds) between mirror leader heartbeats +OPTION(rbd_mirror_leader_max_missed_heartbeats, OPT_INT, 2) // number of missed heartbeats for non-lock owner to attempt to acquire lock +OPTION(rbd_mirror_leader_max_acquire_attempts_before_break, OPT_INT, 3) // number of failed attempts to acquire lock after missing heartbeats before breaking lock OPTION(nss_db_path, OPT_STR, "") // path to nss db diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index 917eedc01e64..259b2db6fa33 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -4,6 +4,7 @@ set(rbd_mirror_test_srcs test_ImageReplayer.cc test_ImageDeleter.cc test_ImageSync.cc + test_LeaderWatcher.cc test_fixture.cc ) add_library(rbd_mirror STATIC ${rbd_mirror_test_srcs}) @@ -16,6 +17,7 @@ add_executable(unittest_rbd_mirror test_mock_ImageReplayer.cc test_mock_ImageSync.cc test_mock_ImageSyncThrottler.cc + test_mock_LeaderWatcher.cc image_replayer/test_mock_BootstrapRequest.cc image_replayer/test_mock_CreateImageRequest.cc image_replayer/test_mock_EventPreprocessor.cc @@ -37,6 +39,7 @@ target_link_libraries(unittest_rbd_mirror rbd_mirror rados_test_stub rbd_mirror_internal + rbd_mirror_types rbd_api rbd_internal rbd_test_mock @@ -60,6 +63,7 @@ set_target_properties(ceph_test_rbd_mirror PROPERTIES COMPILE_FLAGS target_link_libraries(ceph_test_rbd_mirror rbd_mirror rbd_mirror_internal + rbd_mirror_types rbd_api rbd_internal journal diff --git a/src/test/rbd_mirror/test_LeaderWatcher.cc b/src/test/rbd_mirror/test_LeaderWatcher.cc new file mode 100644 index 000000000000..6d0533f3797f --- /dev/null +++ b/src/test/rbd_mirror/test_LeaderWatcher.cc @@ -0,0 +1,323 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/rados/librados.hpp" +#include "librbd/internal.h" +#include "librbd/Utils.h" +#include "test/rbd_mirror/test_fixture.h" +#include "tools/rbd_mirror/LeaderWatcher.h" +#include "tools/rbd_mirror/Threads.h" + +#include "test/librados/test.h" +#include "gtest/gtest.h" + +using librbd::util::unique_lock_name; +using rbd::mirror::LeaderWatcher; + +void register_test_leader_watcher() { +} + +class TestLeaderWatcher : public ::rbd::mirror::TestFixture { +public: + class Listener : public rbd::mirror::LeaderWatcher<>::Listener { + public: + Listener() + : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) { + } + + void on_acquire(int r, Context *ctx) { + Mutex::Locker locker(m_test_lock); + m_on_acquire_r = r; + m_on_acquire = ctx; + } + + void on_release(int r, Context *ctx) { + Mutex::Locker locker(m_test_lock); + m_on_release_r = r; + m_on_release = ctx; + } + + int acquire_count() const { + Mutex::Locker locker(m_test_lock); + return m_acquire_count; + } + + int release_count() const { + Mutex::Locker locker(m_test_lock); + return m_release_count; + } + + virtual void post_acquire_handler(Context *on_finish) { + Mutex::Locker locker(m_test_lock); + m_acquire_count++; + on_finish->complete(m_on_acquire_r); + m_on_acquire_r = 0; + if (m_on_acquire != nullptr) { + m_on_acquire->complete(0); + m_on_acquire = nullptr; + } + } + + virtual void pre_release_handler(Context *on_finish) { + Mutex::Locker locker(m_test_lock); + m_release_count++; + on_finish->complete(m_on_release_r); + m_on_release_r = 0; + if (m_on_release != nullptr) { + m_on_release->complete(0); + m_on_release = nullptr; + } + } + + private: + mutable Mutex m_test_lock; + int m_acquire_count = 0; + int m_release_count = 0; + int m_on_acquire_r = 0; + int m_on_release_r = 0; + Context *m_on_acquire = nullptr; + Context *m_on_release = nullptr; + }; + + struct Connection { + librados::Rados cluster; + librados::IoCtx io_ctx; + }; + + std::list > m_connections; + + virtual void SetUp() { + TestFixture::SetUp(); + EXPECT_EQ(0, librbd::mirror_mode_set(m_local_io_ctx, RBD_MIRROR_MODE_POOL)); + + if (is_librados_test_stub()) { + // speed testing up a little + EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", + "1")); + } + } + + bool is_librados_test_stub() { + std::string fsid; + EXPECT_EQ(0, _rados->cluster_fsid(&fsid)); + return fsid == "00000000-1111-2222-3333-444444444444"; + } + + librados::IoCtx &create_connection(bool no_heartbeats = false) { + m_connections.push_back(std::unique_ptr(new Connection())); + Connection *c = m_connections.back().get(); + + EXPECT_EQ("", connect_cluster_pp(c->cluster)); + if (no_heartbeats) { + EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval", + "3600")); + } else if (is_librados_test_stub()) { + EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval", + "1")); + } + EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx)); + + return c->io_ctx; + } +}; + +TEST_F(TestLeaderWatcher, InitShutdown) +{ + Listener listener; + LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); + + C_SaferCond on_init_acquire; + listener.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + leader_watcher.shut_down(); + ASSERT_EQ(1, listener.acquire_count()); + ASSERT_EQ(1, listener.release_count()); + ASSERT_FALSE(leader_watcher.is_leader()); +} + +TEST_F(TestLeaderWatcher, Release) +{ + Listener listener; + LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); + + C_SaferCond on_init_acquire; + listener.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + C_SaferCond on_release; + C_SaferCond on_acquire; + listener.on_release(0, &on_release); + listener.on_acquire(0, &on_acquire); + leader_watcher.release_leader(); + ASSERT_EQ(0, on_release.wait()); + ASSERT_FALSE(leader_watcher.is_leader()); + + // wait for lock re-acquired due to no another locker + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + C_SaferCond on_release2; + listener.on_release(0, &on_release2); + leader_watcher.release_leader(); + ASSERT_EQ(0, on_release2.wait()); + + leader_watcher.shut_down(); + ASSERT_EQ(2, listener.acquire_count()); + ASSERT_EQ(2, listener.release_count()); +} + +TEST_F(TestLeaderWatcher, ListenerError) +{ + Listener listener; + LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); + + // make listener return error on acquire + C_SaferCond on_init_acquire, on_init_release; + listener.on_acquire(-EINVAL, &on_init_acquire); + listener.on_release(0, &on_init_release); + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + ASSERT_EQ(0, on_init_release.wait()); + ASSERT_FALSE(leader_watcher.is_leader()); + + // wait for lock re-acquired due to no another locker + C_SaferCond on_acquire; + listener.on_acquire(0, &on_acquire); + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + // make listener return error on release + C_SaferCond on_release; + listener.on_release(-EINVAL, &on_release); + leader_watcher.release_leader(); + ASSERT_EQ(0, on_release.wait()); + ASSERT_FALSE(leader_watcher.is_leader()); + + leader_watcher.shut_down(); + ASSERT_EQ(2, listener.acquire_count()); + ASSERT_EQ(2, listener.release_count()); + ASSERT_FALSE(leader_watcher.is_leader()); +} + +TEST_F(TestLeaderWatcher, Two) +{ + Listener listener1; + LeaderWatcher<> leader_watcher1(m_threads, m_local_io_ctx, &listener1); + + C_SaferCond on_init_acquire; + listener1.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher1.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + + Listener listener2; + LeaderWatcher<> leader_watcher2(m_threads, m_local_io_ctx, &listener2); + + ASSERT_EQ(0, leader_watcher2.init()); + ASSERT_TRUE(leader_watcher1.is_leader()); + ASSERT_FALSE(leader_watcher2.is_leader()); + + C_SaferCond on_release; + C_SaferCond on_acquire; + listener1.on_release(0, &on_release); + listener2.on_acquire(0, &on_acquire); + leader_watcher1.release_leader(); + ASSERT_EQ(0, on_release.wait()); + ASSERT_FALSE(leader_watcher1.is_leader()); + + // wait for lock acquired by another watcher + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher2.is_leader()); + + leader_watcher1.shut_down(); + leader_watcher2.shut_down(); + + ASSERT_EQ(1, listener1.acquire_count()); + ASSERT_EQ(1, listener1.release_count()); + ASSERT_EQ(1, listener2.acquire_count()); + ASSERT_EQ(1, listener2.release_count()); +} + +TEST_F(TestLeaderWatcher, Break) +{ + if (is_librados_test_stub()) { + // break_lock (blacklist) does not work on librados test stub + std::cout << "SKIPPING" << std::endl; + return SUCCEED(); + } + + Listener listener1, listener2; + LeaderWatcher<> leader_watcher1(m_threads, + create_connection(true /* no heartbeats */), + &listener1); + LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2); + + C_SaferCond on_init_acquire; + listener1.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher1.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + + C_SaferCond on_acquire; + listener2.on_acquire(0, &on_acquire); + ASSERT_EQ(0, leader_watcher2.init()); + ASSERT_FALSE(leader_watcher2.is_leader()); + + // wait for lock broken due to no heartbeats and re-acquired + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher2.is_leader()); + + leader_watcher1.shut_down(); + leader_watcher2.shut_down(); +} + +TEST_F(TestLeaderWatcher, Stress) +{ + if (is_librados_test_stub()) { + // skipping due to possible break request sent + std::cout << "SKIPPING" << std::endl; + return SUCCEED(); + } + + const int WATCHERS_COUNT = 20; + std::list *> leader_watchers; + Listener listener; + + for (int i = 0; i < WATCHERS_COUNT; i++) { + auto leader_watcher = + new LeaderWatcher<>(m_threads, create_connection(), &listener); + leader_watchers.push_back(leader_watcher); + } + + C_SaferCond on_init_acquire; + listener.on_acquire(0, &on_init_acquire); + for (auto &leader_watcher : leader_watchers) { + ASSERT_EQ(0, leader_watcher->init()); + } + ASSERT_EQ(0, on_init_acquire.wait()); + + while (true) { + C_SaferCond on_acquire; + listener.on_acquire(0, &on_acquire); + std::unique_ptr > leader_watcher; + for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) { + if ((*it)->is_leader()) { + ASSERT_FALSE(leader_watcher); + leader_watcher.reset(*it); + it = leader_watchers.erase(it); + } else { + it++; + } + } + + ASSERT_TRUE(leader_watcher); + leader_watcher->shut_down(); + if (leader_watchers.empty()) { + break; + } + ASSERT_EQ(0, on_acquire.wait()); + } +} diff --git a/src/test/rbd_mirror/test_main.cc b/src/test/rbd_mirror/test_main.cc index 0234805b117d..d0d577e056e4 100644 --- a/src/test/rbd_mirror/test_main.cc +++ b/src/test/rbd_mirror/test_main.cc @@ -13,6 +13,7 @@ extern void register_test_pool_watcher(); extern void register_test_rbd_mirror(); extern void register_test_rbd_mirror_image_deleter(); extern void register_test_image_sync(); +extern void register_test_leader_watcher(); int main(int argc, char **argv) { @@ -21,6 +22,7 @@ int main(int argc, char **argv) register_test_rbd_mirror(); register_test_rbd_mirror_image_deleter(); register_test_image_sync(); + register_test_leader_watcher(); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc new file mode 100644 index 000000000000..8477fce6193c --- /dev/null +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -0,0 +1,342 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/test_mock_fixture.h" +#include "tools/rbd_mirror/LeaderWatcher.h" +#include "tools/rbd_mirror/Threads.h" + +namespace librbd { + +namespace { + +struct MockTestImageCtx : public MockImageCtx { + MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +struct MockManagedLock { + static MockManagedLock *s_instance; + static MockManagedLock &get_instance() { + assert(s_instance != nullptr); + return *s_instance; + } + + MockManagedLock() { + s_instance = this; + } + + MOCK_CONST_METHOD0(is_lock_owner, bool()); + + MOCK_METHOD1(shut_down, void(Context *)); + MOCK_METHOD1(try_acquire_lock, void(Context *)); + MOCK_METHOD1(release_lock, void(Context *)); + MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *)); + MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *)); + + MOCK_METHOD0(set_state_post_acquiring, void()); + + MOCK_CONST_METHOD0(is_shutdown, bool()); + + MOCK_CONST_METHOD0(is_state_post_acquiring, bool()); + MOCK_CONST_METHOD0(is_state_locked, bool()); +}; + +MockManagedLock *MockManagedLock::s_instance = nullptr; + +template <> +struct ManagedLock { + ManagedLock(librados::IoCtx& ioctx, ContextWQ *work_queue, + const std::string& oid, librbd::Watcher *watcher, + managed_lock::Mode mode, bool blacklist_on_break_lock, + uint32_t blacklist_expire_seconds) + : m_lock("ManagedLock::m_lock") { + } + + mutable Mutex m_lock; + + bool is_lock_owner() const { + return MockManagedLock::get_instance().is_lock_owner(); + } + void shut_down(Context *on_shutdown) { + MockManagedLock::get_instance().shut_down(on_shutdown); + } + void try_acquire_lock(Context *on_acquired) { + MockManagedLock::get_instance().try_acquire_lock(on_acquired); + } + void release_lock(Context *on_released) { + MockManagedLock::get_instance().release_lock(on_released); + } + void get_locker(managed_lock::Locker *locker, Context *on_finish) { + MockManagedLock::get_instance().get_locker(locker, on_finish); + } + void break_lock(const managed_lock::Locker &locker, bool force_break_lock, + Context *on_finish) { + MockManagedLock::get_instance().break_lock(locker, force_break_lock, + on_finish); + } + void set_state_post_acquiring() { + MockManagedLock::get_instance().set_state_post_acquiring(); + } + bool is_shutdown() const { + return MockManagedLock::get_instance().is_shutdown(); + } + bool is_state_post_acquiring() const { + return MockManagedLock::get_instance().is_state_post_acquiring(); + } + bool is_state_locked() const { + return MockManagedLock::get_instance().is_state_locked(); + } +}; + +} // namespace librbd + +// template definitions +#include "tools/rbd_mirror/LeaderWatcher.cc" +template class rbd::mirror::LeaderWatcher; + +namespace rbd { +namespace mirror { + +using ::testing::_; +using ::testing::AtLeast; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; + +using librbd::MockManagedLock; + +class TestMockLeaderWatcher : public TestMockFixture { +public: + typedef LeaderWatcher MockLeaderWatcher; + + class MockListener : public MockLeaderWatcher::Listener { + public: + MOCK_METHOD1(post_acquire_handler, void(Context *)); + MOCK_METHOD1(pre_release_handler, void(Context *)); + }; + + void expect_is_lock_owner(MockManagedLock &mock_managed_lock, bool owner) { + EXPECT_CALL(mock_managed_lock, is_lock_owner()) + .WillOnce(Return(owner)); + } + + void expect_shut_down(MockManagedLock &mock_managed_lock, int r) { + EXPECT_CALL(mock_managed_lock, shut_down(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_try_acquire_lock(MockManagedLock &mock_managed_lock, int r) { + EXPECT_CALL(mock_managed_lock, try_acquire_lock(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_release_lock(MockManagedLock &mock_managed_lock, int r) { + EXPECT_CALL(mock_managed_lock, release_lock(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_get_locker(MockManagedLock &mock_managed_lock, + const librbd::managed_lock::Locker &locker, int r, + Context *on_finish = nullptr) { + EXPECT_CALL(mock_managed_lock, get_locker(_, _)) + .WillOnce(Invoke([on_finish, r, locker](librbd::managed_lock::Locker *out, + Context *ctx) { + if (r == 0) { + *out = locker; + } + ctx->complete(r); + if (on_finish != nullptr) { + on_finish->complete(0); + } + })); + } + + void expect_break_lock(MockManagedLock &mock_managed_lock, + const librbd::managed_lock::Locker &locker, int r, + Context *on_finish) { + EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _)) + .WillOnce(Invoke([on_finish, r](const librbd::managed_lock::Locker &, + bool, Context *ctx) { + ctx->complete(r); + on_finish->complete(0); + })); + } + + void expect_set_state_post_acquiring(MockManagedLock &mock_managed_lock) { + EXPECT_CALL(mock_managed_lock, set_state_post_acquiring()); + } + + void expect_is_shutdown(MockManagedLock &mock_managed_lock) { + EXPECT_CALL(mock_managed_lock, is_shutdown()) + .Times(AtLeast(0)).WillRepeatedly(Return(false)); + } + + void expect_is_leader(MockManagedLock &mock_managed_lock, bool post_acquiring, + bool locked) { + EXPECT_CALL(mock_managed_lock, is_state_post_acquiring()) + .WillOnce(Return(post_acquiring)); + if (!post_acquiring) { + EXPECT_CALL(mock_managed_lock, is_state_locked()) + .WillOnce(Return(locked)); + } + } + + void expect_is_leader(MockManagedLock &mock_managed_lock) { + EXPECT_CALL(mock_managed_lock, is_state_post_acquiring()) + .Times(AtLeast(0)).WillRepeatedly(Return(false)); + EXPECT_CALL(mock_managed_lock, is_state_locked()) + .Times(AtLeast(0)).WillRepeatedly(Return(false)); + } + + void expect_notify_heartbeat(MockManagedLock &mock_managed_lock, + Context *on_finish) { + EXPECT_CALL(mock_managed_lock, is_state_post_acquiring()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_managed_lock, is_state_locked()) + .WillOnce(Return(true)); + EXPECT_CALL(mock_managed_lock, is_state_post_acquiring()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_managed_lock, is_state_locked()) + .WillOnce(DoAll(Invoke([on_finish]() { + on_finish->complete(0); + }), + Return(true))); + } +}; + +TEST_F(TestMockLeaderWatcher, InitShutdown) { + MockManagedLock mock_managed_lock; + MockListener listener; + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + expect_is_shutdown(mock_managed_lock); + + InSequence seq; + C_SaferCond on_heartbeat_finish; + expect_try_acquire_lock(mock_managed_lock, 0); + expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); + + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_heartbeat_finish.wait()); + + expect_shut_down(mock_managed_lock, 0); + + leader_watcher.shut_down(); +} + +TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { + MockManagedLock mock_managed_lock; + MockListener listener; + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + expect_is_shutdown(mock_managed_lock); + + InSequence seq; + C_SaferCond on_heartbeat_finish; + expect_try_acquire_lock(mock_managed_lock, 0); + expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); + + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_heartbeat_finish.wait()); + + expect_is_leader(mock_managed_lock, false, true); + expect_release_lock(mock_managed_lock, 0); + + leader_watcher.release_leader(); + + expect_shut_down(mock_managed_lock, 0); + + leader_watcher.shut_down(); +} + +TEST_F(TestMockLeaderWatcher, AcquireError) { + MockManagedLock mock_managed_lock; + MockListener listener; + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + expect_is_shutdown(mock_managed_lock); + expect_is_leader(mock_managed_lock); + + InSequence seq; + C_SaferCond on_get_locker_finish; + expect_try_acquire_lock(mock_managed_lock, -EAGAIN); + expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), 0, + &on_get_locker_finish); + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_get_locker_finish.wait()); + + expect_shut_down(mock_managed_lock, 0); + + leader_watcher.shut_down(); +} + +TEST_F(TestMockLeaderWatcher, ReleaseError) { + MockManagedLock mock_managed_lock; + MockListener listener; + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + expect_is_shutdown(mock_managed_lock); + + InSequence seq; + C_SaferCond on_heartbeat_finish; + expect_try_acquire_lock(mock_managed_lock, 0); + expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); + + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_heartbeat_finish.wait()); + + expect_is_leader(mock_managed_lock, false, true); + expect_release_lock(mock_managed_lock, -EINVAL); + + leader_watcher.release_leader(); + + expect_shut_down(mock_managed_lock, 0); + + leader_watcher.shut_down(); +} + +TEST_F(TestMockLeaderWatcher, Break) { + EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1")); + EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats", + "1")); + CephContext *cct = reinterpret_cast(m_local_io_ctx.cct()); + int max_acquire_attempts = + cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break; + + MockManagedLock mock_managed_lock; + MockListener listener; + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + librbd::managed_lock::Locker + locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123}; + + expect_is_shutdown(mock_managed_lock); + expect_is_leader(mock_managed_lock); + + InSequence seq; + for (int i = 0; i <= max_acquire_attempts; i++) { + expect_try_acquire_lock(mock_managed_lock, -EAGAIN); + if (i < max_acquire_attempts) { + expect_get_locker(mock_managed_lock, locker, 0); + } + } + C_SaferCond on_break; + expect_break_lock(mock_managed_lock, locker, 0, &on_break); + C_SaferCond on_heartbeat_finish; + expect_try_acquire_lock(mock_managed_lock, 0); + expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); + + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_heartbeat_finish.wait()); + + expect_shut_down(mock_managed_lock, 0); + + leader_watcher.shut_down(); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index fe1ee4d9f444..faa10b3523bd 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -7,6 +7,7 @@ set(rbd_mirror_internal ImageDeleter.cc ImageSync.cc ImageSyncThrottler.cc + LeaderWatcher.cc Mirror.cc MirrorStatusWatcher.cc PoolWatcher.cc diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc new file mode 100644 index 000000000000..9ab4f0468c80 --- /dev/null +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -0,0 +1,870 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "LeaderWatcher.h" +#include "common/Timer.h" +#include "common/debug.h" +#include "common/errno.h" +#include "cls/rbd/cls_rbd_client.h" +#include "librbd/Utils.h" +#include "librbd/watcher/Types.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \ + << this << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +using namespace leader_watcher; + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; +using librbd::util::create_rados_ack_callback; + +template +LeaderWatcher::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, + Listener *listener) + : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER), + m_threads(threads), m_listener(listener), + m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()), + m_notifier_id(librados::Rados(io_ctx).get_instance_id()) { +} + +template +int LeaderWatcher::init() { + C_SaferCond init_ctx; + init(&init_ctx); + return init_ctx.wait(); +} + +template +void LeaderWatcher::init(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(!m_leader_lock); + m_leader_lock.reset( + new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true, + m_cct->_conf->rbd_blacklist_expire_seconds)); + + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + create_leader_object(); +} + +template +void LeaderWatcher::create_leader_object() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + librados::ObjectWriteOperation op; + op.create(false); + + librados::AioCompletion *aio_comp = create_rados_ack_callback< + LeaderWatcher, &LeaderWatcher::handle_create_leader_object>(this); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + assert(r == 0); + aio_comp->release(); +} + +template +void LeaderWatcher::handle_create_leader_object(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r == 0) { + register_watch(); + return; + } + + derr << "error creating " << m_oid << " object: " << cpp_strerror(r) + << dendl; + + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::register_watch() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_register_watch>(this)); + + librbd::Watcher::register_watch(ctx); +} + +template +void LeaderWatcher::handle_register_watch(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error registering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } else { + acquire_leader_lock(true); + } + + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::shut_down() { + C_SaferCond shut_down_ctx; + shut_down(&shut_down_ctx); + int r = shut_down_ctx.wait(); + assert(r == 0); +} + +template +void LeaderWatcher::shut_down(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + assert(m_leader_lock); + + if (false && is_leader(m_lock)) { + Context *ctx = create_async_context_callback( + m_work_queue, new FunctionContext( + [this, on_finish](int r) { + if (r < 0) { + derr << "error releasing leader lock: " << cpp_strerror(r) << dendl; + } + shut_down(on_finish); + })); + + m_leader_lock->release_lock(ctx); + return; + } + + assert(m_on_shut_down_finish == nullptr); + m_on_shut_down_finish = on_finish; + cancel_timer_task(); + shut_down_leader_lock(); +} + +template +void LeaderWatcher::shut_down_leader_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_shut_down_leader_lock>(this)); + + m_leader_lock->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl; + } + + unregister_watch(); +} + +template +void LeaderWatcher::unregister_watch() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_unregister_watch>(this)); + + librbd::Watcher::unregister_watch(ctx); +} + +template +void LeaderWatcher::handle_unregister_watch(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error unregistering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } + + assert(m_on_shut_down_finish != nullptr); + std::swap(on_finish, m_on_shut_down_finish); + } + on_finish->complete(0); +} + +template +bool LeaderWatcher::is_leader() { + Mutex::Locker locker(m_lock); + + return is_leader(m_lock); +} + +template +bool LeaderWatcher::is_leader(Mutex &lock) { + assert(m_lock.is_locked()); + + bool leader = m_leader_lock && m_leader_lock->is_leader(); + dout(20) << leader << dendl; + return leader; +} + +template +void LeaderWatcher::release_leader() { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + if (!is_leader(m_lock)) { + return; + } + + release_leader_lock(); +} + +template +void LeaderWatcher::cancel_timer_task() { + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + if (m_timer_task == nullptr) { + return; + } + + dout(20) << m_timer_task << dendl; + bool canceled = m_threads->timer->cancel_event(m_timer_task); + assert(canceled); + m_timer_task = nullptr; +} + +template +void LeaderWatcher::schedule_timer_task(const std::string &name, + int delay_factor, bool leader, + void (LeaderWatcher::*cb)()) { + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + cancel_timer_task(); + + m_timer_task = new FunctionContext( + [this, cb, leader](int r) { + assert(m_threads->timer_lock.is_locked()); + m_timer_task = nullptr; + Mutex::Locker locker(m_lock); + if (is_leader(m_lock) != leader) { + return; + } + (this->*cb)(); + }); + + int after = delay_factor * + max(1, m_cct->_conf->rbd_mirror_leader_heartbeat_interval); + + dout(20) << "scheduling " << name << " after " << after << " sec (task " + << m_timer_task << ")" << dendl; + m_threads->timer->add_event_after(after, m_timer_task); +} + +template +void LeaderWatcher::handle_post_acquire_leader_lock(int r, + Context *on_finish) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + if (r == -EAGAIN) { + dout(20) << "already locked" << dendl; + } else { + derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl; + } + on_finish->complete(r); + return; + } + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_notify_error = 0; + + init_status_watcher(); +} + +template +void LeaderWatcher::handle_pre_release_leader_lock(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_notify_error = 0; + + notify_listener(); +} + +template +void LeaderWatcher::handle_post_release_leader_lock(int r, + Context *on_finish) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + on_finish->complete(r); + return; + } + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + notify_lock_released(); +} + +template +void LeaderWatcher::break_leader_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + if (m_locker.cookie.empty()) { + acquire_leader_lock(true); + return; + } + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_break_leader_lock>(this)); + + m_leader_lock->break_lock(m_locker, true, ctx); +} + +template +void LeaderWatcher::handle_break_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + return; + } + + if (r < 0 && r != -ENOENT) { + derr << "error beaking leader lock: " << cpp_strerror(r) << dendl; + + schedule_timer_task("get locker", 1, false, &LeaderWatcher::get_locker); + return; + } + + acquire_leader_lock(true); +} + +template +void LeaderWatcher::get_locker() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + C_GetLocker *get_locker_ctx = new C_GetLocker(this); + Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx); + + m_leader_lock->get_locker(&get_locker_ctx->locker, ctx); +} + +template +void LeaderWatcher::handle_get_locker(int r, + librbd::managed_lock::Locker& locker) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker mutex_locker(m_lock); + + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + return; + } + + if (is_leader(m_lock)) { + m_locker = {}; + } else { + if (r == -ENOENT) { + acquire_leader_lock(true); + } else { + if (r < 0) { + derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl; + } else { + m_locker = locker; + } + + schedule_timer_task("acquire leader lock", + m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats, + false, &LeaderWatcher::acquire_leader_lock); + } + } +} + +template +void LeaderWatcher::acquire_leader_lock() { + return acquire_leader_lock(false); +} + +template +void LeaderWatcher::acquire_leader_lock(bool reset_attempt_counter) { + dout(20) << "reset_attempt_counter=" << reset_attempt_counter << dendl; + + assert(m_lock.is_locked()); + + if (reset_attempt_counter) { + m_acquire_attempts = 0; + } + + dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl; + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_acquire_leader_lock>(this)); + + m_leader_lock->try_acquire_lock(ctx); +} + +template +void LeaderWatcher::handle_acquire_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + return; + } + + if (r < 0) { + if (r == -EAGAIN) { + dout(20) << "already locked" << dendl; + } else { + derr << "error acquiring lock: " << cpp_strerror(r) << dendl; + } + if (++m_acquire_attempts > + m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) { + dout(0) << "breaking leader lock after failed attemts to acquire" + << dendl; + break_leader_lock(); + } else { + get_locker(); + } + return; + } + + m_acquire_attempts = 0; + + if (m_notify_error) { + dout(5) << "releasing due to error on notify" << dendl; + release_leader_lock(); + return; + } + + notify_heartbeat(); +} + +template +void LeaderWatcher::release_leader_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_release_leader_lock>(this)); + + m_leader_lock->release_lock(ctx); +} + +template +void LeaderWatcher::handle_release_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error releasing lock: " << cpp_strerror(r) << dendl; + return; + } + + schedule_timer_task("get locker", 1, false, &LeaderWatcher::get_locker); +} + +template +void LeaderWatcher::init_status_watcher() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(!m_status_watcher); + + m_status_watcher.reset(new MirrorStatusWatcher(m_ioctx, m_work_queue)); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_init_status_watcher>(this); + + m_status_watcher->init(ctx); +} + +template +void LeaderWatcher::handle_init_status_watcher(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r == 0) { + notify_listener(); + return; + } + + derr << "error initializing mirror status watcher: " << cpp_strerror(r) + << dendl; + m_status_watcher.reset(); + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::shut_down_status_watcher() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(m_status_watcher); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback, + &LeaderWatcher::handle_shut_down_status_watcher>(this)); + + m_status_watcher->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_status_watcher(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error shutting mirror status watcher down: " << cpp_strerror(r) + << dendl; + if (!is_leader(m_lock)) { + // ignore on releasing + r = 0; + } + } + + assert(m_status_watcher); + m_status_watcher.reset(); + + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::notify_listener() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_listener>(this)); + + if (is_leader(m_lock)) { + ctx = new FunctionContext( + [this, ctx](int r) { + m_listener->post_acquire_handler(ctx); + }); + } else { + ctx = new FunctionContext( + [this, ctx](int r) { + m_listener->pre_release_handler(ctx); + }); + } + m_work_queue->queue(ctx, 0); +} + +template +void LeaderWatcher::handle_notify_listener(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error notifying listener: " << cpp_strerror(r) << dendl; + m_notify_error = r; + } + + if (is_leader(m_lock)) { + notify_lock_acquired(); + } else { + shut_down_status_watcher(); + } +} + +template +void LeaderWatcher::notify_lock_acquired() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_lock_acquired>(this); + + bufferlist bl; + ::encode(NotifyMessage{LockAcquiredPayload{}}, bl); + + send_notify(bl, nullptr, ctx); +} + +template +void LeaderWatcher::handle_notify_lock_acquired(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying leader lock acquired: " << cpp_strerror(r) + << dendl; + m_notify_error = r; + } + + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(0); +} + +template +void LeaderWatcher::notify_lock_released() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_lock_released>(this); + + bufferlist bl; + ::encode(NotifyMessage{LockReleasedPayload{}}, bl); + + send_notify(bl, nullptr, ctx); +} + +template +void LeaderWatcher::handle_notify_lock_released(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying leader lock released: " << cpp_strerror(r) + << dendl; + } + + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::notify_heartbeat() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + if (!is_leader(m_lock)) { + dout(5) << "not leader, canceling" << dendl; + return; + } + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_heartbeat>(this); + + bufferlist bl; + ::encode(NotifyMessage{HeartbeatPayload{}}, bl); + + send_notify(bl, nullptr, ctx); +} + +template +void LeaderWatcher::handle_notify_heartbeat(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + if (!is_leader(m_lock)) { + return; + } + + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying hearbeat: " << cpp_strerror(r) + << ", releasing leader" << dendl; + release_leader_lock(); + return; + } + + schedule_timer_task("heartbeat", 1, true, + &LeaderWatcher::notify_heartbeat); +} + +template +void LeaderWatcher::handle_heartbeat(Context *on_notify_ack) { + dout(20) << dendl; + + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + if (is_leader(m_lock)) { + dout(5) << "got another leader heartbeat, ignoring" << dendl; + } else { + m_acquire_attempts = 0; + cancel_timer_task(); + get_locker(); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_lock_acquired(Context *on_notify_ack) { + dout(20) << dendl; + + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + if (is_leader(m_lock)) { + dout(5) << "got another leader lock_acquired, ignoring" << dendl; + } else { + cancel_timer_task(); + m_acquire_attempts = 0; + get_locker(); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_lock_released(Context *on_notify_ack) { + dout(20) << dendl; + + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + if (is_leader(m_lock)) { + dout(5) << "got another leader lock_released, ignoring" << dendl; + } else { + cancel_timer_task(); + acquire_leader_lock(true); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist &bl) { + dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", " + << "notifier_id=" << notifier_id << dendl; + + Context *ctx = new librbd::watcher::C_NotifyAck(this, notify_id, handle); + + if (notifier_id == m_notifier_id) { + dout(20) << "our own notification, ignoring" << dendl; + ctx->complete(0); + return; + } + + NotifyMessage notify_message; + try { + bufferlist::iterator iter = bl.begin(); + ::decode(notify_message, iter); + } catch (const buffer::error &err) { + derr << ": error decoding image notification: " << err.what() << dendl; + ctx->complete(0); + return; + } + + apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload); +} + +template +void LeaderWatcher::handle_payload(const HeartbeatPayload &payload, + Context *on_notify_ack) { + dout(20) << "heartbeat" << dendl; + + handle_heartbeat(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const LockAcquiredPayload &payload, + Context *on_notify_ack) { + dout(20) << "lock_acquired" << dendl; + + handle_lock_acquired(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const LockReleasedPayload &payload, + Context *on_notify_ack) { + dout(20) << "lock_released" << dendl; + + handle_lock_released(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const UnknownPayload &payload, + Context *on_notify_ack) { + dout(20) << "unknown" << dendl; + + on_notify_ack->complete(0); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::LeaderWatcher; diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h new file mode 100644 index 000000000000..8a2c6f728040 --- /dev/null +++ b/src/tools/rbd_mirror/LeaderWatcher.h @@ -0,0 +1,242 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_LEADER_WATCHER_H +#define CEPH_RBD_MIRROR_LEADER_WATCHER_H + +#include +#include +#include + +#include "librbd/ManagedLock.h" +#include "librbd/managed_lock/Types.h" +#include "librbd/Watcher.h" +#include "MirrorStatusWatcher.h" +#include "tools/rbd_mirror/leader_watcher/Types.h" + +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +struct Threads; + +template +class LeaderWatcher : protected librbd::Watcher { +public: + struct Listener { + virtual ~Listener() { + } + + virtual void post_acquire_handler(Context *on_finish) = 0; + virtual void pre_release_handler(Context *on_finish) = 0; + }; + + LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, Listener *listener); + + int init(); + void shut_down(); + + void init(Context *on_finish); + void shut_down(Context *on_finish); + + bool is_leader(); + void release_leader(); + +private: + /** + * @verbatim + * + * <------------------------------ UNREGISTER_WATCH + * | (init) ^ ^ + * v * | + * CREATE_OBJECT * * (error) SHUT_DOWN_LEADER_LOCK + * | * ^ + * v * | + * REGISTER_WATCH * * | (shut_down) + * | | + * | (no leader heartbeat and acquire failed) | + * | BREAK_LOCK <-------------------------------------\ | + * | | (no leader heartbeat) | | + * | | /----------------------------------------\ | | + * | | | (lock_released received) | | + * | | | /-------------------------------------\ | | + * | | | | (lock_acquired or | | | + * | | | | heartbeat received) | | | + * | | | | (ENOENT) /-----------\ | | | + * | | | | * * * * * * * * * * | | | | | + * v v v v v (error) * v | | | | + * ACQUIRE_LEADER_LOCK * * * * *> GET_LOCKER ---> + * | * ^ + * ....|...................*............. .....|..................... + * . v * . . | post_release . + * .INIT_STATUS_WATCHER * * . .NOTIFY_LOCK_RELEASED . + * . | (error) . .....^..................... + * . v . | + * .NOTIFY_LISTENERS . RELEASE_LEADER_LOCK + * . | . ^ + * . v . .....|..................... + * .NOTIFY_LOCK_ACQUIRED post_acquire . .SHUT_DOWN_STATUS_WATCHER . + * ....|................................. . ^ . + * v . | . + * -----------------------------------> .NOTIFY_LISTENERS . + * (shut_down, release_leader, . pre_release . + * notify error) ........................... + * @endverbatim + */ + + class LeaderLock : public librbd::ManagedLock { + public: + typedef librbd::ManagedLock Parent; + + LeaderLock(librados::IoCtx& ioctx, ContextWQ *work_queue, + const std::string& oid, LeaderWatcher *watcher, + bool blacklist_on_break_lock, + uint32_t blacklist_expire_seconds) + : Parent(ioctx, work_queue, oid, watcher, librbd::managed_lock::EXCLUSIVE, + blacklist_on_break_lock, blacklist_expire_seconds), + watcher(watcher) { + } + + bool is_leader() const { + Mutex::Locker loker(Parent::m_lock); + return Parent::is_state_post_acquiring() || Parent::is_state_locked(); + } + + protected: + virtual void post_acquire_lock_handler(int r, Context *on_finish) { + if (r == 0) { + // lock is owned at this point + Mutex::Locker locker(Parent::m_lock); + Parent::set_state_post_acquiring(); + } + watcher->handle_post_acquire_leader_lock(r, on_finish); + } + virtual void pre_release_lock_handler(bool shutting_down, + Context *on_finish) { + watcher->handle_pre_release_leader_lock(on_finish); + } + virtual void post_release_lock_handler(bool shutting_down, int r, + Context *on_finish) { + watcher->handle_post_release_leader_lock(r, on_finish); + } + private: + LeaderWatcher *watcher; + }; + + struct HandlePayloadVisitor : public boost::static_visitor { + LeaderWatcher *leader_watcher; + Context *on_notify_ack; + + HandlePayloadVisitor(LeaderWatcher *leader_watcher, Context *on_notify_ack) + : leader_watcher(leader_watcher), on_notify_ack(on_notify_ack) { + } + + template + inline void operator()(const Payload &payload) const { + leader_watcher->handle_payload(payload, on_notify_ack); + } + }; + + struct C_GetLocker : public Context { + LeaderWatcher *leader_watcher; + librbd::managed_lock::Locker locker; + + C_GetLocker(LeaderWatcher *leader_watcher) + : leader_watcher(leader_watcher) { + } + + virtual void finish(int r) { + leader_watcher->handle_get_locker(r, locker); + } + }; + + Threads *m_threads; + Listener *m_listener; + + Mutex m_lock; + uint64_t m_notifier_id; + Context *m_on_finish = nullptr; + Context *m_on_shut_down_finish = nullptr; + int m_acquire_attempts = 0; + int m_notify_error = 0; + std::unique_ptr m_leader_lock; + std::unique_ptr m_status_watcher; + librbd::managed_lock::Locker m_locker; + Context *m_timer_task = nullptr; + + bool is_leader(Mutex &m_lock); + + void cancel_timer_task(); + void schedule_timer_task(const std::string &name, + int delay_factor, bool leader, + void (LeaderWatcher::*callback)()); + + void create_leader_object(); + void handle_create_leader_object(int r); + + void register_watch(); + void handle_register_watch(int r); + + void shut_down_leader_lock(); + void handle_shut_down_leader_lock(int r); + + void unregister_watch(); + void handle_unregister_watch(int r); + + void break_leader_lock(); + void handle_break_leader_lock(int r); + + void get_locker(); + void handle_get_locker(int r, librbd::managed_lock::Locker& locker); + + void acquire_leader_lock(bool reset_attempt_counter); + void acquire_leader_lock(); + void handle_acquire_leader_lock(int r); + + void release_leader_lock(); + void handle_release_leader_lock(int r); + + void init_status_watcher(); + void handle_init_status_watcher(int r); + + void shut_down_status_watcher(); + void handle_shut_down_status_watcher(int r); + + void notify_listener(); + void handle_notify_listener(int r); + + void notify_lock_acquired(); + void handle_notify_lock_acquired(int r); + + void notify_lock_released(); + void handle_notify_lock_released(int r); + + void notify_heartbeat(); + void handle_notify_heartbeat(int r); + + void handle_post_acquire_leader_lock(int r, Context *on_finish); + void handle_pre_release_leader_lock(Context *on_finish); + void handle_post_release_leader_lock(int r, Context *on_finish); + + void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist &bl); + + void handle_heartbeat(Context *on_ack); + void handle_lock_acquired(Context *on_ack); + void handle_lock_released(Context *on_ack); + + void handle_payload(const leader_watcher::HeartbeatPayload &payload, + Context *on_notify_ack); + void handle_payload(const leader_watcher::LockAcquiredPayload &payload, + Context *on_notify_ack); + void handle_payload(const leader_watcher::LockReleasedPayload &payload, + Context *on_notify_ack); + void handle_payload(const leader_watcher::UnknownPayload &payload, + Context *on_notify_ack); +}; + +} // namespace mirror +} // namespace rbd + +#endif // CEPH_RBD_MIRROR_LEADER_WATCHER_H