From 76a33346aff7d1aadc38386266ff14290b11799a Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 9 Mar 2018 16:16:34 -0500 Subject: [PATCH] rbd-mirror: block instance add/remove notifications until post-locked Avoid a possible race condition with instances being detected before the pool replayer leader handling is fully initialized. Signed-off-by: Jason Dillaman --- src/test/rbd_mirror/test_Instances.cc | 6 +++++ .../rbd_mirror/test_mock_LeaderWatcher.cc | 9 ++++++++ src/tools/rbd_mirror/Instances.cc | 23 ++++++++++++++++++- src/tools/rbd_mirror/Instances.h | 4 ++++ src/tools/rbd_mirror/LeaderWatcher.cc | 3 +++ 5 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/test/rbd_mirror/test_Instances.cc b/src/test/rbd_mirror/test_Instances.cc index e233f5d79078e..f6a80e572b7ae 100644 --- a/src/test/rbd_mirror/test_Instances.cc +++ b/src/test/rbd_mirror/test_Instances.cc @@ -76,6 +76,9 @@ TEST_F(TestInstances, InitShutdown) instances.init(&on_init); ASSERT_EQ(0, on_init.wait()); + ASSERT_LT(0U, m_listener.add.count); + instances.unblock_listener(); + ASSERT_EQ(0, m_listener.add.ctx.wait()); ASSERT_EQ(std::set({instance_id}), m_listener.add.ids); @@ -125,6 +128,9 @@ TEST_F(TestInstances, NotifyRemove) instances.acked({instance_id1, instance_id2}); + ASSERT_LT(0U, m_listener.add.count); + instances.unblock_listener(); + ASSERT_EQ(0, m_listener.add.ctx.wait()); ASSERT_EQ(std::set({instance_id1, instance_id2}), m_listener.add.ids); diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc index 2eae5fd2d7bb0..78d879da73d0c 100644 --- a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -231,6 +231,7 @@ struct Instances { MOCK_METHOD1(init, void(Context *)); MOCK_METHOD1(shut_down, void(Context *)); MOCK_METHOD1(acked, void(const std::vector &)); + MOCK_METHOD0(unblock_listener, void()); }; Instances *Instances::s_instance = nullptr; @@ -446,6 +447,10 @@ public: expect_is_leader(mock_managed_lock, false, false); } + void expect_unblock_listener(MockInstances& mock_instances) { + EXPECT_CALL(mock_instances, unblock_listener()); + } + MockThreads *m_mock_threads; }; @@ -470,6 +475,7 @@ TEST_F(TestMockLeaderWatcher, InitShutdown) { expect_try_acquire_lock(mock_managed_lock, 0); expect_init(mock_instances, 0); expect_acquire_notify(mock_managed_lock, listener, 0); + expect_unblock_listener(mock_instances); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); @@ -507,6 +513,7 @@ TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { expect_try_acquire_lock(mock_managed_lock, 0); expect_init(mock_instances, 0); expect_acquire_notify(mock_managed_lock, listener, 0); + expect_unblock_listener(mock_instances); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); @@ -581,6 +588,7 @@ TEST_F(TestMockLeaderWatcher, AcquireError) { expect_try_acquire_lock(mock_managed_lock, 0); expect_init(mock_instances, 0); expect_acquire_notify(mock_managed_lock, listener, 0); + expect_unblock_listener(mock_instances); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); @@ -635,6 +643,7 @@ TEST_F(TestMockLeaderWatcher, Break) { expect_try_acquire_lock(mock_managed_lock, 0); expect_init(mock_instances, 0); expect_acquire_notify(mock_managed_lock, listener, 0); + expect_unblock_listener(mock_instances); expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish); ASSERT_EQ(0, leader_watcher.init()); diff --git a/src/tools/rbd_mirror/Instances.cc b/src/tools/rbd_mirror/Instances.cc index bb8fa2c95daa3..4513869d3f048 100644 --- a/src/tools/rbd_mirror/Instances.cc +++ b/src/tools/rbd_mirror/Instances.cc @@ -65,6 +65,27 @@ void Instances::shut_down(Context *on_finish) { m_threads->work_queue->queue(ctx, 0); } +template +void Instances::unblock_listener() { + dout(5) << dendl; + + Mutex::Locker locker(m_lock); + assert(m_listener_blocked); + m_listener_blocked = false; + + InstanceIds added_instance_ids; + for (auto& pair : m_instances) { + if (pair.second.state == INSTANCE_STATE_ADDING) { + added_instance_ids.push_back(pair.first); + } + } + + if (!added_instance_ids.empty()) { + m_threads->work_queue->queue( + new C_NotifyInstancesAdded(this, added_instance_ids), 0); + } +} + template void Instances::acked(const InstanceIds& instance_ids) { dout(20) << "instance_ids=" << instance_ids << dendl; @@ -102,7 +123,7 @@ void Instances::handle_acked(const InstanceIds& instance_ids) { } schedule_remove_task(time); - if (!added_instance_ids.empty()) { + if (!m_listener_blocked && !added_instance_ids.empty()) { m_threads->work_queue->queue( new C_NotifyInstancesAdded(this, added_instance_ids), 0); } diff --git a/src/tools/rbd_mirror/Instances.h b/src/tools/rbd_mirror/Instances.h index 827b2eedf2939..3a4f5f3e8445c 100644 --- a/src/tools/rbd_mirror/Instances.h +++ b/src/tools/rbd_mirror/Instances.h @@ -42,6 +42,8 @@ public: void init(Context *on_finish); void shut_down(Context *on_finish); + void unblock_listener(); + void acked(const InstanceIds& instance_ids); void list(std::vector *instance_ids); @@ -138,6 +140,8 @@ private: Context *m_timer_task = nullptr; + bool m_listener_blocked = true; + void handle_acked(const InstanceIds& instance_ids); void notify_instances_added(const InstanceIds& instance_ids); void notify_instances_removed(const InstanceIds& instance_ids); diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index 01e997f8e8cf2..c3acb21f6abdd 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -899,6 +899,9 @@ void LeaderWatcher::handle_notify_lock_acquired(int r) { assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); + + // listener should be ready for instance add/remove events now + m_instances->unblock_listener(); } on_finish->complete(0); } -- 2.39.5