From: Mykola Golub Date: Tue, 25 Jun 2019 04:38:37 +0000 (+0100) Subject: rbd-mirror: handle duplicates in image sync throttler queue X-Git-Tag: v13.2.7~270^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F28815%2Fhead;p=ceph.git rbd-mirror: handle duplicates in image sync throttler queue Fixes: http://tracker.ceph.com/issues/40519 Signed-off-by: Mykola Golub (cherry picked from commit 2f35ab70a59fc22c02c98e5aa80a1e6cde29a226) Conflicts: src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc: no g_ceph_context arg for MockImageSyncThrottler constructor --- diff --git a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc index 24815aeb799a..0a052c7002c6 100644 --- a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc +++ b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc @@ -113,6 +113,65 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) { throttler.finish_op("id2"); } +TEST_F(TestMockImageSyncThrottler, Duplicate) { + MockImageSyncThrottler throttler; + throttler.set_max_concurrent_syncs(1); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + ASSERT_EQ(0, on_start1.wait()); + + C_SaferCond on_start2; + throttler.start_op("id1", &on_start2); + ASSERT_EQ(0, on_start2.wait()); + + C_SaferCond on_start3; + throttler.start_op("id2", &on_start3); + C_SaferCond on_start4; + throttler.start_op("id2", &on_start4); + ASSERT_EQ(-ENOENT, on_start3.wait()); + + throttler.finish_op("id1"); + ASSERT_EQ(0, on_start4.wait()); + throttler.finish_op("id2"); +} + +TEST_F(TestMockImageSyncThrottler, Duplicate2) { + MockImageSyncThrottler throttler; + throttler.set_max_concurrent_syncs(2); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + ASSERT_EQ(0, on_start1.wait()); + C_SaferCond on_start2; + throttler.start_op("id2", &on_start2); + ASSERT_EQ(0, on_start2.wait()); + + C_SaferCond on_start3; + throttler.start_op("id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("id3", &on_start4); // dup + ASSERT_EQ(-ENOENT, on_start3.wait()); + + C_SaferCond on_start5; + throttler.start_op("id4", &on_start5); + + throttler.finish_op("id1"); + ASSERT_EQ(0, on_start4.wait()); + + throttler.finish_op("id2"); + ASSERT_EQ(0, on_start5.wait()); + + C_SaferCond on_start6; + throttler.start_op("id5", &on_start6); + + throttler.finish_op("id3"); + ASSERT_EQ(0, on_start6.wait()); + + throttler.finish_op("id4"); + throttler.finish_op("id5"); +} + TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) { MockImageSyncThrottler throttler; throttler.set_max_concurrent_syncs(2); diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc index dfa96ed4d3e4..cde96b22c482 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.cc +++ b/src/tools/rbd_mirror/ImageSyncThrottler.cc @@ -50,11 +50,16 @@ template void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { dout(20) << "id=" << id << dendl; + int r = 0; { Mutex::Locker locker(m_lock); if (m_inflight_ops.count(id) > 0) { dout(20) << "duplicate for already started op " << id << dendl; + } else if (m_queued_ops.count(id) > 0) { + dout(20) << "duplicate for already queued op " << id << dendl; + std::swap(m_queued_ops[id], on_start); + r = -ENOENT; } else if (m_max_concurrent_syncs == 0 || m_inflight_ops.size() < m_max_concurrent_syncs) { assert(m_queue.empty()); @@ -63,14 +68,14 @@ void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" << dendl; } else { - m_queue.push_back(std::make_pair(id, on_start)); - on_start = nullptr; + m_queue.push_back(id); + std::swap(m_queued_ops[id], on_start); dout(20) << "image sync for " << id << " has been queued" << dendl; } } if (on_start != nullptr) { - on_start->complete(0); + on_start->complete(r); } } @@ -81,13 +86,12 @@ bool ImageSyncThrottler::cancel_op(const std::string &id) { Context *on_start = nullptr; { Mutex::Locker locker(m_lock); - for (auto it = m_queue.begin(); it != m_queue.end(); ++it) { - if (it->first == id) { - on_start = it->second; - dout(20) << "canceled queued sync for " << id << dendl; - m_queue.erase(it); - break; - } + auto it = m_queued_ops.find(id); + if (it != m_queued_ops.end()) { + dout(20) << "canceled queued sync for " << id << dendl; + m_queue.remove(id); + on_start = it->second; + m_queued_ops.erase(it); } } @@ -114,12 +118,15 @@ void ImageSyncThrottler::finish_op(const std::string &id) { m_inflight_ops.erase(id); if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) { - auto pair = m_queue.front(); - m_inflight_ops.insert(pair.first); - dout(20) << "ready to start sync for " << pair.first << " [" + auto id = m_queue.front(); + auto it = m_queued_ops.find(id); + ceph_assert(it != m_queued_ops.end()); + m_inflight_ops.insert(id); + dout(20) << "ready to start sync for " << id << " [" << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" << dendl; - on_start= pair.second; + on_start = it->second; + m_queued_ops.erase(it); m_queue.pop_front(); } } @@ -133,15 +140,16 @@ template void ImageSyncThrottler::drain(int r) { dout(20) << dendl; - std::list> queue; + std::map queued_ops; { Mutex::Locker locker(m_lock); - std::swap(m_queue, queue); + std::swap(m_queued_ops, queued_ops); + m_queue.clear(); m_inflight_ops.clear(); } - for (auto &pair : queue) { - pair.second->complete(r); + for (auto &it : queued_ops) { + it.second->complete(r); } } @@ -158,12 +166,15 @@ void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { while ((m_max_concurrent_syncs == 0 || m_inflight_ops.size() < m_max_concurrent_syncs) && !m_queue.empty()) { - auto pair = m_queue.front(); - m_inflight_ops.insert(pair.first); - dout(20) << "ready to start sync for " << pair.first << " [" + auto id = m_queue.front(); + m_inflight_ops.insert(id); + dout(20) << "ready to start sync for " << id << " [" << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" << dendl; - ops.push_back(pair.second); + auto it = m_queued_ops.find(id); + ceph_assert(it != m_queued_ops.end()); + ops.push_back(it->second); + m_queued_ops.erase(it); m_queue.pop_front(); } } diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h index e0c3f0bf5a9b..b553e43bdff6 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.h +++ b/src/tools/rbd_mirror/ImageSyncThrottler.h @@ -5,6 +5,7 @@ #define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H #include +#include #include #include #include @@ -45,7 +46,8 @@ public: private: Mutex m_lock; uint32_t m_max_concurrent_syncs; - std::list> m_queue; + std::list m_queue; + std::map m_queued_ops; std::set m_inflight_ops; const char **get_tracked_conf_keys() const override; diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index e69890aaddf5..d5181a40c91d 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -1188,6 +1188,9 @@ void InstanceWatcher::handle_sync_request(const std::string &instance_id, if (r == 0) { notify_sync_start(instance_id, sync_id); } + if (r == -ENOENT) { + r = 0; + } on_finish->complete(r); })); m_image_sync_throttler->start_op(sync_id, on_start);