]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: handle duplicates in image sync throttler queue 28815/head
authorMykola Golub <mgolub@suse.com>
Tue, 25 Jun 2019 04:38:37 +0000 (05:38 +0100)
committerMykola Golub <mgolub@suse.com>
Mon, 8 Jul 2019 17:18:56 +0000 (20:18 +0300)
Fixes: http://tracker.ceph.com/issues/40519
Signed-off-by: Mykola Golub <mgolub@suse.com>
(cherry picked from commit 2f35ab70a59fc22c02c98e5aa80a1e6cde29a226)

Conflicts:
    src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc:
       no g_ceph_context arg for MockImageSyncThrottler constructor

src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc
src/tools/rbd_mirror/ImageSyncThrottler.cc
src/tools/rbd_mirror/ImageSyncThrottler.h
src/tools/rbd_mirror/InstanceWatcher.cc

index 24815aeb799aa2bf41d818a708b2e78dd3f7a137..0a052c7002c657185ed27a36bd341ca9ef9b2531 100644 (file)
@@ -113,6 +113,65 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
   throttler.finish_op("id2");
 }
 
+TEST_F(TestMockImageSyncThrottler, Duplicate) {
+  MockImageSyncThrottler throttler;
+  throttler.set_max_concurrent_syncs(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  ASSERT_EQ(0, on_start1.wait());
+
+  C_SaferCond on_start2;
+  throttler.start_op("id1", &on_start2);
+  ASSERT_EQ(0, on_start2.wait());
+
+  C_SaferCond on_start3;
+  throttler.start_op("id2", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("id2", &on_start4);
+  ASSERT_EQ(-ENOENT, on_start3.wait());
+
+  throttler.finish_op("id1");
+  ASSERT_EQ(0, on_start4.wait());
+  throttler.finish_op("id2");
+}
+
+TEST_F(TestMockImageSyncThrottler, Duplicate2) {
+  MockImageSyncThrottler throttler;
+  throttler.set_max_concurrent_syncs(2);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  ASSERT_EQ(0, on_start1.wait());
+  C_SaferCond on_start2;
+  throttler.start_op("id2", &on_start2);
+  ASSERT_EQ(0, on_start2.wait());
+
+  C_SaferCond on_start3;
+  throttler.start_op("id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("id3", &on_start4); // dup
+  ASSERT_EQ(-ENOENT, on_start3.wait());
+
+  C_SaferCond on_start5;
+  throttler.start_op("id4", &on_start5);
+
+  throttler.finish_op("id1");
+  ASSERT_EQ(0, on_start4.wait());
+
+  throttler.finish_op("id2");
+  ASSERT_EQ(0, on_start5.wait());
+
+  C_SaferCond on_start6;
+  throttler.start_op("id5", &on_start6);
+
+  throttler.finish_op("id3");
+  ASSERT_EQ(0, on_start6.wait());
+
+  throttler.finish_op("id4");
+  throttler.finish_op("id5");
+}
+
 TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
   MockImageSyncThrottler throttler;
   throttler.set_max_concurrent_syncs(2);
index dfa96ed4d3e463f3a21faca658958aa54398e99f..cde96b22c482482c4b21039202df0e5827e9dee4 100644 (file)
@@ -50,11 +50,16 @@ template <typename I>
 void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
   dout(20) << "id=" << id << dendl;
 
+  int r = 0;
   {
     Mutex::Locker locker(m_lock);
 
     if (m_inflight_ops.count(id) > 0) {
       dout(20) << "duplicate for already started op " << id << dendl;
+    } else if (m_queued_ops.count(id) > 0) {
+      dout(20) << "duplicate for already queued op " << id << dendl;
+      std::swap(m_queued_ops[id], on_start);
+      r = -ENOENT;
     } else if (m_max_concurrent_syncs == 0 ||
                m_inflight_ops.size() < m_max_concurrent_syncs) {
       assert(m_queue.empty());
@@ -63,14 +68,14 @@ void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
                << dendl;
     } else {
-      m_queue.push_back(std::make_pair(id, on_start));
-      on_start = nullptr;
+      m_queue.push_back(id);
+      std::swap(m_queued_ops[id], on_start);
       dout(20) << "image sync for " << id << " has been queued" << dendl;
     }
   }
 
   if (on_start != nullptr) {
-    on_start->complete(0);
+    on_start->complete(r);
   }
 }
 
@@ -81,13 +86,12 @@ bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
   Context *on_start = nullptr;
   {
     Mutex::Locker locker(m_lock);
-    for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
-      if (it->first == id) {
-        on_start = it->second;
-        dout(20) << "canceled queued sync for " << id << dendl;
-        m_queue.erase(it);
-        break;
-      }
+    auto it = m_queued_ops.find(id);
+    if (it != m_queued_ops.end()) {
+      dout(20) << "canceled queued sync for " << id << dendl;
+      m_queue.remove(id);
+      on_start = it->second;
+      m_queued_ops.erase(it);
     }
   }
 
@@ -114,12 +118,15 @@ void ImageSyncThrottler<I>::finish_op(const std::string &id) {
     m_inflight_ops.erase(id);
 
     if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
-      auto pair = m_queue.front();
-      m_inflight_ops.insert(pair.first);
-      dout(20) << "ready to start sync for " << pair.first << " ["
+      auto id = m_queue.front();
+      auto it = m_queued_ops.find(id);
+      ceph_assert(it != m_queued_ops.end());
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start sync for " << id << " ["
                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
                << dendl;
-      on_start= pair.second;
+      on_start = it->second;
+      m_queued_ops.erase(it);
       m_queue.pop_front();
     }
   }
@@ -133,15 +140,16 @@ template <typename I>
 void ImageSyncThrottler<I>::drain(int r) {
   dout(20) << dendl;
 
-  std::list<std::pair<std::string, Context *>> queue;
+  std::map<std::string, Context *> queued_ops;
   {
     Mutex::Locker locker(m_lock);
-    std::swap(m_queue, queue);
+    std::swap(m_queued_ops, queued_ops);
+    m_queue.clear();
     m_inflight_ops.clear();
   }
 
-  for (auto &pair : queue) {
-    pair.second->complete(r);
+  for (auto &it : queued_ops) {
+    it.second->complete(r);
   }
 }
 
@@ -158,12 +166,15 @@ void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
     while ((m_max_concurrent_syncs == 0 ||
             m_inflight_ops.size() < m_max_concurrent_syncs) &&
            !m_queue.empty()) {
-      auto pair = m_queue.front();
-      m_inflight_ops.insert(pair.first);
-      dout(20) << "ready to start sync for " << pair.first << " ["
+      auto id = m_queue.front();
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start sync for " << id << " ["
                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
                << dendl;
-      ops.push_back(pair.second);
+      auto it = m_queued_ops.find(id);
+      ceph_assert(it != m_queued_ops.end());
+      ops.push_back(it->second);
+      m_queued_ops.erase(it);
       m_queue.pop_front();
     }
   }
index e0c3f0bf5a9b9c2288f39f129a729c18688a35fb..b553e43bdff67d22940a656ff8b46771bb0ed1be 100644 (file)
@@ -5,6 +5,7 @@
 #define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
 
 #include <list>
+#include <map>
 #include <set>
 #include <sstream>
 #include <string>
@@ -45,7 +46,8 @@ public:
 private:
   Mutex m_lock;
   uint32_t m_max_concurrent_syncs;
-  std::list<std::pair<std::string, Context *>> m_queue;
+  std::list<std::string> m_queue;
+  std::map<std::string, Context *> m_queued_ops;
   std::set<std::string> m_inflight_ops;
 
   const char **get_tracked_conf_keys() const override;
index e69890aaddf596c82d8779704ff2efd3ed1559b7..d5181a40c91d102a1f076528d70af4c1931093c9 100644 (file)
@@ -1188,6 +1188,9 @@ void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
         if (r == 0) {
           notify_sync_start(instance_id, sync_id);
         }
+        if (r == -ENOENT) {
+          r = 0;
+        }
         on_finish->complete(r);
       }));
   m_image_sync_throttler->start_op(sync_id, on_start);