]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: block instance add/remove notifications until post-locked
authorJason Dillaman <dillaman@redhat.com>
Fri, 9 Mar 2018 21:16:34 +0000 (16:16 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 10 Apr 2018 20:31:32 +0000 (16:31 -0400)
Avoid a possible race condition with instances being detected before the
pool replayer leader handling is fully initialized.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_Instances.cc
src/test/rbd_mirror/test_mock_LeaderWatcher.cc
src/tools/rbd_mirror/Instances.cc
src/tools/rbd_mirror/Instances.h
src/tools/rbd_mirror/LeaderWatcher.cc

index e233f5d79078e5a698c5781516db174d9874072f..f6a80e572b7aea55aa03e8305c9d6f11716639b0 100644 (file)
@@ -76,6 +76,9 @@ TEST_F(TestInstances, InitShutdown)
   instances.init(&on_init);
   ASSERT_EQ(0, on_init.wait());
 
+  ASSERT_LT(0U, m_listener.add.count);
+  instances.unblock_listener();
+
   ASSERT_EQ(0, m_listener.add.ctx.wait());
   ASSERT_EQ(std::set<std::string>({instance_id}), m_listener.add.ids);
 
@@ -125,6 +128,9 @@ TEST_F(TestInstances, NotifyRemove)
 
   instances.acked({instance_id1, instance_id2});
 
+  ASSERT_LT(0U, m_listener.add.count);
+  instances.unblock_listener();
+
   ASSERT_EQ(0, m_listener.add.ctx.wait());
   ASSERT_EQ(std::set<std::string>({instance_id1, instance_id2}),
             m_listener.add.ids);
index 2eae5fd2d7bb0edae690cba7aee24f5b0a51d827..78d879da73d0c40b266be6cd3e9b74a93de4df31 100644 (file)
@@ -231,6 +231,7 @@ struct Instances<librbd::MockTestImageCtx> {
   MOCK_METHOD1(init, void(Context *));
   MOCK_METHOD1(shut_down, void(Context *));
   MOCK_METHOD1(acked, void(const std::vector<std::string> &));
+  MOCK_METHOD0(unblock_listener, void());
 };
 
 Instances<librbd::MockTestImageCtx> *Instances<librbd::MockTestImageCtx>::s_instance = nullptr;
@@ -446,6 +447,10 @@ public:
     expect_is_leader(mock_managed_lock, false, false);
   }
 
+  void expect_unblock_listener(MockInstances& mock_instances) {
+    EXPECT_CALL(mock_instances, unblock_listener());
+  }
+
   MockThreads *m_mock_threads;
 };
 
@@ -470,6 +475,7 @@ TEST_F(TestMockLeaderWatcher, InitShutdown) {
   expect_try_acquire_lock(mock_managed_lock, 0);
   expect_init(mock_instances, 0);
   expect_acquire_notify(mock_managed_lock, listener, 0);
+  expect_unblock_listener(mock_instances);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
@@ -507,6 +513,7 @@ TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) {
   expect_try_acquire_lock(mock_managed_lock, 0);
   expect_init(mock_instances, 0);
   expect_acquire_notify(mock_managed_lock, listener, 0);
+  expect_unblock_listener(mock_instances);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
@@ -581,6 +588,7 @@ TEST_F(TestMockLeaderWatcher, AcquireError) {
   expect_try_acquire_lock(mock_managed_lock, 0);
   expect_init(mock_instances, 0);
   expect_acquire_notify(mock_managed_lock, listener, 0);
+  expect_unblock_listener(mock_instances);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
@@ -635,6 +643,7 @@ TEST_F(TestMockLeaderWatcher, Break) {
   expect_try_acquire_lock(mock_managed_lock, 0);
   expect_init(mock_instances, 0);
   expect_acquire_notify(mock_managed_lock, listener, 0);
+  expect_unblock_listener(mock_instances);
   expect_notify_heartbeat(mock_managed_lock, &on_heartbeat_finish);
 
   ASSERT_EQ(0, leader_watcher.init());
index bb8fa2c95daa39291a7b8beb2fe4c00f3743b359..4513869d3f048fa92480f36f1b75853b29b39ad1 100644 (file)
@@ -65,6 +65,27 @@ void Instances<I>::shut_down(Context *on_finish) {
   m_threads->work_queue->queue(ctx, 0);
 }
 
+template <typename I>
+void Instances<I>::unblock_listener() {
+  dout(5) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_listener_blocked);
+  m_listener_blocked = false;
+
+  InstanceIds added_instance_ids;
+  for (auto& pair : m_instances) {
+    if (pair.second.state == INSTANCE_STATE_ADDING) {
+      added_instance_ids.push_back(pair.first);
+    }
+  }
+
+  if (!added_instance_ids.empty()) {
+    m_threads->work_queue->queue(
+      new C_NotifyInstancesAdded(this, added_instance_ids), 0);
+  }
+}
+
 template <typename I>
 void Instances<I>::acked(const InstanceIds& instance_ids) {
   dout(20) << "instance_ids=" << instance_ids << dendl;
@@ -102,7 +123,7 @@ void Instances<I>::handle_acked(const InstanceIds& instance_ids) {
   }
 
   schedule_remove_task(time);
-  if (!added_instance_ids.empty()) {
+  if (!m_listener_blocked && !added_instance_ids.empty()) {
     m_threads->work_queue->queue(
       new C_NotifyInstancesAdded(this, added_instance_ids), 0);
   }
index 827b2eedf293915b1e1e6e9f31e4a12464b0e364..3a4f5f3e8445c097da25ac24c18de1ff040c9940 100644 (file)
@@ -42,6 +42,8 @@ public:
   void init(Context *on_finish);
   void shut_down(Context *on_finish);
 
+  void unblock_listener();
+
   void acked(const InstanceIds& instance_ids);
 
   void list(std::vector<std::string> *instance_ids);
@@ -138,6 +140,8 @@ private:
 
   Context *m_timer_task = nullptr;
 
+  bool m_listener_blocked = true;
+
   void handle_acked(const InstanceIds& instance_ids);
   void notify_instances_added(const InstanceIds& instance_ids);
   void notify_instances_removed(const InstanceIds& instance_ids);
index 01e997f8e8cf2e95ead265e916be3c6c9f63dc6a..c3acb21f6abdd276f0e6735d114d35718b446414 100644 (file)
@@ -899,6 +899,9 @@ void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
 
     assert(m_on_finish != nullptr);
     std::swap(m_on_finish, on_finish);
+
+    // listener should be ready for instance add/remove events now
+    m_instances->unblock_listener();
   }
   on_finish->complete(0);
 }