From 5a0b75172da582503cced4fe315589ecddf466e7 Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Sat, 11 Feb 2017 17:05:59 +0100 Subject: [PATCH] rbd-mirror A/A: leader should track up/down rbd-mirror instances Fixes: http://tracker.ceph.com/issues/18784 Signed-off-by: Mykola Golub --- .../rbd_mirror/test_mock_LeaderWatcher.cc | 340 ++++++++++++++++-- src/test/rbd_mirror/test_mock_fixture.h | 6 + src/tools/rbd_mirror/InstanceWatcher.cc | 2 +- src/tools/rbd_mirror/LeaderWatcher.cc | 192 +++++++--- src/tools/rbd_mirror/LeaderWatcher.h | 45 ++- src/tools/rbd_mirror/MirrorStatusWatcher.cc | 23 +- src/tools/rbd_mirror/MirrorStatusWatcher.h | 12 + src/tools/rbd_mirror/Replayer.cc | 35 +- 8 files changed, 548 insertions(+), 107 deletions(-) diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc index 211c1b205d6..4ac1298fee0 100644 --- a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -1,11 +1,14 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include "librbd/Utils.h" #include "test/librbd/mock/MockImageCtx.h" #include "test/rbd_mirror/test_mock_fixture.h" #include "tools/rbd_mirror/LeaderWatcher.h" #include "tools/rbd_mirror/Threads.h" +using librbd::util::create_async_context_callback; + namespace librbd { namespace { @@ -29,6 +32,11 @@ struct MockManagedLock { s_instance = this; } + bool m_release_lock_on_shutdown = false; + + MOCK_METHOD0(construct, void()); + MOCK_METHOD0(destroy, void()); + MOCK_CONST_METHOD0(is_lock_owner, bool()); MOCK_METHOD1(shut_down, void(Context *)); @@ -53,49 +61,166 @@ struct ManagedLock { const std::string& oid, librbd::Watcher *watcher, managed_lock::Mode mode, bool blacklist_on_break_lock, uint32_t blacklist_expire_seconds) - : m_lock("ManagedLock::m_lock") { + : m_work_queue(work_queue), m_lock("ManagedLock::m_lock") { + MockManagedLock::get_instance().construct(); + } + + virtual ~ManagedLock() { + MockManagedLock::get_instance().destroy(); } - virtual ~ManagedLock() = default; + ContextWQ *m_work_queue; mutable Mutex m_lock; bool is_lock_owner() const { return MockManagedLock::get_instance().is_lock_owner(); } + void shut_down(Context *on_shutdown) { + if (MockManagedLock::get_instance().m_release_lock_on_shutdown) { + on_shutdown = new FunctionContext( + [this, on_shutdown](int r) { + MockManagedLock::get_instance().m_release_lock_on_shutdown = false; + shut_down(on_shutdown); + }); + release_lock(on_shutdown); + return; + } + MockManagedLock::get_instance().shut_down(on_shutdown); } + void try_acquire_lock(Context *on_acquired) { - MockManagedLock::get_instance().try_acquire_lock(on_acquired); + Context *post_acquire_ctx = create_async_context_callback( + m_work_queue, new FunctionContext( + [this, on_acquired](int r) { + post_acquire_lock_handler(r, on_acquired); + })); + MockManagedLock::get_instance().try_acquire_lock(post_acquire_ctx); } + void release_lock(Context *on_released) { - MockManagedLock::get_instance().release_lock(on_released); + Context *post_release_ctx = new FunctionContext( + [this, on_released](int r) { + post_release_lock_handler(false, r, on_released); + }); + + Context *release_ctx = new FunctionContext( + [this, on_released, post_release_ctx](int r) { + if (r < 0) { + on_released->complete(r); + } else { + MockManagedLock::get_instance().release_lock(post_release_ctx); + } + }); + + Context *pre_release_ctx = new FunctionContext( + [this, release_ctx](int r) { + bool shutting_down = + MockManagedLock::get_instance().m_release_lock_on_shutdown; + pre_release_lock_handler(shutting_down, release_ctx); + }); + + m_work_queue->queue(pre_release_ctx, 0); } + void get_locker(managed_lock::Locker *locker, Context *on_finish) { MockManagedLock::get_instance().get_locker(locker, on_finish); } + void break_lock(const managed_lock::Locker &locker, bool force_break_lock, Context *on_finish) { MockManagedLock::get_instance().break_lock(locker, force_break_lock, on_finish); } + void set_state_post_acquiring() { MockManagedLock::get_instance().set_state_post_acquiring(); } + bool is_shutdown() const { return MockManagedLock::get_instance().is_shutdown(); } + bool is_state_post_acquiring() const { return MockManagedLock::get_instance().is_state_post_acquiring(); } + bool is_state_locked() const { return MockManagedLock::get_instance().is_state_locked(); } + + virtual void post_acquire_lock_handler(int r, Context *on_finish) = 0; + virtual void pre_release_lock_handler(bool shutting_down, + Context *on_finish) = 0; + virtual void post_release_lock_handler(bool shutting_down, int r, + Context *on_finish) = 0; }; } // namespace librbd +namespace rbd { +namespace mirror { + +template <> +struct MirrorStatusWatcher { + static MirrorStatusWatcher* s_instance; + + static MirrorStatusWatcher *create(librados::IoCtx &io_ctx, + ContextWQ *work_queue) { + assert(s_instance != nullptr); + return s_instance; + } + + MirrorStatusWatcher() { + assert(s_instance == nullptr); + s_instance = this; + } + + ~MirrorStatusWatcher() { + assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(destroy, void()); + MOCK_METHOD1(init, void(Context *)); + MOCK_METHOD1(shut_down, void(Context *)); +}; + +MirrorStatusWatcher *MirrorStatusWatcher::s_instance = nullptr; + +template <> +struct Instances { + static Instances* s_instance; + + static Instances *create(Threads *threads, librados::IoCtx &ioctx) { + assert(s_instance != nullptr); + return s_instance; + } + + Instances() { + assert(s_instance == nullptr); + s_instance = this; + } + + ~Instances() { + assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(destroy, void()); + MOCK_METHOD1(init, void(Context *)); + MOCK_METHOD1(shut_down, void(Context *)); + MOCK_METHOD1(notify, void(const std::string &)); +}; + +Instances *Instances::s_instance = nullptr; + +} // namespace mirror +} // namespace rbd + + // template definitions #include "tools/rbd_mirror/LeaderWatcher.cc" @@ -111,22 +236,47 @@ using ::testing::Return; using librbd::MockManagedLock; +struct MockListener : public LeaderWatcher::Listener { + static MockListener* s_instance; + + MockListener() { + assert(s_instance == nullptr); + s_instance = this; + } + + ~MockListener() override { + assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD1(post_acquire_handler, void(Context *)); + MOCK_METHOD1(pre_release_handler, void(Context *)); +}; + +MockListener *MockListener::s_instance = nullptr; + class TestMockLeaderWatcher : public TestMockFixture { public: + typedef MirrorStatusWatcher MockMirrorStatusWatcher; + typedef Instances MockInstances; typedef LeaderWatcher MockLeaderWatcher; - class MockListener : public MockLeaderWatcher::Listener { - public: - MOCK_METHOD1(post_acquire_handler, void(Context *)); - MOCK_METHOD1(pre_release_handler, void(Context *)); - }; + void expect_construct(MockManagedLock &mock_managed_lock) { + EXPECT_CALL(mock_managed_lock, construct()); + } + + void expect_destroy(MockManagedLock &mock_managed_lock) { + EXPECT_CALL(mock_managed_lock, destroy()); + } void expect_is_lock_owner(MockManagedLock &mock_managed_lock, bool owner) { EXPECT_CALL(mock_managed_lock, is_lock_owner()) .WillOnce(Return(owner)); } - void expect_shut_down(MockManagedLock &mock_managed_lock, int r) { + void expect_shut_down(MockManagedLock &mock_managed_lock, + bool release_lock_on_shutdown, int r) { + mock_managed_lock.m_release_lock_on_shutdown = release_lock_on_shutdown; EXPECT_CALL(mock_managed_lock, shut_down(_)) .WillOnce(CompleteContext(r)); } @@ -134,11 +284,20 @@ public: void expect_try_acquire_lock(MockManagedLock &mock_managed_lock, int r) { EXPECT_CALL(mock_managed_lock, try_acquire_lock(_)) .WillOnce(CompleteContext(r)); + if (r == 0) { + expect_set_state_post_acquiring(mock_managed_lock); + } } - void expect_release_lock(MockManagedLock &mock_managed_lock, int r) { + void expect_release_lock(MockManagedLock &mock_managed_lock, int r, + Context *on_finish = nullptr) { EXPECT_CALL(mock_managed_lock, release_lock(_)) - .WillOnce(CompleteContext(r)); + .WillOnce(Invoke([on_finish, r](Context *ctx) { + ctx->complete(r); + if (on_finish != nullptr) { + on_finish->complete(0); + } + })); } void expect_get_locker(MockManagedLock &mock_managed_lock, @@ -158,8 +317,8 @@ public: } void expect_break_lock(MockManagedLock &mock_managed_lock, - const librbd::managed_lock::Locker &locker, int r, - Context *on_finish) { + const librbd::managed_lock::Locker &locker, int r, + Context *on_finish) { EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _)) .WillOnce(Invoke([on_finish, r](const librbd::managed_lock::Locker &, bool, Context *ctx) { @@ -196,74 +355,164 @@ public: void expect_notify_heartbeat(MockManagedLock &mock_managed_lock, Context *on_finish) { + // is_leader in notify_heartbeat EXPECT_CALL(mock_managed_lock, is_state_post_acquiring()) .WillOnce(Return(false)); EXPECT_CALL(mock_managed_lock, is_state_locked()) .WillOnce(Return(true)); + + // is_leader in handle_notify_heartbeat EXPECT_CALL(mock_managed_lock, is_state_post_acquiring()) .WillOnce(Return(false)); EXPECT_CALL(mock_managed_lock, is_state_locked()) .WillOnce(DoAll(Invoke([on_finish]() { on_finish->complete(0); }), - Return(true))); + Return(true))); + } + + void expect_destroy(MockMirrorStatusWatcher &mock_mirror_status_watcher) { + EXPECT_CALL(mock_mirror_status_watcher, destroy()); + } + + void expect_init(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) { + EXPECT_CALL(mock_mirror_status_watcher, init(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_shut_down(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) { + EXPECT_CALL(mock_mirror_status_watcher, shut_down(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + expect_destroy(mock_mirror_status_watcher); + } + + void expect_destroy(MockInstances &mock_instances) { + EXPECT_CALL(mock_instances, destroy()); + } + + void expect_init(MockInstances &mock_instances, int r) { + EXPECT_CALL(mock_instances, init(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_shut_down(MockInstances &mock_instances, int r) { + EXPECT_CALL(mock_instances, shut_down(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + expect_destroy(mock_instances); + } + + void expect_acquire_notify(MockManagedLock &mock_managed_lock, + MockListener &mock_listener, int r) { + expect_is_leader(mock_managed_lock, true, false); + EXPECT_CALL(mock_listener, post_acquire_handler(_)) + .WillOnce(CompleteContext(r)); + expect_is_leader(mock_managed_lock, true, false); + } + + void expect_release_notify(MockManagedLock &mock_managed_lock, + MockListener &mock_listener, int r) { + expect_is_leader(mock_managed_lock, false, false); + EXPECT_CALL(mock_listener, pre_release_handler(_)) + .WillOnce(CompleteContext(r)); + expect_is_leader(mock_managed_lock, false, false); } }; TEST_F(TestMockLeaderWatcher, InitShutdown) { MockManagedLock mock_managed_lock; + MockMirrorStatusWatcher mock_mirror_status_watcher; + MockInstances mock_instances; MockListener listener; - MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); expect_is_shutdown(mock_managed_lock); + expect_destroy(mock_managed_lock); InSequence seq; + + expect_construct(mock_managed_lock); + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + // Inint C_SaferCond on_heartbeat_finish; expect_try_acquire_lock(mock_managed_lock, 0); + expect_init(mock_mirror_status_watcher, 0); + expect_init(mock_instances, 0); + expect_acquire_notify(mock_managed_lock, listener, 0); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); ASSERT_EQ(0, on_heartbeat_finish.wait()); - expect_shut_down(mock_managed_lock, 0); + // Shutdown + expect_release_notify(mock_managed_lock, listener, 0); + expect_shut_down(mock_instances, 0); + expect_shut_down(mock_mirror_status_watcher, 0); + expect_is_leader(mock_managed_lock, false, false); + expect_release_lock(mock_managed_lock, 0); + expect_shut_down(mock_managed_lock, true, 0); leader_watcher.shut_down(); } TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { MockManagedLock mock_managed_lock; + MockMirrorStatusWatcher mock_mirror_status_watcher; + MockInstances mock_instances; MockListener listener; - MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); expect_is_shutdown(mock_managed_lock); + expect_destroy(mock_managed_lock); InSequence seq; + + expect_construct(mock_managed_lock); + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + // Inint C_SaferCond on_heartbeat_finish; expect_try_acquire_lock(mock_managed_lock, 0); + expect_init(mock_mirror_status_watcher, 0); + expect_init(mock_instances, 0); + expect_acquire_notify(mock_managed_lock, listener, 0); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); ASSERT_EQ(0, on_heartbeat_finish.wait()); + // Release expect_is_leader(mock_managed_lock, false, true); - expect_release_lock(mock_managed_lock, 0); + expect_release_notify(mock_managed_lock, listener, 0); + expect_shut_down(mock_instances, 0); + expect_shut_down(mock_mirror_status_watcher, 0); + expect_is_leader(mock_managed_lock, false, false); + C_SaferCond on_release; + expect_release_lock(mock_managed_lock, 0, &on_release); leader_watcher.release_leader(); + ASSERT_EQ(0, on_release.wait()); - expect_shut_down(mock_managed_lock, 0); + // Shutdown + expect_shut_down(mock_managed_lock, false, 0); leader_watcher.shut_down(); } TEST_F(TestMockLeaderWatcher, AcquireError) { MockManagedLock mock_managed_lock; + MockMirrorStatusWatcher mock_mirror_status_watcher; + MockInstances mock_instances; MockListener listener; - MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); expect_is_shutdown(mock_managed_lock); expect_is_leader(mock_managed_lock); + expect_destroy(mock_managed_lock); InSequence seq; + + expect_construct(mock_managed_lock); + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + // Inint C_SaferCond on_get_locker_finish; expect_try_acquire_lock(mock_managed_lock, -EAGAIN); expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), 0, @@ -271,32 +520,51 @@ TEST_F(TestMockLeaderWatcher, AcquireError) { ASSERT_EQ(0, leader_watcher.init()); ASSERT_EQ(0, on_get_locker_finish.wait()); - expect_shut_down(mock_managed_lock, 0); + // Shutdown + expect_shut_down(mock_managed_lock, false, 0); leader_watcher.shut_down(); } TEST_F(TestMockLeaderWatcher, ReleaseError) { MockManagedLock mock_managed_lock; + MockMirrorStatusWatcher mock_mirror_status_watcher; + MockInstances mock_instances; MockListener listener; - MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); expect_is_shutdown(mock_managed_lock); + expect_destroy(mock_managed_lock); InSequence seq; + + expect_construct(mock_managed_lock); + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + // Inint C_SaferCond on_heartbeat_finish; expect_try_acquire_lock(mock_managed_lock, 0); + expect_init(mock_mirror_status_watcher, 0); + expect_init(mock_instances, 0); + expect_acquire_notify(mock_managed_lock, listener, 0); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); ASSERT_EQ(0, on_heartbeat_finish.wait()); + // Release expect_is_leader(mock_managed_lock, false, true); - expect_release_lock(mock_managed_lock, -EINVAL); + expect_release_notify(mock_managed_lock, listener, -EINVAL); + expect_shut_down(mock_instances, 0); + expect_shut_down(mock_mirror_status_watcher, -EINVAL); + expect_is_leader(mock_managed_lock, false, false); + C_SaferCond on_release; + expect_release_lock(mock_managed_lock, -EINVAL, &on_release); leader_watcher.release_leader(); + ASSERT_EQ(0, on_release.wait()); - expect_shut_down(mock_managed_lock, 0); + // Shutdown + expect_shut_down(mock_managed_lock, false, 0); leader_watcher.shut_down(); } @@ -304,21 +572,28 @@ TEST_F(TestMockLeaderWatcher, ReleaseError) { TEST_F(TestMockLeaderWatcher, Break) { EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", "1")); EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_max_missed_heartbeats", - "1")); + "1")); CephContext *cct = reinterpret_cast(m_local_io_ctx.cct()); int max_acquire_attempts = cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break; MockManagedLock mock_managed_lock; + MockMirrorStatusWatcher mock_mirror_status_watcher; + MockInstances mock_instances; MockListener listener; - MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); librbd::managed_lock::Locker locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123}; expect_is_shutdown(mock_managed_lock); expect_is_leader(mock_managed_lock); + expect_destroy(mock_managed_lock); InSequence seq; + + expect_construct(mock_managed_lock); + MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); + + // Init for (int i = 0; i <= max_acquire_attempts; i++) { expect_try_acquire_lock(mock_managed_lock, -EAGAIN); if (i < max_acquire_attempts) { @@ -329,12 +604,21 @@ TEST_F(TestMockLeaderWatcher, Break) { expect_break_lock(mock_managed_lock, locker, 0, &on_break); C_SaferCond on_heartbeat_finish; expect_try_acquire_lock(mock_managed_lock, 0); + expect_init(mock_mirror_status_watcher, 0); + expect_init(mock_instances, 0); + expect_acquire_notify(mock_managed_lock, listener, 0); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); ASSERT_EQ(0, on_heartbeat_finish.wait()); - expect_shut_down(mock_managed_lock, 0); + // Shutdown + expect_release_notify(mock_managed_lock, listener, 0); + expect_shut_down(mock_instances, 0); + expect_shut_down(mock_mirror_status_watcher, 0); + expect_is_leader(mock_managed_lock, false, false); + expect_release_lock(mock_managed_lock, 0); + expect_shut_down(mock_managed_lock, true, 0); leader_watcher.shut_down(); } diff --git a/src/test/rbd_mirror/test_mock_fixture.h b/src/test/rbd_mirror/test_mock_fixture.h index 77a36ddc6d8..18ee6146763 100644 --- a/src/test/rbd_mirror/test_mock_fixture.h +++ b/src/test/rbd_mirror/test_mock_fixture.h @@ -6,6 +6,7 @@ #include "test/rbd_mirror/test_fixture.h" #include "test/librados_test_stub/LibradosTestStub.h" +#include "common/WorkQueue.h" #include #include @@ -23,6 +24,11 @@ ACTION_P(CompleteContext, r) { arg0->complete(r); } +ACTION_P2(CompleteContext, wq, r) { + ContextWQ *context_wq = reinterpret_cast(wq); + context_wq->queue(arg0, r); +} + MATCHER_P(ContentsEqual, bl, "") { // TODO fix const-correctness of bufferlist return const_cast(arg).contents_equal( diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index c07100b4648..1655ecd244b 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -118,7 +118,7 @@ int InstanceWatcher::init() { template void InstanceWatcher::init(Context *on_finish) { - dout(20) << dendl; + dout(20) << "instance_id=" << m_instance_id << dendl; Mutex::Locker locker(m_lock); diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index 9ab4f0468c8..6dd08a8b85f 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -15,7 +15,6 @@ #undef dout_prefix #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \ << this << " " << __func__ << ": " - namespace rbd { namespace mirror { @@ -31,7 +30,18 @@ LeaderWatcher::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER), m_threads(threads), m_listener(listener), m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()), - m_notifier_id(librados::Rados(io_ctx).get_instance_id()) { + m_notifier_id(librados::Rados(io_ctx).get_instance_id()), + m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true, + m_cct->_conf->rbd_blacklist_expire_seconds)) { +} + +template +LeaderWatcher::~LeaderWatcher() { + assert(m_status_watcher == nullptr); + assert(m_instances == nullptr); + assert(m_timer_task == nullptr); + + delete m_leader_lock; } template @@ -43,15 +53,10 @@ int LeaderWatcher::init() { template void LeaderWatcher::init(Context *on_finish) { - dout(20) << dendl; + dout(20) << "notifier_id=" << m_notifier_id << dendl; Mutex::Locker locker(m_lock); - assert(!m_leader_lock); - m_leader_lock.reset( - new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true, - m_cct->_conf->rbd_blacklist_expire_seconds)); - assert(m_on_finish == nullptr); m_on_finish = on_finish; @@ -143,22 +148,6 @@ void LeaderWatcher::shut_down(Context *on_finish) { Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); - assert(m_leader_lock); - - if (false && is_leader(m_lock)) { - Context *ctx = create_async_context_callback( - m_work_queue, new FunctionContext( - [this, on_finish](int r) { - if (r < 0) { - derr << "error releasing leader lock: " << cpp_strerror(r) << dendl; - } - shut_down(on_finish); - })); - - m_leader_lock->release_lock(ctx); - return; - } - assert(m_on_shut_down_finish == nullptr); m_on_shut_down_finish = on_finish; cancel_timer_task(); @@ -234,7 +223,7 @@ template bool LeaderWatcher::is_leader(Mutex &lock) { assert(m_lock.is_locked()); - bool leader = m_leader_lock && m_leader_lock->is_leader(); + bool leader = m_leader_lock->is_leader(); dout(20) << leader << dendl; return leader; } @@ -251,6 +240,19 @@ void LeaderWatcher::release_leader() { release_leader_lock(); } +template +void LeaderWatcher::list_instances(std::vector *instance_ids) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + instance_ids->clear(); + if (m_instances != nullptr) { + m_instances->list(instance_ids); + } +} + + template void LeaderWatcher::cancel_timer_task() { assert(m_threads->timer_lock.is_locked()); @@ -273,6 +275,10 @@ void LeaderWatcher::schedule_timer_task(const std::string &name, assert(m_threads->timer_lock.is_locked()); assert(m_lock.is_locked()); + if (m_on_shut_down_finish != nullptr) { + return; + } + cancel_timer_task(); m_timer_task = new FunctionContext( @@ -312,7 +318,7 @@ void LeaderWatcher::handle_post_acquire_leader_lock(int r, Mutex::Locker locker(m_lock); assert(m_on_finish == nullptr); m_on_finish = on_finish; - m_notify_error = 0; + m_ret_val = 0; init_status_watcher(); } @@ -324,7 +330,7 @@ void LeaderWatcher::handle_pre_release_leader_lock(Context *on_finish) { Mutex::Locker locker(m_lock); assert(m_on_finish == nullptr); m_on_finish = on_finish; - m_notify_error = 0; + m_ret_val = 0; notify_listener(); } @@ -484,7 +490,7 @@ void LeaderWatcher::handle_acquire_leader_lock(int r) { m_acquire_attempts = 0; - if (m_notify_error) { + if (m_ret_val) { dout(5) << "releasing due to error on notify" << dendl; release_leader_lock(); return; @@ -526,9 +532,9 @@ void LeaderWatcher::init_status_watcher() { dout(20) << dendl; assert(m_lock.is_locked()); - assert(!m_status_watcher); + assert(m_status_watcher == nullptr); - m_status_watcher.reset(new MirrorStatusWatcher(m_ioctx, m_work_queue)); + m_status_watcher = MirrorStatusWatcher::create(m_ioctx, m_work_queue); Context *ctx = create_context_callback< LeaderWatcher, &LeaderWatcher::handle_init_status_watcher>(this); @@ -545,13 +551,14 @@ void LeaderWatcher::handle_init_status_watcher(int r) { Mutex::Locker locker(m_lock); if (r == 0) { - notify_listener(); + init_instances(); return; } derr << "error initializing mirror status watcher: " << cpp_strerror(r) - << dendl; - m_status_watcher.reset(); + << dendl; + m_status_watcher->destroy(); + m_status_watcher = nullptr; assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); } @@ -563,7 +570,7 @@ void LeaderWatcher::shut_down_status_watcher() { dout(20) << dendl; assert(m_lock.is_locked()); - assert(m_status_watcher); + assert(m_status_watcher != nullptr); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback, @@ -580,17 +587,22 @@ void LeaderWatcher::handle_shut_down_status_watcher(int r) { { Mutex::Locker locker(m_lock); + m_status_watcher->destroy(); + m_status_watcher = nullptr; + if (r < 0) { derr << "error shutting mirror status watcher down: " << cpp_strerror(r) - << dendl; - if (!is_leader(m_lock)) { - // ignore on releasing - r = 0; - } + << dendl; } - assert(m_status_watcher); - m_status_watcher.reset(); + if (m_ret_val != 0) { + r = m_ret_val; + } + + if (!is_leader(m_lock)) { + // ignore on releasing + r = 0; + } assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); @@ -598,6 +610,66 @@ void LeaderWatcher::handle_shut_down_status_watcher(int r) { on_finish->complete(r); } +template +void LeaderWatcher::init_instances() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(m_instances == nullptr); + + m_instances = Instances::create(m_threads, m_ioctx); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_init_instances>(this); + + m_instances->init(ctx); +} + +template +void LeaderWatcher::handle_init_instances(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error initializing instances: " << cpp_strerror(r) << dendl; + m_ret_val = r; + m_instances->destroy(); + m_instances = nullptr; + shut_down_status_watcher(); + return; + } + + notify_listener(); +} + +template +void LeaderWatcher::shut_down_instances() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(m_instances != nullptr); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback, + &LeaderWatcher::handle_shut_down_instances>(this)); + + m_instances->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_instances(int r) { + dout(20) << "r=" << r << dendl; + assert(r == 0); + + Mutex::Locker locker(m_lock); + + m_instances->destroy(); + m_instances = nullptr; + + shut_down_status_watcher(); +} + template void LeaderWatcher::notify_listener() { dout(20) << dendl; @@ -630,13 +702,13 @@ void LeaderWatcher::handle_notify_listener(int r) { if (r < 0) { derr << "error notifying listener: " << cpp_strerror(r) << dendl; - m_notify_error = r; + m_ret_val = r; } if (is_leader(m_lock)) { notify_lock_acquired(); } else { - shut_down_status_watcher(); + shut_down_instances(); } } @@ -664,8 +736,8 @@ void LeaderWatcher::handle_notify_lock_acquired(int r) { Mutex::Locker locker(m_lock); if (r < 0 && r != -ETIMEDOUT) { derr << "error notifying leader lock acquired: " << cpp_strerror(r) - << dendl; - m_notify_error = r; + << dendl; + m_ret_val = r; } assert(m_on_finish != nullptr); @@ -724,7 +796,8 @@ void LeaderWatcher::notify_heartbeat() { bufferlist bl; ::encode(NotifyMessage{HeartbeatPayload{}}, bl); - send_notify(bl, nullptr, ctx); + m_heartbeat_ack_bl.clear(); + send_notify(bl, &m_heartbeat_ack_bl, ctx); } template @@ -745,6 +818,33 @@ void LeaderWatcher::handle_notify_heartbeat(int r) { return; } + try { + bufferlist::iterator iter = m_heartbeat_ack_bl.begin(); + uint32_t num_acks; + ::decode(num_acks, iter); + + dout(20) << num_acks << " acks received" << dendl; + + for (uint32_t i = 0; i < num_acks; i++) { + uint64_t notifier_id; + uint64_t cookie; + bufferlist reply_bl; + + ::decode(notifier_id, iter); + ::decode(cookie, iter); + ::decode(reply_bl, iter); + + if (notifier_id == m_notifier_id) { + continue; + } + + std::string instance_id = stringify(notifier_id); + m_instances->notify(instance_id); + } + } catch (const buffer::error &err) { + derr << ": error decoding heartbeat acks: " << err.what() << dendl; + } + schedule_timer_task("heartbeat", 1, true, &LeaderWatcher::notify_heartbeat); } diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h index 8d5fd5dae85..e2e7cc8a0e9 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.h +++ b/src/tools/rbd_mirror/LeaderWatcher.h @@ -11,6 +11,7 @@ #include "librbd/ManagedLock.h" #include "librbd/managed_lock/Types.h" #include "librbd/Watcher.h" +#include "Instances.h" #include "MirrorStatusWatcher.h" #include "tools/rbd_mirror/leader_watcher/Types.h" @@ -33,6 +34,7 @@ public: }; LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, Listener *listener); + ~LeaderWatcher() override; int init(); void shut_down(); @@ -42,6 +44,7 @@ public: bool is_leader(); void release_leader(); + void list_instances(std::vector *instance_ids); private: /** @@ -68,18 +71,21 @@ private: * v v v v v (error) * v | | | | * ACQUIRE_LEADER_LOCK * * * * *> GET_LOCKER ---> * | * ^ - * ....|...................*............. .....|..................... - * . v * . . | post_release . - * .INIT_STATUS_WATCHER * * . .NOTIFY_LOCK_RELEASED . - * . | (error) . .....^..................... - * . v . | - * .NOTIFY_LISTENERS . RELEASE_LEADER_LOCK - * . | . ^ - * . v . .....|..................... - * .NOTIFY_LOCK_ACQUIRED post_acquire . .SHUT_DOWN_STATUS_WATCHER . - * ....|................................. . ^ . + * ....|...................*.................... .....|..................... + * . v * . . | post_release . + * .INIT_STATUS_WATCHER * * (error) . .NOTIFY_LOCK_RELEASED . + * . | ^ . .....^..................... + * . v (error) | . | + * .INIT_INSTANCES *> SHUT_DOWN_STATUS_WATCHER . RELEASE_LEADER_LOCK + * . | . ^ + * . v . .....|..................... + * .NOTIFY_LISTENER . .SHUT_DOWN_STATUS_WATCHER . + * . | . . ^ . + * . v . . | . + * .NOTIFY_LOCK_ACQUIRED post_acquire . .SHUT_DOWN_INSTANCES . + * ....|........................................ . ^ . * v . | . - * -----------------------------------> .NOTIFY_LISTENERS . + * -----------------------------------> .NOTIFY_LISTENER . * (shut_down, release_leader, . pre_release . * notify error) ........................... * @endverbatim @@ -156,14 +162,16 @@ private: Mutex m_lock; uint64_t m_notifier_id; + LeaderLock *m_leader_lock; Context *m_on_finish = nullptr; Context *m_on_shut_down_finish = nullptr; int m_acquire_attempts = 0; - int m_notify_error = 0; - std::unique_ptr m_leader_lock; - std::unique_ptr m_status_watcher; + int m_ret_val = 0; + MirrorStatusWatcher *m_status_watcher = nullptr; + Instances *m_instances = nullptr; librbd::managed_lock::Locker m_locker; Context *m_timer_task = nullptr; + bufferlist m_heartbeat_ack_bl; bool is_leader(Mutex &m_lock); @@ -203,6 +211,12 @@ private: void shut_down_status_watcher(); void handle_shut_down_status_watcher(int r); + void init_instances(); + void handle_init_instances(int r); + + void shut_down_instances(); + void handle_shut_down_instances(int r); + void notify_listener(); void handle_notify_listener(int r); @@ -215,6 +229,9 @@ private: void notify_heartbeat(); void handle_notify_heartbeat(int r); + void get_instances(); + void handle_get_instances(int r); + void handle_post_acquire_leader_lock(int r, Context *on_finish); void handle_pre_release_leader_lock(Context *on_finish); void handle_post_release_leader_lock(int r, Context *on_finish); diff --git a/src/tools/rbd_mirror/MirrorStatusWatcher.cc b/src/tools/rbd_mirror/MirrorStatusWatcher.cc index b969ed68ef1..95e347363ba 100644 --- a/src/tools/rbd_mirror/MirrorStatusWatcher.cc +++ b/src/tools/rbd_mirror/MirrorStatusWatcher.cc @@ -18,12 +18,18 @@ namespace mirror { using librbd::util::create_rados_ack_callback; -MirrorStatusWatcher::MirrorStatusWatcher(librados::IoCtx &io_ctx, - ContextWQ *work_queue) +template +MirrorStatusWatcher::MirrorStatusWatcher(librados::IoCtx &io_ctx, + ContextWQ *work_queue) : Watcher(io_ctx, work_queue, RBD_MIRRORING) { } -void MirrorStatusWatcher::init(Context *on_finish) { +template +MirrorStatusWatcher::~MirrorStatusWatcher() { +} + +template +void MirrorStatusWatcher::init(Context *on_finish) { dout(20) << dendl; on_finish = new FunctionContext( @@ -45,14 +51,17 @@ void MirrorStatusWatcher::init(Context *on_finish) { aio_comp->release(); } -void MirrorStatusWatcher::shut_down(Context *on_finish) { +template +void MirrorStatusWatcher::shut_down(Context *on_finish) { dout(20) << dendl; unregister_watch(on_finish); } -void MirrorStatusWatcher::handle_notify(uint64_t notify_id, uint64_t handle, - uint64_t notifier_id, bufferlist &bl) { +template +void MirrorStatusWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, + bufferlist &bl) { dout(20) << dendl; bufferlist out; @@ -61,3 +70,5 @@ void MirrorStatusWatcher::handle_notify(uint64_t notify_id, uint64_t handle, } // namespace mirror } // namespace rbd + +template class rbd::mirror::MirrorStatusWatcher; diff --git a/src/tools/rbd_mirror/MirrorStatusWatcher.h b/src/tools/rbd_mirror/MirrorStatusWatcher.h index f9d9227b3c1..155f8cc8d05 100644 --- a/src/tools/rbd_mirror/MirrorStatusWatcher.h +++ b/src/tools/rbd_mirror/MirrorStatusWatcher.h @@ -6,12 +6,24 @@ #include "librbd/Watcher.h" +namespace librbd { class ImageCtx; } + namespace rbd { namespace mirror { +template class MirrorStatusWatcher : protected librbd::Watcher { public: + static MirrorStatusWatcher *create(librados::IoCtx &io_ctx, + ContextWQ *work_queue) { + return new MirrorStatusWatcher(io_ctx, work_queue); + } + void destroy() { + delete this; + } + MirrorStatusWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue); + ~MirrorStatusWatcher() override; void init(Context *on_finish); void shut_down(Context *on_finish); diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 0b9cf478057..01750aaa777 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -480,26 +480,37 @@ void Replayer::print_status(Formatter *f, stringstream *ss) { dout(20) << "enter" << dendl; + if (!f) { + return; + } + Mutex::Locker l(m_lock); - if (f) { - f->open_object_section("replayer_status"); - f->dump_string("pool", m_local_io_ctx.get_pool_name()); - f->dump_stream("peer") << m_peer; - f->dump_bool("leader", m_leader_watcher->is_leader()); - f->open_array_section("image_replayers"); - }; + f->open_object_section("replayer_status"); + f->dump_string("pool", m_local_io_ctx.get_pool_name()); + f->dump_stream("peer") << m_peer; + + bool leader = m_leader_watcher->is_leader(); + f->dump_bool("leader", leader); + if (leader) { + std::vector instance_ids; + m_leader_watcher->list_instances(&instance_ids); + f->open_array_section("instances"); + for (auto instance_id : instance_ids) { + f->dump_string("instance_id", instance_id); + } + f->close_section(); + } + f->open_array_section("image_replayers"); for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; image_replayer->print_status(f, ss); } - if (f) { - f->close_section(); - f->close_section(); - f->flush(*ss); - } + f->close_section(); + f->close_section(); + f->flush(*ss); } void Replayer::start() -- 2.39.5