]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: handle duplicates in image sync throttler queue 28817/head
authorMykola Golub <mgolub@suse.com>
Tue, 25 Jun 2019 04:38:37 +0000 (05:38 +0100)
committerMykola Golub <mgolub@suse.com>
Mon, 1 Jul 2019 11:03:59 +0000 (14:03 +0300)
Fixes: http://tracker.ceph.com/issues/40519
Signed-off-by: Mykola Golub <mgolub@suse.com>
(cherry picked from commit 2f35ab70a59fc22c02c98e5aa80a1e6cde29a226)

src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc
src/tools/rbd_mirror/ImageSyncThrottler.cc
src/tools/rbd_mirror/ImageSyncThrottler.h
src/tools/rbd_mirror/InstanceWatcher.cc

index f30a299f1505ba2800a4acd2f31688f5b5b19589..af88edcba5a6ec06ca5072deb53f65622afcb99d 100644 (file)
@@ -113,6 +113,65 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
   throttler.finish_op("id2");
 }
 
+TEST_F(TestMockImageSyncThrottler, Duplicate) {
+  MockImageSyncThrottler throttler(g_ceph_context);
+  throttler.set_max_concurrent_syncs(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  ASSERT_EQ(0, on_start1.wait());
+
+  C_SaferCond on_start2;
+  throttler.start_op("id1", &on_start2);
+  ASSERT_EQ(0, on_start2.wait());
+
+  C_SaferCond on_start3;
+  throttler.start_op("id2", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("id2", &on_start4);
+  ASSERT_EQ(-ENOENT, on_start3.wait());
+
+  throttler.finish_op("id1");
+  ASSERT_EQ(0, on_start4.wait());
+  throttler.finish_op("id2");
+}
+
+TEST_F(TestMockImageSyncThrottler, Duplicate2) {
+  MockImageSyncThrottler throttler(g_ceph_context);
+  throttler.set_max_concurrent_syncs(2);
+
+  C_SaferCond on_start1;
+  throttler.start_op("id1", &on_start1);
+  ASSERT_EQ(0, on_start1.wait());
+  C_SaferCond on_start2;
+  throttler.start_op("id2", &on_start2);
+  ASSERT_EQ(0, on_start2.wait());
+
+  C_SaferCond on_start3;
+  throttler.start_op("id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("id3", &on_start4); // dup
+  ASSERT_EQ(-ENOENT, on_start3.wait());
+
+  C_SaferCond on_start5;
+  throttler.start_op("id4", &on_start5);
+
+  throttler.finish_op("id1");
+  ASSERT_EQ(0, on_start4.wait());
+
+  throttler.finish_op("id2");
+  ASSERT_EQ(0, on_start5.wait());
+
+  C_SaferCond on_start6;
+  throttler.start_op("id5", &on_start6);
+
+  throttler.finish_op("id3");
+  ASSERT_EQ(0, on_start6.wait());
+
+  throttler.finish_op("id4");
+  throttler.finish_op("id5");
+}
+
 TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
   MockImageSyncThrottler throttler(g_ceph_context);
   throttler.set_max_concurrent_syncs(2);
index c2e618bf48fc1110354ba26bb191144f693d83d5..b395a0127094d777fa96d6d6f8454f445d006d14 100644 (file)
@@ -51,11 +51,16 @@ template <typename I>
 void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
   dout(20) << "id=" << id << dendl;
 
+  int r = 0;
   {
     Mutex::Locker locker(m_lock);
 
     if (m_inflight_ops.count(id) > 0) {
       dout(20) << "duplicate for already started op " << id << dendl;
+    } else if (m_queued_ops.count(id) > 0) {
+      dout(20) << "duplicate for already queued op " << id << dendl;
+      std::swap(m_queued_ops[id], on_start);
+      r = -ENOENT;
     } else if (m_max_concurrent_syncs == 0 ||
                m_inflight_ops.size() < m_max_concurrent_syncs) {
       ceph_assert(m_queue.empty());
@@ -64,14 +69,14 @@ void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
                << dendl;
     } else {
-      m_queue.push_back(std::make_pair(id, on_start));
-      on_start = nullptr;
+      m_queue.push_back(id);
+      std::swap(m_queued_ops[id], on_start);
       dout(20) << "image sync for " << id << " has been queued" << dendl;
     }
   }
 
   if (on_start != nullptr) {
-    on_start->complete(0);
+    on_start->complete(r);
   }
 }
 
@@ -82,13 +87,12 @@ bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
   Context *on_start = nullptr;
   {
     Mutex::Locker locker(m_lock);
-    for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
-      if (it->first == id) {
-        on_start = it->second;
-        dout(20) << "canceled queued sync for " << id << dendl;
-        m_queue.erase(it);
-        break;
-      }
+    auto it = m_queued_ops.find(id);
+    if (it != m_queued_ops.end()) {
+      dout(20) << "canceled queued sync for " << id << dendl;
+      m_queue.remove(id);
+      on_start = it->second;
+      m_queued_ops.erase(it);
     }
   }
 
@@ -115,12 +119,15 @@ void ImageSyncThrottler<I>::finish_op(const std::string &id) {
     m_inflight_ops.erase(id);
 
     if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
-      auto pair = m_queue.front();
-      m_inflight_ops.insert(pair.first);
-      dout(20) << "ready to start sync for " << pair.first << " ["
+      auto id = m_queue.front();
+      auto it = m_queued_ops.find(id);
+      ceph_assert(it != m_queued_ops.end());
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start sync for " << id << " ["
                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
                << dendl;
-      on_start= pair.second;
+      on_start = it->second;
+      m_queued_ops.erase(it);
       m_queue.pop_front();
     }
   }
@@ -134,15 +141,16 @@ template <typename I>
 void ImageSyncThrottler<I>::drain(int r) {
   dout(20) << dendl;
 
-  std::list<std::pair<std::string, Context *>> queue;
+  std::map<std::string, Context *> queued_ops;
   {
     Mutex::Locker locker(m_lock);
-    std::swap(m_queue, queue);
+    std::swap(m_queued_ops, queued_ops);
+    m_queue.clear();
     m_inflight_ops.clear();
   }
 
-  for (auto &pair : queue) {
-    pair.second->complete(r);
+  for (auto &it : queued_ops) {
+    it.second->complete(r);
   }
 }
 
@@ -159,12 +167,15 @@ void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
     while ((m_max_concurrent_syncs == 0 ||
             m_inflight_ops.size() < m_max_concurrent_syncs) &&
            !m_queue.empty()) {
-      auto pair = m_queue.front();
-      m_inflight_ops.insert(pair.first);
-      dout(20) << "ready to start sync for " << pair.first << " ["
+      auto id = m_queue.front();
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start sync for " << id << " ["
                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
                << dendl;
-      ops.push_back(pair.second);
+      auto it = m_queued_ops.find(id);
+      ceph_assert(it != m_queued_ops.end());
+      ops.push_back(it->second);
+      m_queued_ops.erase(it);
       m_queue.pop_front();
     }
   }
index 8c8f754626a3f28b9fbd23a7e76df92f2a15a6bf..c0cda61e9a6196d20459568f09e63e2759b63a6e 100644 (file)
@@ -5,6 +5,7 @@
 #define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
 
 #include <list>
+#include <map>
 #include <set>
 #include <sstream>
 #include <string>
@@ -47,7 +48,8 @@ private:
   CephContext *m_cct;
   Mutex m_lock;
   uint32_t m_max_concurrent_syncs;
-  std::list<std::pair<std::string, Context *>> m_queue;
+  std::list<std::string> m_queue;
+  std::map<std::string, Context *> m_queued_ops;
   std::set<std::string> m_inflight_ops;
 
   const char **get_tracked_conf_keys() const override;
index 5889e01351555e0a14f6a79e59c166a30f98df55..d9e1ba23345618052011300583099b6de76dcb03 100644 (file)
@@ -1176,6 +1176,9 @@ void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
         if (r == 0) {
           notify_sync_start(instance_id, sync_id);
         }
+        if (r == -ENOENT) {
+          r = 0;
+        }
         on_finish->complete(r);
       }));
   m_image_sync_throttler->start_op(sync_id, on_start);