test_mock_ImageMap.cc
test_mock_ImageReplayer.cc
test_mock_ImageSync.cc
- test_mock_ImageSyncThrottler.cc
test_mock_InstanceReplayer.cc
test_mock_InstanceWatcher.cc
test_mock_LeaderWatcher.cc
test_mock_NamespaceReplayer.cc
test_mock_PoolReplayer.cc
test_mock_PoolWatcher.cc
+ test_mock_Throttler.cc
image_deleter/test_mock_RemoveRequest.cc
image_deleter/test_mock_SnapshotPurgeRequest.cc
image_deleter/test_mock_TrashMoveRequest.cc
#include "librbd/io/ImageRequestWQ.h"
#include "librbd/io/ReadResult.h"
#include "tools/rbd_mirror/ImageReplayer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Throttler.h"
#include "tools/rbd_mirror/Types.h"
#include "test/librados/test_cxx.h"
auto cct = reinterpret_cast<CephContext*>(m_local_ioctx.cct());
m_threads.reset(new rbd::mirror::Threads<>(cct));
- m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>(cct));
+ m_image_sync_throttler.reset(new rbd::mirror::Throttler<>(
+ cct, "rbd_mirror_concurrent_image_syncs"));
m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
m_local_ioctx, m_threads->work_queue, nullptr,
std::shared_ptr<librados::Rados> m_local_cluster;
std::unique_ptr<rbd::mirror::Threads<>> m_threads;
- std::unique_ptr<rbd::mirror::ImageSyncThrottler<>> m_image_sync_throttler;
+ std::unique_ptr<rbd::mirror::Throttler<>> m_image_sync_throttler;
librados::Rados m_remote_cluster;
rbd::mirror::InstanceWatcher<> *m_instance_watcher;
std::string m_local_mirror_uuid = "local mirror uuid";
#include "librbd/io/ReadResult.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ImageSync.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Throttler.h"
void register_test_image_sync() {
}
create_and_open(m_remote_io_ctx, &m_remote_image_ctx);
auto cct = reinterpret_cast<CephContext*>(m_local_io_ctx.cct());
- m_image_sync_throttler = rbd::mirror::ImageSyncThrottler<>::create(cct);
+ m_image_sync_throttler = rbd::mirror::Throttler<>::create(
+ cct, "rbd_mirror_concurrent_image_syncs");
m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
m_local_io_ctx, m_threads->work_queue, nullptr, m_image_sync_throttler);
librbd::ImageCtx *m_remote_image_ctx;
librbd::ImageCtx *m_local_image_ctx;
- rbd::mirror::ImageSyncThrottler<> *m_image_sync_throttler;
+ rbd::mirror::Throttler<> *m_image_sync_throttler;
rbd::mirror::InstanceWatcher<> *m_instance_watcher;
::journal::Journaler *m_remote_journaler;
librbd::journal::MirrorPeerClientMeta m_client_meta;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 SUSE LINUX GmbH
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "test/rbd_mirror/test_mock_fixture.h"
-#include "test/librbd/mock/MockImageCtx.h"
-
-namespace librbd {
-
-namespace {
-
-struct MockTestImageCtx : public librbd::MockImageCtx {
- MockTestImageCtx(librbd::ImageCtx &image_ctx)
- : librbd::MockImageCtx(image_ctx) {
- }
-};
-
-} // anonymous namespace
-
-} // namespace librbd
-
-// template definitions
-#include "tools/rbd_mirror/ImageSyncThrottler.cc"
-
-namespace rbd {
-namespace mirror {
-
-class TestMockImageSyncThrottler : public TestMockFixture {
-public:
- typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
-
-};
-
-TEST_F(TestMockImageSyncThrottler, Single_Sync) {
- MockImageSyncThrottler throttler(g_ceph_context);
- C_SaferCond on_start;
- throttler.start_op("ns", "id", &on_start);
- ASSERT_EQ(0, on_start.wait());
- throttler.finish_op("ns", "id");
-}
-
-TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(2);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- C_SaferCond on_start2;
- throttler.start_op("ns", "id2", &on_start2);
- C_SaferCond on_start3;
- throttler.start_op("ns", "id3", &on_start3);
- C_SaferCond on_start4;
- throttler.start_op("ns", "id4", &on_start4);
-
- ASSERT_EQ(0, on_start2.wait());
- throttler.finish_op("ns", "id2");
- ASSERT_EQ(0, on_start3.wait());
- throttler.finish_op("ns", "id3");
- ASSERT_EQ(0, on_start1.wait());
- throttler.finish_op("ns", "id1");
- ASSERT_EQ(0, on_start4.wait());
- throttler.finish_op("ns", "id4");
-}
-
-TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) {
- MockImageSyncThrottler throttler(g_ceph_context);
- C_SaferCond on_start;
- throttler.start_op("ns", "id", &on_start);
- ASSERT_EQ(0, on_start.wait());
- ASSERT_FALSE(throttler.cancel_op("ns", "id"));
- throttler.finish_op("ns", "id");
-}
-
-TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(1);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- C_SaferCond on_start2;
- throttler.start_op("ns", "id2", &on_start2);
-
- ASSERT_EQ(0, on_start1.wait());
- ASSERT_TRUE(throttler.cancel_op("ns", "id2"));
- ASSERT_EQ(-ECANCELED, on_start2.wait());
- throttler.finish_op("ns", "id1");
-}
-
-TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(1);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- C_SaferCond on_start2;
- throttler.start_op("ns", "id2", &on_start2);
-
- ASSERT_EQ(0, on_start1.wait());
- ASSERT_FALSE(throttler.cancel_op("ns", "id1"));
- throttler.finish_op("ns", "id1");
- ASSERT_EQ(0, on_start2.wait());
- throttler.finish_op("ns", "id2");
-}
-
-TEST_F(TestMockImageSyncThrottler, Duplicate) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(1);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- ASSERT_EQ(0, on_start1.wait());
-
- C_SaferCond on_start2;
- throttler.start_op("ns", "id1", &on_start2);
- ASSERT_EQ(0, on_start2.wait());
-
- C_SaferCond on_start3;
- throttler.start_op("ns", "id2", &on_start3);
- C_SaferCond on_start4;
- throttler.start_op("ns", "id2", &on_start4);
- ASSERT_EQ(-ENOENT, on_start3.wait());
-
- throttler.finish_op("ns", "id1");
- ASSERT_EQ(0, on_start4.wait());
- throttler.finish_op("ns", "id2");
-}
-
-TEST_F(TestMockImageSyncThrottler, Duplicate2) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(2);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- ASSERT_EQ(0, on_start1.wait());
- C_SaferCond on_start2;
- throttler.start_op("ns", "id2", &on_start2);
- ASSERT_EQ(0, on_start2.wait());
-
- C_SaferCond on_start3;
- throttler.start_op("ns", "id3", &on_start3);
- C_SaferCond on_start4;
- throttler.start_op("ns", "id3", &on_start4); // dup
- ASSERT_EQ(-ENOENT, on_start3.wait());
-
- C_SaferCond on_start5;
- throttler.start_op("ns", "id4", &on_start5);
-
- throttler.finish_op("ns", "id1");
- ASSERT_EQ(0, on_start4.wait());
-
- throttler.finish_op("ns", "id2");
- ASSERT_EQ(0, on_start5.wait());
-
- C_SaferCond on_start6;
- throttler.start_op("ns", "id5", &on_start6);
-
- throttler.finish_op("ns", "id3");
- ASSERT_EQ(0, on_start6.wait());
-
- throttler.finish_op("ns", "id4");
- throttler.finish_op("ns", "id5");
-}
-
-TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(2);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- C_SaferCond on_start2;
- throttler.start_op("ns", "id2", &on_start2);
- C_SaferCond on_start3;
- throttler.start_op("ns", "id3", &on_start3);
- C_SaferCond on_start4;
- throttler.start_op("ns", "id4", &on_start4);
- C_SaferCond on_start5;
- throttler.start_op("ns", "id5", &on_start5);
-
- ASSERT_EQ(0, on_start1.wait());
- ASSERT_EQ(0, on_start2.wait());
-
- throttler.set_max_concurrent_syncs(4);
-
- ASSERT_EQ(0, on_start3.wait());
- ASSERT_EQ(0, on_start4.wait());
-
- throttler.finish_op("ns", "id4");
- ASSERT_EQ(0, on_start5.wait());
-
- throttler.finish_op("ns", "id1");
- throttler.finish_op("ns", "id2");
- throttler.finish_op("ns", "id3");
- throttler.finish_op("ns", "id5");
-}
-
-TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(4);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- C_SaferCond on_start2;
- throttler.start_op("ns", "id2", &on_start2);
- C_SaferCond on_start3;
- throttler.start_op("ns", "id3", &on_start3);
- C_SaferCond on_start4;
- throttler.start_op("ns", "id4", &on_start4);
- C_SaferCond on_start5;
- throttler.start_op("ns", "id5", &on_start5);
-
- ASSERT_EQ(0, on_start1.wait());
- ASSERT_EQ(0, on_start2.wait());
- ASSERT_EQ(0, on_start3.wait());
- ASSERT_EQ(0, on_start4.wait());
-
- throttler.set_max_concurrent_syncs(2);
-
- throttler.finish_op("ns", "id1");
- throttler.finish_op("ns", "id2");
- throttler.finish_op("ns", "id3");
-
- ASSERT_EQ(0, on_start5.wait());
-
- throttler.finish_op("ns", "id4");
- throttler.finish_op("ns", "id5");
-}
-
-TEST_F(TestMockImageSyncThrottler, Drain) {
- MockImageSyncThrottler throttler(g_ceph_context);
- throttler.set_max_concurrent_syncs(1);
-
- C_SaferCond on_start1;
- throttler.start_op("ns", "id1", &on_start1);
- C_SaferCond on_start2;
- throttler.start_op("ns", "id2", &on_start2);
-
- ASSERT_EQ(0, on_start1.wait());
- throttler.drain("ns", -ESTALE);
- ASSERT_EQ(-ESTALE, on_start2.wait());
-}
-
-} // namespace mirror
-} // namespace rbd
};
template <>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
- static ImageSyncThrottler* s_instance;
+struct Throttler<librbd::MockTestImageCtx> {
+ static Throttler* s_instance;
- ImageSyncThrottler() {
+ Throttler() {
ceph_assert(s_instance == nullptr);
s_instance = this;
}
- virtual ~ImageSyncThrottler() {
+ virtual ~Throttler() {
ceph_assert(s_instance == this);
s_instance = nullptr;
}
MOCK_METHOD2(drain, void(const std::string &, int));
};
-ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+Throttler<librbd::MockTestImageCtx>* Throttler<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace mirror
} // namespace rbd
class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
public:
- typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+ typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
MockManagedLock mock_managed_lock;
- MockImageSyncThrottler mock_image_sync_throttler;
+ MockThrottler mock_image_sync_throttler;
std::string instance_id1;
std::string instance_id2;
void expect_throttler_drain() {
EXPECT_CALL(mock_image_sync_throttler, drain("", -ESTALE));
- }
+ }
};
TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
static InstanceWatcher* create(
librados::IoCtx &ioctx, ContextWQ* work_queue,
InstanceReplayer<librbd::MockTestImageCtx>* instance_replayer,
- ImageSyncThrottler<librbd::MockTestImageCtx> *image_sync_throttler) {
+ Throttler<librbd::MockTestImageCtx> *image_sync_throttler) {
ceph_assert(s_instance != nullptr);
return s_instance;
}
#include "test/rbd_mirror/test_mock_fixture.h"
#include "test/rbd_mirror/mock/MockContextWQ.h"
#include "test/rbd_mirror/mock/MockSafeTimer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/Throttler.h"
#include "tools/rbd_mirror/LeaderWatcher.h"
#include "tools/rbd_mirror/NamespaceReplayer.h"
#include "tools/rbd_mirror/PoolReplayer.h"
namespace mirror {
template <>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
- static ImageSyncThrottler* s_instance;
+struct Throttler<librbd::MockTestImageCtx> {
+ static Throttler* s_instance;
- static ImageSyncThrottler *create(CephContext *cct) {
+ static Throttler *create(
+ CephContext *cct,
+ const std::string &max_concurrent_ops_config_param_name) {
return s_instance;
}
- ImageSyncThrottler() {
+ Throttler() {
ceph_assert(s_instance == nullptr);
s_instance = this;
}
- virtual ~ImageSyncThrottler() {
+ virtual ~Throttler() {
ceph_assert(s_instance == this);
s_instance = nullptr;
}
MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*));
};
-ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+Throttler<librbd::MockTestImageCtx>* Throttler<librbd::MockTestImageCtx>::s_instance = nullptr;
template <>
struct NamespaceReplayer<librbd::MockTestImageCtx> {
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
Threads<librbd::MockTestImageCtx> *threads,
- ImageSyncThrottler<librbd::MockTestImageCtx> *image_sync_throttler,
+ Throttler<librbd::MockTestImageCtx> *image_sync_throttler,
ServiceDaemon<librbd::MockTestImageCtx> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) {
ceph_assert(s_instances.count(name));
public:
typedef librbd::api::Namespace<librbd::MockTestImageCtx> MockNamespace;
typedef PoolReplayer<librbd::MockTestImageCtx> MockPoolReplayer;
- typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+ typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
typedef NamespaceReplayer<librbd::MockTestImageCtx> MockNamespaceReplayer;
typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
typedef ServiceDaemon<librbd::MockTestImageCtx> MockServiceDaemon;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "test/librbd/mock/MockImageCtx.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public librbd::MockImageCtx {
+ MockTestImageCtx(librbd::ImageCtx &image_ctx)
+ : librbd::MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+
+} // namespace librbd
+
+// template definitions
+#include "tools/rbd_mirror/Throttler.cc"
+
+namespace rbd {
+namespace mirror {
+
+class TestMockThrottler : public TestMockFixture {
+public:
+ typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
+
+};
+
+TEST_F(TestMockThrottler, Single_Sync) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ C_SaferCond on_start;
+ throttler.start_op("ns", "id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+ throttler.finish_op("ns", "id");
+}
+
+TEST_F(TestMockThrottler, Multiple_Syncs) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(2);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id2", &on_start2);
+ C_SaferCond on_start3;
+ throttler.start_op("ns", "id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("ns", "id4", &on_start4);
+
+ ASSERT_EQ(0, on_start2.wait());
+ throttler.finish_op("ns", "id2");
+ ASSERT_EQ(0, on_start3.wait());
+ throttler.finish_op("ns", "id3");
+ ASSERT_EQ(0, on_start1.wait());
+ throttler.finish_op("ns", "id1");
+ ASSERT_EQ(0, on_start4.wait());
+ throttler.finish_op("ns", "id4");
+}
+
+TEST_F(TestMockThrottler, Cancel_Running_Sync) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ C_SaferCond on_start;
+ throttler.start_op("ns", "id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+ ASSERT_FALSE(throttler.cancel_op("ns", "id"));
+ throttler.finish_op("ns", "id");
+}
+
+TEST_F(TestMockThrottler, Cancel_Waiting_Sync) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(1);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id2", &on_start2);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_TRUE(throttler.cancel_op("ns", "id2"));
+ ASSERT_EQ(-ECANCELED, on_start2.wait());
+ throttler.finish_op("ns", "id1");
+}
+
+TEST_F(TestMockThrottler, Cancel_Running_Sync_Start_Waiting) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(1);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id2", &on_start2);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_FALSE(throttler.cancel_op("ns", "id1"));
+ throttler.finish_op("ns", "id1");
+ ASSERT_EQ(0, on_start2.wait());
+ throttler.finish_op("ns", "id2");
+}
+
+TEST_F(TestMockThrottler, Duplicate) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(1);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ ASSERT_EQ(0, on_start1.wait());
+
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id1", &on_start2);
+ ASSERT_EQ(0, on_start2.wait());
+
+ C_SaferCond on_start3;
+ throttler.start_op("ns", "id2", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("ns", "id2", &on_start4);
+ ASSERT_EQ(-ENOENT, on_start3.wait());
+
+ throttler.finish_op("ns", "id1");
+ ASSERT_EQ(0, on_start4.wait());
+ throttler.finish_op("ns", "id2");
+}
+
+TEST_F(TestMockThrottler, Duplicate2) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(2);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ ASSERT_EQ(0, on_start1.wait());
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id2", &on_start2);
+ ASSERT_EQ(0, on_start2.wait());
+
+ C_SaferCond on_start3;
+ throttler.start_op("ns", "id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("ns", "id3", &on_start4); // dup
+ ASSERT_EQ(-ENOENT, on_start3.wait());
+
+ C_SaferCond on_start5;
+ throttler.start_op("ns", "id4", &on_start5);
+
+ throttler.finish_op("ns", "id1");
+ ASSERT_EQ(0, on_start4.wait());
+
+ throttler.finish_op("ns", "id2");
+ ASSERT_EQ(0, on_start5.wait());
+
+ C_SaferCond on_start6;
+ throttler.start_op("ns", "id5", &on_start6);
+
+ throttler.finish_op("ns", "id3");
+ ASSERT_EQ(0, on_start6.wait());
+
+ throttler.finish_op("ns", "id4");
+ throttler.finish_op("ns", "id5");
+}
+
+TEST_F(TestMockThrottler, Increase_Max_Concurrent_Syncs) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(2);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id2", &on_start2);
+ C_SaferCond on_start3;
+ throttler.start_op("ns", "id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("ns", "id4", &on_start4);
+ C_SaferCond on_start5;
+ throttler.start_op("ns", "id5", &on_start5);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_EQ(0, on_start2.wait());
+
+ throttler.set_max_concurrent_ops(4);
+
+ ASSERT_EQ(0, on_start3.wait());
+ ASSERT_EQ(0, on_start4.wait());
+
+ throttler.finish_op("ns", "id4");
+ ASSERT_EQ(0, on_start5.wait());
+
+ throttler.finish_op("ns", "id1");
+ throttler.finish_op("ns", "id2");
+ throttler.finish_op("ns", "id3");
+ throttler.finish_op("ns", "id5");
+}
+
+TEST_F(TestMockThrottler, Decrease_Max_Concurrent_Syncs) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(4);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id2", &on_start2);
+ C_SaferCond on_start3;
+ throttler.start_op("ns", "id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("ns", "id4", &on_start4);
+ C_SaferCond on_start5;
+ throttler.start_op("ns", "id5", &on_start5);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_EQ(0, on_start2.wait());
+ ASSERT_EQ(0, on_start3.wait());
+ ASSERT_EQ(0, on_start4.wait());
+
+ throttler.set_max_concurrent_ops(2);
+
+ throttler.finish_op("ns", "id1");
+ throttler.finish_op("ns", "id2");
+ throttler.finish_op("ns", "id3");
+
+ ASSERT_EQ(0, on_start5.wait());
+
+ throttler.finish_op("ns", "id4");
+ throttler.finish_op("ns", "id5");
+}
+
+TEST_F(TestMockThrottler, Drain) {
+ MockThrottler throttler(g_ceph_context, "rbd_mirror_concurrent_image_syncs");
+ throttler.set_max_concurrent_ops(1);
+
+ C_SaferCond on_start1;
+ throttler.start_op("ns", "id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("ns", "id2", &on_start2);
+
+ ASSERT_EQ(0, on_start1.wait());
+ throttler.drain("ns", -ESTALE);
+ ASSERT_EQ(-ESTALE, on_start2.wait());
+}
+
+} // namespace mirror
+} // namespace rbd
ImageMap.cc
ImageReplayer.cc
ImageSync.cc
- ImageSyncThrottler.cc
InstanceReplayer.cc
InstanceWatcher.cc
Instances.cc
PoolWatcher.cc
ServiceDaemon.cc
Threads.cc
+ Throttler.cc
Types.cc
image_deleter/RemoveRequest.cc
image_deleter/SnapshotPurgeRequest.cc
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 SUSE LINUX GmbH
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "ImageSyncThrottler.h"
-#include "common/Formatter.h"
-#include "common/debug.h"
-#include "common/errno.h"
-#include "librbd/Utils.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
- << " " << __func__ << ": "
-
-namespace rbd {
-namespace mirror {
-
-template <typename I>
-ImageSyncThrottler<I>::ImageSyncThrottler(CephContext *cct)
- : m_cct(cct),
- m_lock(ceph::make_mutex(
- librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
- this))),
- m_max_concurrent_syncs(cct->_conf.get_val<uint64_t>(
- "rbd_mirror_concurrent_image_syncs")) {
- dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
- m_cct->_conf.add_observer(this);
-}
-
-template <typename I>
-ImageSyncThrottler<I>::~ImageSyncThrottler() {
- m_cct->_conf.remove_observer(this);
-
- std::lock_guard locker{m_lock};
- ceph_assert(m_inflight_ops.empty());
- ceph_assert(m_queue.empty());
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::start_op(const std::string &ns,
- const std::string &id_,
- Context *on_start) {
- Id id{ns, id_};
-
- dout(20) << "id=" << id << dendl;
-
- int r = 0;
- {
- std::lock_guard locker{m_lock};
-
- if (m_inflight_ops.count(id) > 0) {
- dout(20) << "duplicate for already started op " << id << dendl;
- } else if (m_queued_ops.count(id) > 0) {
- dout(20) << "duplicate for already queued op " << id << dendl;
- std::swap(m_queued_ops[id], on_start);
- r = -ENOENT;
- } else if (m_max_concurrent_syncs == 0 ||
- m_inflight_ops.size() < m_max_concurrent_syncs) {
- ceph_assert(m_queue.empty());
- m_inflight_ops.insert(id);
- dout(20) << "ready to start sync for " << id << " ["
- << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
- << dendl;
- } else {
- m_queue.push_back(id);
- std::swap(m_queued_ops[id], on_start);
- dout(20) << "image sync for " << id << " has been queued" << dendl;
- }
- }
-
- if (on_start != nullptr) {
- on_start->complete(r);
- }
-}
-
-template <typename I>
-bool ImageSyncThrottler<I>::cancel_op(const std::string &ns,
- const std::string &id_) {
- Id id{ns, id_};
-
- dout(20) << "id=" << id << dendl;
-
- Context *on_start = nullptr;
- {
- std::lock_guard locker{m_lock};
- auto it = m_queued_ops.find(id);
- if (it != m_queued_ops.end()) {
- dout(20) << "canceled queued sync for " << id << dendl;
- m_queue.remove(id);
- on_start = it->second;
- m_queued_ops.erase(it);
- }
- }
-
- if (on_start == nullptr) {
- return false;
- }
-
- on_start->complete(-ECANCELED);
- return true;
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::finish_op(const std::string &ns,
- const std::string &id_) {
- Id id{ns, id_};
-
- dout(20) << "id=" << id << dendl;
-
- if (cancel_op(ns, id_)) {
- return;
- }
-
- Context *on_start = nullptr;
- {
- std::lock_guard locker{m_lock};
-
- m_inflight_ops.erase(id);
-
- if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
- auto id = m_queue.front();
- auto it = m_queued_ops.find(id);
- ceph_assert(it != m_queued_ops.end());
- m_inflight_ops.insert(id);
- dout(20) << "ready to start sync for " << id << " ["
- << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
- << dendl;
- on_start = it->second;
- m_queued_ops.erase(it);
- m_queue.pop_front();
- }
- }
-
- if (on_start != nullptr) {
- on_start->complete(0);
- }
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::drain(const std::string &ns, int r) {
- dout(20) << "ns=" << ns << dendl;
-
- std::map<Id, Context *> queued_ops;
- {
- std::lock_guard locker{m_lock};
- for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) {
- if (it->first.first == ns) {
- queued_ops[it->first] = it->second;
- m_queue.remove(it->first);
- it = m_queued_ops.erase(it);
- } else {
- it++;
- }
- }
- for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) {
- if (it->first == ns) {
- dout(20) << "inflight_op " << *it << dendl;
- it = m_inflight_ops.erase(it);
- } else {
- it++;
- }
- }
- }
-
- for (auto &it : queued_ops) {
- dout(20) << "queued_op " << it.first << dendl;
- it.second->complete(r);
- }
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
- dout(20) << "max=" << max << dendl;
-
- std::list<Context *> ops;
- {
- std::lock_guard locker{m_lock};
- m_max_concurrent_syncs = max;
-
- // Start waiting ops in the case of available free slots
- while ((m_max_concurrent_syncs == 0 ||
- m_inflight_ops.size() < m_max_concurrent_syncs) &&
- !m_queue.empty()) {
- auto id = m_queue.front();
- m_inflight_ops.insert(id);
- dout(20) << "ready to start sync for " << id << " ["
- << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
- << dendl;
- auto it = m_queued_ops.find(id);
- ceph_assert(it != m_queued_ops.end());
- ops.push_back(it->second);
- m_queued_ops.erase(it);
- m_queue.pop_front();
- }
- }
-
- for (const auto& ctx : ops) {
- ctx->complete(0);
- }
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::print_status(ceph::Formatter *f, std::stringstream *ss) {
- dout(20) << dendl;
-
- std::lock_guard locker{m_lock};
-
- if (f) {
- f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
- f->dump_int("running_syncs", m_inflight_ops.size());
- f->dump_int("waiting_syncs", m_queue.size());
- f->flush(*ss);
- } else {
- *ss << "[ ";
- *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
- *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
- *ss << "waiting_syncs=" << m_queue.size() << " ]";
- }
-}
-
-template <typename I>
-const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
- static const char* KEYS[] = {
- "rbd_mirror_concurrent_image_syncs",
- NULL
- };
- return KEYS;
-}
-
-template <typename I>
-void ImageSyncThrottler<I>::handle_conf_change(const ConfigProxy& conf,
- const set<string> &changed) {
- if (changed.count("rbd_mirror_concurrent_image_syncs")) {
- set_max_concurrent_syncs(conf.get_val<uint64_t>("rbd_mirror_concurrent_image_syncs"));
- }
-}
-
-} // namespace mirror
-} // namespace rbd
-
-template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
-#define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
-
-#include <list>
-#include <map>
-#include <set>
-#include <sstream>
-#include <string>
-#include <utility>
-
-#include "common/ceph_mutex.h"
-#include "common/config_obs.h"
-
-class CephContext;
-class Context;
-
-namespace ceph { class Formatter; }
-namespace librbd { class ImageCtx; }
-
-namespace rbd {
-namespace mirror {
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class ImageSyncThrottler : public md_config_obs_t {
-public:
- static ImageSyncThrottler *create(CephContext *cct) {
- return new ImageSyncThrottler(cct);
- }
- void destroy() {
- delete this;
- }
-
- ImageSyncThrottler(CephContext *cct);
- ~ImageSyncThrottler() override;
-
- void set_max_concurrent_syncs(uint32_t max);
- void start_op(const std::string &ns, const std::string &id,
- Context *on_start);
- bool cancel_op(const std::string &ns, const std::string &id);
- void finish_op(const std::string &ns, const std::string &id);
- void drain(const std::string &ns, int r);
-
- void print_status(ceph::Formatter *f, std::stringstream *ss);
-
-private:
- typedef std::pair<std::string, std::string> Id;
-
- CephContext *m_cct;
- ceph::mutex m_lock;
- uint32_t m_max_concurrent_syncs;
- std::list<Id> m_queue;
- std::map<Id, Context *> m_queued_ops;
- std::set<Id> m_inflight_ops;
-
- const char **get_tracked_conf_keys() const override;
- void handle_conf_change(const ConfigProxy& conf,
- const std::set<std::string> &changed) override;
-};
-
-} // namespace mirror
-} // namespace rbd
-
-extern template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
-
-#endif // RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
#include "librbd/ManagedLock.h"
#include "librbd/Utils.h"
#include "InstanceReplayer.h"
-#include "ImageSyncThrottler.h"
+#include "Throttler.h"
#include "common/Cond.h"
#define dout_context g_ceph_context
InstanceWatcher<I> *InstanceWatcher<I>::create(
librados::IoCtx &io_ctx, ContextWQ *work_queue,
InstanceReplayer<I> *instance_replayer,
- ImageSyncThrottler<I> *image_sync_throttler) {
+ Throttler<I> *image_sync_throttler) {
return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
image_sync_throttler,
stringify(io_ctx.get_instance_id()));
InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
ContextWQ *work_queue,
InstanceReplayer<I> *instance_replayer,
- ImageSyncThrottler<I> *image_sync_throttler,
+ Throttler<I> *image_sync_throttler,
const std::string &instance_id)
: Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
m_instance_replayer(instance_replayer),
namespace rbd {
namespace mirror {
-template <typename> class ImageSyncThrottler;
template <typename> class InstanceReplayer;
+template <typename> class Throttler;
template <typename> struct Threads;
template <typename ImageCtxT = librbd::ImageCtx>
static InstanceWatcher *create(
librados::IoCtx &io_ctx, ContextWQ *work_queue,
InstanceReplayer<ImageCtxT> *instance_replayer,
- ImageSyncThrottler<ImageCtxT> *image_sync_throttler);
+ Throttler<ImageCtxT> *image_sync_throttler);
void destroy() {
delete this;
}
InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
InstanceReplayer<ImageCtxT> *instance_replayer,
- ImageSyncThrottler<ImageCtxT> *image_sync_throttler,
+ Throttler<ImageCtxT> *image_sync_throttler,
const std::string &instance_id);
~InstanceWatcher() override;
Threads<ImageCtxT> *m_threads;
InstanceReplayer<ImageCtxT> *m_instance_replayer;
- ImageSyncThrottler<ImageCtxT> *m_image_sync_throttler;
+ Throttler<ImageCtxT> *m_image_sync_throttler;
std::string m_instance_id;
mutable ceph::mutex m_lock;
const std::string &name,
librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid,
- Threads<I> *threads, ImageSyncThrottler<I> *image_sync_throttler,
+ Threads<I> *threads, Throttler<I> *image_sync_throttler,
ServiceDaemon<I> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) :
m_local_mirror_uuid(local_mirror_uuid),
namespace rbd {
namespace mirror {
-template <typename> class ImageSyncThrottler;
template <typename> class ServiceDaemon;
+template <typename> class Throttler;
template <typename> struct Threads;
/**
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
Threads<ImageCtxT> *threads,
- ImageSyncThrottler<ImageCtxT> *image_sync_throttler,
+ Throttler<ImageCtxT> *image_sync_throttler,
ServiceDaemon<ImageCtxT> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) {
return new NamespaceReplayer(name, local_ioctx, remote_ioctx,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
Threads<ImageCtxT> *threads,
- ImageSyncThrottler<ImageCtxT> *image_sync_throttler,
+ Throttler<ImageCtxT> *image_sync_throttler,
ServiceDaemon<ImageCtxT> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler);
NamespaceReplayer(const NamespaceReplayer&) = delete;
std::string m_local_mirror_uuid;
std::string m_remote_mirror_uuid;
Threads<ImageCtxT> *m_threads;
- ImageSyncThrottler<ImageCtxT> *m_image_sync_throttler;
+ Throttler<ImageCtxT> *m_image_sync_throttler;
ServiceDaemon<ImageCtxT> *m_service_daemon;
journal::CacheManagerHandler *m_cache_manager_handler;
dout(10) << "connected to " << m_peer << dendl;
- m_image_sync_throttler.reset(ImageSyncThrottler<I>::create(cct));
+ m_image_sync_throttler.reset(
+ Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs"));
m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
"", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
#include "include/rados/librados.hpp"
#include "librbd/Utils.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/LeaderWatcher.h"
#include "tools/rbd_mirror/NamespaceReplayer.h"
+#include "tools/rbd_mirror/Throttler.h"
#include "tools/rbd_mirror/Types.h"
#include "tools/rbd_mirror/leader_watcher/Types.h"
#include "tools/rbd_mirror/service_daemon/Types.h"
} m_leader_listener;
std::unique_ptr<LeaderWatcher<ImageCtxT>> m_leader_watcher;
- std::unique_ptr<ImageSyncThrottler<ImageCtxT>> m_image_sync_throttler;
+ std::unique_ptr<Throttler<ImageCtxT>> m_image_sync_throttler;
};
} // namespace mirror
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Throttler.h"
+#include "common/Formatter.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::Throttler:: " << this \
+ << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+template <typename I>
+Throttler<I>::Throttler(CephContext *cct, const std::string &config_key)
+ : m_cct(cct), m_config_key(config_key),
+ m_config_keys{m_config_key.c_str(), nullptr},
+ m_lock(ceph::make_mutex(
+ librbd::util::unique_lock_name("rbd::mirror::Throttler", this))),
+ m_max_concurrent_ops(cct->_conf.get_val<uint64_t>(m_config_key)) {
+ dout(20) << m_config_key << "=" << m_max_concurrent_ops << dendl;
+ m_cct->_conf.add_observer(this);
+}
+
+template <typename I>
+Throttler<I>::~Throttler() {
+ m_cct->_conf.remove_observer(this);
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_inflight_ops.empty());
+ ceph_assert(m_queue.empty());
+}
+
+template <typename I>
+void Throttler<I>::start_op(const std::string &ns,
+ const std::string &id_,
+ Context *on_start) {
+ Id id{ns, id_};
+
+ dout(20) << "id=" << id << dendl;
+
+ int r = 0;
+ {
+ std::lock_guard locker{m_lock};
+
+ if (m_inflight_ops.count(id) > 0) {
+ dout(20) << "duplicate for already started op " << id << dendl;
+ } else if (m_queued_ops.count(id) > 0) {
+ dout(20) << "duplicate for already queued op " << id << dendl;
+ std::swap(m_queued_ops[id], on_start);
+ r = -ENOENT;
+ } else if (m_max_concurrent_ops == 0 ||
+ m_inflight_ops.size() < m_max_concurrent_ops) {
+ ceph_assert(m_queue.empty());
+ m_inflight_ops.insert(id);
+ dout(20) << "ready to start op for " << id << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
+ << dendl;
+ } else {
+ m_queue.push_back(id);
+ std::swap(m_queued_ops[id], on_start);
+ dout(20) << "op for " << id << " has been queued" << dendl;
+ }
+ }
+
+ if (on_start != nullptr) {
+ on_start->complete(r);
+ }
+}
+
+template <typename I>
+bool Throttler<I>::cancel_op(const std::string &ns,
+ const std::string &id_) {
+ Id id{ns, id_};
+
+ dout(20) << "id=" << id << dendl;
+
+ Context *on_start = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ auto it = m_queued_ops.find(id);
+ if (it != m_queued_ops.end()) {
+ dout(20) << "canceled queued op for " << id << dendl;
+ m_queue.remove(id);
+ on_start = it->second;
+ m_queued_ops.erase(it);
+ }
+ }
+
+ if (on_start == nullptr) {
+ return false;
+ }
+
+ on_start->complete(-ECANCELED);
+ return true;
+}
+
+template <typename I>
+void Throttler<I>::finish_op(const std::string &ns,
+ const std::string &id_) {
+ Id id{ns, id_};
+
+ dout(20) << "id=" << id << dendl;
+
+ if (cancel_op(ns, id_)) {
+ return;
+ }
+
+ Context *on_start = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+
+ m_inflight_ops.erase(id);
+
+ if (m_inflight_ops.size() < m_max_concurrent_ops && !m_queue.empty()) {
+ auto id = m_queue.front();
+ auto it = m_queued_ops.find(id);
+ ceph_assert(it != m_queued_ops.end());
+ m_inflight_ops.insert(id);
+ dout(20) << "ready to start op for " << id << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
+ << dendl;
+ on_start = it->second;
+ m_queued_ops.erase(it);
+ m_queue.pop_front();
+ }
+ }
+
+ if (on_start != nullptr) {
+ on_start->complete(0);
+ }
+}
+
+template <typename I>
+void Throttler<I>::drain(const std::string &ns, int r) {
+ dout(20) << "ns=" << ns << dendl;
+
+ std::map<Id, Context *> queued_ops;
+ {
+ std::lock_guard locker{m_lock};
+ for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) {
+ if (it->first.first == ns) {
+ queued_ops[it->first] = it->second;
+ m_queue.remove(it->first);
+ it = m_queued_ops.erase(it);
+ } else {
+ it++;
+ }
+ }
+ for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) {
+ if (it->first == ns) {
+ dout(20) << "inflight_op " << *it << dendl;
+ it = m_inflight_ops.erase(it);
+ } else {
+ it++;
+ }
+ }
+ }
+
+ for (auto &it : queued_ops) {
+ dout(20) << "queued_op " << it.first << dendl;
+ it.second->complete(r);
+ }
+}
+
+template <typename I>
+void Throttler<I>::set_max_concurrent_ops(uint32_t max) {
+ dout(20) << "max=" << max << dendl;
+
+ std::list<Context *> ops;
+ {
+ std::lock_guard locker{m_lock};
+ m_max_concurrent_ops = max;
+
+ // Start waiting ops in the case of available free slots
+ while ((m_max_concurrent_ops == 0 ||
+ m_inflight_ops.size() < m_max_concurrent_ops) &&
+ !m_queue.empty()) {
+ auto id = m_queue.front();
+ m_inflight_ops.insert(id);
+ dout(20) << "ready to start op for " << id << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
+ << dendl;
+ auto it = m_queued_ops.find(id);
+ ceph_assert(it != m_queued_ops.end());
+ ops.push_back(it->second);
+ m_queued_ops.erase(it);
+ m_queue.pop_front();
+ }
+ }
+
+ for (const auto& ctx : ops) {
+ ctx->complete(0);
+ }
+}
+
+template <typename I>
+void Throttler<I>::print_status(ceph::Formatter *f, std::stringstream *ss) {
+ dout(20) << dendl;
+
+ std::lock_guard locker{m_lock};
+
+ if (f) {
+ f->dump_int("max_parallel_requests", m_max_concurrent_ops);
+ f->dump_int("running_requests", m_inflight_ops.size());
+ f->dump_int("waiting_requests", m_queue.size());
+ f->flush(*ss);
+ } else {
+ *ss << "[ ";
+ *ss << "max_parallel_requests=" << m_max_concurrent_ops << ", ";
+ *ss << "running_requests=" << m_inflight_ops.size() << ", ";
+ *ss << "waiting_requests=" << m_queue.size() << " ]";
+ }
+}
+
+template <typename I>
+const char** Throttler<I>::get_tracked_conf_keys() const {
+ return m_config_keys;
+}
+
+template <typename I>
+void Throttler<I>::handle_conf_change(const ConfigProxy& conf,
+ const set<string> &changed) {
+ if (changed.count(m_config_key)) {
+ set_max_concurrent_ops(conf.get_val<uint64_t>(m_config_key));
+ }
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::Throttler<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_THROTTLER_H
+#define RBD_MIRROR_THROTTLER_H
+
+#include <list>
+#include <map>
+#include <set>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "common/ceph_mutex.h"
+#include "common/config_obs.h"
+
+class CephContext;
+class Context;
+
+namespace ceph { class Formatter; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class Throttler : public md_config_obs_t {
+public:
+ static Throttler *create(
+ CephContext *cct,
+ const std::string &config_key) {
+ return new Throttler(cct, config_key);
+ }
+ void destroy() {
+ delete this;
+ }
+
+ Throttler(CephContext *cct,
+ const std::string &config_key);
+ ~Throttler() override;
+
+ void set_max_concurrent_ops(uint32_t max);
+ void start_op(const std::string &ns, const std::string &id,
+ Context *on_start);
+ bool cancel_op(const std::string &ns, const std::string &id);
+ void finish_op(const std::string &ns, const std::string &id);
+ void drain(const std::string &ns, int r);
+
+ void print_status(ceph::Formatter *f, std::stringstream *ss);
+
+private:
+ typedef std::pair<std::string, std::string> Id;
+
+ CephContext *m_cct;
+ const std::string m_config_key;
+ mutable const char* m_config_keys[2];
+
+ ceph::mutex m_lock;
+ uint32_t m_max_concurrent_ops;
+ std::list<Id> m_queue;
+ std::map<Id, Context *> m_queued_ops;
+ std::set<Id> m_inflight_ops;
+
+ const char **get_tracked_conf_keys() const override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) override;
+};
+
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::Throttler<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_THROTTLER_H