From: Mykola Golub Date: Tue, 25 Jun 2019 04:38:37 +0000 (+0100) Subject: rbd-mirror: handle duplicates in image sync throttler queue X-Git-Tag: v15.1.0~2358^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F28730%2Fhead;p=ceph.git rbd-mirror: handle duplicates in image sync throttler queue Fixes: http://tracker.ceph.com/issues/40519 Signed-off-by: Mykola Golub --- diff --git a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc index f30a299f1505..af88edcba5a6 100644 --- a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc +++ b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc @@ -113,6 +113,65 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) { throttler.finish_op("id2"); } +TEST_F(TestMockImageSyncThrottler, Duplicate) { + MockImageSyncThrottler throttler(g_ceph_context); + throttler.set_max_concurrent_syncs(1); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + ASSERT_EQ(0, on_start1.wait()); + + C_SaferCond on_start2; + throttler.start_op("id1", &on_start2); + ASSERT_EQ(0, on_start2.wait()); + + C_SaferCond on_start3; + throttler.start_op("id2", &on_start3); + C_SaferCond on_start4; + throttler.start_op("id2", &on_start4); + ASSERT_EQ(-ENOENT, on_start3.wait()); + + throttler.finish_op("id1"); + ASSERT_EQ(0, on_start4.wait()); + throttler.finish_op("id2"); +} + +TEST_F(TestMockImageSyncThrottler, Duplicate2) { + MockImageSyncThrottler throttler(g_ceph_context); + throttler.set_max_concurrent_syncs(2); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + ASSERT_EQ(0, on_start1.wait()); + C_SaferCond on_start2; + throttler.start_op("id2", &on_start2); + ASSERT_EQ(0, on_start2.wait()); + + C_SaferCond on_start3; + throttler.start_op("id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("id3", &on_start4); // dup + ASSERT_EQ(-ENOENT, on_start3.wait()); + + C_SaferCond on_start5; + throttler.start_op("id4", &on_start5); + + throttler.finish_op("id1"); + ASSERT_EQ(0, on_start4.wait()); + + throttler.finish_op("id2"); + ASSERT_EQ(0, on_start5.wait()); + + C_SaferCond on_start6; + throttler.start_op("id5", &on_start6); + + throttler.finish_op("id3"); + ASSERT_EQ(0, on_start6.wait()); + + throttler.finish_op("id4"); + throttler.finish_op("id5"); +} + TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) { MockImageSyncThrottler throttler(g_ceph_context); throttler.set_max_concurrent_syncs(2); diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc index c2e618bf48fc..b395a0127094 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.cc +++ b/src/tools/rbd_mirror/ImageSyncThrottler.cc @@ -51,11 +51,16 @@ template void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { dout(20) << "id=" << id << dendl; + int r = 0; { Mutex::Locker locker(m_lock); if (m_inflight_ops.count(id) > 0) { dout(20) << "duplicate for already started op " << id << dendl; + } else if (m_queued_ops.count(id) > 0) { + dout(20) << "duplicate for already queued op " << id << dendl; + std::swap(m_queued_ops[id], on_start); + r = -ENOENT; } else if (m_max_concurrent_syncs == 0 || m_inflight_ops.size() < m_max_concurrent_syncs) { ceph_assert(m_queue.empty()); @@ -64,14 +69,14 @@ void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" << dendl; } else { - m_queue.push_back(std::make_pair(id, on_start)); - on_start = nullptr; + m_queue.push_back(id); + std::swap(m_queued_ops[id], on_start); dout(20) << "image sync for " << id << " has been queued" << dendl; } } if (on_start != nullptr) { - on_start->complete(0); + on_start->complete(r); } } @@ -82,13 +87,12 @@ bool ImageSyncThrottler::cancel_op(const std::string &id) { Context *on_start = nullptr; { Mutex::Locker locker(m_lock); - for (auto it = m_queue.begin(); it != m_queue.end(); ++it) { - if (it->first == id) { - on_start = it->second; - dout(20) << "canceled queued sync for " << id << dendl; - m_queue.erase(it); - break; - } + auto it = m_queued_ops.find(id); + if (it != m_queued_ops.end()) { + dout(20) << "canceled queued sync for " << id << dendl; + m_queue.remove(id); + on_start = it->second; + m_queued_ops.erase(it); } } @@ -115,12 +119,15 @@ void ImageSyncThrottler::finish_op(const std::string &id) { m_inflight_ops.erase(id); if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) { - auto pair = m_queue.front(); - m_inflight_ops.insert(pair.first); - dout(20) << "ready to start sync for " << pair.first << " [" + auto id = m_queue.front(); + auto it = m_queued_ops.find(id); + ceph_assert(it != m_queued_ops.end()); + m_inflight_ops.insert(id); + dout(20) << "ready to start sync for " << id << " [" << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" << dendl; - on_start= pair.second; + on_start = it->second; + m_queued_ops.erase(it); m_queue.pop_front(); } } @@ -134,15 +141,16 @@ template void ImageSyncThrottler::drain(int r) { dout(20) << dendl; - std::list> queue; + std::map queued_ops; { Mutex::Locker locker(m_lock); - std::swap(m_queue, queue); + std::swap(m_queued_ops, queued_ops); + m_queue.clear(); m_inflight_ops.clear(); } - for (auto &pair : queue) { - pair.second->complete(r); + for (auto &it : queued_ops) { + it.second->complete(r); } } @@ -159,12 +167,15 @@ void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { while ((m_max_concurrent_syncs == 0 || m_inflight_ops.size() < m_max_concurrent_syncs) && !m_queue.empty()) { - auto pair = m_queue.front(); - m_inflight_ops.insert(pair.first); - dout(20) << "ready to start sync for " << pair.first << " [" + auto id = m_queue.front(); + m_inflight_ops.insert(id); + dout(20) << "ready to start sync for " << id << " [" << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" << dendl; - ops.push_back(pair.second); + auto it = m_queued_ops.find(id); + ceph_assert(it != m_queued_ops.end()); + ops.push_back(it->second); + m_queued_ops.erase(it); m_queue.pop_front(); } } diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h index 8c8f754626a3..c0cda61e9a61 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.h +++ b/src/tools/rbd_mirror/ImageSyncThrottler.h @@ -5,6 +5,7 @@ #define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H #include +#include #include #include #include @@ -47,7 +48,8 @@ private: CephContext *m_cct; Mutex m_lock; uint32_t m_max_concurrent_syncs; - std::list> m_queue; + std::list m_queue; + std::map m_queued_ops; std::set m_inflight_ops; const char **get_tracked_conf_keys() const override; diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index 5889e0135155..d9e1ba233456 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -1176,6 +1176,9 @@ void InstanceWatcher::handle_sync_request(const std::string &instance_id, if (r == 0) { notify_sync_start(instance_id, sync_id); } + if (r == -ENOENT) { + r = 0; + } on_finish->complete(r); })); m_image_sync_throttler->start_op(sync_id, on_start);