From: Mykola Golub Date: Tue, 27 Aug 2019 19:42:46 +0000 (+0100) Subject: rbd-mirror: generalize ImageSyncThrottler into Throttler X-Git-Tag: v15.1.0~1590^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=03dc984f07d87d9c50632fb9843411562dd0db72;p=ceph.git rbd-mirror: generalize ImageSyncThrottler into Throttler Signed-off-by: Mykola Golub --- diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index 4ea85e4d249..e4811399286 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -21,13 +21,13 @@ add_executable(unittest_rbd_mirror test_mock_ImageMap.cc test_mock_ImageReplayer.cc test_mock_ImageSync.cc - test_mock_ImageSyncThrottler.cc test_mock_InstanceReplayer.cc test_mock_InstanceWatcher.cc test_mock_LeaderWatcher.cc test_mock_NamespaceReplayer.cc test_mock_PoolReplayer.cc test_mock_PoolWatcher.cc + test_mock_Throttler.cc image_deleter/test_mock_RemoveRequest.cc image_deleter/test_mock_SnapshotPurgeRequest.cc image_deleter/test_mock_TrashMoveRequest.cc diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 7d85d3c568b..1491e8bcd77 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -35,9 +35,9 @@ #include "librbd/io/ImageRequestWQ.h" #include "librbd/io/ReadResult.h" #include "tools/rbd_mirror/ImageReplayer.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/Throttler.h" #include "tools/rbd_mirror/Types.h" #include "test/librados/test_cxx.h" @@ -120,7 +120,8 @@ public: auto cct = reinterpret_cast(m_local_ioctx.cct()); m_threads.reset(new rbd::mirror::Threads<>(cct)); - m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>(cct)); + m_image_sync_throttler.reset(new rbd::mirror::Throttler<>( + cct, "rbd_mirror_concurrent_image_syncs")); m_instance_watcher = rbd::mirror::InstanceWatcher<>::create( m_local_ioctx, m_threads->work_queue, nullptr, @@ -377,7 +378,7 @@ public: std::shared_ptr m_local_cluster; std::unique_ptr> m_threads; - std::unique_ptr> m_image_sync_throttler; + std::unique_ptr> m_image_sync_throttler; librados::Rados m_remote_cluster; rbd::mirror::InstanceWatcher<> *m_instance_watcher; std::string m_local_mirror_uuid = "local mirror uuid"; diff --git a/src/test/rbd_mirror/test_ImageSync.cc b/src/test/rbd_mirror/test_ImageSync.cc index 39feffe89f5..b585ea571fb 100644 --- a/src/test/rbd_mirror/test_ImageSync.cc +++ b/src/test/rbd_mirror/test_ImageSync.cc @@ -18,9 +18,9 @@ #include "librbd/io/ReadResult.h" #include "librbd/journal/Types.h" #include "tools/rbd_mirror/ImageSync.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/Throttler.h" void register_test_image_sync() { } @@ -74,7 +74,8 @@ public: create_and_open(m_remote_io_ctx, &m_remote_image_ctx); auto cct = reinterpret_cast(m_local_io_ctx.cct()); - m_image_sync_throttler = rbd::mirror::ImageSyncThrottler<>::create(cct); + m_image_sync_throttler = rbd::mirror::Throttler<>::create( + cct, "rbd_mirror_concurrent_image_syncs"); m_instance_watcher = rbd::mirror::InstanceWatcher<>::create( m_local_io_ctx, m_threads->work_queue, nullptr, m_image_sync_throttler); @@ -126,7 +127,7 @@ public: librbd::ImageCtx *m_remote_image_ctx; librbd::ImageCtx *m_local_image_ctx; - rbd::mirror::ImageSyncThrottler<> *m_image_sync_throttler; + rbd::mirror::Throttler<> *m_image_sync_throttler; rbd::mirror::InstanceWatcher<> *m_instance_watcher; ::journal::Journaler *m_remote_journaler; librbd::journal::MirrorPeerClientMeta m_client_meta; diff --git a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc deleted file mode 100644 index 1189e03b48f..00000000000 --- a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc +++ /dev/null @@ -1,253 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 SUSE LINUX GmbH - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "test/rbd_mirror/test_mock_fixture.h" -#include "test/librbd/mock/MockImageCtx.h" - -namespace librbd { - -namespace { - -struct MockTestImageCtx : public librbd::MockImageCtx { - MockTestImageCtx(librbd::ImageCtx &image_ctx) - : librbd::MockImageCtx(image_ctx) { - } -}; - -} // anonymous namespace - -} // namespace librbd - -// template definitions -#include "tools/rbd_mirror/ImageSyncThrottler.cc" - -namespace rbd { -namespace mirror { - -class TestMockImageSyncThrottler : public TestMockFixture { -public: - typedef ImageSyncThrottler MockImageSyncThrottler; - -}; - -TEST_F(TestMockImageSyncThrottler, Single_Sync) { - MockImageSyncThrottler throttler(g_ceph_context); - C_SaferCond on_start; - throttler.start_op("ns", "id", &on_start); - ASSERT_EQ(0, on_start.wait()); - throttler.finish_op("ns", "id"); -} - -TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(2); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - C_SaferCond on_start2; - throttler.start_op("ns", "id2", &on_start2); - C_SaferCond on_start3; - throttler.start_op("ns", "id3", &on_start3); - C_SaferCond on_start4; - throttler.start_op("ns", "id4", &on_start4); - - ASSERT_EQ(0, on_start2.wait()); - throttler.finish_op("ns", "id2"); - ASSERT_EQ(0, on_start3.wait()); - throttler.finish_op("ns", "id3"); - ASSERT_EQ(0, on_start1.wait()); - throttler.finish_op("ns", "id1"); - ASSERT_EQ(0, on_start4.wait()); - throttler.finish_op("ns", "id4"); -} - -TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) { - MockImageSyncThrottler throttler(g_ceph_context); - C_SaferCond on_start; - throttler.start_op("ns", "id", &on_start); - ASSERT_EQ(0, on_start.wait()); - ASSERT_FALSE(throttler.cancel_op("ns", "id")); - throttler.finish_op("ns", "id"); -} - -TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(1); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - C_SaferCond on_start2; - throttler.start_op("ns", "id2", &on_start2); - - ASSERT_EQ(0, on_start1.wait()); - ASSERT_TRUE(throttler.cancel_op("ns", "id2")); - ASSERT_EQ(-ECANCELED, on_start2.wait()); - throttler.finish_op("ns", "id1"); -} - -TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(1); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - C_SaferCond on_start2; - throttler.start_op("ns", "id2", &on_start2); - - ASSERT_EQ(0, on_start1.wait()); - ASSERT_FALSE(throttler.cancel_op("ns", "id1")); - throttler.finish_op("ns", "id1"); - ASSERT_EQ(0, on_start2.wait()); - throttler.finish_op("ns", "id2"); -} - -TEST_F(TestMockImageSyncThrottler, Duplicate) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(1); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - ASSERT_EQ(0, on_start1.wait()); - - C_SaferCond on_start2; - throttler.start_op("ns", "id1", &on_start2); - ASSERT_EQ(0, on_start2.wait()); - - C_SaferCond on_start3; - throttler.start_op("ns", "id2", &on_start3); - C_SaferCond on_start4; - throttler.start_op("ns", "id2", &on_start4); - ASSERT_EQ(-ENOENT, on_start3.wait()); - - throttler.finish_op("ns", "id1"); - ASSERT_EQ(0, on_start4.wait()); - throttler.finish_op("ns", "id2"); -} - -TEST_F(TestMockImageSyncThrottler, Duplicate2) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(2); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - ASSERT_EQ(0, on_start1.wait()); - C_SaferCond on_start2; - throttler.start_op("ns", "id2", &on_start2); - ASSERT_EQ(0, on_start2.wait()); - - C_SaferCond on_start3; - throttler.start_op("ns", "id3", &on_start3); - C_SaferCond on_start4; - throttler.start_op("ns", "id3", &on_start4); // dup - ASSERT_EQ(-ENOENT, on_start3.wait()); - - C_SaferCond on_start5; - throttler.start_op("ns", "id4", &on_start5); - - throttler.finish_op("ns", "id1"); - ASSERT_EQ(0, on_start4.wait()); - - throttler.finish_op("ns", "id2"); - ASSERT_EQ(0, on_start5.wait()); - - C_SaferCond on_start6; - throttler.start_op("ns", "id5", &on_start6); - - throttler.finish_op("ns", "id3"); - ASSERT_EQ(0, on_start6.wait()); - - throttler.finish_op("ns", "id4"); - throttler.finish_op("ns", "id5"); -} - -TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(2); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - C_SaferCond on_start2; - throttler.start_op("ns", "id2", &on_start2); - C_SaferCond on_start3; - throttler.start_op("ns", "id3", &on_start3); - C_SaferCond on_start4; - throttler.start_op("ns", "id4", &on_start4); - C_SaferCond on_start5; - throttler.start_op("ns", "id5", &on_start5); - - ASSERT_EQ(0, on_start1.wait()); - ASSERT_EQ(0, on_start2.wait()); - - throttler.set_max_concurrent_syncs(4); - - ASSERT_EQ(0, on_start3.wait()); - ASSERT_EQ(0, on_start4.wait()); - - throttler.finish_op("ns", "id4"); - ASSERT_EQ(0, on_start5.wait()); - - throttler.finish_op("ns", "id1"); - throttler.finish_op("ns", "id2"); - throttler.finish_op("ns", "id3"); - throttler.finish_op("ns", "id5"); -} - -TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(4); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - C_SaferCond on_start2; - throttler.start_op("ns", "id2", &on_start2); - C_SaferCond on_start3; - throttler.start_op("ns", "id3", &on_start3); - C_SaferCond on_start4; - throttler.start_op("ns", "id4", &on_start4); - C_SaferCond on_start5; - throttler.start_op("ns", "id5", &on_start5); - - ASSERT_EQ(0, on_start1.wait()); - ASSERT_EQ(0, on_start2.wait()); - ASSERT_EQ(0, on_start3.wait()); - ASSERT_EQ(0, on_start4.wait()); - - throttler.set_max_concurrent_syncs(2); - - throttler.finish_op("ns", "id1"); - throttler.finish_op("ns", "id2"); - throttler.finish_op("ns", "id3"); - - ASSERT_EQ(0, on_start5.wait()); - - throttler.finish_op("ns", "id4"); - throttler.finish_op("ns", "id5"); -} - -TEST_F(TestMockImageSyncThrottler, Drain) { - MockImageSyncThrottler throttler(g_ceph_context); - throttler.set_max_concurrent_syncs(1); - - C_SaferCond on_start1; - throttler.start_op("ns", "id1", &on_start1); - C_SaferCond on_start2; - throttler.start_op("ns", "id2", &on_start2); - - ASSERT_EQ(0, on_start1.wait()); - throttler.drain("ns", -ESTALE); - ASSERT_EQ(-ESTALE, on_start2.wait()); -} - -} // namespace mirror -} // namespace rbd diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc index 039b97e8284..78bb972b555 100644 --- a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc @@ -83,15 +83,15 @@ struct InstanceReplayer { }; template <> -struct ImageSyncThrottler { - static ImageSyncThrottler* s_instance; +struct Throttler { + static Throttler* s_instance; - ImageSyncThrottler() { + Throttler() { ceph_assert(s_instance == nullptr); s_instance = this; } - virtual ~ImageSyncThrottler() { + virtual ~Throttler() { ceph_assert(s_instance == this); s_instance = nullptr; } @@ -102,7 +102,7 @@ struct ImageSyncThrottler { MOCK_METHOD2(drain, void(const std::string &, int)); }; -ImageSyncThrottler* ImageSyncThrottler::s_instance = nullptr; +Throttler* Throttler::s_instance = nullptr; } // namespace mirror } // namespace rbd @@ -658,10 +658,10 @@ TEST_F(TestMockInstanceWatcher, PeerImageRemovedCancel) { class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher { public: - typedef ImageSyncThrottler MockImageSyncThrottler; + typedef Throttler MockThrottler; MockManagedLock mock_managed_lock; - MockImageSyncThrottler mock_image_sync_throttler; + MockThrottler mock_image_sync_throttler; std::string instance_id1; std::string instance_id2; @@ -765,7 +765,7 @@ public: void expect_throttler_drain() { EXPECT_CALL(mock_image_sync_throttler, drain("", -ESTALE)); - } + } }; TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) { diff --git a/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc index b787a388921..b52e932ed5c 100644 --- a/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc @@ -130,7 +130,7 @@ struct InstanceWatcher { static InstanceWatcher* create( librados::IoCtx &ioctx, ContextWQ* work_queue, InstanceReplayer* instance_replayer, - ImageSyncThrottler *image_sync_throttler) { + Throttler *image_sync_throttler) { ceph_assert(s_instance != nullptr); return s_instance; } diff --git a/src/test/rbd_mirror/test_mock_PoolReplayer.cc b/src/test/rbd_mirror/test_mock_PoolReplayer.cc index 369624c0d35..e1f050f8f53 100644 --- a/src/test/rbd_mirror/test_mock_PoolReplayer.cc +++ b/src/test/rbd_mirror/test_mock_PoolReplayer.cc @@ -10,7 +10,7 @@ #include "test/rbd_mirror/test_mock_fixture.h" #include "test/rbd_mirror/mock/MockContextWQ.h" #include "test/rbd_mirror/mock/MockSafeTimer.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" +#include "tools/rbd_mirror/Throttler.h" #include "tools/rbd_mirror/LeaderWatcher.h" #include "tools/rbd_mirror/NamespaceReplayer.h" #include "tools/rbd_mirror/PoolReplayer.h" @@ -97,19 +97,21 @@ namespace rbd { namespace mirror { template <> -struct ImageSyncThrottler { - static ImageSyncThrottler* s_instance; +struct Throttler { + static Throttler* s_instance; - static ImageSyncThrottler *create(CephContext *cct) { + static Throttler *create( + CephContext *cct, + const std::string &max_concurrent_ops_config_param_name) { return s_instance; } - ImageSyncThrottler() { + Throttler() { ceph_assert(s_instance == nullptr); s_instance = this; } - virtual ~ImageSyncThrottler() { + virtual ~Throttler() { ceph_assert(s_instance == this); s_instance = nullptr; } @@ -117,7 +119,7 @@ struct ImageSyncThrottler { MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*)); }; -ImageSyncThrottler* ImageSyncThrottler::s_instance = nullptr; +Throttler* Throttler::s_instance = nullptr; template <> struct NamespaceReplayer { @@ -130,7 +132,7 @@ struct NamespaceReplayer { const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid, Threads *threads, - ImageSyncThrottler *image_sync_throttler, + Throttler *image_sync_throttler, ServiceDaemon *service_daemon, journal::CacheManagerHandler *cache_manager_handler) { ceph_assert(s_instances.count(name)); @@ -250,7 +252,7 @@ class TestMockPoolReplayer : public TestMockFixture { public: typedef librbd::api::Namespace MockNamespace; typedef PoolReplayer MockPoolReplayer; - typedef ImageSyncThrottler MockImageSyncThrottler; + typedef Throttler MockThrottler; typedef NamespaceReplayer MockNamespaceReplayer; typedef LeaderWatcher MockLeaderWatcher; typedef ServiceDaemon MockServiceDaemon; diff --git a/src/test/rbd_mirror/test_mock_Throttler.cc b/src/test/rbd_mirror/test_mock_Throttler.cc new file mode 100644 index 00000000000..ab562c18d7c --- /dev/null +++ b/src/test/rbd_mirror/test_mock_Throttler.cc @@ -0,0 +1,253 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 SUSE LINUX GmbH + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "test/rbd_mirror/test_mock_fixture.h" +#include "test/librbd/mock/MockImageCtx.h" + +namespace librbd { + +namespace { + +struct MockTestImageCtx : public librbd::MockImageCtx { + MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +} // namespace librbd + +// template definitions +#include "tools/rbd_mirror/Throttler.cc" + +namespace rbd { +namespace mirror { + +class TestMockThrottler : public TestMockFixture { +public: + typedef Throttler MockThrottler; + +}; + +TEST_F(TestMockThrottler, Single_Sync) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + C_SaferCond on_start; + throttler.start_op("ns", "id", &on_start); + ASSERT_EQ(0, on_start.wait()); + throttler.finish_op("ns", "id"); +} + +TEST_F(TestMockThrottler, Multiple_Syncs) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(2); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + C_SaferCond on_start3; + throttler.start_op("ns", "id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("ns", "id4", &on_start4); + + ASSERT_EQ(0, on_start2.wait()); + throttler.finish_op("ns", "id2"); + ASSERT_EQ(0, on_start3.wait()); + throttler.finish_op("ns", "id3"); + ASSERT_EQ(0, on_start1.wait()); + throttler.finish_op("ns", "id1"); + ASSERT_EQ(0, on_start4.wait()); + throttler.finish_op("ns", "id4"); +} + +TEST_F(TestMockThrottler, Cancel_Running_Sync) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + C_SaferCond on_start; + throttler.start_op("ns", "id", &on_start); + ASSERT_EQ(0, on_start.wait()); + ASSERT_FALSE(throttler.cancel_op("ns", "id")); + throttler.finish_op("ns", "id"); +} + +TEST_F(TestMockThrottler, Cancel_Waiting_Sync) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(1); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_TRUE(throttler.cancel_op("ns", "id2")); + ASSERT_EQ(-ECANCELED, on_start2.wait()); + throttler.finish_op("ns", "id1"); +} + +TEST_F(TestMockThrottler, Cancel_Running_Sync_Start_Waiting) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(1); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_FALSE(throttler.cancel_op("ns", "id1")); + throttler.finish_op("ns", "id1"); + ASSERT_EQ(0, on_start2.wait()); + throttler.finish_op("ns", "id2"); +} + +TEST_F(TestMockThrottler, Duplicate) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(1); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + ASSERT_EQ(0, on_start1.wait()); + + C_SaferCond on_start2; + throttler.start_op("ns", "id1", &on_start2); + ASSERT_EQ(0, on_start2.wait()); + + C_SaferCond on_start3; + throttler.start_op("ns", "id2", &on_start3); + C_SaferCond on_start4; + throttler.start_op("ns", "id2", &on_start4); + ASSERT_EQ(-ENOENT, on_start3.wait()); + + throttler.finish_op("ns", "id1"); + ASSERT_EQ(0, on_start4.wait()); + throttler.finish_op("ns", "id2"); +} + +TEST_F(TestMockThrottler, Duplicate2) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(2); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + ASSERT_EQ(0, on_start1.wait()); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + ASSERT_EQ(0, on_start2.wait()); + + C_SaferCond on_start3; + throttler.start_op("ns", "id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("ns", "id3", &on_start4); // dup + ASSERT_EQ(-ENOENT, on_start3.wait()); + + C_SaferCond on_start5; + throttler.start_op("ns", "id4", &on_start5); + + throttler.finish_op("ns", "id1"); + ASSERT_EQ(0, on_start4.wait()); + + throttler.finish_op("ns", "id2"); + ASSERT_EQ(0, on_start5.wait()); + + C_SaferCond on_start6; + throttler.start_op("ns", "id5", &on_start6); + + throttler.finish_op("ns", "id3"); + ASSERT_EQ(0, on_start6.wait()); + + throttler.finish_op("ns", "id4"); + throttler.finish_op("ns", "id5"); +} + +TEST_F(TestMockThrottler, Increase_Max_Concurrent_Syncs) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(2); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + C_SaferCond on_start3; + throttler.start_op("ns", "id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("ns", "id4", &on_start4); + C_SaferCond on_start5; + throttler.start_op("ns", "id5", &on_start5); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_EQ(0, on_start2.wait()); + + throttler.set_max_concurrent_ops(4); + + ASSERT_EQ(0, on_start3.wait()); + ASSERT_EQ(0, on_start4.wait()); + + throttler.finish_op("ns", "id4"); + ASSERT_EQ(0, on_start5.wait()); + + throttler.finish_op("ns", "id1"); + throttler.finish_op("ns", "id2"); + throttler.finish_op("ns", "id3"); + throttler.finish_op("ns", "id5"); +} + +TEST_F(TestMockThrottler, Decrease_Max_Concurrent_Syncs) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(4); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + C_SaferCond on_start3; + throttler.start_op("ns", "id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("ns", "id4", &on_start4); + C_SaferCond on_start5; + throttler.start_op("ns", "id5", &on_start5); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_EQ(0, on_start2.wait()); + ASSERT_EQ(0, on_start3.wait()); + ASSERT_EQ(0, on_start4.wait()); + + throttler.set_max_concurrent_ops(2); + + throttler.finish_op("ns", "id1"); + throttler.finish_op("ns", "id2"); + throttler.finish_op("ns", "id3"); + + ASSERT_EQ(0, on_start5.wait()); + + throttler.finish_op("ns", "id4"); + throttler.finish_op("ns", "id5"); +} + +TEST_F(TestMockThrottler, Drain) { + MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs"); + throttler.set_max_concurrent_ops(1); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + + ASSERT_EQ(0, on_start1.wait()); + throttler.drain("ns", -ESTALE); + ASSERT_EQ(-ESTALE, on_start2.wait()); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 3c0088fdf10..0bf91bd5ec8 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -9,7 +9,6 @@ set(rbd_mirror_internal ImageMap.cc ImageReplayer.cc ImageSync.cc - ImageSyncThrottler.cc InstanceReplayer.cc InstanceWatcher.cc Instances.cc @@ -21,6 +20,7 @@ set(rbd_mirror_internal PoolWatcher.cc ServiceDaemon.cc Threads.cc + Throttler.cc Types.cc image_deleter/RemoveRequest.cc image_deleter/SnapshotPurgeRequest.cc diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc deleted file mode 100644 index f2d0be97df0..00000000000 --- a/src/tools/rbd_mirror/ImageSyncThrottler.cc +++ /dev/null @@ -1,253 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 SUSE LINUX GmbH - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "ImageSyncThrottler.h" -#include "common/Formatter.h" -#include "common/debug.h" -#include "common/errno.h" -#include "librbd/Utils.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd_mirror -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \ - << " " << __func__ << ": " - -namespace rbd { -namespace mirror { - -template -ImageSyncThrottler::ImageSyncThrottler(CephContext *cct) - : m_cct(cct), - m_lock(ceph::make_mutex( - librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", - this))), - m_max_concurrent_syncs(cct->_conf.get_val( - "rbd_mirror_concurrent_image_syncs")) { - dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl; - m_cct->_conf.add_observer(this); -} - -template -ImageSyncThrottler::~ImageSyncThrottler() { - m_cct->_conf.remove_observer(this); - - std::lock_guard locker{m_lock}; - ceph_assert(m_inflight_ops.empty()); - ceph_assert(m_queue.empty()); -} - -template -void ImageSyncThrottler::start_op(const std::string &ns, - const std::string &id_, - Context *on_start) { - Id id{ns, id_}; - - dout(20) << "id=" << id << dendl; - - int r = 0; - { - std::lock_guard locker{m_lock}; - - if (m_inflight_ops.count(id) > 0) { - dout(20) << "duplicate for already started op " << id << dendl; - } else if (m_queued_ops.count(id) > 0) { - dout(20) << "duplicate for already queued op " << id << dendl; - std::swap(m_queued_ops[id], on_start); - r = -ENOENT; - } else if (m_max_concurrent_syncs == 0 || - m_inflight_ops.size() < m_max_concurrent_syncs) { - ceph_assert(m_queue.empty()); - m_inflight_ops.insert(id); - dout(20) << "ready to start sync for " << id << " [" - << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" - << dendl; - } else { - m_queue.push_back(id); - std::swap(m_queued_ops[id], on_start); - dout(20) << "image sync for " << id << " has been queued" << dendl; - } - } - - if (on_start != nullptr) { - on_start->complete(r); - } -} - -template -bool ImageSyncThrottler::cancel_op(const std::string &ns, - const std::string &id_) { - Id id{ns, id_}; - - dout(20) << "id=" << id << dendl; - - Context *on_start = nullptr; - { - std::lock_guard locker{m_lock}; - auto it = m_queued_ops.find(id); - if (it != m_queued_ops.end()) { - dout(20) << "canceled queued sync for " << id << dendl; - m_queue.remove(id); - on_start = it->second; - m_queued_ops.erase(it); - } - } - - if (on_start == nullptr) { - return false; - } - - on_start->complete(-ECANCELED); - return true; -} - -template -void ImageSyncThrottler::finish_op(const std::string &ns, - const std::string &id_) { - Id id{ns, id_}; - - dout(20) << "id=" << id << dendl; - - if (cancel_op(ns, id_)) { - return; - } - - Context *on_start = nullptr; - { - std::lock_guard locker{m_lock}; - - m_inflight_ops.erase(id); - - if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) { - auto id = m_queue.front(); - auto it = m_queued_ops.find(id); - ceph_assert(it != m_queued_ops.end()); - m_inflight_ops.insert(id); - dout(20) << "ready to start sync for " << id << " [" - << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" - << dendl; - on_start = it->second; - m_queued_ops.erase(it); - m_queue.pop_front(); - } - } - - if (on_start != nullptr) { - on_start->complete(0); - } -} - -template -void ImageSyncThrottler::drain(const std::string &ns, int r) { - dout(20) << "ns=" << ns << dendl; - - std::map queued_ops; - { - std::lock_guard locker{m_lock}; - for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) { - if (it->first.first == ns) { - queued_ops[it->first] = it->second; - m_queue.remove(it->first); - it = m_queued_ops.erase(it); - } else { - it++; - } - } - for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) { - if (it->first == ns) { - dout(20) << "inflight_op " << *it << dendl; - it = m_inflight_ops.erase(it); - } else { - it++; - } - } - } - - for (auto &it : queued_ops) { - dout(20) << "queued_op " << it.first << dendl; - it.second->complete(r); - } -} - -template -void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { - dout(20) << "max=" << max << dendl; - - std::list ops; - { - std::lock_guard locker{m_lock}; - m_max_concurrent_syncs = max; - - // Start waiting ops in the case of available free slots - while ((m_max_concurrent_syncs == 0 || - m_inflight_ops.size() < m_max_concurrent_syncs) && - !m_queue.empty()) { - auto id = m_queue.front(); - m_inflight_ops.insert(id); - dout(20) << "ready to start sync for " << id << " [" - << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" - << dendl; - auto it = m_queued_ops.find(id); - ceph_assert(it != m_queued_ops.end()); - ops.push_back(it->second); - m_queued_ops.erase(it); - m_queue.pop_front(); - } - } - - for (const auto& ctx : ops) { - ctx->complete(0); - } -} - -template -void ImageSyncThrottler::print_status(ceph::Formatter *f, std::stringstream *ss) { - dout(20) << dendl; - - std::lock_guard locker{m_lock}; - - if (f) { - f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); - f->dump_int("running_syncs", m_inflight_ops.size()); - f->dump_int("waiting_syncs", m_queue.size()); - f->flush(*ss); - } else { - *ss << "[ "; - *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", "; - *ss << "running_syncs=" << m_inflight_ops.size() << ", "; - *ss << "waiting_syncs=" << m_queue.size() << " ]"; - } -} - -template -const char** ImageSyncThrottler::get_tracked_conf_keys() const { - static const char* KEYS[] = { - "rbd_mirror_concurrent_image_syncs", - NULL - }; - return KEYS; -} - -template -void ImageSyncThrottler::handle_conf_change(const ConfigProxy& conf, - const set &changed) { - if (changed.count("rbd_mirror_concurrent_image_syncs")) { - set_max_concurrent_syncs(conf.get_val("rbd_mirror_concurrent_image_syncs")); - } -} - -} // namespace mirror -} // namespace rbd - -template class rbd::mirror::ImageSyncThrottler; diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h deleted file mode 100644 index 278b7c3061f..00000000000 --- a/src/tools/rbd_mirror/ImageSyncThrottler.h +++ /dev/null @@ -1,68 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef RBD_MIRROR_IMAGE_SYNC_THROTTLER_H -#define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H - -#include -#include -#include -#include -#include -#include - -#include "common/ceph_mutex.h" -#include "common/config_obs.h" - -class CephContext; -class Context; - -namespace ceph { class Formatter; } -namespace librbd { class ImageCtx; } - -namespace rbd { -namespace mirror { - -template -class ImageSyncThrottler : public md_config_obs_t { -public: - static ImageSyncThrottler *create(CephContext *cct) { - return new ImageSyncThrottler(cct); - } - void destroy() { - delete this; - } - - ImageSyncThrottler(CephContext *cct); - ~ImageSyncThrottler() override; - - void set_max_concurrent_syncs(uint32_t max); - void start_op(const std::string &ns, const std::string &id, - Context *on_start); - bool cancel_op(const std::string &ns, const std::string &id); - void finish_op(const std::string &ns, const std::string &id); - void drain(const std::string &ns, int r); - - void print_status(ceph::Formatter *f, std::stringstream *ss); - -private: - typedef std::pair Id; - - CephContext *m_cct; - ceph::mutex m_lock; - uint32_t m_max_concurrent_syncs; - std::list m_queue; - std::map m_queued_ops; - std::set m_inflight_ops; - - const char **get_tracked_conf_keys() const override; - void handle_conf_change(const ConfigProxy& conf, - const std::set &changed) override; -}; - -} // namespace mirror -} // namespace rbd - -extern template class rbd::mirror::ImageSyncThrottler; - -#endif // RBD_MIRROR_IMAGE_SYNC_THROTTLER_H diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index f29a8d48abc..122e35326b6 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -9,7 +9,7 @@ #include "librbd/ManagedLock.h" #include "librbd/Utils.h" #include "InstanceReplayer.h" -#include "ImageSyncThrottler.h" +#include "Throttler.h" #include "common/Cond.h" #define dout_context g_ceph_context @@ -313,7 +313,7 @@ template InstanceWatcher *InstanceWatcher::create( librados::IoCtx &io_ctx, ContextWQ *work_queue, InstanceReplayer *instance_replayer, - ImageSyncThrottler *image_sync_throttler) { + Throttler *image_sync_throttler) { return new InstanceWatcher(io_ctx, work_queue, instance_replayer, image_sync_throttler, stringify(io_ctx.get_instance_id())); @@ -323,7 +323,7 @@ template InstanceWatcher::InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, InstanceReplayer *instance_replayer, - ImageSyncThrottler *image_sync_throttler, + Throttler *image_sync_throttler, const std::string &instance_id) : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id), m_instance_replayer(instance_replayer), diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h index 5e077921fa2..bc459f3e4c0 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.h +++ b/src/tools/rbd_mirror/InstanceWatcher.h @@ -25,8 +25,8 @@ template class ManagedLock; namespace rbd { namespace mirror { -template class ImageSyncThrottler; template class InstanceReplayer; +template class Throttler; template struct Threads; template @@ -44,14 +44,14 @@ public: static InstanceWatcher *create( librados::IoCtx &io_ctx, ContextWQ *work_queue, InstanceReplayer *instance_replayer, - ImageSyncThrottler *image_sync_throttler); + Throttler *image_sync_throttler); void destroy() { delete this; } InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, InstanceReplayer *instance_replayer, - ImageSyncThrottler *image_sync_throttler, + Throttler *image_sync_throttler, const std::string &instance_id); ~InstanceWatcher() override; @@ -157,7 +157,7 @@ private: Threads *m_threads; InstanceReplayer *m_instance_replayer; - ImageSyncThrottler *m_image_sync_throttler; + Throttler *m_image_sync_throttler; std::string m_instance_id; mutable ceph::mutex m_lock; diff --git a/src/tools/rbd_mirror/NamespaceReplayer.cc b/src/tools/rbd_mirror/NamespaceReplayer.cc index 6428bfe11e8..35703ab8bf4 100644 --- a/src/tools/rbd_mirror/NamespaceReplayer.cc +++ b/src/tools/rbd_mirror/NamespaceReplayer.cc @@ -40,7 +40,7 @@ NamespaceReplayer::NamespaceReplayer( const std::string &name, librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx, const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid, - Threads *threads, ImageSyncThrottler *image_sync_throttler, + Threads *threads, Throttler *image_sync_throttler, ServiceDaemon *service_daemon, journal::CacheManagerHandler *cache_manager_handler) : m_local_mirror_uuid(local_mirror_uuid), diff --git a/src/tools/rbd_mirror/NamespaceReplayer.h b/src/tools/rbd_mirror/NamespaceReplayer.h index 810f8b44507..c6f519f5590 100644 --- a/src/tools/rbd_mirror/NamespaceReplayer.h +++ b/src/tools/rbd_mirror/NamespaceReplayer.h @@ -32,8 +32,8 @@ namespace librbd { class ImageCtx; } namespace rbd { namespace mirror { -template class ImageSyncThrottler; template class ServiceDaemon; +template class Throttler; template struct Threads; /** @@ -49,7 +49,7 @@ public: const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid, Threads *threads, - ImageSyncThrottler *image_sync_throttler, + Throttler *image_sync_throttler, ServiceDaemon *service_daemon, journal::CacheManagerHandler *cache_manager_handler) { return new NamespaceReplayer(name, local_ioctx, remote_ioctx, @@ -64,7 +64,7 @@ public: const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid, Threads *threads, - ImageSyncThrottler *image_sync_throttler, + Throttler *image_sync_throttler, ServiceDaemon *service_daemon, journal::CacheManagerHandler *cache_manager_handler); NamespaceReplayer(const NamespaceReplayer&) = delete; @@ -251,7 +251,7 @@ private: std::string m_local_mirror_uuid; std::string m_remote_mirror_uuid; Threads *m_threads; - ImageSyncThrottler *m_image_sync_throttler; + Throttler *m_image_sync_throttler; ServiceDaemon *m_service_daemon; journal::CacheManagerHandler *m_cache_manager_handler; diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc index c61951b6ffc..588ea5e0749 100644 --- a/src/tools/rbd_mirror/PoolReplayer.cc +++ b/src/tools/rbd_mirror/PoolReplayer.cc @@ -312,7 +312,8 @@ void PoolReplayer::init() { dout(10) << "connected to " << m_peer << dendl; - m_image_sync_throttler.reset(ImageSyncThrottler::create(cct)); + m_image_sync_throttler.reset( + Throttler::create(cct, "rbd_mirror_concurrent_image_syncs")); m_default_namespace_replayer.reset(NamespaceReplayer::create( "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid, diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h index 039a420c675..33a8eebd3bf 100644 --- a/src/tools/rbd_mirror/PoolReplayer.h +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -10,9 +10,9 @@ #include "include/rados/librados.hpp" #include "librbd/Utils.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/LeaderWatcher.h" #include "tools/rbd_mirror/NamespaceReplayer.h" +#include "tools/rbd_mirror/Throttler.h" #include "tools/rbd_mirror/Types.h" #include "tools/rbd_mirror/leader_watcher/Types.h" #include "tools/rbd_mirror/service_daemon/Types.h" @@ -259,7 +259,7 @@ private: } m_leader_listener; std::unique_ptr> m_leader_watcher; - std::unique_ptr> m_image_sync_throttler; + std::unique_ptr> m_image_sync_throttler; }; } // namespace mirror diff --git a/src/tools/rbd_mirror/Throttler.cc b/src/tools/rbd_mirror/Throttler.cc new file mode 100644 index 00000000000..ed377f1b061 --- /dev/null +++ b/src/tools/rbd_mirror/Throttler.cc @@ -0,0 +1,248 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 SUSE LINUX GmbH + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Throttler.h" +#include "common/Formatter.h" +#include "common/debug.h" +#include "common/errno.h" +#include "librbd/Utils.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::Throttler:: " << this \ + << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +template +Throttler::Throttler(CephContext *cct, const std::string &config_key) + : m_cct(cct), m_config_key(config_key), + m_config_keys{m_config_key.c_str(), nullptr}, + m_lock(ceph::make_mutex( + librbd::util::unique_lock_name("rbd::mirror::Throttler", this))), + m_max_concurrent_ops(cct->_conf.get_val(m_config_key)) { + dout(20) << m_config_key << "=" << m_max_concurrent_ops << dendl; + m_cct->_conf.add_observer(this); +} + +template +Throttler::~Throttler() { + m_cct->_conf.remove_observer(this); + + std::lock_guard locker{m_lock}; + ceph_assert(m_inflight_ops.empty()); + ceph_assert(m_queue.empty()); +} + +template +void Throttler::start_op(const std::string &ns, + const std::string &id_, + Context *on_start) { + Id id{ns, id_}; + + dout(20) << "id=" << id << dendl; + + int r = 0; + { + std::lock_guard locker{m_lock}; + + if (m_inflight_ops.count(id) > 0) { + dout(20) << "duplicate for already started op " << id << dendl; + } else if (m_queued_ops.count(id) > 0) { + dout(20) << "duplicate for already queued op " << id << dendl; + std::swap(m_queued_ops[id], on_start); + r = -ENOENT; + } else if (m_max_concurrent_ops == 0 || + m_inflight_ops.size() < m_max_concurrent_ops) { + ceph_assert(m_queue.empty()); + m_inflight_ops.insert(id); + dout(20) << "ready to start op for " << id << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]" + << dendl; + } else { + m_queue.push_back(id); + std::swap(m_queued_ops[id], on_start); + dout(20) << "op for " << id << " has been queued" << dendl; + } + } + + if (on_start != nullptr) { + on_start->complete(r); + } +} + +template +bool Throttler::cancel_op(const std::string &ns, + const std::string &id_) { + Id id{ns, id_}; + + dout(20) << "id=" << id << dendl; + + Context *on_start = nullptr; + { + std::lock_guard locker{m_lock}; + auto it = m_queued_ops.find(id); + if (it != m_queued_ops.end()) { + dout(20) << "canceled queued op for " << id << dendl; + m_queue.remove(id); + on_start = it->second; + m_queued_ops.erase(it); + } + } + + if (on_start == nullptr) { + return false; + } + + on_start->complete(-ECANCELED); + return true; +} + +template +void Throttler::finish_op(const std::string &ns, + const std::string &id_) { + Id id{ns, id_}; + + dout(20) << "id=" << id << dendl; + + if (cancel_op(ns, id_)) { + return; + } + + Context *on_start = nullptr; + { + std::lock_guard locker{m_lock}; + + m_inflight_ops.erase(id); + + if (m_inflight_ops.size() < m_max_concurrent_ops && !m_queue.empty()) { + auto id = m_queue.front(); + auto it = m_queued_ops.find(id); + ceph_assert(it != m_queued_ops.end()); + m_inflight_ops.insert(id); + dout(20) << "ready to start op for " << id << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]" + << dendl; + on_start = it->second; + m_queued_ops.erase(it); + m_queue.pop_front(); + } + } + + if (on_start != nullptr) { + on_start->complete(0); + } +} + +template +void Throttler::drain(const std::string &ns, int r) { + dout(20) << "ns=" << ns << dendl; + + std::map queued_ops; + { + std::lock_guard locker{m_lock}; + for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) { + if (it->first.first == ns) { + queued_ops[it->first] = it->second; + m_queue.remove(it->first); + it = m_queued_ops.erase(it); + } else { + it++; + } + } + for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) { + if (it->first == ns) { + dout(20) << "inflight_op " << *it << dendl; + it = m_inflight_ops.erase(it); + } else { + it++; + } + } + } + + for (auto &it : queued_ops) { + dout(20) << "queued_op " << it.first << dendl; + it.second->complete(r); + } +} + +template +void Throttler::set_max_concurrent_ops(uint32_t max) { + dout(20) << "max=" << max << dendl; + + std::list ops; + { + std::lock_guard locker{m_lock}; + m_max_concurrent_ops = max; + + // Start waiting ops in the case of available free slots + while ((m_max_concurrent_ops == 0 || + m_inflight_ops.size() < m_max_concurrent_ops) && + !m_queue.empty()) { + auto id = m_queue.front(); + m_inflight_ops.insert(id); + dout(20) << "ready to start op for " << id << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]" + << dendl; + auto it = m_queued_ops.find(id); + ceph_assert(it != m_queued_ops.end()); + ops.push_back(it->second); + m_queued_ops.erase(it); + m_queue.pop_front(); + } + } + + for (const auto& ctx : ops) { + ctx->complete(0); + } +} + +template +void Throttler::print_status(ceph::Formatter *f, std::stringstream *ss) { + dout(20) << dendl; + + std::lock_guard locker{m_lock}; + + if (f) { + f->dump_int("max_parallel_requests", m_max_concurrent_ops); + f->dump_int("running_requests", m_inflight_ops.size()); + f->dump_int("waiting_requests", m_queue.size()); + f->flush(*ss); + } else { + *ss << "[ "; + *ss << "max_parallel_requests=" << m_max_concurrent_ops << ", "; + *ss << "running_requests=" << m_inflight_ops.size() << ", "; + *ss << "waiting_requests=" << m_queue.size() << " ]"; + } +} + +template +const char** Throttler::get_tracked_conf_keys() const { + return m_config_keys; +} + +template +void Throttler::handle_conf_change(const ConfigProxy& conf, + const set &changed) { + if (changed.count(m_config_key)) { + set_max_concurrent_ops(conf.get_val(m_config_key)); + } +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::Throttler; diff --git a/src/tools/rbd_mirror/Throttler.h b/src/tools/rbd_mirror/Throttler.h new file mode 100644 index 00000000000..f92c032729b --- /dev/null +++ b/src/tools/rbd_mirror/Throttler.h @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_THROTTLER_H +#define RBD_MIRROR_THROTTLER_H + +#include +#include +#include +#include +#include +#include + +#include "common/ceph_mutex.h" +#include "common/config_obs.h" + +class CephContext; +class Context; + +namespace ceph { class Formatter; } +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +template +class Throttler : public md_config_obs_t { +public: + static Throttler *create( + CephContext *cct, + const std::string &config_key) { + return new Throttler(cct, config_key); + } + void destroy() { + delete this; + } + + Throttler(CephContext *cct, + const std::string &config_key); + ~Throttler() override; + + void set_max_concurrent_ops(uint32_t max); + void start_op(const std::string &ns, const std::string &id, + Context *on_start); + bool cancel_op(const std::string &ns, const std::string &id); + void finish_op(const std::string &ns, const std::string &id); + void drain(const std::string &ns, int r); + + void print_status(ceph::Formatter *f, std::stringstream *ss); + +private: + typedef std::pair Id; + + CephContext *m_cct; + const std::string m_config_key; + mutable const char* m_config_keys[2]; + + ceph::mutex m_lock; + uint32_t m_max_concurrent_ops; + std::list m_queue; + std::map m_queued_ops; + std::set m_inflight_ops; + + const char **get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override; +}; + +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::Throttler; + +#endif // RBD_MIRROR_THROTTLER_H