]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: generalize ImageSyncThrottler into Throttler
authorMykola Golub <mgolub@suse.com>
Tue, 27 Aug 2019 19:42:46 +0000 (20:42 +0100)
committerMykola Golub <mgolub@suse.com>
Thu, 5 Sep 2019 12:57:47 +0000 (13:57 +0100)
Signed-off-by: Mykola Golub <mgolub@suse.com>
19 files changed:
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/test_ImageReplayer.cc
src/test/rbd_mirror/test_ImageSync.cc
src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc [deleted file]
src/test/rbd_mirror/test_mock_InstanceWatcher.cc
src/test/rbd_mirror/test_mock_NamespaceReplayer.cc
src/test/rbd_mirror/test_mock_PoolReplayer.cc
src/test/rbd_mirror/test_mock_Throttler.cc [new file with mode: 0644]
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/ImageSyncThrottler.cc [deleted file]
src/tools/rbd_mirror/ImageSyncThrottler.h [deleted file]
src/tools/rbd_mirror/InstanceWatcher.cc
src/tools/rbd_mirror/InstanceWatcher.h
src/tools/rbd_mirror/NamespaceReplayer.cc
src/tools/rbd_mirror/NamespaceReplayer.h
src/tools/rbd_mirror/PoolReplayer.cc
src/tools/rbd_mirror/PoolReplayer.h
src/tools/rbd_mirror/Throttler.cc [new file with mode: 0644]
src/tools/rbd_mirror/Throttler.h [new file with mode: 0644]

index 4ea85e4d249e3200350fb0add7f7ee2b86858daa..e481139928606e95c019ecf117963acf770a5734 100644 (file)
@@ -21,13 +21,13 @@ add_executable(unittest_rbd_mirror
   test_mock_ImageMap.cc
   test_mock_ImageReplayer.cc
   test_mock_ImageSync.cc
-  test_mock_ImageSyncThrottler.cc
   test_mock_InstanceReplayer.cc
   test_mock_InstanceWatcher.cc
   test_mock_LeaderWatcher.cc
   test_mock_NamespaceReplayer.cc
   test_mock_PoolReplayer.cc
   test_mock_PoolWatcher.cc
+  test_mock_Throttler.cc
   image_deleter/test_mock_RemoveRequest.cc
   image_deleter/test_mock_SnapshotPurgeRequest.cc
   image_deleter/test_mock_TrashMoveRequest.cc
index 7d85d3c568bb8a538933dceb84b1b14d63523c06..1491e8bcd774fc7862ebc076ec2feaba441dc65e 100644 (file)
@@ -35,9 +35,9 @@
 #include "librbd/io/ImageRequestWQ.h"
 #include "librbd/io/ReadResult.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Throttler.h"
 #include "tools/rbd_mirror/Types.h"
 
 #include "test/librados/test_cxx.h"
@@ -120,7 +120,8 @@ public:
     auto cct = reinterpret_cast<CephContext*>(m_local_ioctx.cct());
     m_threads.reset(new rbd::mirror::Threads<>(cct));
 
-    m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>(cct));
+    m_image_sync_throttler.reset(new rbd::mirror::Throttler<>(
+        cct, "rbd_mirror_concurrent_image_syncs"));
 
     m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
         m_local_ioctx, m_threads->work_queue, nullptr,
@@ -377,7 +378,7 @@ public:
 
   std::shared_ptr<librados::Rados> m_local_cluster;
   std::unique_ptr<rbd::mirror::Threads<>> m_threads;
-  std::unique_ptr<rbd::mirror::ImageSyncThrottler<>> m_image_sync_throttler;
+  std::unique_ptr<rbd::mirror::Throttler<>> m_image_sync_throttler;
   librados::Rados m_remote_cluster;
   rbd::mirror::InstanceWatcher<> *m_instance_watcher;
   std::string m_local_mirror_uuid = "local mirror uuid";
index 39feffe89f5a0a96a4bbf39d5d6c76ac5deeed0e..b585ea571fbcf575c4619959464d5d510c808665 100644 (file)
@@ -18,9 +18,9 @@
 #include "librbd/io/ReadResult.h"
 #include "librbd/journal/Types.h"
 #include "tools/rbd_mirror/ImageSync.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Throttler.h"
 
 void register_test_image_sync() {
 }
@@ -74,7 +74,8 @@ public:
     create_and_open(m_remote_io_ctx, &m_remote_image_ctx);
 
     auto cct = reinterpret_cast<CephContext*>(m_local_io_ctx.cct());
-    m_image_sync_throttler = rbd::mirror::ImageSyncThrottler<>::create(cct);
+    m_image_sync_throttler = rbd::mirror::Throttler<>::create(
+        cct, "rbd_mirror_concurrent_image_syncs");
 
     m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
         m_local_io_ctx, m_threads->work_queue, nullptr, m_image_sync_throttler);
@@ -126,7 +127,7 @@ public:
 
   librbd::ImageCtx *m_remote_image_ctx;
   librbd::ImageCtx *m_local_image_ctx;
rbd::mirror::ImageSyncThrottler<> *m_image_sync_throttler;
 rbd::mirror::Throttler<> *m_image_sync_throttler;
   rbd::mirror::InstanceWatcher<> *m_instance_watcher;
   ::journal::Journaler *m_remote_journaler;
   librbd::journal::MirrorPeerClientMeta m_client_meta;
diff --git a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc
deleted file mode 100644 (file)
index 1189e03..0000000
+++ /dev/null
@@ -1,253 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 SUSE LINUX GmbH
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "test/rbd_mirror/test_mock_fixture.h"
-#include "test/librbd/mock/MockImageCtx.h"
-
-namespace librbd {
-
-namespace {
-
-struct MockTestImageCtx : public librbd::MockImageCtx {
-  MockTestImageCtx(librbd::ImageCtx &image_ctx)
-    : librbd::MockImageCtx(image_ctx) {
-  }
-};
-
-} // anonymous namespace
-
-} // namespace librbd
-
-// template definitions
-#include "tools/rbd_mirror/ImageSyncThrottler.cc"
-
-namespace rbd {
-namespace mirror {
-
-class TestMockImageSyncThrottler : public TestMockFixture {
-public:
-  typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
-
-};
-
-TEST_F(TestMockImageSyncThrottler, Single_Sync) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  C_SaferCond on_start;
-  throttler.start_op("ns", "id", &on_start);
-  ASSERT_EQ(0, on_start.wait());
-  throttler.finish_op("ns", "id");
-}
-
-TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(2);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id2", &on_start2);
-  C_SaferCond on_start3;
-  throttler.start_op("ns", "id3", &on_start3);
-  C_SaferCond on_start4;
-  throttler.start_op("ns", "id4", &on_start4);
-
-  ASSERT_EQ(0, on_start2.wait());
-  throttler.finish_op("ns", "id2");
-  ASSERT_EQ(0, on_start3.wait());
-  throttler.finish_op("ns", "id3");
-  ASSERT_EQ(0, on_start1.wait());
-  throttler.finish_op("ns", "id1");
-  ASSERT_EQ(0, on_start4.wait());
-  throttler.finish_op("ns", "id4");
-}
-
-TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  C_SaferCond on_start;
-  throttler.start_op("ns", "id", &on_start);
-  ASSERT_EQ(0, on_start.wait());
-  ASSERT_FALSE(throttler.cancel_op("ns", "id"));
-  throttler.finish_op("ns", "id");
-}
-
-TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(1);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id2", &on_start2);
-
-  ASSERT_EQ(0, on_start1.wait());
-  ASSERT_TRUE(throttler.cancel_op("ns", "id2"));
-  ASSERT_EQ(-ECANCELED, on_start2.wait());
-  throttler.finish_op("ns", "id1");
-}
-
-TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(1);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id2", &on_start2);
-
-  ASSERT_EQ(0, on_start1.wait());
-  ASSERT_FALSE(throttler.cancel_op("ns", "id1"));
-  throttler.finish_op("ns", "id1");
-  ASSERT_EQ(0, on_start2.wait());
-  throttler.finish_op("ns", "id2");
-}
-
-TEST_F(TestMockImageSyncThrottler, Duplicate) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(1);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  ASSERT_EQ(0, on_start1.wait());
-
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id1", &on_start2);
-  ASSERT_EQ(0, on_start2.wait());
-
-  C_SaferCond on_start3;
-  throttler.start_op("ns", "id2", &on_start3);
-  C_SaferCond on_start4;
-  throttler.start_op("ns", "id2", &on_start4);
-  ASSERT_EQ(-ENOENT, on_start3.wait());
-
-  throttler.finish_op("ns", "id1");
-  ASSERT_EQ(0, on_start4.wait());
-  throttler.finish_op("ns", "id2");
-}
-
-TEST_F(TestMockImageSyncThrottler, Duplicate2) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(2);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  ASSERT_EQ(0, on_start1.wait());
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id2", &on_start2);
-  ASSERT_EQ(0, on_start2.wait());
-
-  C_SaferCond on_start3;
-  throttler.start_op("ns", "id3", &on_start3);
-  C_SaferCond on_start4;
-  throttler.start_op("ns", "id3", &on_start4); // dup
-  ASSERT_EQ(-ENOENT, on_start3.wait());
-
-  C_SaferCond on_start5;
-  throttler.start_op("ns", "id4", &on_start5);
-
-  throttler.finish_op("ns", "id1");
-  ASSERT_EQ(0, on_start4.wait());
-
-  throttler.finish_op("ns", "id2");
-  ASSERT_EQ(0, on_start5.wait());
-
-  C_SaferCond on_start6;
-  throttler.start_op("ns", "id5", &on_start6);
-
-  throttler.finish_op("ns", "id3");
-  ASSERT_EQ(0, on_start6.wait());
-
-  throttler.finish_op("ns", "id4");
-  throttler.finish_op("ns", "id5");
-}
-
-TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(2);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id2", &on_start2);
-  C_SaferCond on_start3;
-  throttler.start_op("ns", "id3", &on_start3);
-  C_SaferCond on_start4;
-  throttler.start_op("ns", "id4", &on_start4);
-  C_SaferCond on_start5;
-  throttler.start_op("ns", "id5", &on_start5);
-
-  ASSERT_EQ(0, on_start1.wait());
-  ASSERT_EQ(0, on_start2.wait());
-
-  throttler.set_max_concurrent_syncs(4);
-
-  ASSERT_EQ(0, on_start3.wait());
-  ASSERT_EQ(0, on_start4.wait());
-
-  throttler.finish_op("ns", "id4");
-  ASSERT_EQ(0, on_start5.wait());
-
-  throttler.finish_op("ns", "id1");
-  throttler.finish_op("ns", "id2");
-  throttler.finish_op("ns", "id3");
-  throttler.finish_op("ns", "id5");
-}
-
-TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(4);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id2", &on_start2);
-  C_SaferCond on_start3;
-  throttler.start_op("ns", "id3", &on_start3);
-  C_SaferCond on_start4;
-  throttler.start_op("ns", "id4", &on_start4);
-  C_SaferCond on_start5;
-  throttler.start_op("ns", "id5", &on_start5);
-
-  ASSERT_EQ(0, on_start1.wait());
-  ASSERT_EQ(0, on_start2.wait());
-  ASSERT_EQ(0, on_start3.wait());
-  ASSERT_EQ(0, on_start4.wait());
-
-  throttler.set_max_concurrent_syncs(2);
-
-  throttler.finish_op("ns", "id1");
-  throttler.finish_op("ns", "id2");
-  throttler.finish_op("ns", "id3");
-
-  ASSERT_EQ(0, on_start5.wait());
-
-  throttler.finish_op("ns", "id4");
-  throttler.finish_op("ns", "id5");
-}
-
-TEST_F(TestMockImageSyncThrottler, Drain) {
-  MockImageSyncThrottler throttler(g_ceph_context);
-  throttler.set_max_concurrent_syncs(1);
-
-  C_SaferCond on_start1;
-  throttler.start_op("ns", "id1", &on_start1);
-  C_SaferCond on_start2;
-  throttler.start_op("ns", "id2", &on_start2);
-
-  ASSERT_EQ(0, on_start1.wait());
-  throttler.drain("ns", -ESTALE);
-  ASSERT_EQ(-ESTALE, on_start2.wait());
-}
-
-} // namespace mirror
-} // namespace rbd
index 039b97e8284c5463721caaef4144efda503fc71f..78bb972b5556599ffd9b3b95c5a34f1eec4e06fa 100644 (file)
@@ -83,15 +83,15 @@ struct InstanceReplayer<librbd::MockTestImageCtx> {
 };
 
 template <>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
-  static ImageSyncThrottler* s_instance;
+struct Throttler<librbd::MockTestImageCtx> {
+  static Throttler* s_instance;
 
-  ImageSyncThrottler() {
+  Throttler() {
     ceph_assert(s_instance == nullptr);
     s_instance = this;
   }
 
-  virtual ~ImageSyncThrottler() {
+  virtual ~Throttler() {
     ceph_assert(s_instance == this);
     s_instance = nullptr;
   }
@@ -102,7 +102,7 @@ struct ImageSyncThrottler<librbd::MockTestImageCtx> {
   MOCK_METHOD2(drain, void(const std::string &, int));
 };
 
-ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+Throttler<librbd::MockTestImageCtx>* Throttler<librbd::MockTestImageCtx>::s_instance = nullptr;
 
 } // namespace mirror
 } // namespace rbd
@@ -658,10 +658,10 @@ TEST_F(TestMockInstanceWatcher, PeerImageRemovedCancel) {
 
 class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
 public:
-  typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+  typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
 
   MockManagedLock mock_managed_lock;
-  MockImageSyncThrottler mock_image_sync_throttler;
+  MockThrottler mock_image_sync_throttler;
   std::string instance_id1;
   std::string instance_id2;
 
@@ -765,7 +765,7 @@ public:
 
   void expect_throttler_drain() {
     EXPECT_CALL(mock_image_sync_throttler, drain("", -ESTALE));
-  }  
+  }
 };
 
 TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
index b787a388921b3c516a5fb1fefacf9fea3fd08358..b52e932ed5c49219447781358a21021765ab2323 100644 (file)
@@ -130,7 +130,7 @@ struct InstanceWatcher<librbd::MockTestImageCtx> {
   static InstanceWatcher* create(
       librados::IoCtx &ioctx, ContextWQ* work_queue,
       InstanceReplayer<librbd::MockTestImageCtx>* instance_replayer,
-      ImageSyncThrottler<librbd::MockTestImageCtx> *image_sync_throttler) {
+      Throttler<librbd::MockTestImageCtx> *image_sync_throttler) {
     ceph_assert(s_instance != nullptr);
     return s_instance;
   }
index 369624c0d35e07c2829dc98b8b2beb5cf6330dd5..e1f050f8f53365cd96d5a1f18b966e1418bf5d58 100644 (file)
@@ -10,7 +10,7 @@
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "test/rbd_mirror/mock/MockContextWQ.h"
 #include "test/rbd_mirror/mock/MockSafeTimer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/Throttler.h"
 #include "tools/rbd_mirror/LeaderWatcher.h"
 #include "tools/rbd_mirror/NamespaceReplayer.h"
 #include "tools/rbd_mirror/PoolReplayer.h"
@@ -97,19 +97,21 @@ namespace rbd {
 namespace mirror {
 
 template <>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
-  static ImageSyncThrottler* s_instance;
+struct Throttler<librbd::MockTestImageCtx> {
+  static Throttler* s_instance;
 
-  static ImageSyncThrottler *create(CephContext *cct) {
+  static Throttler *create(
+      CephContext *cct,
+      const std::string &max_concurrent_ops_config_param_name) {
     return s_instance;
   }
 
-  ImageSyncThrottler() {
+  Throttler() {
     ceph_assert(s_instance == nullptr);
     s_instance = this;
   }
 
-  virtual ~ImageSyncThrottler() {
+  virtual ~Throttler() {
     ceph_assert(s_instance == this);
     s_instance = nullptr;
   }
@@ -117,7 +119,7 @@ struct ImageSyncThrottler<librbd::MockTestImageCtx> {
   MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*));
 };
 
-ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+Throttler<librbd::MockTestImageCtx>* Throttler<librbd::MockTestImageCtx>::s_instance = nullptr;
 
 template <>
 struct NamespaceReplayer<librbd::MockTestImageCtx> {
@@ -130,7 +132,7 @@ struct NamespaceReplayer<librbd::MockTestImageCtx> {
       const std::string &local_mirror_uuid,
       const std::string &remote_mirror_uuid,
       Threads<librbd::MockTestImageCtx> *threads,
-      ImageSyncThrottler<librbd::MockTestImageCtx> *image_sync_throttler,
+      Throttler<librbd::MockTestImageCtx> *image_sync_throttler,
       ServiceDaemon<librbd::MockTestImageCtx> *service_daemon,
       journal::CacheManagerHandler *cache_manager_handler) {
     ceph_assert(s_instances.count(name));
@@ -250,7 +252,7 @@ class TestMockPoolReplayer : public TestMockFixture {
 public:
   typedef librbd::api::Namespace<librbd::MockTestImageCtx> MockNamespace;
   typedef PoolReplayer<librbd::MockTestImageCtx> MockPoolReplayer;
-  typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+  typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
   typedef NamespaceReplayer<librbd::MockTestImageCtx> MockNamespaceReplayer;
   typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
   typedef ServiceDaemon<librbd::MockTestImageCtx> MockServiceDaemon;
diff --git a/src/test/rbd_mirror/test_mock_Throttler.cc b/src/test/rbd_mirror/test_mock_Throttler.cc
new file mode 100644 (file)
index 0000000..ab562c1
--- /dev/null
@@ -0,0 +1,253 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "test/librbd/mock/MockImageCtx.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public librbd::MockImageCtx {
+  MockTestImageCtx(librbd::ImageCtx &image_ctx)
+    : librbd::MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+
+} // namespace librbd
+
+// template definitions
+#include "tools/rbd_mirror/Throttler.cc"
+
+namespace rbd {
+namespace mirror {
+
+class TestMockThrottler : public TestMockFixture {
+public:
+  typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
+
+};
+
+TEST_F(TestMockThrottler, Single_Sync) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  C_SaferCond on_start;
+  throttler.start_op("ns", "id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+  throttler.finish_op("ns", "id");
+}
+
+TEST_F(TestMockThrottler, Multiple_Syncs) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(2);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id2", &on_start2);
+  C_SaferCond on_start3;
+  throttler.start_op("ns", "id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("ns", "id4", &on_start4);
+
+  ASSERT_EQ(0, on_start2.wait());
+  throttler.finish_op("ns", "id2");
+  ASSERT_EQ(0, on_start3.wait());
+  throttler.finish_op("ns", "id3");
+  ASSERT_EQ(0, on_start1.wait());
+  throttler.finish_op("ns", "id1");
+  ASSERT_EQ(0, on_start4.wait());
+  throttler.finish_op("ns", "id4");
+}
+
+TEST_F(TestMockThrottler, Cancel_Running_Sync) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  C_SaferCond on_start;
+  throttler.start_op("ns", "id", &on_start);
+  ASSERT_EQ(0, on_start.wait());
+  ASSERT_FALSE(throttler.cancel_op("ns", "id"));
+  throttler.finish_op("ns", "id");
+}
+
+TEST_F(TestMockThrottler, Cancel_Waiting_Sync) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id2", &on_start2);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_TRUE(throttler.cancel_op("ns", "id2"));
+  ASSERT_EQ(-ECANCELED, on_start2.wait());
+  throttler.finish_op("ns", "id1");
+}
+
+TEST_F(TestMockThrottler, Cancel_Running_Sync_Start_Waiting) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id2", &on_start2);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_FALSE(throttler.cancel_op("ns", "id1"));
+  throttler.finish_op("ns", "id1");
+  ASSERT_EQ(0, on_start2.wait());
+  throttler.finish_op("ns", "id2");
+}
+
+TEST_F(TestMockThrottler, Duplicate) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  ASSERT_EQ(0, on_start1.wait());
+
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id1", &on_start2);
+  ASSERT_EQ(0, on_start2.wait());
+
+  C_SaferCond on_start3;
+  throttler.start_op("ns", "id2", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("ns", "id2", &on_start4);
+  ASSERT_EQ(-ENOENT, on_start3.wait());
+
+  throttler.finish_op("ns", "id1");
+  ASSERT_EQ(0, on_start4.wait());
+  throttler.finish_op("ns", "id2");
+}
+
+TEST_F(TestMockThrottler, Duplicate2) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(2);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  ASSERT_EQ(0, on_start1.wait());
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id2", &on_start2);
+  ASSERT_EQ(0, on_start2.wait());
+
+  C_SaferCond on_start3;
+  throttler.start_op("ns", "id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("ns", "id3", &on_start4); // dup
+  ASSERT_EQ(-ENOENT, on_start3.wait());
+
+  C_SaferCond on_start5;
+  throttler.start_op("ns", "id4", &on_start5);
+
+  throttler.finish_op("ns", "id1");
+  ASSERT_EQ(0, on_start4.wait());
+
+  throttler.finish_op("ns", "id2");
+  ASSERT_EQ(0, on_start5.wait());
+
+  C_SaferCond on_start6;
+  throttler.start_op("ns", "id5", &on_start6);
+
+  throttler.finish_op("ns", "id3");
+  ASSERT_EQ(0, on_start6.wait());
+
+  throttler.finish_op("ns", "id4");
+  throttler.finish_op("ns", "id5");
+}
+
+TEST_F(TestMockThrottler, Increase_Max_Concurrent_Syncs) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(2);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id2", &on_start2);
+  C_SaferCond on_start3;
+  throttler.start_op("ns", "id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("ns", "id4", &on_start4);
+  C_SaferCond on_start5;
+  throttler.start_op("ns", "id5", &on_start5);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_EQ(0, on_start2.wait());
+
+  throttler.set_max_concurrent_ops(4);
+
+  ASSERT_EQ(0, on_start3.wait());
+  ASSERT_EQ(0, on_start4.wait());
+
+  throttler.finish_op("ns", "id4");
+  ASSERT_EQ(0, on_start5.wait());
+
+  throttler.finish_op("ns", "id1");
+  throttler.finish_op("ns", "id2");
+  throttler.finish_op("ns", "id3");
+  throttler.finish_op("ns", "id5");
+}
+
+TEST_F(TestMockThrottler, Decrease_Max_Concurrent_Syncs) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(4);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id2", &on_start2);
+  C_SaferCond on_start3;
+  throttler.start_op("ns", "id3", &on_start3);
+  C_SaferCond on_start4;
+  throttler.start_op("ns", "id4", &on_start4);
+  C_SaferCond on_start5;
+  throttler.start_op("ns", "id5", &on_start5);
+
+  ASSERT_EQ(0, on_start1.wait());
+  ASSERT_EQ(0, on_start2.wait());
+  ASSERT_EQ(0, on_start3.wait());
+  ASSERT_EQ(0, on_start4.wait());
+
+  throttler.set_max_concurrent_ops(2);
+
+  throttler.finish_op("ns", "id1");
+  throttler.finish_op("ns", "id2");
+  throttler.finish_op("ns", "id3");
+
+  ASSERT_EQ(0, on_start5.wait());
+
+  throttler.finish_op("ns", "id4");
+  throttler.finish_op("ns", "id5");
+}
+
+TEST_F(TestMockThrottler, Drain) {
+  MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+  throttler.set_max_concurrent_ops(1);
+
+  C_SaferCond on_start1;
+  throttler.start_op("ns", "id1", &on_start1);
+  C_SaferCond on_start2;
+  throttler.start_op("ns", "id2", &on_start2);
+
+  ASSERT_EQ(0, on_start1.wait());
+  throttler.drain("ns", -ESTALE);
+  ASSERT_EQ(-ESTALE, on_start2.wait());
+}
+
+} // namespace mirror
+} // namespace rbd
index 3c0088fdf103dd1f27bb217b900d766aa7a15589..0bf91bd5ec864387d02ef296c6dd38aa24837d4a 100644 (file)
@@ -9,7 +9,6 @@ set(rbd_mirror_internal
   ImageMap.cc
   ImageReplayer.cc
   ImageSync.cc
-  ImageSyncThrottler.cc
   InstanceReplayer.cc
   InstanceWatcher.cc
   Instances.cc
@@ -21,6 +20,7 @@ set(rbd_mirror_internal
   PoolWatcher.cc
   ServiceDaemon.cc
   Threads.cc
+  Throttler.cc
   Types.cc
   image_deleter/RemoveRequest.cc
   image_deleter/SnapshotPurgeRequest.cc
diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc
deleted file mode 100644 (file)
index f2d0be9..0000000
+++ /dev/null
@@ -1,253 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 SUSE LINUX GmbH
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "ImageSyncThrottler.h"
-#include "common/Formatter.h"
-#include "common/debug.h"
-#include "common/errno.h"
-#include "librbd/Utils.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
-                           << " " << __func__ << ": "
-
-namespace rbd {
-namespace mirror {
-
-template <typename I>
-ImageSyncThrottler<I>::ImageSyncThrottler(CephContext *cct)
-  : m_cct(cct),
-    m_lock(ceph::make_mutex(
-      librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
-                                    this))),
-    m_max_concurrent_syncs(cct->_conf.get_val<uint64_t>(
-      "rbd_mirror_concurrent_image_syncs")) {
-  dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
-  m_cct->_conf.add_observer(this);
-}
-
-template <typename I>
-ImageSyncThrottler<I>::~ImageSyncThrottler() {
-  m_cct->_conf.remove_observer(this);
-
-  std::lock_guard locker{m_lock};
-  ceph_assert(m_inflight_ops.empty());
-  ceph_assert(m_queue.empty());
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::start_op(const std::string &ns,
-                                     const std::string &id_,
-                                     Context *on_start) {
-  Id id{ns, id_};
-
-  dout(20) << "id=" << id << dendl;
-
-  int r = 0;
-  {
-    std::lock_guard locker{m_lock};
-
-    if (m_inflight_ops.count(id) > 0) {
-      dout(20) << "duplicate for already started op " << id << dendl;
-    } else if (m_queued_ops.count(id) > 0) {
-      dout(20) << "duplicate for already queued op " << id << dendl;
-      std::swap(m_queued_ops[id], on_start);
-      r = -ENOENT;
-    } else if (m_max_concurrent_syncs == 0 ||
-               m_inflight_ops.size() < m_max_concurrent_syncs) {
-      ceph_assert(m_queue.empty());
-      m_inflight_ops.insert(id);
-      dout(20) << "ready to start sync for " << id << " ["
-               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
-               << dendl;
-    } else {
-      m_queue.push_back(id);
-      std::swap(m_queued_ops[id], on_start);
-      dout(20) << "image sync for " << id << " has been queued" << dendl;
-    }
-  }
-
-  if (on_start != nullptr) {
-    on_start->complete(r);
-  }
-}
-
-template <typename I>
-bool ImageSyncThrottler<I>::cancel_op(const std::string &ns,
-                                      const std::string &id_) {
-  Id id{ns, id_};
-
-  dout(20) << "id=" << id << dendl;
-
-  Context *on_start = nullptr;
-  {
-    std::lock_guard locker{m_lock};
-    auto it = m_queued_ops.find(id);
-    if (it != m_queued_ops.end()) {
-      dout(20) << "canceled queued sync for " << id << dendl;
-      m_queue.remove(id);
-      on_start = it->second;
-      m_queued_ops.erase(it);
-    }
-  }
-
-  if (on_start == nullptr) {
-    return false;
-  }
-
-  on_start->complete(-ECANCELED);
-  return true;
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::finish_op(const std::string &ns,
-                                      const std::string &id_) {
-  Id id{ns, id_};
-
-  dout(20) << "id=" << id << dendl;
-
-  if (cancel_op(ns, id_)) {
-    return;
-  }
-
-  Context *on_start = nullptr;
-  {
-    std::lock_guard locker{m_lock};
-
-    m_inflight_ops.erase(id);
-
-    if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
-      auto id = m_queue.front();
-      auto it = m_queued_ops.find(id);
-      ceph_assert(it != m_queued_ops.end());
-      m_inflight_ops.insert(id);
-      dout(20) << "ready to start sync for " << id << " ["
-               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
-               << dendl;
-      on_start = it->second;
-      m_queued_ops.erase(it);
-      m_queue.pop_front();
-    }
-  }
-
-  if (on_start != nullptr) {
-    on_start->complete(0);
-  }
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::drain(const std::string &ns, int r) {
-  dout(20) << "ns=" << ns << dendl;
-
-  std::map<Id, Context *> queued_ops;
-  {
-    std::lock_guard locker{m_lock};
-    for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) {
-      if (it->first.first == ns) {
-        queued_ops[it->first] = it->second;
-        m_queue.remove(it->first);
-        it = m_queued_ops.erase(it);
-      } else {
-        it++;
-      }
-    }
-    for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) {
-      if (it->first == ns) {
-        dout(20) << "inflight_op " << *it << dendl;
-        it = m_inflight_ops.erase(it);
-      } else {
-        it++;
-      }
-    }
-  }
-
-  for (auto &it : queued_ops) {
-    dout(20) << "queued_op " << it.first << dendl;
-    it.second->complete(r);
-  }
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
-  dout(20) << "max=" << max << dendl;
-
-  std::list<Context *> ops;
-  {
-    std::lock_guard locker{m_lock};
-    m_max_concurrent_syncs = max;
-
-    // Start waiting ops in the case of available free slots
-    while ((m_max_concurrent_syncs == 0 ||
-            m_inflight_ops.size() < m_max_concurrent_syncs) &&
-           !m_queue.empty()) {
-      auto id = m_queue.front();
-      m_inflight_ops.insert(id);
-      dout(20) << "ready to start sync for " << id << " ["
-               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
-               << dendl;
-      auto it = m_queued_ops.find(id);
-      ceph_assert(it != m_queued_ops.end());
-      ops.push_back(it->second);
-      m_queued_ops.erase(it);
-      m_queue.pop_front();
-    }
-  }
-
-  for (const auto& ctx : ops) {
-    ctx->complete(0);
-  }
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::print_status(ceph::Formatter *f, std::stringstream *ss) {
-  dout(20) << dendl;
-
-  std::lock_guard locker{m_lock};
-
-  if (f) {
-    f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
-    f->dump_int("running_syncs", m_inflight_ops.size());
-    f->dump_int("waiting_syncs", m_queue.size());
-    f->flush(*ss);
-  } else {
-    *ss << "[ ";
-    *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
-    *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
-    *ss << "waiting_syncs=" << m_queue.size() << " ]";
-  }
-}
-
-template <typename I>
-const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
-  static const char* KEYS[] = {
-    "rbd_mirror_concurrent_image_syncs",
-    NULL
-  };
-  return KEYS;
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::handle_conf_change(const ConfigProxy& conf,
-                                      const set<string> &changed) {
-  if (changed.count("rbd_mirror_concurrent_image_syncs")) {
-    set_max_concurrent_syncs(conf.get_val<uint64_t>("rbd_mirror_concurrent_image_syncs"));
-  }
-}
-
-} // namespace mirror
-} // namespace rbd
-
-template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h
deleted file mode 100644 (file)
index 278b7c3..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
-#define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
-
-#include <list>
-#include <map>
-#include <set>
-#include <sstream>
-#include <string>
-#include <utility>
-
-#include "common/ceph_mutex.h"
-#include "common/config_obs.h"
-
-class CephContext;
-class Context;
-
-namespace ceph { class Formatter; }
-namespace librbd { class ImageCtx; }
-
-namespace rbd {
-namespace mirror {
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class ImageSyncThrottler : public md_config_obs_t {
-public:
-  static ImageSyncThrottler *create(CephContext *cct) {
-    return new ImageSyncThrottler(cct);
-  }
-  void destroy() {
-    delete this;
-  }
-
-  ImageSyncThrottler(CephContext *cct);
-  ~ImageSyncThrottler() override;
-
-  void set_max_concurrent_syncs(uint32_t max);
-  void start_op(const std::string &ns, const std::string &id,
-                Context *on_start);
-  bool cancel_op(const std::string &ns, const std::string &id);
-  void finish_op(const std::string &ns, const std::string &id);
-  void drain(const std::string &ns, int r);
-
-  void print_status(ceph::Formatter *f, std::stringstream *ss);
-
-private:
-  typedef std::pair<std::string, std::string> Id;
-  
-  CephContext *m_cct;
-  ceph::mutex m_lock;
-  uint32_t m_max_concurrent_syncs;
-  std::list<Id> m_queue;
-  std::map<Id, Context *> m_queued_ops;
-  std::set<Id> m_inflight_ops;
-
-  const char **get_tracked_conf_keys() const override;
-  void handle_conf_change(const ConfigProxy& conf,
-                          const std::set<std::string> &changed) override;
-};
-
-} // namespace mirror
-} // namespace rbd
-
-extern template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
-
-#endif // RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
index f29a8d48abc1bdecd33190afbe10e59b2ae014a8..122e35326b60b7670913f3c619ae46676c27e4ae 100644 (file)
@@ -9,7 +9,7 @@
 #include "librbd/ManagedLock.h"
 #include "librbd/Utils.h"
 #include "InstanceReplayer.h"
-#include "ImageSyncThrottler.h"
+#include "Throttler.h"
 #include "common/Cond.h"
 
 #define dout_context g_ceph_context
@@ -313,7 +313,7 @@ template <typename I>
 InstanceWatcher<I> *InstanceWatcher<I>::create(
     librados::IoCtx &io_ctx, ContextWQ *work_queue,
     InstanceReplayer<I> *instance_replayer,
-    ImageSyncThrottler<I> *image_sync_throttler) {
+    Throttler<I> *image_sync_throttler) {
   return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
                                 image_sync_throttler,
                                 stringify(io_ctx.get_instance_id()));
@@ -323,7 +323,7 @@ template <typename I>
 InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
                                     ContextWQ *work_queue,
                                     InstanceReplayer<I> *instance_replayer,
-                                    ImageSyncThrottler<I> *image_sync_throttler,
+                                    Throttler<I> *image_sync_throttler,
                                     const std::string &instance_id)
   : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
     m_instance_replayer(instance_replayer),
index 5e077921fa2bcef6500cb46b1022e9f9407d1dbb..bc459f3e4c0682c4823d1358b794d15222e6be31 100644 (file)
@@ -25,8 +25,8 @@ template <typename> class ManagedLock;
 namespace rbd {
 namespace mirror {
 
-template <typename> class ImageSyncThrottler;
 template <typename> class InstanceReplayer;
+template <typename> class Throttler;
 template <typename> struct Threads;
 
 template <typename ImageCtxT = librbd::ImageCtx>
@@ -44,14 +44,14 @@ public:
   static InstanceWatcher *create(
     librados::IoCtx &io_ctx, ContextWQ *work_queue,
     InstanceReplayer<ImageCtxT> *instance_replayer,
-    ImageSyncThrottler<ImageCtxT> *image_sync_throttler);
+    Throttler<ImageCtxT> *image_sync_throttler);
   void destroy() {
     delete this;
   }
 
   InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
                   InstanceReplayer<ImageCtxT> *instance_replayer,
-                  ImageSyncThrottler<ImageCtxT> *image_sync_throttler,
+                  Throttler<ImageCtxT> *image_sync_throttler,
                   const std::string &instance_id);
   ~InstanceWatcher() override;
 
@@ -157,7 +157,7 @@ private:
 
   Threads<ImageCtxT> *m_threads;
   InstanceReplayer<ImageCtxT> *m_instance_replayer;
-  ImageSyncThrottler<ImageCtxT> *m_image_sync_throttler;
+  Throttler<ImageCtxT> *m_image_sync_throttler;
   std::string m_instance_id;
 
   mutable ceph::mutex m_lock;
index 6428bfe11e8f203e1e538f9ad67b89df7e2c066b..35703ab8bf4c193c1a15d44555bd6030568567e7 100644 (file)
@@ -40,7 +40,7 @@ NamespaceReplayer<I>::NamespaceReplayer(
     const std::string &name,
     librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
     const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid,
-    Threads<I> *threads, ImageSyncThrottler<I> *image_sync_throttler,
+    Threads<I> *threads, Throttler<I> *image_sync_throttler,
     ServiceDaemon<I> *service_daemon,
     journal::CacheManagerHandler *cache_manager_handler) :
   m_local_mirror_uuid(local_mirror_uuid),
index 810f8b445070c224b9017ba385665bda503f795f..c6f519f5590ab9bee931465ffd64726d6350a71d 100644 (file)
@@ -32,8 +32,8 @@ namespace librbd { class ImageCtx; }
 namespace rbd {
 namespace mirror {
 
-template <typename> class ImageSyncThrottler;
 template <typename> class ServiceDaemon;
+template <typename> class Throttler;
 template <typename> struct Threads;
 
 /**
@@ -49,7 +49,7 @@ public:
       const std::string &local_mirror_uuid,
       const std::string &remote_mirror_uuid,
       Threads<ImageCtxT> *threads,
-      ImageSyncThrottler<ImageCtxT> *image_sync_throttler,
+      Throttler<ImageCtxT> *image_sync_throttler,
       ServiceDaemon<ImageCtxT> *service_daemon,
       journal::CacheManagerHandler *cache_manager_handler) {
     return new NamespaceReplayer(name, local_ioctx, remote_ioctx,
@@ -64,7 +64,7 @@ public:
                     const std::string &local_mirror_uuid,
                     const std::string &remote_mirror_uuid,
                     Threads<ImageCtxT> *threads,
-                    ImageSyncThrottler<ImageCtxT> *image_sync_throttler,
+                    Throttler<ImageCtxT> *image_sync_throttler,
                     ServiceDaemon<ImageCtxT> *service_daemon,
                     journal::CacheManagerHandler *cache_manager_handler);
   NamespaceReplayer(const NamespaceReplayer&) = delete;
@@ -251,7 +251,7 @@ private:
   std::string m_local_mirror_uuid;
   std::string m_remote_mirror_uuid;
   Threads<ImageCtxT> *m_threads;
-  ImageSyncThrottler<ImageCtxT> *m_image_sync_throttler;
+  Throttler<ImageCtxT> *m_image_sync_throttler;
   ServiceDaemon<ImageCtxT> *m_service_daemon;
   journal::CacheManagerHandler *m_cache_manager_handler;
 
index c61951b6ffc8a85b5e07e5db933de92232f2e4df..588ea5e0749398d56fca529c3c974a45d083748b 100644 (file)
@@ -312,7 +312,8 @@ void PoolReplayer<I>::init() {
 
   dout(10) << "connected to " << m_peer << dendl;
 
-  m_image_sync_throttler.reset(ImageSyncThrottler<I>::create(cct));
+  m_image_sync_throttler.reset(
+      Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs"));
 
   m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
       "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
index 039a420c67501474581ce7f5f1e723bf264e4346..33a8eebd3bf749ee6d3978aa8f47534187572080 100644 (file)
@@ -10,9 +10,9 @@
 #include "include/rados/librados.hpp"
 #include "librbd/Utils.h"
 
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
 #include "tools/rbd_mirror/LeaderWatcher.h"
 #include "tools/rbd_mirror/NamespaceReplayer.h"
+#include "tools/rbd_mirror/Throttler.h"
 #include "tools/rbd_mirror/Types.h"
 #include "tools/rbd_mirror/leader_watcher/Types.h"
 #include "tools/rbd_mirror/service_daemon/Types.h"
@@ -259,7 +259,7 @@ private:
   } m_leader_listener;
 
   std::unique_ptr<LeaderWatcher<ImageCtxT>> m_leader_watcher;
-  std::unique_ptr<ImageSyncThrottler<ImageCtxT>> m_image_sync_throttler;
+  std::unique_ptr<Throttler<ImageCtxT>> m_image_sync_throttler;
 };
 
 } // namespace mirror
diff --git a/src/tools/rbd_mirror/Throttler.cc b/src/tools/rbd_mirror/Throttler.cc
new file mode 100644 (file)
index 0000000..ed377f1
--- /dev/null
@@ -0,0 +1,248 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "Throttler.h"
+#include "common/Formatter.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::Throttler:: " << this \
+                           << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+template <typename I>
+Throttler<I>::Throttler(CephContext *cct, const std::string &config_key)
+  : m_cct(cct), m_config_key(config_key),
+    m_config_keys{m_config_key.c_str(), nullptr},
+    m_lock(ceph::make_mutex(
+      librbd::util::unique_lock_name("rbd::mirror::Throttler", this))),
+    m_max_concurrent_ops(cct->_conf.get_val<uint64_t>(m_config_key)) {
+  dout(20) << m_config_key << "=" << m_max_concurrent_ops << dendl;
+  m_cct->_conf.add_observer(this);
+}
+
+template <typename I>
+Throttler<I>::~Throttler() {
+  m_cct->_conf.remove_observer(this);
+
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_inflight_ops.empty());
+  ceph_assert(m_queue.empty());
+}
+
+template <typename I>
+void Throttler<I>::start_op(const std::string &ns,
+                                     const std::string &id_,
+                                     Context *on_start) {
+  Id id{ns, id_};
+
+  dout(20) << "id=" << id << dendl;
+
+  int r = 0;
+  {
+    std::lock_guard locker{m_lock};
+
+    if (m_inflight_ops.count(id) > 0) {
+      dout(20) << "duplicate for already started op " << id << dendl;
+    } else if (m_queued_ops.count(id) > 0) {
+      dout(20) << "duplicate for already queued op " << id << dendl;
+      std::swap(m_queued_ops[id], on_start);
+      r = -ENOENT;
+    } else if (m_max_concurrent_ops == 0 ||
+               m_inflight_ops.size() < m_max_concurrent_ops) {
+      ceph_assert(m_queue.empty());
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start op for " << id << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
+               << dendl;
+    } else {
+      m_queue.push_back(id);
+      std::swap(m_queued_ops[id], on_start);
+      dout(20) << "op for " << id << " has been queued" << dendl;
+    }
+  }
+
+  if (on_start != nullptr) {
+    on_start->complete(r);
+  }
+}
+
+template <typename I>
+bool Throttler<I>::cancel_op(const std::string &ns,
+                                      const std::string &id_) {
+  Id id{ns, id_};
+
+  dout(20) << "id=" << id << dendl;
+
+  Context *on_start = nullptr;
+  {
+    std::lock_guard locker{m_lock};
+    auto it = m_queued_ops.find(id);
+    if (it != m_queued_ops.end()) {
+      dout(20) << "canceled queued op for " << id << dendl;
+      m_queue.remove(id);
+      on_start = it->second;
+      m_queued_ops.erase(it);
+    }
+  }
+
+  if (on_start == nullptr) {
+    return false;
+  }
+
+  on_start->complete(-ECANCELED);
+  return true;
+}
+
+template <typename I>
+void Throttler<I>::finish_op(const std::string &ns,
+                                      const std::string &id_) {
+  Id id{ns, id_};
+
+  dout(20) << "id=" << id << dendl;
+
+  if (cancel_op(ns, id_)) {
+    return;
+  }
+
+  Context *on_start = nullptr;
+  {
+    std::lock_guard locker{m_lock};
+
+    m_inflight_ops.erase(id);
+
+    if (m_inflight_ops.size() < m_max_concurrent_ops && !m_queue.empty()) {
+      auto id = m_queue.front();
+      auto it = m_queued_ops.find(id);
+      ceph_assert(it != m_queued_ops.end());
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start op for " << id << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
+               << dendl;
+      on_start = it->second;
+      m_queued_ops.erase(it);
+      m_queue.pop_front();
+    }
+  }
+
+  if (on_start != nullptr) {
+    on_start->complete(0);
+  }
+}
+
+template <typename I>
+void Throttler<I>::drain(const std::string &ns, int r) {
+  dout(20) << "ns=" << ns << dendl;
+
+  std::map<Id, Context *> queued_ops;
+  {
+    std::lock_guard locker{m_lock};
+    for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) {
+      if (it->first.first == ns) {
+        queued_ops[it->first] = it->second;
+        m_queue.remove(it->first);
+        it = m_queued_ops.erase(it);
+      } else {
+        it++;
+      }
+    }
+    for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) {
+      if (it->first == ns) {
+        dout(20) << "inflight_op " << *it << dendl;
+        it = m_inflight_ops.erase(it);
+      } else {
+        it++;
+      }
+    }
+  }
+
+  for (auto &it : queued_ops) {
+    dout(20) << "queued_op " << it.first << dendl;
+    it.second->complete(r);
+  }
+}
+
+template <typename I>
+void Throttler<I>::set_max_concurrent_ops(uint32_t max) {
+  dout(20) << "max=" << max << dendl;
+
+  std::list<Context *> ops;
+  {
+    std::lock_guard locker{m_lock};
+    m_max_concurrent_ops = max;
+
+    // Start waiting ops in the case of available free slots
+    while ((m_max_concurrent_ops == 0 ||
+            m_inflight_ops.size() < m_max_concurrent_ops) &&
+           !m_queue.empty()) {
+      auto id = m_queue.front();
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start op for " << id << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
+               << dendl;
+      auto it = m_queued_ops.find(id);
+      ceph_assert(it != m_queued_ops.end());
+      ops.push_back(it->second);
+      m_queued_ops.erase(it);
+      m_queue.pop_front();
+    }
+  }
+
+  for (const auto& ctx : ops) {
+    ctx->complete(0);
+  }
+}
+
+template <typename I>
+void Throttler<I>::print_status(ceph::Formatter *f, std::stringstream *ss) {
+  dout(20) << dendl;
+
+  std::lock_guard locker{m_lock};
+
+  if (f) {
+    f->dump_int("max_parallel_requests", m_max_concurrent_ops);
+    f->dump_int("running_requests", m_inflight_ops.size());
+    f->dump_int("waiting_requests", m_queue.size());
+    f->flush(*ss);
+  } else {
+    *ss << "[ ";
+    *ss << "max_parallel_requests=" << m_max_concurrent_ops << ", ";
+    *ss << "running_requests=" << m_inflight_ops.size() << ", ";
+    *ss << "waiting_requests=" << m_queue.size() << " ]";
+  }
+}
+
+template <typename I>
+const char** Throttler<I>::get_tracked_conf_keys() const {  
+  return m_config_keys;
+}
+
+template <typename I>
+void Throttler<I>::handle_conf_change(const ConfigProxy& conf,
+                                      const set<string> &changed) {
+  if (changed.count(m_config_key)) {
+    set_max_concurrent_ops(conf.get_val<uint64_t>(m_config_key));
+  }
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::Throttler<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/Throttler.h b/src/tools/rbd_mirror/Throttler.h
new file mode 100644 (file)
index 0000000..f92c032
--- /dev/null
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_THROTTLER_H
+#define RBD_MIRROR_THROTTLER_H
+
+#include <list>
+#include <map>
+#include <set>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "common/ceph_mutex.h"
+#include "common/config_obs.h"
+
+class CephContext;
+class Context;
+
+namespace ceph { class Formatter; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class Throttler : public md_config_obs_t {
+public:
+  static Throttler *create(
+      CephContext *cct,
+      const std::string &config_key) {
+    return new Throttler(cct, config_key);
+  }
+  void destroy() {
+    delete this;
+  }
+
+  Throttler(CephContext *cct,
+            const std::string &config_key);
+  ~Throttler() override;
+
+  void set_max_concurrent_ops(uint32_t max);
+  void start_op(const std::string &ns, const std::string &id,
+                Context *on_start);
+  bool cancel_op(const std::string &ns, const std::string &id);
+  void finish_op(const std::string &ns, const std::string &id);
+  void drain(const std::string &ns, int r);
+
+  void print_status(ceph::Formatter *f, std::stringstream *ss);
+
+private:
+  typedef std::pair<std::string, std::string> Id;
+
+  CephContext *m_cct;
+  const std::string m_config_key;
+  mutable const char* m_config_keys[2];
+
+  ceph::mutex m_lock;
+  uint32_t m_max_concurrent_ops;
+  std::list<Id> m_queue;
+  std::map<Id, Context *> m_queued_ops;
+  std::set<Id> m_inflight_ops;
+
+  const char **get_tracked_conf_keys() const override;
+  void handle_conf_change(const ConfigProxy& conf,
+                          const std::set<std::string> &changed) override;
+};
+
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::Throttler<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_THROTTLER_H