From 478a0cac10705a62ac562209a08374d17cb1d92c Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Thu, 1 Aug 2019 07:49:34 +0100 Subject: [PATCH] rbd-mirror: add namespace support http://tracker.ceph.com/issues/37529 Signed-off-by: Mykola Golub --- src/test/rbd_mirror/CMakeLists.txt | 1 + src/test/rbd_mirror/test_ImageReplayer.cc | 22 +- src/test/rbd_mirror/test_ImageSync.cc | 12 +- src/test/rbd_mirror/test_InstanceWatcher.cc | 4 +- .../rbd_mirror/test_mock_ImageReplayer.cc | 5 +- .../test_mock_ImageSyncThrottler.cc | 134 +-- .../rbd_mirror/test_mock_InstanceReplayer.cc | 25 +- .../rbd_mirror/test_mock_InstanceWatcher.cc | 111 ++- .../rbd_mirror/test_mock_LeaderWatcher.cc | 82 -- .../rbd_mirror/test_mock_NamespaceReplayer.cc | 575 +++++++++++++ src/test/rbd_mirror/test_mock_PoolReplayer.cc | 711 +++++++++++----- src/tools/rbd_mirror/CMakeLists.txt | 1 + src/tools/rbd_mirror/ImageReplayer.cc | 78 +- src/tools/rbd_mirror/ImageReplayer.h | 34 +- src/tools/rbd_mirror/ImageSyncThrottler.cc | 45 +- src/tools/rbd_mirror/ImageSyncThrottler.h | 17 +- src/tools/rbd_mirror/InstanceReplayer.cc | 56 +- src/tools/rbd_mirror/InstanceReplayer.h | 27 +- src/tools/rbd_mirror/InstanceWatcher.cc | 38 +- src/tools/rbd_mirror/InstanceWatcher.h | 12 +- src/tools/rbd_mirror/LeaderWatcher.cc | 87 +- src/tools/rbd_mirror/LeaderWatcher.h | 15 +- src/tools/rbd_mirror/NamespaceReplayer.cc | 781 ++++++++++++++++++ src/tools/rbd_mirror/NamespaceReplayer.h | 285 +++++++ src/tools/rbd_mirror/PoolReplayer.cc | 776 ++++++++--------- src/tools/rbd_mirror/PoolReplayer.h | 237 +++--- 26 files changed, 2942 insertions(+), 1229 deletions(-) create mode 100644 src/test/rbd_mirror/test_mock_NamespaceReplayer.cc create mode 100644 src/tools/rbd_mirror/NamespaceReplayer.cc create mode 100644 src/tools/rbd_mirror/NamespaceReplayer.h diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index 2af8680b2f2d2..4ea85e4d249e3 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable(unittest_rbd_mirror test_mock_InstanceReplayer.cc test_mock_InstanceWatcher.cc test_mock_LeaderWatcher.cc + test_mock_NamespaceReplayer.cc test_mock_PoolReplayer.cc test_mock_PoolWatcher.cc image_deleter/test_mock_RemoveRequest.cc diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index f48ac29436fda..7d85d3c568bb8 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -35,8 +35,8 @@ #include "librbd/io/ImageRequestWQ.h" #include "librbd/io/ReadResult.h" #include "tools/rbd_mirror/ImageReplayer.h" +#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/InstanceWatcher.h" -#include "tools/rbd_mirror/ServiceDaemon.h" #include "tools/rbd_mirror/Threads.h" #include "tools/rbd_mirror/Types.h" @@ -117,15 +117,14 @@ public: m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name); m_global_image_id = get_global_image_id(m_remote_ioctx, m_remote_image_id); - m_threads.reset(new rbd::mirror::Threads<>(reinterpret_cast( - m_local_ioctx.cct()))); + auto cct = reinterpret_cast(m_local_ioctx.cct()); + m_threads.reset(new rbd::mirror::Threads<>(cct)); - m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context, - m_local_cluster, - m_threads.get())); + m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>(cct)); m_instance_watcher = rbd::mirror::InstanceWatcher<>::create( - m_local_ioctx, m_threads->work_queue, nullptr); + m_local_ioctx, m_threads->work_queue, nullptr, + m_image_sync_throttler.get()); m_instance_watcher->handle_acquire_leader(); } @@ -144,10 +143,9 @@ public: template > void create_replayer() { - m_replayer = new ImageReplayerT( - m_threads.get(), m_instance_watcher, nullptr, - rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), - m_local_mirror_uuid, m_local_ioctx.get_id(), m_global_image_id); + m_replayer = new ImageReplayerT(m_local_ioctx, m_local_mirror_uuid, + m_global_image_id, m_threads.get(), + m_instance_watcher, nullptr); m_replayer->add_peer("peer uuid", m_remote_ioctx); } @@ -379,7 +377,7 @@ public: std::shared_ptr m_local_cluster; std::unique_ptr> m_threads; - std::unique_ptr> m_service_daemon; + std::unique_ptr> m_image_sync_throttler; librados::Rados m_remote_cluster; rbd::mirror::InstanceWatcher<> *m_instance_watcher; std::string m_local_mirror_uuid = "local mirror uuid"; diff --git a/src/test/rbd_mirror/test_ImageSync.cc b/src/test/rbd_mirror/test_ImageSync.cc index e46005c3a776f..39feffe89f5a0 100644 --- a/src/test/rbd_mirror/test_ImageSync.cc +++ b/src/test/rbd_mirror/test_ImageSync.cc @@ -18,6 +18,7 @@ #include "librbd/io/ReadResult.h" #include "librbd/journal/Types.h" #include "tools/rbd_mirror/ImageSync.h" +#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" @@ -72,8 +73,11 @@ public: create_and_open(m_local_io_ctx, &m_local_image_ctx); create_and_open(m_remote_io_ctx, &m_remote_image_ctx); + auto cct = reinterpret_cast(m_local_io_ctx.cct()); + m_image_sync_throttler = rbd::mirror::ImageSyncThrottler<>::create(cct); + m_instance_watcher = rbd::mirror::InstanceWatcher<>::create( - m_local_io_ctx, m_threads->work_queue, nullptr); + m_local_io_ctx, m_threads->work_queue, nullptr, m_image_sync_throttler); m_instance_watcher->handle_acquire_leader(); m_remote_journaler = new ::journal::Journaler( @@ -90,12 +94,13 @@ public: } void TearDown() override { - TestFixture::TearDown(); - m_instance_watcher->handle_release_leader(); delete m_remote_journaler; delete m_instance_watcher; + delete m_image_sync_throttler; + + TestFixture::TearDown(); } void create_and_open(librados::IoCtx &io_ctx, librbd::ImageCtx **image_ctx) { @@ -121,6 +126,7 @@ public: librbd::ImageCtx *m_remote_image_ctx; librbd::ImageCtx *m_local_image_ctx; + rbd::mirror::ImageSyncThrottler<> *m_image_sync_throttler; rbd::mirror::InstanceWatcher<> *m_instance_watcher; ::journal::Journaler *m_remote_journaler; librbd::journal::MirrorPeerClientMeta m_client_meta; diff --git a/src/test/rbd_mirror/test_InstanceWatcher.cc b/src/test/rbd_mirror/test_InstanceWatcher.cc index 92a8f94360b3b..ba19dc1f41e81 100644 --- a/src/test/rbd_mirror/test_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_InstanceWatcher.cc @@ -45,7 +45,7 @@ public: TEST_F(TestInstanceWatcher, InitShutdown) { InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue, - nullptr, m_instance_id); + nullptr, nullptr, m_instance_id); std::vector instance_ids; get_instances(&instance_ids); ASSERT_EQ(0U, instance_ids.size()); @@ -94,7 +94,7 @@ TEST_F(TestInstanceWatcher, Remove) ASSERT_EQ("", connect_cluster_pp(cluster)); ASSERT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx)); InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue, - nullptr, "instance_id"); + nullptr, nullptr, "instance_id"); // Init ASSERT_EQ(0, instance_watcher.init()); diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index 4712774302635..dfb1fdbcdf80e 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -603,9 +603,8 @@ public: void create_image_replayer(MockThreads &mock_threads) { m_image_replayer = new MockImageReplayer( - &mock_threads, &m_instance_watcher, nullptr, - rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)), - "local_mirror_uuid", m_local_io_ctx.get_id(), "global image id"); + m_local_io_ctx, "local_mirror_uuid", "global image id", + &mock_threads, &m_instance_watcher, nullptr); m_image_replayer->add_peer("peer_uuid", m_remote_io_ctx); } diff --git a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc index af88edcba5a6e..1189e03b48f2e 100644 --- a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc +++ b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc @@ -44,9 +44,9 @@ public: TEST_F(TestMockImageSyncThrottler, Single_Sync) { MockImageSyncThrottler throttler(g_ceph_context); C_SaferCond on_start; - throttler.start_op("id", &on_start); + throttler.start_op("ns", "id", &on_start); ASSERT_EQ(0, on_start.wait()); - throttler.finish_op("id"); + throttler.finish_op("ns", "id"); } TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) { @@ -54,31 +54,31 @@ TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) { throttler.set_max_concurrent_syncs(2); C_SaferCond on_start1; - throttler.start_op("id1", &on_start1); + throttler.start_op("ns", "id1", &on_start1); C_SaferCond on_start2; - throttler.start_op("id2", &on_start2); + throttler.start_op("ns", "id2", &on_start2); C_SaferCond on_start3; - throttler.start_op("id3", &on_start3); + throttler.start_op("ns", "id3", &on_start3); C_SaferCond on_start4; - throttler.start_op("id4", &on_start4); + throttler.start_op("ns", "id4", &on_start4); ASSERT_EQ(0, on_start2.wait()); - throttler.finish_op("id2"); + throttler.finish_op("ns", "id2"); ASSERT_EQ(0, on_start3.wait()); - throttler.finish_op("id3"); + throttler.finish_op("ns", "id3"); ASSERT_EQ(0, on_start1.wait()); - throttler.finish_op("id1"); + throttler.finish_op("ns", "id1"); ASSERT_EQ(0, on_start4.wait()); - throttler.finish_op("id4"); + throttler.finish_op("ns", "id4"); } TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) { MockImageSyncThrottler throttler(g_ceph_context); C_SaferCond on_start; - throttler.start_op("id", &on_start); + throttler.start_op("ns", "id", &on_start); ASSERT_EQ(0, on_start.wait()); - ASSERT_FALSE(throttler.cancel_op("id")); - throttler.finish_op("id"); + ASSERT_FALSE(throttler.cancel_op("ns", "id")); + throttler.finish_op("ns", "id"); } TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) { @@ -86,31 +86,30 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) { throttler.set_max_concurrent_syncs(1); C_SaferCond on_start1; - throttler.start_op("id1", &on_start1); + throttler.start_op("ns", "id1", &on_start1); C_SaferCond on_start2; - throttler.start_op("id2", &on_start2); + throttler.start_op("ns", "id2", &on_start2); ASSERT_EQ(0, on_start1.wait()); - ASSERT_TRUE(throttler.cancel_op("id2")); + ASSERT_TRUE(throttler.cancel_op("ns", "id2")); ASSERT_EQ(-ECANCELED, on_start2.wait()); - throttler.finish_op("id1"); + throttler.finish_op("ns", "id1"); } - TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) { MockImageSyncThrottler throttler(g_ceph_context); throttler.set_max_concurrent_syncs(1); C_SaferCond on_start1; - throttler.start_op("id1", &on_start1); + throttler.start_op("ns", "id1", &on_start1); C_SaferCond on_start2; - throttler.start_op("id2", &on_start2); + throttler.start_op("ns", "id2", &on_start2); ASSERT_EQ(0, on_start1.wait()); - ASSERT_FALSE(throttler.cancel_op("id1")); - throttler.finish_op("id1"); + ASSERT_FALSE(throttler.cancel_op("ns", "id1")); + throttler.finish_op("ns", "id1"); ASSERT_EQ(0, on_start2.wait()); - throttler.finish_op("id2"); + throttler.finish_op("ns", "id2"); } TEST_F(TestMockImageSyncThrottler, Duplicate) { @@ -118,22 +117,22 @@ TEST_F(TestMockImageSyncThrottler, Duplicate) { throttler.set_max_concurrent_syncs(1); C_SaferCond on_start1; - throttler.start_op("id1", &on_start1); + throttler.start_op("ns", "id1", &on_start1); ASSERT_EQ(0, on_start1.wait()); C_SaferCond on_start2; - throttler.start_op("id1", &on_start2); + throttler.start_op("ns", "id1", &on_start2); ASSERT_EQ(0, on_start2.wait()); C_SaferCond on_start3; - throttler.start_op("id2", &on_start3); + throttler.start_op("ns", "id2", &on_start3); C_SaferCond on_start4; - throttler.start_op("id2", &on_start4); + throttler.start_op("ns", "id2", &on_start4); ASSERT_EQ(-ENOENT, on_start3.wait()); - throttler.finish_op("id1"); + throttler.finish_op("ns", "id1"); ASSERT_EQ(0, on_start4.wait()); - throttler.finish_op("id2"); + throttler.finish_op("ns", "id2"); } TEST_F(TestMockImageSyncThrottler, Duplicate2) { @@ -141,35 +140,35 @@ TEST_F(TestMockImageSyncThrottler, Duplicate2) { throttler.set_max_concurrent_syncs(2); C_SaferCond on_start1; - throttler.start_op("id1", &on_start1); + throttler.start_op("ns", "id1", &on_start1); ASSERT_EQ(0, on_start1.wait()); C_SaferCond on_start2; - throttler.start_op("id2", &on_start2); + throttler.start_op("ns", "id2", &on_start2); ASSERT_EQ(0, on_start2.wait()); C_SaferCond on_start3; - throttler.start_op("id3", &on_start3); + throttler.start_op("ns", "id3", &on_start3); C_SaferCond on_start4; - throttler.start_op("id3", &on_start4); // dup + throttler.start_op("ns", "id3", &on_start4); // dup ASSERT_EQ(-ENOENT, on_start3.wait()); C_SaferCond on_start5; - throttler.start_op("id4", &on_start5); + throttler.start_op("ns", "id4", &on_start5); - throttler.finish_op("id1"); + throttler.finish_op("ns", "id1"); ASSERT_EQ(0, on_start4.wait()); - throttler.finish_op("id2"); + throttler.finish_op("ns", "id2"); ASSERT_EQ(0, on_start5.wait()); C_SaferCond on_start6; - throttler.start_op("id5", &on_start6); + throttler.start_op("ns", "id5", &on_start6); - throttler.finish_op("id3"); + throttler.finish_op("ns", "id3"); ASSERT_EQ(0, on_start6.wait()); - throttler.finish_op("id4"); - throttler.finish_op("id5"); + throttler.finish_op("ns", "id4"); + throttler.finish_op("ns", "id5"); } TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) { @@ -177,15 +176,15 @@ TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) { throttler.set_max_concurrent_syncs(2); C_SaferCond on_start1; - throttler.start_op("id1", &on_start1); + throttler.start_op("ns", "id1", &on_start1); C_SaferCond on_start2; - throttler.start_op("id2", &on_start2); + throttler.start_op("ns", "id2", &on_start2); C_SaferCond on_start3; - throttler.start_op("id3", &on_start3); + throttler.start_op("ns", "id3", &on_start3); C_SaferCond on_start4; - throttler.start_op("id4", &on_start4); + throttler.start_op("ns", "id4", &on_start4); C_SaferCond on_start5; - throttler.start_op("id5", &on_start5); + throttler.start_op("ns", "id5", &on_start5); ASSERT_EQ(0, on_start1.wait()); ASSERT_EQ(0, on_start2.wait()); @@ -195,13 +194,13 @@ TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) { ASSERT_EQ(0, on_start3.wait()); ASSERT_EQ(0, on_start4.wait()); - throttler.finish_op("id4"); + throttler.finish_op("ns", "id4"); ASSERT_EQ(0, on_start5.wait()); - throttler.finish_op("id1"); - throttler.finish_op("id2"); - throttler.finish_op("id3"); - throttler.finish_op("id5"); + throttler.finish_op("ns", "id1"); + throttler.finish_op("ns", "id2"); + throttler.finish_op("ns", "id3"); + throttler.finish_op("ns", "id5"); } TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) { @@ -209,15 +208,15 @@ TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) { throttler.set_max_concurrent_syncs(4); C_SaferCond on_start1; - throttler.start_op("id1", &on_start1); + throttler.start_op("ns", "id1", &on_start1); C_SaferCond on_start2; - throttler.start_op("id2", &on_start2); + throttler.start_op("ns", "id2", &on_start2); C_SaferCond on_start3; - throttler.start_op("id3", &on_start3); + throttler.start_op("ns", "id3", &on_start3); C_SaferCond on_start4; - throttler.start_op("id4", &on_start4); + throttler.start_op("ns", "id4", &on_start4); C_SaferCond on_start5; - throttler.start_op("id5", &on_start5); + throttler.start_op("ns", "id5", &on_start5); ASSERT_EQ(0, on_start1.wait()); ASSERT_EQ(0, on_start2.wait()); @@ -226,16 +225,29 @@ TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) { throttler.set_max_concurrent_syncs(2); - throttler.finish_op("id1"); - throttler.finish_op("id2"); - throttler.finish_op("id3"); + throttler.finish_op("ns", "id1"); + throttler.finish_op("ns", "id2"); + throttler.finish_op("ns", "id3"); ASSERT_EQ(0, on_start5.wait()); - throttler.finish_op("id4"); - throttler.finish_op("id5"); + throttler.finish_op("ns", "id4"); + throttler.finish_op("ns", "id5"); +} + +TEST_F(TestMockImageSyncThrottler, Drain) { + MockImageSyncThrottler throttler(g_ceph_context); + throttler.set_max_concurrent_syncs(1); + + C_SaferCond on_start1; + throttler.start_op("ns", "id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("ns", "id2", &on_start2); + + ASSERT_EQ(0, on_start1.wait()); + throttler.drain("ns", -ESTALE); + ASSERT_EQ(-ESTALE, on_start2.wait()); } } // namespace mirror } // namespace rbd - diff --git a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc index 149facbcdbcd7..c3eca03bb7337 100644 --- a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc @@ -65,11 +65,11 @@ struct ImageReplayer { std::string global_image_id; static ImageReplayer *create( - Threads *threads, - InstanceWatcher *instance_watcher, - journal::CacheManagerHandler *cache_manager_handler, - RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id, - const std::string &global_image_id) { + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + const std::string &global_image_id, + Threads *threads, + InstanceWatcher *instance_watcher, + journal::CacheManagerHandler *cache_manager_handler) { ceph_assert(s_instance != nullptr); s_instance->global_image_id = global_image_id; return s_instance; @@ -171,9 +171,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) { MockInstanceWatcher mock_instance_watcher; MockImageReplayer mock_image_replayer; MockInstanceReplayer instance_replayer( - &mock_threads, &mock_service_daemon, nullptr, - rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)), - "local_mirror_uuid", m_local_io_ctx.get_id()); + m_local_io_ctx, "local_mirror_uuid", + &mock_threads, &mock_service_daemon, nullptr); std::string global_image_id("global_image_id"); EXPECT_CALL(mock_image_replayer, get_global_image_id()) @@ -240,9 +239,8 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) { MockInstanceWatcher mock_instance_watcher; MockImageReplayer mock_image_replayer; MockInstanceReplayer instance_replayer( - &mock_threads, &mock_service_daemon, nullptr, - rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)), - "local_mirror_uuid", m_local_io_ctx.get_id()); + m_local_io_ctx, "local_mirror_uuid", + &mock_threads, &mock_service_daemon, nullptr); std::string global_image_id("global_image_id"); EXPECT_CALL(mock_image_replayer, get_global_image_id()) @@ -312,9 +310,8 @@ TEST_F(TestMockInstanceReplayer, Reacquire) { MockInstanceWatcher mock_instance_watcher; MockImageReplayer mock_image_replayer; MockInstanceReplayer instance_replayer( - &mock_threads, &mock_service_daemon, nullptr, - rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)), - "local_mirror_uuid", m_local_io_ctx.get_id()); + m_local_io_ctx, "local_mirror_uuid", + &mock_threads, &mock_service_daemon, nullptr); std::string global_image_id("global_image_id"); EXPECT_CALL(mock_image_replayer, get_global_image_id()) diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc index c2e0d41f3f767..039b97e8284c5 100644 --- a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc @@ -9,7 +9,6 @@ #include "test/librbd/mock/MockImageCtx.h" #include "test/rbd_mirror/test_mock_fixture.h" #include "tools/rbd_mirror/InstanceReplayer.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" @@ -87,11 +86,6 @@ template <> struct ImageSyncThrottler { static ImageSyncThrottler* s_instance; - static ImageSyncThrottler *create(CephContext *cct) { - ceph_assert(s_instance != nullptr); - return s_instance; - } - ImageSyncThrottler() { ceph_assert(s_instance == nullptr); s_instance = this; @@ -102,10 +96,10 @@ struct ImageSyncThrottler { s_instance = nullptr; } - MOCK_METHOD0(destroy, void()); - MOCK_METHOD1(drain, void(int)); - MOCK_METHOD2(start_op, void(const std::string &, Context *)); - MOCK_METHOD1(finish_op, void(const std::string &)); + MOCK_METHOD3(start_op, void(const std::string &, const std::string &, + Context *)); + MOCK_METHOD2(finish_op, void(const std::string &, const std::string &)); + MOCK_METHOD2(drain, void(const std::string &, int)); }; ImageSyncThrottler* ImageSyncThrottler::s_instance = nullptr; @@ -224,7 +218,8 @@ TEST_F(TestMockInstanceWatcher, InitShutdown) { librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( - m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); + m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr, + m_instance_id); InSequence seq; // Init @@ -248,7 +243,8 @@ TEST_F(TestMockInstanceWatcher, InitError) { librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( - m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); + m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr, + m_instance_id); InSequence seq; expect_register_instance(mock_io_ctx, 0); @@ -268,7 +264,8 @@ TEST_F(TestMockInstanceWatcher, ShutdownError) { librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( - m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); + m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr, + m_instance_id); InSequence seq; // Init @@ -337,7 +334,7 @@ TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) { librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); MockInstanceReplayer mock_instance_replayer1; auto instance_watcher1 = MockInstanceWatcher::create( - io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1); + io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1, nullptr); librados::Rados cluster; librados::IoCtx io_ctx2; @@ -347,7 +344,7 @@ TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) { librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); MockInstanceReplayer mock_instance_replayer2; auto instance_watcher2 = MockInstanceWatcher::create( - io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2); + io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2, nullptr); InSequence seq; @@ -420,7 +417,7 @@ TEST_F(TestMockInstanceWatcher, PeerImageRemoved) { librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); MockInstanceReplayer mock_instance_replayer1; auto instance_watcher1 = MockInstanceWatcher::create( - io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1); + io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1, nullptr); librados::Rados cluster; librados::IoCtx io_ctx2; @@ -430,7 +427,7 @@ TEST_F(TestMockInstanceWatcher, PeerImageRemoved) { librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); MockInstanceReplayer mock_instance_replayer2; auto instance_watcher2 = MockInstanceWatcher::create( - io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2); + io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2, nullptr); InSequence seq; @@ -486,7 +483,8 @@ TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) { librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( - m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); + m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr, + m_instance_id); InSequence seq; // Init @@ -553,8 +551,8 @@ TEST_F(TestMockInstanceWatcher, PeerImageAcquireWatchDNE) { MockInstanceReplayer mock_instance_replayer; auto instance_watcher = new MockInstanceWatcher( - m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer, - m_instance_id); + m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer, + nullptr, m_instance_id); InSequence seq; // Init @@ -585,8 +583,8 @@ TEST_F(TestMockInstanceWatcher, PeerImageReleaseWatchDNE) { MockInstanceReplayer mock_instance_replayer; auto instance_watcher = new MockInstanceWatcher( - m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer, - m_instance_id); + m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer, + nullptr, m_instance_id); InSequence seq; // Init @@ -616,7 +614,8 @@ TEST_F(TestMockInstanceWatcher, PeerImageRemovedCancel) { librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( - m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); + m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr, + m_instance_id); InSequence seq; // Init @@ -680,14 +679,16 @@ public: librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); instance_watcher1 = MockInstanceWatcher::create(io_ctx1, m_mock_threads->work_queue, - nullptr); + nullptr, + &mock_image_sync_throttler); EXPECT_EQ("", connect_cluster_pp(cluster)); EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2)); instance_id2 = stringify(io_ctx2.get_instance_id()); librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); instance_watcher2 = MockInstanceWatcher::create(io_ctx2, m_mock_threads->work_queue, - nullptr); + nullptr, + &mock_image_sync_throttler); InSequence seq; // Init instance watcher 1 (leader) @@ -712,7 +713,7 @@ public: InSequence seq; - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher1->handle_release_leader(); // Shutdown instance watcher 1 @@ -736,24 +737,12 @@ public: TestMockInstanceWatcher::TearDown(); } - void expect_throttler_destroy( - std::vector *throttler_queue = nullptr) { - EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE)) - .WillOnce(Invoke([throttler_queue] (int r) { - if (throttler_queue != nullptr) { - for (auto ctx : *throttler_queue) { - ctx->complete(r); - } - } - })); - EXPECT_CALL(mock_image_sync_throttler, destroy()); - } - void expect_throttler_start_op(const std::string &sync_id, Context *on_call = nullptr, Context **on_start_ctx = nullptr) { - EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _)) + EXPECT_CALL(mock_image_sync_throttler, start_op("", sync_id, _)) .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &, + const std::string &, Context *ctx) { if (on_start_ctx != nullptr) { *on_start_ctx = ctx; @@ -768,11 +757,15 @@ public: void expect_throttler_finish_op(const std::string &sync_id, Context *on_finish) { - EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id")) - .WillOnce(Invoke([on_finish](const std::string &) { + EXPECT_CALL(mock_image_sync_throttler, finish_op("", "sync_id")) + .WillOnce(Invoke([on_finish](const std::string &, const std::string &) { on_finish->complete(0); })); } + + void expect_throttler_drain() { + EXPECT_CALL(mock_image_sync_throttler, drain("", -ESTALE)); + } }; TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) { @@ -863,8 +856,9 @@ TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) { ASSERT_EQ(0, on_start1.wait()); C_SaferCond on_start2; - EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id")) - .WillOnce(Invoke([this, &on_start2](const std::string &) { + EXPECT_CALL(mock_image_sync_throttler, finish_op("", "sync_id")) + .WillOnce(Invoke([this, &on_start2](const std::string &, + const std::string &) { instance_watcher2->notify_sync_request("sync_id", &on_start2); })); expect_throttler_start_op("sync_id"); @@ -880,7 +874,7 @@ TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) { TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) { InSequence seq; - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher1->handle_release_leader(); instance_watcher1->handle_acquire_leader(); } @@ -888,7 +882,7 @@ TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) { TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) { InSequence seq; - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher1->handle_release_leader(); instance_watcher2->handle_acquire_leader(); @@ -896,7 +890,7 @@ TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) { C_SaferCond on_start; instance_watcher2->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher2->handle_release_leader(); instance_watcher2->notify_sync_complete("sync_id"); @@ -913,9 +907,11 @@ TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) { instance_watcher1->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start_op_called.wait()); - std::vector throttler_queue = {on_start_ctx}; - expect_throttler_destroy(&throttler_queue); + expect_throttler_drain(); instance_watcher1->handle_release_leader(); + // emulate throttler queue drain on leader release + on_start_ctx->complete(-ESTALE); + expect_throttler_start_op("sync_id"); instance_watcher2->handle_acquire_leader(); instance_watcher1->handle_update_leader(instance_id2); @@ -926,7 +922,7 @@ TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) { instance_watcher1->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher2->handle_release_leader(); instance_watcher1->handle_acquire_leader(); } @@ -934,7 +930,7 @@ TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) { TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) { InSequence seq; - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher1->handle_release_leader(); instance_watcher2->handle_acquire_leader(); instance_watcher1->handle_update_leader(instance_id2); @@ -944,7 +940,7 @@ TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) { instance_watcher1->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher2->handle_release_leader(); instance_watcher1->handle_acquire_leader(); instance_watcher2->handle_update_leader(instance_id1); @@ -963,12 +959,13 @@ TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) { instance_watcher2->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start_op_called.wait()); - std::vector throttler_queue = {on_start_ctx}; - expect_throttler_destroy(&throttler_queue); + expect_throttler_drain(); instance_watcher1->handle_release_leader(); + // emulate throttler queue drain on leader release + on_start_ctx->complete(-ESTALE); - EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _)) - .WillOnce(WithArg<1>(CompleteContext(0))); + EXPECT_CALL(mock_image_sync_throttler, start_op("", "sync_id", _)) + .WillOnce(WithArg<2>(CompleteContext(0))); instance_watcher2->handle_acquire_leader(); instance_watcher1->handle_update_leader(instance_id2); @@ -979,7 +976,7 @@ TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) { instance_watcher2->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); - expect_throttler_destroy(); + expect_throttler_drain(); instance_watcher2->handle_release_leader(); instance_watcher1->handle_acquire_leader(); } diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc index 13ad8b4c96bbb..06ccf40a8881d 100644 --- a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -192,33 +192,6 @@ struct Threads { } }; -template <> -struct MirrorStatusWatcher { - static MirrorStatusWatcher* s_instance; - - static MirrorStatusWatcher *create(librados::IoCtx &io_ctx, - ContextWQ *work_queue) { - ceph_assert(s_instance != nullptr); - return s_instance; - } - - MirrorStatusWatcher() { - ceph_assert(s_instance == nullptr); - s_instance = this; - } - - ~MirrorStatusWatcher() { - ceph_assert(s_instance == this); - s_instance = nullptr; - } - - MOCK_METHOD0(destroy, void()); - MOCK_METHOD1(init, void(Context *)); - MOCK_METHOD1(shut_down, void(Context *)); -}; - -MirrorStatusWatcher *MirrorStatusWatcher::s_instance = nullptr; - template <> struct Instances { static Instances* s_instance; @@ -294,7 +267,6 @@ MockListener *MockListener::s_instance = nullptr; class TestMockLeaderWatcher : public TestMockFixture { public: - typedef MirrorStatusWatcher MockMirrorStatusWatcher; typedef Instances MockInstances; typedef LeaderWatcher MockLeaderWatcher; typedef Threads MockThreads; @@ -423,21 +395,6 @@ public: Return(true))); } - void expect_destroy(MockMirrorStatusWatcher &mock_mirror_status_watcher) { - EXPECT_CALL(mock_mirror_status_watcher, destroy()); - } - - void expect_init(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) { - EXPECT_CALL(mock_mirror_status_watcher, init(_)) - .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); - } - - void expect_shut_down(MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) { - EXPECT_CALL(mock_mirror_status_watcher, shut_down(_)) - .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); - expect_destroy(mock_mirror_status_watcher); - } - void expect_destroy(MockInstances &mock_instances) { EXPECT_CALL(mock_instances, destroy()); } @@ -482,7 +439,6 @@ public: TEST_F(TestMockLeaderWatcher, InitShutdown) { MockManagedLock mock_managed_lock; - MockMirrorStatusWatcher mock_mirror_status_watcher; MockInstances mock_instances; MockListener listener; @@ -495,7 +451,6 @@ TEST_F(TestMockLeaderWatcher, InitShutdown) { MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener); // Init - expect_init(mock_mirror_status_watcher, 0); C_SaferCond on_heartbeat_finish; expect_is_leader(mock_managed_lock, false, false); expect_try_acquire_lock(mock_managed_lock, 0); @@ -513,7 +468,6 @@ TEST_F(TestMockLeaderWatcher, InitShutdown) { expect_shut_down(mock_instances, 0); expect_release_lock(mock_managed_lock, 0); expect_shut_down(mock_managed_lock, true, 0); - expect_shut_down(mock_mirror_status_watcher, 0); expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); @@ -521,7 +475,6 @@ TEST_F(TestMockLeaderWatcher, InitShutdown) { TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { MockManagedLock mock_managed_lock; - MockMirrorStatusWatcher mock_mirror_status_watcher; MockInstances mock_instances; MockListener listener; @@ -534,7 +487,6 @@ TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener); // Init - expect_init(mock_mirror_status_watcher, 0); C_SaferCond on_heartbeat_finish; expect_is_leader(mock_managed_lock, false, false); expect_try_acquire_lock(mock_managed_lock, 0); @@ -559,34 +511,6 @@ TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { // Shutdown expect_shut_down(mock_managed_lock, false, 0); - expect_shut_down(mock_mirror_status_watcher, 0); - expect_is_leader(mock_managed_lock, false, false); - - leader_watcher.shut_down(); -} - -TEST_F(TestMockLeaderWatcher, InitStatusWatcherError) { - MockManagedLock mock_managed_lock; - MockMirrorStatusWatcher mock_mirror_status_watcher; - MockInstances mock_instances; - MockListener listener; - - expect_is_shutdown(mock_managed_lock); - expect_is_leader(mock_managed_lock); - expect_destroy(mock_managed_lock); - - InSequence seq; - - expect_construct(mock_managed_lock); - MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener); - - // Init - expect_init(mock_mirror_status_watcher, -EINVAL); - ASSERT_EQ(-EINVAL, leader_watcher.init()); - - // Shutdown - expect_shut_down(mock_managed_lock, false, 0); - expect_shut_down(mock_mirror_status_watcher, 0); expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); @@ -594,7 +518,6 @@ TEST_F(TestMockLeaderWatcher, InitStatusWatcherError) { TEST_F(TestMockLeaderWatcher, AcquireError) { MockManagedLock mock_managed_lock; - MockMirrorStatusWatcher mock_mirror_status_watcher; MockInstances mock_instances; MockListener listener; @@ -608,7 +531,6 @@ TEST_F(TestMockLeaderWatcher, AcquireError) { MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener); // Init - expect_init(mock_mirror_status_watcher, 0); C_SaferCond on_heartbeat_finish; expect_is_leader(mock_managed_lock, false, false); expect_try_acquire_lock(mock_managed_lock, -EAGAIN); @@ -628,7 +550,6 @@ TEST_F(TestMockLeaderWatcher, AcquireError) { expect_shut_down(mock_instances, 0); expect_release_lock(mock_managed_lock, 0); expect_shut_down(mock_managed_lock, true, 0); - expect_shut_down(mock_mirror_status_watcher, 0); expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); @@ -643,7 +564,6 @@ TEST_F(TestMockLeaderWatcher, Break) { "rbd_mirror_leader_max_acquire_attempts_before_break"); MockManagedLock mock_managed_lock; - MockMirrorStatusWatcher mock_mirror_status_watcher; MockInstances mock_instances; MockListener listener; librbd::managed_lock::Locker @@ -660,7 +580,6 @@ TEST_F(TestMockLeaderWatcher, Break) { MockLeaderWatcher leader_watcher(m_mock_threads, m_local_io_ctx, &listener); // Init - expect_init(mock_mirror_status_watcher, 0); expect_is_leader(mock_managed_lock, false, false); for (int i = 0; i < max_acquire_attempts; i++) { expect_try_acquire_lock(mock_managed_lock, -EAGAIN); @@ -684,7 +603,6 @@ TEST_F(TestMockLeaderWatcher, Break) { expect_shut_down(mock_instances, 0); expect_release_lock(mock_managed_lock, 0); expect_shut_down(mock_managed_lock, true, 0); - expect_shut_down(mock_mirror_status_watcher, 0); expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); diff --git a/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc new file mode 100644 index 0000000000000..b787a388921b3 --- /dev/null +++ b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc @@ -0,0 +1,575 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/api/Config.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/test_mock_fixture.h" +#include "test/rbd_mirror/mock/MockContextWQ.h" +#include "test/rbd_mirror/mock/MockSafeTimer.h" +#include "tools/rbd_mirror/NamespaceReplayer.h" +#include "tools/rbd_mirror/ImageDeleter.h" +#include "tools/rbd_mirror/ImageMap.h" +#include "tools/rbd_mirror/InstanceWatcher.h" +#include "tools/rbd_mirror/InstanceReplayer.h" +#include "tools/rbd_mirror/PoolWatcher.h" +#include "tools/rbd_mirror/ServiceDaemon.h" +#include "tools/rbd_mirror/Threads.h" + +namespace librbd { + +namespace { + +struct MockTestImageCtx : public MockImageCtx { + MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +} // namespace librbd + +namespace rbd { +namespace mirror { + +template <> +struct ImageDeleter { + static ImageDeleter* s_instance; + + static ImageDeleter* create( + librados::IoCtx &ioctx, Threads *threads, + ServiceDaemon *service_daemon) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + MOCK_METHOD1(init, void(Context*)); + MOCK_METHOD1(shut_down, void(Context*)); + MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*)); + + ImageDeleter() { + s_instance = this; + } +}; + +ImageDeleter* ImageDeleter::s_instance = nullptr; + +template<> +struct ImageMap { + static ImageMap* s_instance; + + static ImageMap *create(librados::IoCtx &ioctx, + Threads *threads, + const std::string& instance_id, + image_map::Listener &listener) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + MOCK_METHOD1(init, void(Context*)); + MOCK_METHOD1(shut_down, void(Context*)); + + MOCK_METHOD1(update_instances_added, void(const std::vector&)); + MOCK_METHOD1(update_instances_removed, void(const std::vector&)); + + MOCK_METHOD3(update_images_mock, void(const std::string&, + const std::set&, + const std::set&)); + void update_images(const std::string& mirror_uuid, + std::set&& added, + std::set&& removed) { + update_images_mock(mirror_uuid, added, removed); + } + + ImageMap() { + s_instance = this; + } +}; + +ImageMap* ImageMap::s_instance = nullptr; + +template<> +struct InstanceReplayer { + static InstanceReplayer* s_instance; + + static InstanceReplayer* create( + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + Threads *threads, + ServiceDaemon *service_daemon, + journal::CacheManagerHandler *cache_manager_handler) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + MOCK_METHOD0(start, void()); + MOCK_METHOD0(stop, void()); + MOCK_METHOD0(restart, void()); + MOCK_METHOD0(flush, void()); + + MOCK_METHOD1(stop, void(Context *)); + + MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*)); + + MOCK_METHOD2(add_peer, void(const std::string&, librados::IoCtx&)); + + MOCK_METHOD1(init, void(Context*)); + MOCK_METHOD1(shut_down, void(Context*)); + MOCK_METHOD1(release_all, void(Context*)); + + InstanceReplayer() { + s_instance = this; + } +}; + +InstanceReplayer* InstanceReplayer::s_instance = nullptr; + +template<> +struct InstanceWatcher { + static InstanceWatcher* s_instance; + + static InstanceWatcher* create( + librados::IoCtx &ioctx, ContextWQ* work_queue, + InstanceReplayer* instance_replayer, + ImageSyncThrottler *image_sync_throttler) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + MOCK_METHOD0(handle_acquire_leader, void()); + MOCK_METHOD0(handle_release_leader, void()); + + MOCK_METHOD0(get_instance_id, std::string()); + + MOCK_METHOD2(print_sync_status, void(Formatter*, std::stringstream*)); + + MOCK_METHOD1(init, void(Context *)); + MOCK_METHOD1(shut_down, void(Context *)); + + MOCK_METHOD3(notify_image_acquire, void(const std::string&, + const std::string&, + Context*)); + MOCK_METHOD3(notify_image_release, void(const std::string&, + const std::string&, + Context*)); + MOCK_METHOD4(notify_peer_image_removed, void(const std::string&, + const std::string&, + const std::string&, + Context*)); + + MOCK_METHOD1(handle_update_leader, void(const std::string&)); + + InstanceWatcher() { + s_instance = this; + } + +}; + +InstanceWatcher* InstanceWatcher::s_instance = nullptr; + +template <> +struct MirrorStatusWatcher { + static MirrorStatusWatcher* s_instance; + + static MirrorStatusWatcher *create(librados::IoCtx &io_ctx, + ContextWQ *work_queue) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + MirrorStatusWatcher() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~MirrorStatusWatcher() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD1(init, void(Context *)); + MOCK_METHOD1(shut_down, void(Context *)); +}; + +MirrorStatusWatcher *MirrorStatusWatcher::s_instance = nullptr; + +template<> +struct PoolWatcher { + static std::map s_instances; + + static PoolWatcher *create(Threads *threads, + librados::IoCtx &ioctx, + pool_watcher::Listener& listener) { + auto pool_id = ioctx.get_id(); + ceph_assert(s_instances.count(pool_id)); + return s_instances[pool_id]; + } + + MOCK_METHOD0(is_blacklisted, bool()); + + MOCK_METHOD0(get_image_count, uint64_t()); + + MOCK_METHOD1(init, void(Context*)); + MOCK_METHOD1(shut_down, void(Context*)); + + PoolWatcher(int64_t pool_id) { + ceph_assert(!s_instances.count(pool_id)); + s_instances[pool_id] = this; + } +}; + +std::map *> PoolWatcher::s_instances; + +template<> +struct ServiceDaemon { + MOCK_METHOD3(add_or_update_attribute, + void(int64_t, const std::string&, + const service_daemon::AttributeValue&)); + MOCK_METHOD2(remove_attribute, + void(int64_t, const std::string&)); + + MOCK_METHOD4(add_or_update_callout, uint64_t(int64_t, uint64_t, + service_daemon::CalloutLevel, + const std::string&)); + MOCK_METHOD2(remove_callout, void(int64_t, uint64_t)); +}; + +template <> +struct Threads { + ceph::mutex &timer_lock; + SafeTimer *timer; + ContextWQ *work_queue; + + Threads(Threads *threads) + : timer_lock(threads->timer_lock), timer(threads->timer), + work_queue(threads->work_queue) { + } +}; + +} // namespace mirror +} // namespace rbd + +// template definitions +#include "tools/rbd_mirror/NamespaceReplayer.cc" + +namespace rbd { +namespace mirror { + +using ::testing::_; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::StrEq; +using ::testing::WithArg; + +class TestMockNamespaceReplayer : public TestMockFixture { +public: + typedef NamespaceReplayer MockNamespaceReplayer; + typedef ImageDeleter MockImageDeleter; + typedef ImageMap MockImageMap; + typedef InstanceReplayer MockInstanceReplayer; + typedef InstanceWatcher MockInstanceWatcher; + typedef MirrorStatusWatcher MockMirrorStatusWatcher; + typedef PoolWatcher MockPoolWatcher; + typedef ServiceDaemon MockServiceDaemon; + typedef Threads MockThreads; + + void SetUp() override { + TestMockFixture::SetUp(); + m_mock_threads = new MockThreads(m_threads); + } + + void TearDown() override { + delete m_mock_threads; + TestMockFixture::TearDown(); + } + + void expect_mirror_status_watcher_init( + MockMirrorStatusWatcher &mock_mirror_status_watcher, int r) { + EXPECT_CALL(mock_mirror_status_watcher, init(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); + } + + void expect_mirror_status_watcher_shut_down( + MockMirrorStatusWatcher &mock_mirror_status_watcher) { + EXPECT_CALL(mock_mirror_status_watcher, shut_down(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_instance_replayer_init( + MockInstanceReplayer& mock_instance_replayer, int r) { + EXPECT_CALL(mock_instance_replayer, init(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); + } + + void expect_instance_replayer_shut_down( + MockInstanceReplayer& mock_instance_replayer) { + EXPECT_CALL(mock_instance_replayer, shut_down(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_instance_replayer_stop( + MockInstanceReplayer& mock_instance_replayer) { + EXPECT_CALL(mock_instance_replayer, stop(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_instance_replayer_add_peer( + MockInstanceReplayer& mock_instance_replayer, const std::string& uuid) { + EXPECT_CALL(mock_instance_replayer, add_peer(uuid, _)); + } + + void expect_instance_replayer_release_all( + MockInstanceReplayer& mock_instance_replayer) { + EXPECT_CALL(mock_instance_replayer, release_all(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_instance_watcher_get_instance_id( + MockInstanceWatcher& mock_instance_watcher, + const std::string &instance_id) { + EXPECT_CALL(mock_instance_watcher, get_instance_id()) + .WillOnce(Return(instance_id)); + } + + void expect_instance_watcher_init( + MockInstanceWatcher& mock_instance_watcher, int r) { + EXPECT_CALL(mock_instance_watcher, init(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); + } + + void expect_instance_watcher_shut_down( + MockInstanceWatcher& mock_instance_watcher) { + EXPECT_CALL(mock_instance_watcher, shut_down(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_instance_watcher_handle_acquire_leader( + MockInstanceWatcher& mock_instance_watcher) { + EXPECT_CALL(mock_instance_watcher, handle_acquire_leader()); + } + + void expect_instance_watcher_handle_release_leader( + MockInstanceWatcher& mock_instance_watcher) { + EXPECT_CALL(mock_instance_watcher, handle_release_leader()); + } + + void expect_image_map_init(MockInstanceWatcher &mock_instance_watcher, + MockImageMap& mock_image_map, int r) { + expect_instance_watcher_get_instance_id(mock_instance_watcher, "1234"); + EXPECT_CALL(mock_image_map, init(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); + } + + void expect_image_map_shut_down(MockImageMap& mock_image_map) { + EXPECT_CALL(mock_image_map, shut_down(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_pool_watcher_init(MockPoolWatcher& mock_pool_watcher, int r) { + EXPECT_CALL(mock_pool_watcher, init(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); + } + + void expect_pool_watcher_shut_down(MockPoolWatcher& mock_pool_watcher) { + EXPECT_CALL(mock_pool_watcher, shut_down(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_image_deleter_init(MockImageDeleter& mock_image_deleter, int r) { + EXPECT_CALL(mock_image_deleter, init(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); + } + + void expect_image_deleter_shut_down(MockImageDeleter& mock_image_deleter) { + EXPECT_CALL(mock_image_deleter, shut_down(_)) + .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); + } + + void expect_service_daemon_add_or_update_attribute( + MockServiceDaemon &mock_service_daemon, const std::string& key, + const service_daemon::AttributeValue& value) { + EXPECT_CALL(mock_service_daemon, add_or_update_attribute(_, key, value)); + } + + void expect_service_daemon_add_or_update_instance_id_attribute( + MockInstanceWatcher &mock_instance_watcher, + MockServiceDaemon &mock_service_daemon) { + expect_instance_watcher_get_instance_id(mock_instance_watcher, "1234"); + expect_service_daemon_add_or_update_attribute( + mock_service_daemon, "instance_id", {std::string("1234")}); + } + + MockThreads *m_mock_threads; +}; + +TEST_F(TestMockNamespaceReplayer, Init_MirrorStatusWatcherError) { + InSequence seq; + + auto mock_mirror_status_watcher = new MockMirrorStatusWatcher; + expect_mirror_status_watcher_init(*mock_mirror_status_watcher, -EINVAL); + + MockNamespaceReplayer namespace_replayer( + {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", + "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr); + + C_SaferCond on_init; + namespace_replayer.init(&on_init); + ASSERT_EQ(-EINVAL, on_init.wait()); +} + +TEST_F(TestMockNamespaceReplayer, Init_InstanceReplayerError) { + InSequence seq; + + auto mock_mirror_status_watcher = new MockMirrorStatusWatcher; + expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + + auto mock_instance_replayer = new MockInstanceReplayer(); + expect_instance_replayer_init(*mock_instance_replayer, -EINVAL); + + expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + + MockNamespaceReplayer namespace_replayer( + {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", + "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr); + + C_SaferCond on_init; + namespace_replayer.init(&on_init); + ASSERT_EQ(-EINVAL, on_init.wait()); +} + +TEST_F(TestMockNamespaceReplayer, Init_InstanceWatcherError) { + InSequence seq; + + auto mock_mirror_status_watcher = new MockMirrorStatusWatcher; + expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + + auto mock_instance_replayer = new MockInstanceReplayer(); + expect_instance_replayer_init(*mock_instance_replayer, 0); + expect_instance_replayer_add_peer(*mock_instance_replayer, + "remote mirror uuid"); + + auto mock_instance_watcher = new MockInstanceWatcher(); + expect_instance_watcher_init(*mock_instance_watcher, -EINVAL); + + expect_instance_replayer_shut_down(*mock_instance_replayer); + expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + + MockNamespaceReplayer namespace_replayer( + {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", + "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr); + + C_SaferCond on_init; + namespace_replayer.init(&on_init); + ASSERT_EQ(-EINVAL, on_init.wait()); +} + +TEST_F(TestMockNamespaceReplayer, Init) { + InSequence seq; + + auto mock_mirror_status_watcher = new MockMirrorStatusWatcher; + expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + + auto mock_instance_replayer = new MockInstanceReplayer(); + expect_instance_replayer_init(*mock_instance_replayer, 0); + expect_instance_replayer_add_peer(*mock_instance_replayer, + "remote mirror uuid"); + + auto mock_instance_watcher = new MockInstanceWatcher(); + expect_instance_watcher_init(*mock_instance_watcher, 0); + + MockServiceDaemon mock_service_daemon; + expect_service_daemon_add_or_update_instance_id_attribute( + *mock_instance_watcher, mock_service_daemon); + + MockNamespaceReplayer namespace_replayer( + {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", + "remote mirror uuid", m_mock_threads, nullptr, &mock_service_daemon, + nullptr); + + C_SaferCond on_init; + namespace_replayer.init(&on_init); + ASSERT_EQ(0, on_init.wait()); + + expect_instance_replayer_stop(*mock_instance_replayer); + expect_instance_watcher_shut_down(*mock_instance_watcher); + expect_instance_replayer_shut_down(*mock_instance_replayer); + expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + + C_SaferCond on_shut_down; + namespace_replayer.shut_down(&on_shut_down); + ASSERT_EQ(0, on_shut_down.wait()); +} + +TEST_F(TestMockNamespaceReplayer, AcuqireLeader) { + InSequence seq; + + // init + + auto mock_mirror_status_watcher = new MockMirrorStatusWatcher; + expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + + auto mock_instance_replayer = new MockInstanceReplayer(); + expect_instance_replayer_init(*mock_instance_replayer, 0); + expect_instance_replayer_add_peer(*mock_instance_replayer, + "remote mirror uuid"); + + auto mock_instance_watcher = new MockInstanceWatcher(); + expect_instance_watcher_init(*mock_instance_watcher, 0); + + MockServiceDaemon mock_service_daemon; + expect_service_daemon_add_or_update_instance_id_attribute( + *mock_instance_watcher, mock_service_daemon); + + MockNamespaceReplayer namespace_replayer( + {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", + "remote mirror uuid", m_mock_threads, nullptr, &mock_service_daemon, + nullptr); + + C_SaferCond on_init; + namespace_replayer.init(&on_init); + ASSERT_EQ(0, on_init.wait()); + + // acquire leader + + expect_instance_watcher_handle_acquire_leader(*mock_instance_watcher); + + auto mock_image_map = new MockImageMap(); + expect_image_map_init(*mock_instance_watcher, *mock_image_map, 0); + + auto mock_local_pool_watcher = new MockPoolWatcher(m_local_io_ctx.get_id()); + expect_pool_watcher_init(*mock_local_pool_watcher, 0); + + auto mock_remote_pool_watcher = new MockPoolWatcher(m_remote_io_ctx.get_id()); + expect_pool_watcher_init(*mock_remote_pool_watcher, 0); + + auto mock_image_deleter = new MockImageDeleter(); + expect_image_deleter_init(*mock_image_deleter, 0); + + C_SaferCond on_acquire; + namespace_replayer.handle_acquire_leader(&on_acquire); + ASSERT_EQ(0, on_acquire.wait()); + + // release leader + + expect_instance_watcher_handle_release_leader(*mock_instance_watcher); + expect_image_deleter_shut_down(*mock_image_deleter); + expect_pool_watcher_shut_down(*mock_local_pool_watcher); + expect_pool_watcher_shut_down(*mock_remote_pool_watcher); + expect_image_map_shut_down(*mock_image_map); + expect_instance_replayer_release_all(*mock_instance_replayer); + + // shut down + + expect_instance_replayer_stop(*mock_instance_replayer); + expect_instance_watcher_shut_down(*mock_instance_watcher); + expect_instance_replayer_shut_down(*mock_instance_replayer); + expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + + C_SaferCond on_shut_down; + namespace_replayer.shut_down(&on_shut_down); + ASSERT_EQ(0, on_shut_down.wait()); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/test/rbd_mirror/test_mock_PoolReplayer.cc b/src/test/rbd_mirror/test_mock_PoolReplayer.cc index 4edf94cc8cb37..369624c0d35e0 100644 --- a/src/test/rbd_mirror/test_mock_PoolReplayer.cc +++ b/src/test/rbd_mirror/test_mock_PoolReplayer.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "librbd/api/Config.h" +#include "librbd/api/Namespace.h" #include "test/librbd/mock/MockImageCtx.h" #include "test/librados_test_stub/MockTestMemCluster.h" #include "test/librados_test_stub/MockTestMemIoCtxImpl.h" @@ -9,13 +10,10 @@ #include "test/rbd_mirror/test_mock_fixture.h" #include "test/rbd_mirror/mock/MockContextWQ.h" #include "test/rbd_mirror/mock/MockSafeTimer.h" -#include "tools/rbd_mirror/PoolReplayer.h" -#include "tools/rbd_mirror/ImageDeleter.h" -#include "tools/rbd_mirror/ImageMap.h" -#include "tools/rbd_mirror/InstanceWatcher.h" -#include "tools/rbd_mirror/InstanceReplayer.h" +#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/LeaderWatcher.h" -#include "tools/rbd_mirror/PoolWatcher.h" +#include "tools/rbd_mirror/NamespaceReplayer.h" +#include "tools/rbd_mirror/PoolReplayer.h" #include "tools/rbd_mirror/ServiceDaemon.h" #include "tools/rbd_mirror/Threads.h" @@ -41,159 +39,149 @@ public: } }; -} - -} // namespace librbd - -namespace rbd { -namespace mirror { - template <> -struct ImageDeleter { - static ImageDeleter* s_instance; +class Namespace { +public: + static Namespace* s_instance; - static ImageDeleter* create(librados::IoCtx &ioctx, - Threads *threads, - ServiceDaemon *service_daemon) { - ceph_assert(s_instance != nullptr); - return s_instance; - } + static int list(librados::IoCtx& io_ctx, std::vector *names) { + if (s_instance) { + return s_instance->list(names); + } - MOCK_METHOD1(init, void(Context*)); - MOCK_METHOD1(shut_down, void(Context*)); - MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*)); + return 0; + } - ImageDeleter() { + Namespace() { s_instance = this; } -}; -ImageDeleter* ImageDeleter::s_instance = nullptr; - -template<> -struct ImageMap { - static ImageMap* s_instance; + void add(const std::string &name) { + std::lock_guard locker{m_lock}; - static ImageMap *create(librados::IoCtx &ioctx, - Threads *threads, - const std::string& instance_id, - image_map::Listener &listener) { - ceph_assert(s_instance != nullptr); - return s_instance; + m_names.insert(name); } - MOCK_METHOD1(init, void(Context*)); - MOCK_METHOD1(shut_down, void(Context*)); + void remove(const std::string &name) { + std::lock_guard locker{m_lock}; - MOCK_METHOD1(update_instances_added, void(const std::vector&)); - MOCK_METHOD1(update_instances_removed, void(const std::vector&)); + m_names.erase(name); + } + + void clear() { + std::lock_guard locker{m_lock}; - MOCK_METHOD3(update_images_mock, void(const std::string&, - const std::set&, - const std::set&)); - void update_images(const std::string& mirror_uuid, - std::set&& added, - std::set&& removed) { - update_images_mock(mirror_uuid, added, removed); + m_names.clear(); } - ImageMap() { - s_instance = this; +private: + ceph::mutex m_lock = ceph::make_mutex("Namespace"); + std::set m_names; + + int list(std::vector *names) { + std::lock_guard locker{m_lock}; + + names->clear(); + names->insert(names->begin(), m_names.begin(), m_names.end()); + return 0; } }; -ImageMap* ImageMap::s_instance = nullptr; +Namespace* Namespace::s_instance = nullptr; -template<> -struct InstanceReplayer { - static InstanceReplayer* s_instance; - - static InstanceReplayer* create(Threads *threads, - ServiceDaemon *service_daemon, - journal::CacheManagerHandler *cache_manager_handler, - RadosRef rados, const std::string& uuid, - int64_t pool_id) { - ceph_assert(s_instance != nullptr); - return s_instance; - } +} // namespace api - MOCK_METHOD0(start, void()); - MOCK_METHOD0(stop, void()); - MOCK_METHOD0(restart, void()); - MOCK_METHOD0(flush, void()); +} // namespace librbd - MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*)); +namespace rbd { +namespace mirror { - MOCK_METHOD2(add_peer, void(const std::string&, librados::IoCtx&)); +template <> +struct ImageSyncThrottler { + static ImageSyncThrottler* s_instance; - MOCK_METHOD0(init, void()); - MOCK_METHOD0(shut_down, void()); - MOCK_METHOD1(release_all, void(Context*)); + static ImageSyncThrottler *create(CephContext *cct) { + return s_instance; + } - InstanceReplayer() { + ImageSyncThrottler() { + ceph_assert(s_instance == nullptr); s_instance = this; } -}; -InstanceReplayer* InstanceReplayer::s_instance = nullptr; + virtual ~ImageSyncThrottler() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*)); +}; -template<> -struct InstanceWatcher { - static InstanceWatcher* s_instance; +ImageSyncThrottler* ImageSyncThrottler::s_instance = nullptr; - static InstanceWatcher* create(librados::IoCtx &ioctx, - MockContextWQ* work_queue, - InstanceReplayer* instance_replayer) { - ceph_assert(s_instance != nullptr); - return s_instance; +template <> +struct NamespaceReplayer { + static std::map s_instances; + + static NamespaceReplayer *create( + const std::string &name, + librados::IoCtx &local_ioctx, + librados::IoCtx &remote_ioctx, + const std::string &local_mirror_uuid, + const std::string &remote_mirror_uuid, + Threads *threads, + ImageSyncThrottler *image_sync_throttler, + ServiceDaemon *service_daemon, + journal::CacheManagerHandler *cache_manager_handler) { + ceph_assert(s_instances.count(name)); + auto namespace_replayer = s_instances[name]; + s_instances.erase(name); + return namespace_replayer; } - MOCK_METHOD0(handle_acquire_leader, void()); - MOCK_METHOD0(handle_release_leader, void()); - + MOCK_METHOD0(is_blacklisted, bool()); MOCK_METHOD0(get_instance_id, std::string()); - MOCK_METHOD2(print_sync_status, void(Formatter*, std::stringstream*)); + MOCK_METHOD1(init, void(Context*)); + MOCK_METHOD1(shut_down, void(Context*)); - MOCK_METHOD0(init, int()); - MOCK_METHOD0(shut_down, void()); - - MOCK_METHOD3(notify_image_acquire, void(const std::string&, - const std::string&, - Context*)); - MOCK_METHOD3(notify_image_release, void(const std::string&, - const std::string&, - Context*)); - MOCK_METHOD4(notify_peer_image_removed, void(const std::string&, - const std::string&, - const std::string&, - Context*)); - - MOCK_METHOD1(handle_update_leader, void(const std::string&)); - - InstanceWatcher() { - s_instance = this; - } + MOCK_METHOD1(handle_acquire_leader, void(Context *)); + MOCK_METHOD1(handle_release_leader, void(Context *)); + MOCK_METHOD1(handle_update_leader, void(const std::string &)); + MOCK_METHOD1(handle_instances_added, void(const std::vector &)); + MOCK_METHOD1(handle_instances_removed, void(const std::vector &)); + MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*)); + MOCK_METHOD0(start, void()); + MOCK_METHOD0(stop, void()); + MOCK_METHOD0(restart, void()); + MOCK_METHOD0(flush, void()); + + NamespaceReplayer(const std::string &name = "") { + ceph_assert(!s_instances.count(name)); + s_instances[name] = this; + } }; -InstanceWatcher* InstanceWatcher::s_instance = nullptr; +std::map *> NamespaceReplayer::s_instances; template<> struct LeaderWatcher { static LeaderWatcher* s_instance; + leader_watcher::Listener* listener = nullptr; static LeaderWatcher *create(Threads *threads, librados::IoCtx &ioctx, leader_watcher::Listener* listener) { ceph_assert(s_instance != nullptr); + s_instance->listener = listener; return s_instance; } MOCK_METHOD0(is_leader, bool()); MOCK_METHOD0(release_leader, void()); - MOCK_METHOD1(get_leader_instance_id, void(std::string*)); + MOCK_METHOD1(get_leader_instance_id, bool(std::string*)); MOCK_METHOD1(list_instances, void(std::vector*)); MOCK_METHOD0(init, int()); @@ -207,32 +195,6 @@ struct LeaderWatcher { LeaderWatcher* LeaderWatcher::s_instance = nullptr; -template<> -struct PoolWatcher { - static PoolWatcher* s_instance; - - static PoolWatcher *create(Threads *threads, - librados::IoCtx &ioctx, - pool_watcher::Listener& listener) { - ceph_assert(s_instance != nullptr); - return s_instance; - } - - MOCK_METHOD0(is_blacklisted, bool()); - - MOCK_METHOD0(get_image_count, uint64_t()); - - MOCK_METHOD1(init, void(Context*)); - MOCK_METHOD1(shut_down, void(Context*)); - - PoolWatcher() { - s_instance = this; - } - -}; - -PoolWatcher* PoolWatcher::s_instance = nullptr; - template<> struct ServiceDaemon { MOCK_METHOD3(add_or_update_attribute, @@ -276,6 +238,7 @@ namespace rbd { namespace mirror { using ::testing::_; +using ::testing::AtLeast; using ::testing::DoAll; using ::testing::InSequence; using ::testing::Invoke; @@ -285,15 +248,21 @@ using ::testing::WithArg; class TestMockPoolReplayer : public TestMockFixture { public: + typedef librbd::api::Namespace MockNamespace; typedef PoolReplayer MockPoolReplayer; - typedef ImageMap MockImageMap; - typedef InstanceReplayer MockInstanceReplayer; - typedef InstanceWatcher MockInstanceWatcher; + typedef ImageSyncThrottler MockImageSyncThrottler; + typedef NamespaceReplayer MockNamespaceReplayer; typedef LeaderWatcher MockLeaderWatcher; - typedef PoolWatcher MockPoolWatcher; typedef ServiceDaemon MockServiceDaemon; typedef Threads MockThreads; + void expect_work_queue(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) + .WillRepeatedly(Invoke([this](Context *ctx, int r) { + m_threads->work_queue->queue(ctx, r); + })); + } + void expect_connect(librados::MockTestMemCluster& mock_cluster, librados::MockTestMemRadosClient* mock_rados_client, const std::string& cluster_name, CephContext** cct_ref) { @@ -304,7 +273,6 @@ public: cct->get(); *cct_ref = cct; } - return mock_rados_client; })); } @@ -331,62 +299,147 @@ public: Return(r))); } - void expect_instance_replayer_init(MockInstanceReplayer& mock_instance_replayer) { - EXPECT_CALL(mock_instance_replayer, init()); + void expect_mirror_mode_get(librados::MockTestMemIoCtxImpl *io_ctx_impl, + cls::rbd::MirrorMode mirror_mode, int r) { + bufferlist out_bl; + encode(mirror_mode, out_bl); + + EXPECT_CALL(*io_ctx_impl, + exec(RBD_MIRRORING, _, StrEq("rbd"), StrEq("mirror_mode_get"), + _, _, _)) + .WillOnce(DoAll(WithArg<5>(Invoke([out_bl](bufferlist *bl) { + *bl = out_bl; + })), + Return(r))); + } + + void expect_mirror_mode_get(librados::MockTestMemIoCtxImpl *io_ctx_impl) { + EXPECT_CALL(*io_ctx_impl, + exec(RBD_MIRRORING, _, StrEq("rbd"), StrEq("mirror_mode_get"), + _, _, _)) + .WillRepeatedly(DoAll(WithArg<5>(Invoke([](bufferlist *bl) { + encode(cls::rbd::MIRROR_MODE_POOL, *bl); + })), + Return(0))); + } + + void expect_leader_watcher_init(MockLeaderWatcher& mock_leader_watcher, + int r) { + EXPECT_CALL(mock_leader_watcher, init()) + .WillOnce(Return(r)); + } + + void expect_leader_watcher_shut_down(MockLeaderWatcher& mock_leader_watcher) { + EXPECT_CALL(mock_leader_watcher, shut_down()); } - void expect_instance_replayer_shut_down(MockInstanceReplayer& mock_instance_replayer) { - EXPECT_CALL(mock_instance_replayer, shut_down()); + void expect_leader_watcher_get_leader_instance_id( + MockLeaderWatcher& mock_leader_watcher) { + EXPECT_CALL(mock_leader_watcher, get_leader_instance_id(_)) + .WillRepeatedly(Return(true)); } - void expect_instance_replayer_stop(MockInstanceReplayer& mock_instance_replayer) { - EXPECT_CALL(mock_instance_replayer, stop()); + void expect_leader_watcher_list_instances( + MockLeaderWatcher& mock_leader_watcher) { + EXPECT_CALL(mock_leader_watcher, list_instances(_)) + .Times(AtLeast(0)); } - void expect_instance_replayer_add_peer(MockInstanceReplayer& mock_instance_replayer, - const std::string& uuid) { - EXPECT_CALL(mock_instance_replayer, add_peer(uuid, _)); + void expect_namespace_replayer_is_blacklisted( + MockNamespaceReplayer &mock_namespace_replayer, + bool blacklisted) { + EXPECT_CALL(mock_namespace_replayer, is_blacklisted()) + .WillRepeatedly(Return(blacklisted)); } - void expect_instance_watcher_get_instance_id( - MockInstanceWatcher& mock_instance_watcher, + void expect_namespace_replayer_get_instance_id( + MockNamespaceReplayer &mock_namespace_replayer, const std::string &instance_id) { - EXPECT_CALL(mock_instance_watcher, get_instance_id()) + EXPECT_CALL(mock_namespace_replayer, get_instance_id()) .WillOnce(Return(instance_id)); } - void expect_instance_watcher_init(MockInstanceWatcher& mock_instance_watcher, - int r) { - EXPECT_CALL(mock_instance_watcher, init()) - .WillOnce(Return(r)); + void expect_namespace_replayer_init( + MockNamespaceReplayer &mock_namespace_replayer, int r, + Context *on_init = nullptr) { + + EXPECT_CALL(mock_namespace_replayer, init(_)) + .WillOnce(Invoke([this, r, on_init](Context* ctx) { + m_threads->work_queue->queue(ctx, r); + if (on_init != nullptr) { + m_threads->work_queue->queue(on_init, r); + } + })); } - void expect_instance_watcher_shut_down(MockInstanceWatcher& mock_instance_watcher) { - EXPECT_CALL(mock_instance_watcher, shut_down()); + void expect_namespace_replayer_shut_down( + MockNamespaceReplayer &mock_namespace_replayer, + Context *on_shut_down = nullptr) { + EXPECT_CALL(mock_namespace_replayer, shut_down(_)) + .WillOnce(Invoke([this, on_shut_down](Context* ctx) { + m_threads->work_queue->queue(ctx); + if (on_shut_down != nullptr) { + m_threads->work_queue->queue(on_shut_down); + } + })); } - void expect_leader_watcher_init(MockLeaderWatcher& mock_leader_watcher, - int r) { - EXPECT_CALL(mock_leader_watcher, init()) - .WillOnce(Return(r)); + void expect_namespace_replayer_handle_acquire_leader( + MockNamespaceReplayer &mock_namespace_replayer, int r, + Context *on_acquire = nullptr) { + EXPECT_CALL(mock_namespace_replayer, handle_acquire_leader(_)) + .WillOnce(Invoke([this, r, on_acquire](Context* ctx) { + m_threads->work_queue->queue(ctx, r); + if (on_acquire != nullptr) { + m_threads->work_queue->queue(on_acquire, r); + } + })); } - void expect_leader_watcher_shut_down(MockLeaderWatcher& mock_leader_watcher) { - EXPECT_CALL(mock_leader_watcher, shut_down()); + void expect_namespace_replayer_handle_release_leader( + MockNamespaceReplayer &mock_namespace_replayer, int r, + Context *on_release = nullptr) { + EXPECT_CALL(mock_namespace_replayer, handle_release_leader(_)) + .WillOnce(Invoke([this, r, on_release](Context* ctx) { + m_threads->work_queue->queue(ctx, r); + if (on_release != nullptr) { + m_threads->work_queue->queue(on_release, r); + } + })); + } + + void expect_namespace_replayer_handle_update_leader( + MockNamespaceReplayer &mock_namespace_replayer, + const std::string &leader_instance_id, + Context *on_update = nullptr) { + EXPECT_CALL(mock_namespace_replayer, + handle_update_leader(leader_instance_id)) + .WillOnce(Invoke([on_update](const std::string &) { + if (on_update != nullptr) { + on_update->complete(0); + } + })); + } + + void expect_namespace_replayer_handle_instances_added( + MockNamespaceReplayer &mock_namespace_replayer) { + EXPECT_CALL(mock_namespace_replayer, handle_instances_added(_)); + } + + void expect_namespace_replayer_handle_instances_removed( + MockNamespaceReplayer &mock_namespace_replayer) { + EXPECT_CALL(mock_namespace_replayer, handle_instances_removed(_)); } void expect_service_daemon_add_or_update_attribute( MockServiceDaemon &mock_service_daemon, const std::string& key, const service_daemon::AttributeValue& value) { - EXPECT_CALL(mock_service_daemon, add_or_update_attribute(_, _, _)); + EXPECT_CALL(mock_service_daemon, add_or_update_attribute(_, key, value)); } - void expect_service_daemon_add_or_update_instance_id_attribute( - MockInstanceWatcher& mock_instance_watcher, - MockServiceDaemon &mock_service_daemon) { - expect_instance_watcher_get_instance_id(mock_instance_watcher, "1234"); - expect_service_daemon_add_or_update_attribute(mock_service_daemon, - "instance_id", "1234"); + void expect_service_daemon_remove_attribute( + MockServiceDaemon &mock_service_daemon, const std::string& key) { + EXPECT_CALL(mock_service_daemon, remove_attribute(_, key)); } }; @@ -395,6 +448,16 @@ TEST_F(TestMockPoolReplayer, ConfigKeyOverride) { peer_spec.mon_host = "123"; peer_spec.key = "234"; + auto mock_default_namespace_replayer = new MockNamespaceReplayer(); + expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer, + false); + + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + auto mock_leader_watcher = new MockLeaderWatcher(); + expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher); + InSequence seq; auto& mock_cluster = get_mock_cluster(); @@ -413,35 +476,315 @@ TEST_F(TestMockPoolReplayer, ConfigKeyOverride) { expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx); expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0); + expect_namespace_replayer_init(*mock_default_namespace_replayer, 0); + expect_leader_watcher_init(*mock_leader_watcher, 0); + + MockServiceDaemon mock_service_daemon; + MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr, + m_local_io_ctx.get_id(), peer_spec, {}); + pool_replayer.init(); + + ASSERT_TRUE(remote_cct != nullptr); + ASSERT_EQ("123", remote_cct->_conf.get_val("mon_host")); + ASSERT_EQ("234", remote_cct->_conf.get_val("key")); + remote_cct->put(); + + expect_leader_watcher_shut_down(*mock_leader_watcher); + expect_namespace_replayer_shut_down(*mock_default_namespace_replayer); + + pool_replayer.shut_down(); +} + +TEST_F(TestMockPoolReplayer, AcquireReleaseLeader) { + PeerSpec peer_spec{"uuid", "cluster name", "client.name"}; + peer_spec.mon_host = "123"; + peer_spec.key = "234"; + + auto mock_default_namespace_replayer = new MockNamespaceReplayer(); + expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer, + false); + + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + auto mock_leader_watcher = new MockLeaderWatcher(); + expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher); + expect_leader_watcher_list_instances(*mock_leader_watcher); + + InSequence seq; - auto mock_instance_replayer = new MockInstanceReplayer(); - expect_instance_replayer_init(*mock_instance_replayer); - expect_instance_replayer_add_peer(*mock_instance_replayer, "uuid"); + auto& mock_cluster = get_mock_cluster(); + auto mock_local_rados_client = mock_cluster.do_create_rados_client( + g_ceph_context); + expect_connect(mock_cluster, mock_local_rados_client, "ceph", nullptr); - auto mock_instance_watcher = new MockInstanceWatcher(); - expect_instance_watcher_init(*mock_instance_watcher, 0); + auto mock_remote_rados_client = mock_cluster.do_create_rados_client( + g_ceph_context); + expect_connect(mock_cluster, mock_remote_rados_client, "cluster name", + nullptr); + + auto mock_local_io_ctx = mock_local_rados_client->do_create_ioctx( + m_local_io_ctx.get_id(), m_local_io_ctx.get_pool_name()); + expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx); + + expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0); + expect_namespace_replayer_init(*mock_default_namespace_replayer, 0); + expect_leader_watcher_init(*mock_leader_watcher, 0); MockServiceDaemon mock_service_daemon; - expect_service_daemon_add_or_update_instance_id_attribute( - *mock_instance_watcher, mock_service_daemon); + MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr, + m_local_io_ctx.get_id(), peer_spec, {}); + pool_replayer.init(); + + expect_service_daemon_add_or_update_attribute( + mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true); + expect_namespace_replayer_handle_acquire_leader( + *mock_default_namespace_replayer, 0); + + C_SaferCond on_acquire; + mock_leader_watcher->listener->post_acquire_handler(&on_acquire); + ASSERT_EQ(0, on_acquire.wait()); + + expect_service_daemon_remove_attribute(mock_service_daemon, + SERVICE_DAEMON_LEADER_KEY); + expect_namespace_replayer_handle_release_leader( + *mock_default_namespace_replayer, 0); + + C_SaferCond on_release; + mock_leader_watcher->listener->pre_release_handler(&on_release); + ASSERT_EQ(0, on_release.wait()); + + expect_leader_watcher_shut_down(*mock_leader_watcher); + expect_namespace_replayer_shut_down(*mock_default_namespace_replayer); + + pool_replayer.shut_down(); +} + +TEST_F(TestMockPoolReplayer, Namespaces) { + PeerSpec peer_spec{"uuid", "cluster name", "client.name"}; + peer_spec.mon_host = "123"; + peer_spec.key = "234"; + + g_ceph_context->_conf.set_val( + "rbd_mirror_pool_replayers_refresh_interval", "1"); + + MockNamespace mock_namespace; + + auto mock_default_namespace_replayer = new MockNamespaceReplayer(); + expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer, + false); + + auto mock_ns1_namespace_replayer = new MockNamespaceReplayer("ns1"); + expect_namespace_replayer_is_blacklisted(*mock_ns1_namespace_replayer, + false); + + auto mock_ns2_namespace_replayer = new MockNamespaceReplayer("ns2"); + expect_namespace_replayer_is_blacklisted(*mock_ns2_namespace_replayer, + false); + + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); auto mock_leader_watcher = new MockLeaderWatcher(); + expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher); + expect_leader_watcher_list_instances(*mock_leader_watcher); + + auto& mock_cluster = get_mock_cluster(); + auto mock_local_rados_client = mock_cluster.do_create_rados_client( + g_ceph_context); + auto mock_local_io_ctx = mock_local_rados_client->do_create_ioctx( + m_local_io_ctx.get_id(), m_local_io_ctx.get_pool_name()); + auto mock_remote_rados_client = mock_cluster.do_create_rados_client( + g_ceph_context); + + expect_mirror_mode_get(mock_local_io_ctx); + + InSequence seq; + + expect_connect(mock_cluster, mock_local_rados_client, "ceph", nullptr); + expect_connect(mock_cluster, mock_remote_rados_client, "cluster name", + nullptr); + expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx); + expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0); + expect_namespace_replayer_init(*mock_default_namespace_replayer, 0); expect_leader_watcher_init(*mock_leader_watcher, 0); + MockServiceDaemon mock_service_daemon; + MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr, + m_local_io_ctx.get_id(), peer_spec, {}); + pool_replayer.init(); + + C_SaferCond on_ns1_init; + expect_namespace_replayer_init(*mock_ns1_namespace_replayer, 0); + expect_namespace_replayer_handle_update_leader(*mock_ns1_namespace_replayer, + "", &on_ns1_init); + + mock_namespace.add("ns1"); + ASSERT_EQ(0, on_ns1_init.wait()); + + expect_service_daemon_add_or_update_attribute( + mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true); + expect_namespace_replayer_handle_acquire_leader( + *mock_default_namespace_replayer, 0); + expect_namespace_replayer_handle_acquire_leader( + *mock_ns1_namespace_replayer, 0); + + C_SaferCond on_acquire; + mock_leader_watcher->listener->post_acquire_handler(&on_acquire); + ASSERT_EQ(0, on_acquire.wait()); + + expect_namespace_replayer_init(*mock_ns2_namespace_replayer, 0); + C_SaferCond on_ns2_acquire; + expect_namespace_replayer_handle_acquire_leader( + *mock_ns2_namespace_replayer, 0, &on_ns2_acquire); + expect_namespace_replayer_handle_instances_added( + *mock_ns2_namespace_replayer); + + mock_namespace.add("ns2"); + ASSERT_EQ(0, on_ns2_acquire.wait()); + + C_SaferCond on_ns2_shut_down; + expect_namespace_replayer_shut_down(*mock_ns2_namespace_replayer, + &on_ns2_shut_down); + mock_namespace.remove("ns2"); + ASSERT_EQ(0, on_ns2_shut_down.wait()); + + expect_service_daemon_remove_attribute(mock_service_daemon, + SERVICE_DAEMON_LEADER_KEY); + expect_namespace_replayer_handle_release_leader( + *mock_default_namespace_replayer, 0); + expect_namespace_replayer_handle_release_leader( + *mock_ns1_namespace_replayer, 0); + + C_SaferCond on_release; + mock_leader_watcher->listener->pre_release_handler(&on_release); + ASSERT_EQ(0, on_release.wait()); + + expect_namespace_replayer_shut_down(*mock_ns1_namespace_replayer); + expect_leader_watcher_shut_down(*mock_leader_watcher); + expect_namespace_replayer_shut_down(*mock_default_namespace_replayer); + + pool_replayer.shut_down(); +} + +TEST_F(TestMockPoolReplayer, NamespacesError) { + PeerSpec peer_spec{"uuid", "cluster name", "client.name"}; + peer_spec.mon_host = "123"; + peer_spec.key = "234"; + + g_ceph_context->_conf.set_val( + "rbd_mirror_pool_replayers_refresh_interval", "1"); + + MockNamespace mock_namespace; + + auto mock_default_namespace_replayer = new MockNamespaceReplayer(); + expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer, + false); + auto mock_ns1_namespace_replayer = new MockNamespaceReplayer("ns1"); + auto mock_ns2_namespace_replayer = new MockNamespaceReplayer("ns2"); + expect_namespace_replayer_is_blacklisted(*mock_ns2_namespace_replayer, + false); + auto mock_ns3_namespace_replayer = new MockNamespaceReplayer("ns3"); + MockThreads mock_threads(m_threads); + expect_work_queue(mock_threads); + + auto mock_leader_watcher = new MockLeaderWatcher(); + expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher); + expect_leader_watcher_list_instances(*mock_leader_watcher); + + auto& mock_cluster = get_mock_cluster(); + auto mock_local_rados_client = mock_cluster.do_create_rados_client( + g_ceph_context); + auto mock_local_io_ctx = mock_local_rados_client->do_create_ioctx( + m_local_io_ctx.get_id(), m_local_io_ctx.get_pool_name()); + auto mock_remote_rados_client = mock_cluster.do_create_rados_client( + g_ceph_context); + + expect_mirror_mode_get(mock_local_io_ctx); + + InSequence seq; + + expect_connect(mock_cluster, mock_local_rados_client, "ceph", nullptr); + expect_connect(mock_cluster, mock_remote_rados_client, "cluster name", + nullptr); + expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx); + expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0); + expect_namespace_replayer_init(*mock_default_namespace_replayer, 0); + expect_leader_watcher_init(*mock_leader_watcher, 0); + + MockServiceDaemon mock_service_daemon; MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr, m_local_io_ctx.get_id(), peer_spec, {}); pool_replayer.init(); - ASSERT_TRUE(remote_cct != nullptr); - ASSERT_EQ("123", remote_cct->_conf.get_val("mon_host")); - ASSERT_EQ("234", remote_cct->_conf.get_val("key")); - remote_cct->put(); + // test namespace replayer init fails for non leader + + C_SaferCond on_ns1_init; + auto ctx = new FunctionContext( + [&mock_namespace, &on_ns1_init](int r) { + mock_namespace.remove("ns1"); + on_ns1_init.complete(r); + }); + expect_namespace_replayer_init(*mock_ns1_namespace_replayer, -EINVAL, ctx); + mock_namespace.add("ns1"); + ASSERT_EQ(-EINVAL, on_ns1_init.wait()); + + // test acquire leader fails when default namespace replayer fails + + expect_service_daemon_add_or_update_attribute( + mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true); + expect_namespace_replayer_handle_acquire_leader( + *mock_default_namespace_replayer, -EINVAL); + + C_SaferCond on_acquire1; + mock_leader_watcher->listener->post_acquire_handler(&on_acquire1); + ASSERT_EQ(-EINVAL, on_acquire1.wait()); + + // test acquire leader succeeds when non-default namespace replayer fails + + C_SaferCond on_ns2_init; + expect_namespace_replayer_init(*mock_ns2_namespace_replayer, 0); + expect_namespace_replayer_handle_update_leader(*mock_ns2_namespace_replayer, + "", &on_ns2_init); + mock_namespace.add("ns2"); + ASSERT_EQ(0, on_ns2_init.wait()); + + expect_service_daemon_add_or_update_attribute( + mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true); + expect_namespace_replayer_handle_acquire_leader( + *mock_default_namespace_replayer, 0); + + expect_namespace_replayer_handle_acquire_leader(*mock_ns2_namespace_replayer, + -EINVAL); + ctx = new FunctionContext( + [&mock_namespace](int) { + mock_namespace.remove("ns2"); + }); + expect_namespace_replayer_shut_down(*mock_ns2_namespace_replayer, ctx); + mock_namespace.add("ns2"); + + C_SaferCond on_acquire2; + mock_leader_watcher->listener->post_acquire_handler(&on_acquire2); + ASSERT_EQ(0, on_acquire2.wait()); + + // test namespace replayer init fails on acquire leader + + C_SaferCond on_ns3_shut_down; + ctx = new FunctionContext( + [&mock_namespace, &on_ns3_shut_down](int) { + mock_namespace.remove("ns3"); + on_ns3_shut_down.complete(0); + }); + expect_namespace_replayer_init(*mock_ns3_namespace_replayer, 0); + expect_namespace_replayer_handle_acquire_leader(*mock_ns3_namespace_replayer, + -EINVAL); + expect_namespace_replayer_shut_down(*mock_ns3_namespace_replayer, ctx); + mock_namespace.add("ns3"); + ASSERT_EQ(0, on_ns3_shut_down.wait()); - expect_instance_replayer_stop(*mock_instance_replayer); expect_leader_watcher_shut_down(*mock_leader_watcher); - expect_instance_watcher_shut_down(*mock_instance_watcher); - expect_instance_replayer_shut_down(*mock_instance_replayer); + expect_namespace_replayer_shut_down(*mock_default_namespace_replayer); pool_replayer.shut_down(); } diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 8c4156bcb713c..3c0088fdf103d 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -16,6 +16,7 @@ set(rbd_mirror_internal LeaderWatcher.cc Mirror.cc MirrorStatusWatcher.cc + NamespaceReplayer.cc PoolReplayer.cc PoolWatcher.cc ServiceDaemon.cc diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index f803834eb1987..16cc6208a4cbc 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -258,19 +258,17 @@ void ImageReplayer::RemoteJournalerListener::handle_update( template ImageReplayer::ImageReplayer( - Threads *threads, InstanceWatcher *instance_watcher, - journal::CacheManagerHandler *cache_manager_handler, RadosRef local, - const std::string &local_mirror_uuid, int64_t local_pool_id, - const std::string &global_image_id) : - m_threads(threads), + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + const std::string &global_image_id, Threads *threads, + InstanceWatcher *instance_watcher, + journal::CacheManagerHandler *cache_manager_handler) : + m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid), + m_global_image_id(global_image_id), m_threads(threads), m_instance_watcher(instance_watcher), m_cache_manager_handler(cache_manager_handler), - m_local(local), - m_local_mirror_uuid(local_mirror_uuid), - m_local_pool_id(local_pool_id), - m_global_image_id(global_image_id), m_local_image_name(global_image_id), - m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " + - global_image_id)), + m_local_image_name(global_image_id), + m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " + + stringify(local_io_ctx.get_id()) + " " + global_image_id)), m_progress_cxt(this), m_journal_listener(new JournalListener(this)), m_remote_listener(this) @@ -279,15 +277,7 @@ ImageReplayer::ImageReplayer( // name. When the image name becomes known on start the asok commands will be // re-registered using "remote_pool_name/remote_image_name" name. - std::string pool_name; - int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name); - if (r < 0) { - derr << "error resolving local pool " << m_local_pool_id - << ": " << cpp_strerror(r) << dendl; - pool_name = stringify(m_local_pool_id); - } - - m_name = pool_name + "/" + m_global_image_id; + m_name = admin_socket_hook_name(global_image_id); register_admin_socket_hook(); } @@ -380,17 +370,6 @@ void ImageReplayer::start(Context *on_finish, bool manual) return; } - m_local_ioctx.reset(new librados::IoCtx{}); - r = m_local->ioctx_create2(m_local_pool_id, *m_local_ioctx); - if (r < 0) { - m_local_ioctx.reset(); - - derr << "error opening ioctx for local pool " << m_local_pool_id - << ": " << cpp_strerror(r) << dendl; - on_start_fail(r, "error opening local pool"); - return; - } - prepare_local_image(); } @@ -402,7 +381,7 @@ void ImageReplayer::prepare_local_image() { Context *ctx = create_context_callback< ImageReplayer, &ImageReplayer::handle_prepare_local_image>(this); auto req = PrepareLocalImageRequest::create( - *m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name, + m_local_io_ctx, m_global_image_id, &m_local_image_id, &m_local_image_name, &m_local_image_tag_owner, m_threads->work_queue, ctx); req->send(); } @@ -437,7 +416,7 @@ void ImageReplayer::prepare_remote_image() { ceph_assert(!m_peers.empty()); m_remote_image = {*m_peers.begin()}; - auto cct = static_cast(m_local->cct()); + auto cct = static_cast(m_local_io_ctx.cct()); journal::Settings journal_settings; journal_settings.commit_interval = cct->_conf.get_val( "rbd_mirror_journal_commit_age"); @@ -508,7 +487,7 @@ void ImageReplayer::bootstrap() { auto ctx = create_context_callback< ImageReplayer, &ImageReplayer::handle_bootstrap>(this); request = BootstrapRequest::create( - m_threads, *m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher, + m_threads, m_local_io_ctx, m_remote_image.io_ctx, m_instance_watcher, &m_local_image_ctx, m_local_image_id, m_remote_image.image_id, m_global_image_id, m_local_mirror_uuid, m_remote_image.mirror_uuid, m_remote_journaler, &m_client_state, &m_client_meta, ctx, @@ -667,7 +646,7 @@ void ImageReplayer::handle_start_replay(int r) { } { - CephContext *cct = static_cast(m_local->cct()); + CephContext *cct = static_cast(m_local_io_ctx.cct()); double poll_seconds = cct->_conf.get_val( "rbd_mirror_journal_poll_age"); @@ -702,9 +681,7 @@ void ImageReplayer::on_start_fail(int r, const std::string &desc) } set_state_description(r, desc); - if (m_local_ioctx) { - update_mirror_image_status(false, boost::none); - } + update_mirror_image_status(false, boost::none); reschedule_update_status_task(-1); shut_down(r); }); @@ -1474,10 +1451,9 @@ void ImageReplayer::send_mirror_status_update(const OptionalState &opt_state) librados::ObjectWriteOperation op; librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status); - ceph_assert(m_local_ioctx); librados::AioCompletion *aio_comp = create_rados_callback< ImageReplayer, &ImageReplayer::handle_mirror_status_update>(this); - int r = m_local_ioctx->aio_operate(RBD_MIRRORING, aio_comp, &op); + int r = m_local_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op); ceph_assert(r == 0); aio_comp->release(); } @@ -1597,9 +1573,7 @@ void ImageReplayer::shut_down(int r) { // chain the shut down sequence (reverse order) Context *ctx = new FunctionContext( [this, r](int _r) { - if (m_local_ioctx) { - update_mirror_image_status(true, STATE_STOPPED); - } + update_mirror_image_status(true, STATE_STOPPED); handle_shut_down(r); }); @@ -1731,7 +1705,7 @@ void ImageReplayer::handle_shut_down(int r) { auto ctx = new FunctionContext([this, r](int) { handle_shut_down(r); }); - ImageDeleter::trash_move(*m_local_ioctx, m_global_image_id, + ImageDeleter::trash_move(m_local_io_ctx, m_global_image_id, resync_requested, m_threads->work_queue, ctx); return; } @@ -1832,7 +1806,7 @@ void ImageReplayer::register_admin_socket_hook() { if (r == 0) { m_asok_hook = asok_hook; - CephContext *cct = static_cast(m_local->cct()); + CephContext *cct = static_cast(m_local_io_ctx.cct()); auto prio = cct->_conf.get_val("rbd_mirror_perf_stats_prio"); PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_" + m_name, l_rbd_mirror_first, l_rbd_mirror_last); @@ -1873,7 +1847,8 @@ template void ImageReplayer::reregister_admin_socket_hook() { { std::lock_guard locker{m_lock}; - auto name = m_local_ioctx->get_pool_name() + "/" + m_local_image_name; + + auto name = admin_socket_hook_name(m_local_image_name); if (m_asok_hook != nullptr && m_name == name) { return; } @@ -1883,6 +1858,17 @@ void ImageReplayer::reregister_admin_socket_hook() { register_admin_socket_hook(); } +template +std::string ImageReplayer::admin_socket_hook_name( + const std::string &image_name) const { + std::string name = m_local_io_ctx.get_namespace(); + if (!name.empty()) { + name += "/"; + } + + return m_local_io_ctx.get_pool_name() + "/" + name + image_name; +} + template std::ostream &operator<<(std::ostream &os, const ImageReplayer &replayer) { diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 77f3bb2b84bc9..1c1a442bdbb01 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -64,23 +64,23 @@ template class ImageReplayer { public: static ImageReplayer *create( - Threads *threads, InstanceWatcher *instance_watcher, - journal::CacheManagerHandler *cache_manager_handler, RadosRef local, - const std::string &local_mirror_uuid, int64_t local_pool_id, - const std::string &global_image_id) { - return new ImageReplayer(threads, instance_watcher, cache_manager_handler, - local, local_mirror_uuid, local_pool_id, - global_image_id); + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + const std::string &global_image_id, Threads *threads, + InstanceWatcher *instance_watcher, + journal::CacheManagerHandler *cache_manager_handler) { + return new ImageReplayer(local_io_ctx, local_mirror_uuid, global_image_id, + threads, instance_watcher, cache_manager_handler); } void destroy() { delete this; } - ImageReplayer(Threads *threads, + ImageReplayer(librados::IoCtx &local_io_ctx, + const std::string &local_mirror_uuid, + const std::string &global_image_id, + Threads *threads, InstanceWatcher *instance_watcher, - journal::CacheManagerHandler *cache_manager_handler, - RadosRef local, const std::string &local_mirror_uuid, - int64_t local_pool_id, const std::string &global_image_id); + journal::CacheManagerHandler *cache_manager_handler); virtual ~ImageReplayer(); ImageReplayer(const ImageReplayer&) = delete; ImageReplayer& operator=(const ImageReplayer&) = delete; @@ -112,7 +112,7 @@ public: void add_peer(const std::string &peer_uuid, librados::IoCtx &remote_io_ctx); inline int64_t get_local_pool_id() const { - return m_local_pool_id; + return m_local_io_ctx.get_id(); } inline const std::string& get_global_image_id() const { return m_global_image_id; @@ -270,6 +270,9 @@ private: ImageReplayer *replayer; }; + librados::IoCtx &m_local_io_ctx; + std::string m_local_mirror_uuid; + std::string m_global_image_id; Threads *m_threads; InstanceWatcher *m_instance_watcher; journal::CacheManagerHandler *m_cache_manager_handler; @@ -277,11 +280,7 @@ private: Peers m_peers; RemoteImage m_remote_image; - RadosRef m_local; - std::string m_local_mirror_uuid; - int64_t m_local_pool_id; std::string m_local_image_id; - std::string m_global_image_id; std::string m_local_image_name; std::string m_name; @@ -302,7 +301,6 @@ private: image_replayer::EventPreprocessor *m_event_preprocessor = nullptr; image_replayer::ReplayStatusFormatter *m_replay_status_formatter = nullptr; - IoCtxRef m_local_ioctx; ImageCtxT *m_local_image_ctx = nullptr; std::string m_local_image_tag_owner; @@ -434,6 +432,8 @@ private: void register_admin_socket_hook(); void unregister_admin_socket_hook(); void reregister_admin_socket_hook(); + + std::string admin_socket_hook_name(const std::string &image_name) const; }; } // namespace mirror diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc index ccbb68790b6b3..f2d0be97df0d1 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.cc +++ b/src/tools/rbd_mirror/ImageSyncThrottler.cc @@ -49,7 +49,11 @@ ImageSyncThrottler::~ImageSyncThrottler() { } template -void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { +void ImageSyncThrottler::start_op(const std::string &ns, + const std::string &id_, + Context *on_start) { + Id id{ns, id_}; + dout(20) << "id=" << id << dendl; int r = 0; @@ -82,7 +86,10 @@ void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { } template -bool ImageSyncThrottler::cancel_op(const std::string &id) { +bool ImageSyncThrottler::cancel_op(const std::string &ns, + const std::string &id_) { + Id id{ns, id_}; + dout(20) << "id=" << id << dendl; Context *on_start = nullptr; @@ -106,10 +113,13 @@ bool ImageSyncThrottler::cancel_op(const std::string &id) { } template -void ImageSyncThrottler::finish_op(const std::string &id) { +void ImageSyncThrottler::finish_op(const std::string &ns, + const std::string &id_) { + Id id{ns, id_}; + dout(20) << "id=" << id << dendl; - if (cancel_op(id)) { + if (cancel_op(ns, id_)) { return; } @@ -139,18 +149,33 @@ void ImageSyncThrottler::finish_op(const std::string &id) { } template -void ImageSyncThrottler::drain(int r) { - dout(20) << dendl; +void ImageSyncThrottler::drain(const std::string &ns, int r) { + dout(20) << "ns=" << ns << dendl; - std::map queued_ops; + std::map queued_ops; { std::lock_guard locker{m_lock}; - std::swap(m_queued_ops, queued_ops); - m_queue.clear(); - m_inflight_ops.clear(); + for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) { + if (it->first.first == ns) { + queued_ops[it->first] = it->second; + m_queue.remove(it->first); + it = m_queued_ops.erase(it); + } else { + it++; + } + } + for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) { + if (it->first == ns) { + dout(20) << "inflight_op " << *it << dendl; + it = m_inflight_ops.erase(it); + } else { + it++; + } + } } for (auto &it : queued_ops) { + dout(20) << "queued_op " << it.first << dendl; it.second->complete(r); } } diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h index 797c67cc817f3..278b7c3061f20 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.h +++ b/src/tools/rbd_mirror/ImageSyncThrottler.h @@ -37,20 +37,23 @@ public: ~ImageSyncThrottler() override; void set_max_concurrent_syncs(uint32_t max); - void start_op(const std::string &id, Context *on_start); - bool cancel_op(const std::string &id); - void finish_op(const std::string &id); - void drain(int r); + void start_op(const std::string &ns, const std::string &id, + Context *on_start); + bool cancel_op(const std::string &ns, const std::string &id); + void finish_op(const std::string &ns, const std::string &id); + void drain(const std::string &ns, int r); void print_status(ceph::Formatter *f, std::stringstream *ss); private: + typedef std::pair Id; + CephContext *m_cct; ceph::mutex m_lock; uint32_t m_max_concurrent_syncs; - std::list m_queue; - std::map m_queued_ops; - std::set m_inflight_ops; + std::list m_queue; + std::map m_queued_ops; + std::set m_inflight_ops; const char **get_tracked_conf_keys() const override; void handle_conf_change(const ConfigProxy& conf, diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc index 039efb0c530e8..88f048006474c 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.cc +++ b/src/tools/rbd_mirror/InstanceReplayer.cc @@ -34,14 +34,14 @@ using librbd::util::create_context_callback; template InstanceReplayer::InstanceReplayer( + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, Threads *threads, ServiceDaemon* service_daemon, - journal::CacheManagerHandler *cache_manager_handler, RadosRef local_rados, - const std::string &local_mirror_uuid, int64_t local_pool_id) - : m_threads(threads), m_service_daemon(service_daemon), - m_cache_manager_handler(cache_manager_handler), m_local_rados(local_rados), - m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), - m_lock(ceph::make_mutex( - "rbd::mirror::InstanceReplayer " + stringify(local_pool_id))) { + journal::CacheManagerHandler *cache_manager_handler) + : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid), + m_threads(threads), m_service_daemon(service_daemon), + m_cache_manager_handler(cache_manager_handler), + m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " + + stringify(local_io_ctx.get_id()))) { } template @@ -144,8 +144,8 @@ void InstanceReplayer::acquire_image(InstanceWatcher *instance_watcher, auto it = m_image_replayers.find(global_image_id); if (it == m_image_replayers.end()) { auto image_replayer = ImageReplayer::create( - m_threads, instance_watcher, m_cache_manager_handler, m_local_rados, - m_local_mirror_uuid, m_local_pool_id, global_image_id); + m_local_io_ctx, m_local_mirror_uuid, global_image_id, + m_threads, instance_watcher, m_cache_manager_handler); dout(10) << global_image_id << ": creating replayer " << image_replayer << dendl; @@ -266,6 +266,27 @@ void InstanceReplayer::stop() } } +template +void InstanceReplayer::stop(Context *on_finish) +{ + dout(10) << dendl; + + auto cct = static_cast(m_local_io_ctx.cct()); + auto gather_ctx = new C_Gather(cct, on_finish); + { + std::lock_guard locker{m_lock}; + + m_manual_stop = true; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->stop(gather_ctx->new_sub(), true); + } + } + + gather_ctx->activate(); +} + template void InstanceReplayer::restart() { @@ -359,12 +380,15 @@ void InstanceReplayer::start_image_replayers(int r) { start_image_replayer(current_it->second); } - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); + // TODO: add namespace support to service daemon + if (m_local_io_ctx.get_namespace().empty()) { + m_service_daemon->add_or_update_attribute( + m_local_io_ctx.get_id(), SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); + m_service_daemon->add_or_update_attribute( + m_local_io_ctx.get_id(), SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); + m_service_daemon->add_or_update_attribute( + m_local_io_ctx.get_id(), SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); + } m_async_op_tracker.finish_op(); } @@ -490,7 +514,7 @@ void InstanceReplayer::schedule_image_state_check_task() { queue_start_image_replayers(); }); - auto cct = static_cast(m_local_rados->cct()); + auto cct = static_cast(m_local_io_ctx.cct()); int after = cct->_conf.get_val( "rbd_mirror_image_state_check_interval"); diff --git a/src/tools/rbd_mirror/InstanceReplayer.h b/src/tools/rbd_mirror/InstanceReplayer.h index 5aa7fcb058089..a8f8e8ea42f9f 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.h +++ b/src/tools/rbd_mirror/InstanceReplayer.h @@ -28,23 +28,21 @@ template class InstanceReplayer { public: static InstanceReplayer* create( - Threads *threads, - ServiceDaemon *service_daemon, - journal::CacheManagerHandler *cache_manager_handler, - RadosRef local_rados, const std::string &local_mirror_uuid, - int64_t local_pool_id) { - return new InstanceReplayer(threads, service_daemon, cache_manager_handler, - local_rados, local_mirror_uuid, local_pool_id); + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + Threads *threads, ServiceDaemon *service_daemon, + journal::CacheManagerHandler *cache_manager_handler) { + return new InstanceReplayer(local_io_ctx, local_mirror_uuid, threads, + service_daemon, cache_manager_handler); } void destroy() { delete this; } - InstanceReplayer(Threads *threads, + InstanceReplayer(librados::IoCtx &local_io_ctx, + const std::string &local_mirror_uuid, + Threads *threads, ServiceDaemon *service_daemon, - journal::CacheManagerHandler *cache_manager_handler, - RadosRef local_rados, const std::string &local_mirror_uuid, - int64_t local_pool_id); + journal::CacheManagerHandler *cache_manager_handler); ~InstanceReplayer(); int init(); @@ -70,6 +68,8 @@ public: void restart(); void flush(); + void stop(Context *on_finish); + private: /** * @verbatim @@ -85,12 +85,11 @@ private: * @endverbatim */ + librados::IoCtx &m_local_io_ctx; + std::string m_local_mirror_uuid; Threads *m_threads; ServiceDaemon *m_service_daemon; journal::CacheManagerHandler *m_cache_manager_handler; - RadosRef m_local_rados; - std::string m_local_mirror_uuid; - int64_t m_local_pool_id; ceph::mutex m_lock; AsyncOpTracker m_async_op_tracker; diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index 9f8b1d7c3f0bf..f29a8d48abc1b 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -59,7 +59,7 @@ struct C_RemoveInstanceRequest : public Context { C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue, const std::string &instance_id, Context *on_finish) - : instance_watcher(io_ctx, work_queue, nullptr, instance_id), + : instance_watcher(io_ctx, work_queue, nullptr, nullptr, instance_id), on_finish(on_finish) { } @@ -312,8 +312,10 @@ void InstanceWatcher::remove_instance(librados::IoCtx &io_ctx, template InstanceWatcher *InstanceWatcher::create( librados::IoCtx &io_ctx, ContextWQ *work_queue, - InstanceReplayer *instance_replayer) { + InstanceReplayer *instance_replayer, + ImageSyncThrottler *image_sync_throttler) { return new InstanceWatcher(io_ctx, work_queue, instance_replayer, + image_sync_throttler, stringify(io_ctx.get_instance_id())); } @@ -321,9 +323,11 @@ template InstanceWatcher::InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, InstanceReplayer *instance_replayer, + ImageSyncThrottler *image_sync_throttler, const std::string &instance_id) : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id), - m_instance_replayer(instance_replayer), m_instance_id(instance_id), + m_instance_replayer(instance_replayer), + m_image_sync_throttler(image_sync_throttler), m_instance_id(instance_id), m_lock(ceph::make_mutex( unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this))), m_instance_lock(librbd::ManagedLock::create( @@ -338,7 +342,6 @@ InstanceWatcher::~InstanceWatcher() { ceph_assert(m_notify_op_tracker.empty()); ceph_assert(m_suspended_ops.empty()); ceph_assert(m_inflight_sync_reqs.empty()); - ceph_assert(m_image_sync_throttler == nullptr); m_instance_lock->destroy(); } @@ -514,8 +517,8 @@ void InstanceWatcher::notify_sync_start(const std::string &instance_id, [this, sync_id] (int r) { dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl; std::lock_guard locker{m_lock}; - if (r != -ESTALE && m_image_sync_throttler != nullptr) { - m_image_sync_throttler->finish_op(sync_id); + if (r != -ESTALE && is_leader()) { + m_image_sync_throttler->finish_op(m_ioctx.get_namespace(), sync_id); } }); auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, @@ -581,25 +584,12 @@ void InstanceWatcher::handle_notify_sync_complete(C_SyncRequest *sync_ctx, } } -template -void InstanceWatcher::print_sync_status(Formatter *f, stringstream *ss) { - dout(10) << dendl; - - std::lock_guard locker{m_lock}; - if (m_image_sync_throttler != nullptr) { - m_image_sync_throttler->print_status(f, ss); - } -} - template void InstanceWatcher::handle_acquire_leader() { dout(10) << dendl; std::lock_guard locker{m_lock}; - ceph_assert(m_image_sync_throttler == nullptr); - m_image_sync_throttler = ImageSyncThrottler::create(m_cct); - m_leader_instance_id = m_instance_id; unsuspend_notify_requests(); } @@ -610,13 +600,9 @@ void InstanceWatcher::handle_release_leader() { std::lock_guard locker{m_lock}; - ceph_assert(m_image_sync_throttler != nullptr); - m_leader_instance_id.clear(); - m_image_sync_throttler->drain(-ESTALE); - m_image_sync_throttler->destroy(); - m_image_sync_throttler = nullptr; + m_image_sync_throttler->drain(m_ioctx.get_namespace(), -ESTALE); } template @@ -1163,7 +1149,7 @@ void InstanceWatcher::handle_sync_request(const std::string &instance_id, std::lock_guard locker{m_lock}; - if (m_image_sync_throttler == nullptr) { + if (!is_leader()) { dout(10) << "sync request for non-leader" << dendl; m_work_queue->queue(on_finish, -ESTALE); return; @@ -1182,7 +1168,7 @@ void InstanceWatcher::handle_sync_request(const std::string &instance_id, } on_finish->complete(r); })); - m_image_sync_throttler->start_op(sync_id, on_start); + m_image_sync_throttler->start_op(m_ioctx.get_namespace(), sync_id, on_start); } template diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h index ec3b56affeb3c..5e077921fa2bc 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.h +++ b/src/tools/rbd_mirror/InstanceWatcher.h @@ -43,13 +43,15 @@ public: static InstanceWatcher *create( librados::IoCtx &io_ctx, ContextWQ *work_queue, - InstanceReplayer *instance_replayer); + InstanceReplayer *instance_replayer, + ImageSyncThrottler *image_sync_throttler); void destroy() { delete this; } InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, InstanceReplayer *instance_replayer, + ImageSyncThrottler *image_sync_throttler, const std::string &instance_id); ~InstanceWatcher() override; @@ -79,8 +81,6 @@ public: bool cancel_sync_request(const std::string &sync_id); void notify_sync_complete(const std::string &sync_id); - void print_sync_status(Formatter *f, stringstream *ss); - void cancel_notify_requests(const std::string &instance_id); void handle_acquire_leader(); @@ -157,6 +157,7 @@ private: Threads *m_threads; InstanceReplayer *m_instance_replayer; + ImageSyncThrottler *m_image_sync_throttler; std::string m_instance_id; mutable ceph::mutex m_lock; @@ -171,7 +172,10 @@ private: std::set m_requests; std::set m_suspended_ops; std::map m_inflight_sync_reqs; - ImageSyncThrottler *m_image_sync_throttler = nullptr; + + inline bool is_leader() const { + return m_leader_instance_id == m_instance_id; + } void register_instance(); void handle_register_instance(int r); diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index 801f296ad2eac..4f35e1c0aebbd 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -42,7 +42,6 @@ LeaderWatcher::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, template LeaderWatcher::~LeaderWatcher() { - ceph_assert(m_status_watcher == nullptr); ceph_assert(m_instances == nullptr); ceph_assert(m_timer_task == nullptr); @@ -128,16 +127,19 @@ void LeaderWatcher::handle_register_watch(int r) { dout(10) << "r=" << r << dendl; Context *on_finish = nullptr; - if (r < 0) { + { + std::lock_guard timer_locker(m_threads->timer_lock); std::lock_guard locker{m_lock}; - derr << "error registering leader watcher for " << m_oid << " object: " - << cpp_strerror(r) << dendl; + + if (r < 0) { + derr << "error registering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } else { + schedule_acquire_leader_lock(0); + } + ceph_assert(m_on_finish != nullptr); std::swap(on_finish, m_on_finish); - } else { - std::lock_guard locker{m_lock}; - init_status_watcher(); - return; } on_finish->complete(r); @@ -186,7 +188,7 @@ void LeaderWatcher::handle_shut_down_leader_lock(int r) { derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl; } - shut_down_status_watcher(); + unregister_watch(); } template @@ -684,73 +686,6 @@ void LeaderWatcher::handle_release_leader_lock(int r) { schedule_acquire_leader_lock(1); } -template -void LeaderWatcher::init_status_watcher() { - dout(10) << dendl; - - ceph_assert(ceph_mutex_is_locked(m_lock)); - ceph_assert(m_status_watcher == nullptr); - - m_status_watcher = MirrorStatusWatcher::create(m_ioctx, m_work_queue); - - Context *ctx = create_context_callback< - LeaderWatcher, &LeaderWatcher::handle_init_status_watcher>(this); - - m_status_watcher->init(ctx); -} - -template -void LeaderWatcher::handle_init_status_watcher(int r) { - dout(10) << "r=" << r << dendl; - - Context *on_finish = nullptr; - { - std::scoped_lock locker{m_threads->timer_lock, m_lock}; - - if (r < 0) { - derr << "error initializing mirror status watcher: " << cpp_strerror(r) - << cpp_strerror(r) << dendl; - } else { - schedule_acquire_leader_lock(0); - } - - ceph_assert(m_on_finish != nullptr); - std::swap(on_finish, m_on_finish); - } - - on_finish->complete(r); -} - -template -void LeaderWatcher::shut_down_status_watcher() { - dout(10) << dendl; - - ceph_assert(ceph_mutex_is_locked(m_lock)); - ceph_assert(m_status_watcher != nullptr); - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback, - &LeaderWatcher::handle_shut_down_status_watcher>(this)); - - m_status_watcher->shut_down(ctx); -} - -template -void LeaderWatcher::handle_shut_down_status_watcher(int r) { - dout(10) << "r=" << r << dendl; - - std::lock_guard locker{m_lock}; - m_status_watcher->destroy(); - m_status_watcher = nullptr; - - if (r < 0) { - derr << "error shutting mirror status watcher down: " << cpp_strerror(r) - << dendl; - } - - unregister_watch(); -} - template void LeaderWatcher::init_instances() { dout(10) << dendl; diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h index 9d8e85f9e8a62..67c971dbcfb5d 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.h +++ b/src/tools/rbd_mirror/LeaderWatcher.h @@ -14,7 +14,6 @@ #include "librbd/managed_lock/Types.h" #include "librbd/watcher/Types.h" #include "Instances.h" -#include "MirrorStatusWatcher.h" #include "tools/rbd_mirror/instances/Types.h" #include "tools/rbd_mirror/leader_watcher/Types.h" @@ -63,11 +62,8 @@ private: * CREATE_OBJECT * * * * * (error) UNREGISTER_WATCH * | * ^ * v * | - * REGISTER_WATCH * * * * * SHUT_DOWN_STATUS_WATCHER - * | * ^ - * v * | - * INIT_STATUS_WATCHER * * SHUT_DOWN_LEADER_LOCK - * | | + * REGISTER_WATCH * * * * * SHUT_DOWN_LEADER_LOCK + * | ^ * | (no leader heartbeat and acquire failed) | * | BREAK_LOCK <-------------------------------------\ | * | | (no leader heartbeat) | | (shut down) @@ -216,7 +212,6 @@ private: Context *m_on_shut_down_finish = nullptr; uint64_t m_acquire_attempts = 0; int m_ret_val = 0; - MirrorStatusWatcher *m_status_watcher = nullptr; Instances *m_instances = nullptr; librbd::managed_lock::Locker m_locker; @@ -264,12 +259,6 @@ private: void release_leader_lock(); void handle_release_leader_lock(int r); - void init_status_watcher(); - void handle_init_status_watcher(int r); - - void shut_down_status_watcher(); - void handle_shut_down_status_watcher(int r); - void init_instances(); void handle_init_instances(int r); diff --git a/src/tools/rbd_mirror/NamespaceReplayer.cc b/src/tools/rbd_mirror/NamespaceReplayer.cc new file mode 100644 index 0000000000000..6428bfe11e8f2 --- /dev/null +++ b/src/tools/rbd_mirror/NamespaceReplayer.cc @@ -0,0 +1,781 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "NamespaceReplayer.h" +#include +#include "common/Formatter.h" +#include "common/debug.h" +#include "common/errno.h" +#include "cls/rbd/cls_rbd_client.h" +#include "librbd/Utils.h" +#include "librbd/api/Config.h" +#include "librbd/api/Mirror.h" +#include "ServiceDaemon.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::NamespaceReplayer: " \ + << this << " " << __func__ << ": " + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +namespace rbd { +namespace mirror { + +using ::operator<<; + +namespace { + +const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id"); +const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count"); +const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count"); + +} // anonymous namespace + +template +NamespaceReplayer::NamespaceReplayer( + const std::string &name, + librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx, + const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid, + Threads *threads, ImageSyncThrottler *image_sync_throttler, + ServiceDaemon *service_daemon, + journal::CacheManagerHandler *cache_manager_handler) : + m_local_mirror_uuid(local_mirror_uuid), + m_remote_mirror_uuid(remote_mirror_uuid), + m_threads(threads), m_image_sync_throttler(image_sync_throttler), + m_service_daemon(service_daemon), + m_cache_manager_handler(cache_manager_handler), + m_lock(ceph::make_mutex(librbd::util::unique_lock_name( + "rbd::mirror::NamespaceReplayer " + name, this))), + m_local_pool_watcher_listener(this, true), + m_remote_pool_watcher_listener(this, false), + m_image_map_listener(this) { + dout(10) << name << dendl; + + m_local_io_ctx.dup(local_io_ctx); + m_local_io_ctx.set_namespace(name); + m_remote_io_ctx.dup(remote_io_ctx); + m_remote_io_ctx.set_namespace(name); +} + +template +bool NamespaceReplayer::is_blacklisted() const { + std::lock_guard locker{m_lock}; + return (m_local_pool_watcher && + m_local_pool_watcher->is_blacklisted()) || + (m_remote_pool_watcher && + m_remote_pool_watcher->is_blacklisted()); +} + +template +void NamespaceReplayer::init(Context *on_finish) { + dout(20) << dendl; + + std::lock_guard locker{m_lock}; + + ceph_assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + init_status_watcher(); +} + + +template +void NamespaceReplayer::shut_down(Context *on_finish) { + dout(20) << dendl; + + { + std::lock_guard locker{m_lock}; + + ceph_assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + if (!m_image_map) { + stop_instance_replayer(); + return; + } + } + + auto ctx = new FunctionContext( + [this] (int r) { + std::lock_guard locker{m_lock}; + stop_instance_replayer(); + }); + handle_release_leader(ctx); +} + +template +void NamespaceReplayer::print_status(Formatter *f, stringstream *ss) +{ + dout(20) << dendl; + + ceph_assert(f); + + std::lock_guard locker{m_lock}; + + m_instance_replayer->print_status(f, ss); + + if (m_image_deleter) { + f->open_object_section("image_deleter"); + m_image_deleter->print_status(f, ss); + f->close_section(); + } +} + +template +void NamespaceReplayer::start() +{ + dout(20) << dendl; + + std::lock_guard locker{m_lock}; + + m_instance_replayer->start(); +} + +template +void NamespaceReplayer::stop() +{ + dout(20) << dendl; + + std::lock_guard locker{m_lock}; + + m_instance_replayer->stop(); +} + +template +void NamespaceReplayer::restart() +{ + dout(20) << dendl; + + std::lock_guard locker{m_lock}; + + m_instance_replayer->restart(); +} + +template +void NamespaceReplayer::flush() +{ + dout(20) << dendl; + + std::lock_guard locker{m_lock}; + + m_instance_replayer->flush(); +} + +template +void NamespaceReplayer::handle_update(const std::string &mirror_uuid, + ImageIds &&added_image_ids, + ImageIds &&removed_image_ids) { + std::lock_guard locker{m_lock}; + + if (!m_image_map) { + dout(20) << "not leader" << dendl; + return; + } + + dout(10) << "mirror_uuid=" << mirror_uuid << ", " + << "added_count=" << added_image_ids.size() << ", " + << "removed_count=" << removed_image_ids.size() << dendl; + + // TODO: add namespace support to service daemon + if (m_local_io_ctx.get_namespace().empty()) { + m_service_daemon->add_or_update_attribute( + m_local_io_ctx.get_id(), SERVICE_DAEMON_LOCAL_COUNT_KEY, + m_local_pool_watcher->get_image_count()); + if (m_remote_pool_watcher) { + m_service_daemon->add_or_update_attribute( + m_local_io_ctx.get_id(), SERVICE_DAEMON_REMOTE_COUNT_KEY, + m_remote_pool_watcher->get_image_count()); + } + } + + std::set added_global_image_ids; + for (auto& image_id : added_image_ids) { + added_global_image_ids.insert(image_id.global_id); + } + + std::set removed_global_image_ids; + for (auto& image_id : removed_image_ids) { + removed_global_image_ids.insert(image_id.global_id); + } + + m_image_map->update_images(mirror_uuid, + std::move(added_global_image_ids), + std::move(removed_global_image_ids)); +} + +template +void NamespaceReplayer::handle_acquire_leader(Context *on_finish) { + dout(10) << dendl; + + m_instance_watcher->handle_acquire_leader(); + + init_image_map(on_finish); +} + +template +void NamespaceReplayer::handle_release_leader(Context *on_finish) { + dout(10) << dendl; + + m_instance_watcher->handle_release_leader(); + shut_down_image_deleter(on_finish); +} + +template +void NamespaceReplayer::handle_update_leader( + const std::string &leader_instance_id) { + dout(10) << "leader_instance_id=" << leader_instance_id << dendl; + + m_instance_watcher->handle_update_leader(leader_instance_id); +} + +template +void NamespaceReplayer::handle_instances_added( + const std::vector &instance_ids) { + dout(10) << "instance_ids=" << instance_ids << dendl; + + std::lock_guard locker{m_lock}; + + ceph_assert(m_image_map); + m_image_map->update_instances_added(instance_ids); +} + +template +void NamespaceReplayer::handle_instances_removed( + const std::vector &instance_ids) { + dout(10) << "instance_ids=" << instance_ids << dendl; + + std::lock_guard locker{m_lock}; + + ceph_assert(m_image_map); + m_image_map->update_instances_removed(instance_ids); +} + +template +void NamespaceReplayer::init_status_watcher() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_status_watcher); + + m_status_watcher.reset(MirrorStatusWatcher::create( + m_local_io_ctx, m_threads->work_queue)); + auto ctx = create_context_callback, + &NamespaceReplayer::handle_init_status_watcher>(this); + + m_status_watcher->init(ctx); +} + +template +void NamespaceReplayer::handle_init_status_watcher(int r) { + dout(10) << "r=" << r << dendl; + + std::lock_guard locker{m_lock}; + + if (r < 0) { + derr << "error initializing mirror status watcher: " << cpp_strerror(r) + << dendl; + + ceph_assert(m_on_finish != nullptr); + m_threads->work_queue->queue(m_on_finish, r); + m_on_finish = nullptr; + return; + } + + init_instance_replayer(); +} + +template +void NamespaceReplayer::init_instance_replayer() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_instance_replayer); + + m_instance_replayer.reset(InstanceReplayer::create( + m_local_io_ctx, m_local_mirror_uuid, m_threads, m_service_daemon, + m_cache_manager_handler)); + auto ctx = create_context_callback, + &NamespaceReplayer::handle_init_instance_replayer>(this); + + m_instance_replayer->init(ctx); +} + +template +void NamespaceReplayer::handle_init_instance_replayer(int r) { + dout(10) << "r=" << r << dendl; + + std::lock_guard locker{m_lock}; + + if (r < 0) { + derr << "error initializing instance replayer: " << cpp_strerror(r) + << dendl; + + m_instance_replayer.reset(); + m_ret_val = r; + shut_down_status_watcher(); + return; + } + + m_instance_replayer->add_peer(m_remote_mirror_uuid, m_remote_io_ctx); + + init_instance_watcher(); +} + +template +void NamespaceReplayer::init_instance_watcher() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_instance_watcher); + + m_instance_watcher.reset(InstanceWatcher::create( + m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get(), + m_image_sync_throttler)); + auto ctx = create_context_callback, + &NamespaceReplayer::handle_init_instance_watcher>(this); + + m_instance_watcher->init(ctx); +} + +template +void NamespaceReplayer::handle_init_instance_watcher(int r) { + dout(10) << "r=" << r << dendl; + + std::lock_guard locker{m_lock}; + + if (r < 0) { + derr << "error initializing instance watcher: " << cpp_strerror(r) + << dendl; + + m_instance_watcher.reset(); + m_ret_val = r; + shut_down_instance_replayer(); + return; + } + + // TODO: add namespace support to service daemon + if (m_local_io_ctx.get_namespace().empty()) { + m_service_daemon->add_or_update_attribute( + m_local_io_ctx.get_id(), SERVICE_DAEMON_INSTANCE_ID_KEY, + m_instance_watcher->get_instance_id()); + } + + ceph_assert(m_on_finish != nullptr); + m_threads->work_queue->queue(m_on_finish); + m_on_finish = nullptr; +} + +template +void NamespaceReplayer::stop_instance_replayer() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback, + &NamespaceReplayer::handle_stop_instance_replayer>(this)); + + m_instance_replayer->stop(ctx); +} + +template +void NamespaceReplayer::handle_stop_instance_replayer(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "error stopping instance replayer: " << cpp_strerror(r) << dendl; + } + + std::lock_guard locker{m_lock}; + + shut_down_instance_watcher(); +} + +template +void NamespaceReplayer::shut_down_instance_watcher() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_instance_watcher); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback, + &NamespaceReplayer::handle_shut_down_instance_watcher>(this)); + + m_instance_watcher->shut_down(ctx); +} + +template +void NamespaceReplayer::handle_shut_down_instance_watcher(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "error shutting instance watcher down: " << cpp_strerror(r) + << dendl; + } + + std::lock_guard locker{m_lock}; + + m_instance_watcher.reset(); + + shut_down_instance_replayer(); +} + +template +void NamespaceReplayer::shut_down_instance_replayer() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_instance_replayer); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback, + &NamespaceReplayer::handle_shut_down_instance_replayer>(this)); + + m_instance_replayer->shut_down(ctx); +} + +template +void NamespaceReplayer::handle_shut_down_instance_replayer(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "error shutting instance replayer down: " << cpp_strerror(r) + << dendl; + } + + std::lock_guard locker{m_lock}; + + m_instance_replayer.reset(); + + shut_down_status_watcher(); +} + +template +void NamespaceReplayer::shut_down_status_watcher() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_status_watcher); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback, + &NamespaceReplayer::handle_shut_down_status_watcher>(this)); + + m_status_watcher->shut_down(ctx); +} + +template +void NamespaceReplayer::handle_shut_down_status_watcher(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "error shutting mirror status watcher down: " << cpp_strerror(r) + << dendl; + } + + std::lock_guard locker{m_lock}; + + m_status_watcher.reset(); + + ceph_assert(!m_image_map); + ceph_assert(!m_image_deleter); + ceph_assert(!m_local_pool_watcher); + ceph_assert(!m_remote_pool_watcher); + ceph_assert(!m_instance_watcher); + ceph_assert(!m_instance_replayer); + + ceph_assert(m_on_finish != nullptr); + m_threads->work_queue->queue(m_on_finish, m_ret_val); + m_on_finish = nullptr; + m_ret_val = 0; +} + +template +void NamespaceReplayer::init_image_map(Context *on_finish) { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(!m_image_map); + m_image_map.reset(ImageMap::create(m_local_io_ctx, m_threads, + m_instance_watcher->get_instance_id(), + m_image_map_listener)); + + auto ctx = new FunctionContext( + [this, on_finish](int r) { + handle_init_image_map(r, on_finish); + }); + m_image_map->init(create_async_context_callback( + m_threads->work_queue, ctx)); +} + +template +void NamespaceReplayer::handle_init_image_map(int r, Context *on_finish) { + dout(10) << "r=" << r << dendl; + if (r < 0) { + derr << "failed to init image map: " << cpp_strerror(r) << dendl; + on_finish = new FunctionContext([on_finish, r](int) { + on_finish->complete(r); + }); + shut_down_image_map(on_finish); + return; + } + + init_local_pool_watcher(on_finish); +} + +template +void NamespaceReplayer::init_local_pool_watcher(Context *on_finish) { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(!m_local_pool_watcher); + m_local_pool_watcher.reset(PoolWatcher::create( + m_threads, m_local_io_ctx, m_local_pool_watcher_listener)); + + // ensure the initial set of local images is up-to-date + // after acquiring the leader role + auto ctx = new FunctionContext([this, on_finish](int r) { + handle_init_local_pool_watcher(r, on_finish); + }); + m_local_pool_watcher->init(create_async_context_callback( + m_threads->work_queue, ctx)); +} + +template +void NamespaceReplayer::handle_init_local_pool_watcher( + int r, Context *on_finish) { + dout(10) << "r=" << r << dendl; + if (r < 0) { + derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; + on_finish = new FunctionContext([on_finish, r](int) { + on_finish->complete(r); + }); + shut_down_pool_watchers(on_finish); + return; + } + + init_remote_pool_watcher(on_finish); +} + +template +void NamespaceReplayer::init_remote_pool_watcher(Context *on_finish) { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(!m_remote_pool_watcher); + m_remote_pool_watcher.reset(PoolWatcher::create( + m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener)); + + auto ctx = new FunctionContext([this, on_finish](int r) { + handle_init_remote_pool_watcher(r, on_finish); + }); + m_remote_pool_watcher->init(create_async_context_callback( + m_threads->work_queue, ctx)); +} + +template +void NamespaceReplayer::handle_init_remote_pool_watcher( + int r, Context *on_finish) { + dout(10) << "r=" << r << dendl; + if (r == -ENOENT) { + // Technically nothing to do since the other side doesn't + // have mirroring enabled. Eventually the remote pool watcher will + // detect images (if mirroring is enabled), so no point propagating + // an error which would just busy-spin the state machines. + dout(0) << "remote peer does not have mirroring configured" << dendl; + } else if (r < 0) { + derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl; + on_finish = new FunctionContext([on_finish, r](int) { + on_finish->complete(r); + }); + shut_down_pool_watchers(on_finish); + return; + } + + init_image_deleter(on_finish); +} + +template +void NamespaceReplayer::init_image_deleter(Context *on_finish) { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(!m_image_deleter); + + on_finish = new FunctionContext([this, on_finish](int r) { + handle_init_image_deleter(r, on_finish); + }); + m_image_deleter.reset(ImageDeleter::create(m_local_io_ctx, m_threads, + m_service_daemon)); + m_image_deleter->init(create_async_context_callback( + m_threads->work_queue, on_finish)); +} + +template +void NamespaceReplayer::handle_init_image_deleter( + int r, Context *on_finish) { + dout(10) << "r=" << r << dendl; + if (r < 0) { + derr << "failed to init image deleter: " << cpp_strerror(r) << dendl; + on_finish = new FunctionContext([on_finish, r](int) { + on_finish->complete(r); + }); + shut_down_image_deleter(on_finish); + return; + } + + on_finish->complete(0); +} + +template +void NamespaceReplayer::shut_down_image_deleter(Context* on_finish) { + dout(10) << dendl; + { + std::lock_guard locker{m_lock}; + if (m_image_deleter) { + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_shut_down_image_deleter(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + m_image_deleter->shut_down(ctx); + return; + } + } + shut_down_pool_watchers(on_finish); +} + +template +void NamespaceReplayer::handle_shut_down_image_deleter( + int r, Context* on_finish) { + dout(10) << "r=" << r << dendl; + + { + std::lock_guard locker{m_lock}; + ceph_assert(m_image_deleter); + m_image_deleter.reset(); + } + + shut_down_pool_watchers(on_finish); +} + +template +void NamespaceReplayer::shut_down_pool_watchers(Context *on_finish) { + dout(10) << dendl; + + { + std::lock_guard locker{m_lock}; + if (m_local_pool_watcher) { + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_shut_down_pool_watchers(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + auto gather_ctx = new C_Gather(g_ceph_context, ctx); + m_local_pool_watcher->shut_down(gather_ctx->new_sub()); + if (m_remote_pool_watcher) { + m_remote_pool_watcher->shut_down(gather_ctx->new_sub()); + } + gather_ctx->activate(); + return; + } + } + + on_finish->complete(0); +} + +template +void NamespaceReplayer::handle_shut_down_pool_watchers( + int r, Context *on_finish) { + dout(10) << "r=" << r << dendl; + + { + std::lock_guard locker{m_lock}; + ceph_assert(m_local_pool_watcher); + m_local_pool_watcher.reset(); + + if (m_remote_pool_watcher) { + m_remote_pool_watcher.reset(); + } + } + shut_down_image_map(on_finish); +} + +template +void NamespaceReplayer::shut_down_image_map(Context *on_finish) { + dout(5) << dendl; + + std::lock_guard locker{m_lock}; + if (m_image_map) { + on_finish = new FunctionContext( + [this, on_finish](int r) { + handle_shut_down_image_map(r, on_finish); + }); + m_image_map->shut_down(create_async_context_callback( + m_threads->work_queue, on_finish)); + return; + } + + m_threads->work_queue->queue(on_finish); +} + +template +void NamespaceReplayer::handle_shut_down_image_map(int r, Context *on_finish) { + dout(5) << "r=" << r << dendl; + if (r < 0 && r != -EBLACKLISTED) { + derr << "failed to shut down image map: " << cpp_strerror(r) << dendl; + } + + std::lock_guard locker{m_lock}; + ceph_assert(m_image_map); + m_image_map.reset(); + + m_instance_replayer->release_all(create_async_context_callback( + m_threads->work_queue, on_finish)); +} + +template +void NamespaceReplayer::handle_acquire_image(const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish) { + dout(5) << "global_image_id=" << global_image_id << ", " + << "instance_id=" << instance_id << dendl; + + m_instance_watcher->notify_image_acquire(instance_id, global_image_id, + on_finish); +} + +template +void NamespaceReplayer::handle_release_image(const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish) { + dout(5) << "global_image_id=" << global_image_id << ", " + << "instance_id=" << instance_id << dendl; + + m_instance_watcher->notify_image_release(instance_id, global_image_id, + on_finish); +} + +template +void NamespaceReplayer::handle_remove_image(const std::string &mirror_uuid, + const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish) { + ceph_assert(!mirror_uuid.empty()); + dout(5) << "mirror_uuid=" << mirror_uuid << ", " + << "global_image_id=" << global_image_id << ", " + << "instance_id=" << instance_id << dendl; + + m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id, + mirror_uuid, on_finish); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::NamespaceReplayer; diff --git a/src/tools/rbd_mirror/NamespaceReplayer.h b/src/tools/rbd_mirror/NamespaceReplayer.h new file mode 100644 index 0000000000000..810f8b445070c --- /dev/null +++ b/src/tools/rbd_mirror/NamespaceReplayer.h @@ -0,0 +1,285 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_NAMESPACE_REPLAYER_H +#define CEPH_RBD_MIRROR_NAMESPACE_REPLAYER_H + +#include "common/AsyncOpTracker.h" +#include "common/WorkQueue.h" +#include "common/ceph_mutex.h" +#include "include/rados/librados.hpp" + +#include "tools/rbd_mirror/ImageDeleter.h" +#include "tools/rbd_mirror/ImageMap.h" +#include "tools/rbd_mirror/InstanceReplayer.h" +#include "tools/rbd_mirror/InstanceWatcher.h" +#include "tools/rbd_mirror/MirrorStatusWatcher.h" +#include "tools/rbd_mirror/PoolWatcher.h" +#include "tools/rbd_mirror/Types.h" +#include "tools/rbd_mirror/image_map/Types.h" +#include "tools/rbd_mirror/pool_watcher/Types.h" + +#include +#include +#include + +class AdminSocketHook; + +namespace journal { struct CacheManagerHandler; } + +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +template class ImageSyncThrottler; +template class ServiceDaemon; +template struct Threads; + +/** + * Controls mirroring for a single remote cluster. + */ +template +class NamespaceReplayer { +public: + static NamespaceReplayer *create( + const std::string &name, + librados::IoCtx &local_ioctx, + librados::IoCtx &remote_ioctx, + const std::string &local_mirror_uuid, + const std::string &remote_mirror_uuid, + Threads *threads, + ImageSyncThrottler *image_sync_throttler, + ServiceDaemon *service_daemon, + journal::CacheManagerHandler *cache_manager_handler) { + return new NamespaceReplayer(name, local_ioctx, remote_ioctx, + local_mirror_uuid, remote_mirror_uuid, threads, + image_sync_throttler, service_daemon, + cache_manager_handler); + } + + NamespaceReplayer(const std::string &name, + librados::IoCtx &local_ioctx, + librados::IoCtx &remote_ioctx, + const std::string &local_mirror_uuid, + const std::string &remote_mirror_uuid, + Threads *threads, + ImageSyncThrottler *image_sync_throttler, + ServiceDaemon *service_daemon, + journal::CacheManagerHandler *cache_manager_handler); + NamespaceReplayer(const NamespaceReplayer&) = delete; + NamespaceReplayer& operator=(const NamespaceReplayer&) = delete; + + bool is_blacklisted() const; + + void init(Context *on_finish); + void shut_down(Context *on_finish); + + void handle_acquire_leader(Context *on_finish); + void handle_release_leader(Context *on_finish); + void handle_update_leader(const std::string &leader_instance_id); + void handle_instances_added(const std::vector &instance_ids); + void handle_instances_removed(const std::vector &instance_ids); + + void print_status(Formatter *f, stringstream *ss); + void start(); + void stop(); + void restart(); + void flush(); + +private: + /** + * @verbatim + * + * <----------------------\ + * | (init) ^ (error) | + * v * | + * INIT_STATUS_WATCHER * * * * * * > SHUT_DOWN_STATUS_WATCHER + * | * (error) ^ + * v * | + * INIT_INSTANCE_REPLAYER * * * * > SHUT_DOWN_INSTANCE_REPLAYER + * | * ^ + * v * | + * INIT_INSTANCE_WATCHER * * * * * SHUT_DOWN_INSTANCE_WATCHER + * | (error) ^ + * | | + * v STOP_INSTANCE_REPLAYER + * | ^ + * | (shut down) | + * | /--------------------------------/ + * v | + * <---------------------------\ + * . | + * . | + * v (leader acquired) | + * INIT_IMAGE_MAP | + * | | + * v | + * INIT_LOCAL_POOL_WATCHER SHUT_DOWN_IMAGE_MAP + * | ^ + * v | + * INIT_REMOTE_POOL_WATCHER SHUT_DOWN_POOL_WATCHERS + * | ^ + * v | + * INIT_IMAGE_DELETER SHUT_DOWN_IMAGE_DELETER + * | ^ + * v . + * <-----------\ . + * . | . + * . (image update) | . + * . . > NOTIFY_INSTANCE_WATCHER . + * . . + * . (leader lost / shut down) . + * . . . . . . . . . . . . . . . . . . . + * + * @endverbatim + */ + + struct PoolWatcherListener : public pool_watcher::Listener { + NamespaceReplayer *namespace_replayer; + bool local; + + PoolWatcherListener(NamespaceReplayer *namespace_replayer, bool local) + : namespace_replayer(namespace_replayer), local(local) { + } + + void handle_update(const std::string &mirror_uuid, + ImageIds &&added_image_ids, + ImageIds &&removed_image_ids) override { + namespace_replayer->handle_update((local ? "" : mirror_uuid), + std::move(added_image_ids), + std::move(removed_image_ids)); + } + }; + + struct ImageMapListener : public image_map::Listener { + NamespaceReplayer *namespace_replayer; + + ImageMapListener(NamespaceReplayer *namespace_replayer) + : namespace_replayer(namespace_replayer) { + } + + void acquire_image(const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish) override { + namespace_replayer->handle_acquire_image(global_image_id, instance_id, + on_finish); + } + + void release_image(const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish) override { + namespace_replayer->handle_release_image(global_image_id, instance_id, + on_finish); + } + + void remove_image(const std::string &mirror_uuid, + const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish) override { + namespace_replayer->handle_remove_image(mirror_uuid, global_image_id, + instance_id, on_finish); + } + }; + + void handle_update(const std::string &mirror_uuid, + ImageIds &&added_image_ids, + ImageIds &&removed_image_ids); + + int init_rados(const std::string &cluster_name, + const std::string &client_name, + const std::string &mon_host, + const std::string &key, + const std::string &description, RadosRef *rados_ref, + bool strip_cluster_overrides); + + void init_status_watcher(); + void handle_init_status_watcher(int r); + + void init_instance_replayer(); + void handle_init_instance_replayer(int r); + + void init_instance_watcher(); + void handle_init_instance_watcher(int r); + + void stop_instance_replayer(); + void handle_stop_instance_replayer(int r); + + void shut_down_instance_watcher(); + void handle_shut_down_instance_watcher(int r); + + void shut_down_instance_replayer(); + void handle_shut_down_instance_replayer(int r); + + void shut_down_status_watcher(); + void handle_shut_down_status_watcher(int r); + + void init_image_map(Context *on_finish); + void handle_init_image_map(int r, Context *on_finish); + + void init_local_pool_watcher(Context *on_finish); + void handle_init_local_pool_watcher(int r, Context *on_finish); + + void init_remote_pool_watcher(Context *on_finish); + void handle_init_remote_pool_watcher(int r, Context *on_finish); + + void init_image_deleter(Context* on_finish); + void handle_init_image_deleter(int r, Context* on_finish); + + void shut_down_image_deleter(Context* on_finish); + void handle_shut_down_image_deleter(int r, Context* on_finish); + + void shut_down_pool_watchers(Context *on_finish); + void handle_shut_down_pool_watchers(int r, Context *on_finish); + + void shut_down_image_map(Context *on_finish); + void handle_shut_down_image_map(int r, Context *on_finish); + + void handle_acquire_image(const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish); + void handle_release_image(const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish); + void handle_remove_image(const std::string &mirror_uuid, + const std::string &global_image_id, + const std::string &instance_id, + Context* on_finish); + + librados::IoCtx m_local_io_ctx; + librados::IoCtx m_remote_io_ctx; + std::string m_local_mirror_uuid; + std::string m_remote_mirror_uuid; + Threads *m_threads; + ImageSyncThrottler *m_image_sync_throttler; + ServiceDaemon *m_service_daemon; + journal::CacheManagerHandler *m_cache_manager_handler; + + mutable ceph::mutex m_lock; + + int m_ret_val = 0; + Context *m_on_finish = nullptr; + + std::unique_ptr> m_status_watcher; + + PoolWatcherListener m_local_pool_watcher_listener; + std::unique_ptr> m_local_pool_watcher; + + PoolWatcherListener m_remote_pool_watcher_listener; + std::unique_ptr> m_remote_pool_watcher; + + std::unique_ptr> m_instance_replayer; + std::unique_ptr> m_image_deleter; + + ImageMapListener m_image_map_listener; + std::unique_ptr> m_image_map; + + std::unique_ptr> m_instance_watcher; +}; + +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::NamespaceReplayer; + +#endif // CEPH_RBD_MIRROR_NAMESPACE_REPLAYER_H diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc index 7bb98604676e7..c61951b6ffc8a 100644 --- a/src/tools/rbd_mirror/PoolReplayer.cc +++ b/src/tools/rbd_mirror/PoolReplayer.cc @@ -3,6 +3,7 @@ #include "PoolReplayer.h" #include +#include "common/Cond.h" #include "common/Formatter.h" #include "common/admin_socket.h" #include "common/ceph_argparse.h" @@ -10,18 +11,10 @@ #include "common/common_init.h" #include "common/debug.h" #include "common/errno.h" -#include "include/stringify.h" #include "cls/rbd/cls_rbd_client.h" #include "global/global_context.h" -#include "librbd/internal.h" -#include "librbd/Utils.h" -#include "librbd/Watcher.h" #include "librbd/api/Config.h" -#include "librbd/api/Mirror.h" -#include "ImageMap.h" -#include "InstanceReplayer.h" -#include "InstanceWatcher.h" -#include "LeaderWatcher.h" +#include "librbd/api/Namespace.h" #include "ServiceDaemon.h" #include "Threads.h" @@ -31,15 +24,6 @@ #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \ << this << " " << __func__ << ": " -using std::chrono::seconds; -using std::map; -using std::string; -using std::unique_ptr; -using std::vector; - -using librbd::cls_client::dir_get_name; -using librbd::util::create_async_context_callback; - namespace rbd { namespace mirror { @@ -47,10 +31,7 @@ using ::operator<<; namespace { -const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id"); const std::string SERVICE_DAEMON_LEADER_KEY("leader"); -const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count"); -const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count"); const std::vector UNIQUE_PEER_CONFIG_KEYS { {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}}; @@ -237,20 +218,17 @@ PoolReplayer::PoolReplayer( m_local_pool_id(local_pool_id), m_peer(peer), m_args(args), - m_lock(ceph::make_mutex(stringify("rbd::mirror::PoolReplayer ") + stringify(peer))), - m_local_pool_watcher_listener(this, true), - m_remote_pool_watcher_listener(this, false), - m_image_map_listener(this), + m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer))), m_pool_replayer_thread(this), - m_leader_listener(this) -{ + m_leader_listener(this) { } template PoolReplayer::~PoolReplayer() { - delete m_asok_hook; shut_down(); + + ceph_assert(m_asok_hook == nullptr); } template @@ -271,8 +249,7 @@ bool PoolReplayer::is_running() const { } template -void PoolReplayer::init() -{ +void PoolReplayer::init() { ceph_assert(!m_pool_replayer_thread.is_started()); // reset state @@ -311,9 +288,8 @@ void PoolReplayer::init() auto cct = reinterpret_cast(m_local_io_ctx.cct()); librbd::api::Config::apply_pool_overrides(m_local_io_ctx, &cct->_conf); - std::string local_mirror_uuid; r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, - &local_mirror_uuid); + &m_local_mirror_uuid); if (r < 0) { derr << "failed to retrieve local mirror uuid from pool " << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; @@ -336,25 +312,25 @@ void PoolReplayer::init() dout(10) << "connected to " << m_peer << dendl; - m_instance_replayer.reset(InstanceReplayer::create( - m_threads, m_service_daemon, m_cache_manager_handler, m_local_rados, - local_mirror_uuid, m_local_pool_id)); - m_instance_replayer->init(); - m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); + m_image_sync_throttler.reset(ImageSyncThrottler::create(cct)); - m_instance_watcher.reset(InstanceWatcher::create( - m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get())); - r = m_instance_watcher->init(); + m_default_namespace_replayer.reset(NamespaceReplayer::create( + "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid, + m_threads, m_image_sync_throttler.get(), m_service_daemon, + m_cache_manager_handler)); + + C_SaferCond on_init; + m_default_namespace_replayer->init(&on_init); + r = on_init.wait(); if (r < 0) { - derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; + derr << "error initializing default namespace replayer: " << cpp_strerror(r) + << dendl; m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, - "unable to initialize instance messenger object"); + "unable to initialize default namespace replayer"); + m_default_namespace_replayer.reset(); return; } - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_INSTANCE_ID_KEY, - m_instance_watcher->get_instance_id()); m_leader_watcher.reset(LeaderWatcher::create(m_threads, m_local_io_ctx, &m_leader_listener)); @@ -364,6 +340,7 @@ void PoolReplayer::init() m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, "unable to initialize leader messenger object"); + m_leader_watcher.reset(); return; } @@ -377,32 +354,29 @@ void PoolReplayer::init() template void PoolReplayer::shut_down() { - m_stopping = true; { std::lock_guard l{m_lock}; + m_stopping = true; m_cond.notify_all(); } if (m_pool_replayer_thread.is_started()) { m_pool_replayer_thread.join(); } + if (m_leader_watcher) { m_leader_watcher->shut_down(); } - if (m_instance_watcher) { - m_instance_watcher->shut_down(); - } - if (m_instance_replayer) { - m_instance_replayer->shut_down(); + m_leader_watcher.reset(); + + if (m_default_namespace_replayer) { + C_SaferCond on_shut_down; + m_default_namespace_replayer->shut_down(&on_shut_down); + on_shut_down.wait(); } + m_default_namespace_replayer.reset(); - m_leader_watcher.reset(); - m_instance_watcher.reset(); - m_instance_replayer.reset(); + m_image_sync_throttler.reset(); - ceph_assert(!m_image_map); - ceph_assert(!m_image_deleter); - ceph_assert(!m_local_pool_watcher); - ceph_assert(!m_remote_pool_watcher); m_local_rados.reset(); m_remote_rados.reset(); } @@ -534,11 +508,10 @@ int PoolReplayer::init_rados(const std::string &cluster_name, } template -void PoolReplayer::run() -{ - dout(20) << "enter" << dendl; +void PoolReplayer::run() { + dout(20) << dendl; - while (!m_stopping) { + while (true) { std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + m_peer.cluster_name; if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { @@ -549,26 +522,207 @@ void PoolReplayer::run() m_asok_hook_name, this); } + with_namespace_replayers([this]() { update_namespace_replayers(); }); + std::unique_lock locker{m_lock}; - if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) || - (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) { + + if (m_default_namespace_replayer->is_blacklisted()) { m_blacklisted = true; m_stopping = true; + } + + for (auto &it : m_namespace_replayers) { + if (it.second->is_blacklisted()) { + m_blacklisted = true; + m_stopping = true; + break; + } + } + + if (m_stopping) { break; } - if (!m_stopping) { - m_cond.wait_for(locker, 1s); + auto seconds = g_ceph_context->_conf.get_val( + "rbd_mirror_pool_replayers_refresh_interval"); + m_cond.wait_for(locker, ceph::make_timespan(seconds)); + } + + // shut down namespace replayers + with_namespace_replayers([this]() { update_namespace_replayers(); }); + + delete m_asok_hook; + m_asok_hook = nullptr; +} + +template +void PoolReplayer::update_namespace_replayers() { + dout(20) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + std::set mirroring_namespaces; + if (!m_stopping) { + int r = list_mirroring_namespaces(&mirroring_namespaces); + if (r < 0) { + return; + } + } + + auto cct = reinterpret_cast(m_local_io_ctx.cct()); + C_SaferCond cond; + auto gather_ctx = new C_Gather(cct, &cond); + for (auto it = m_namespace_replayers.begin(); + it != m_namespace_replayers.end(); ) { + auto iter = mirroring_namespaces.find(it->first); + if (iter == mirroring_namespaces.end()) { + auto namespace_replayer = it->second; + auto on_shut_down = new FunctionContext( + [this, namespace_replayer, ctx=gather_ctx->new_sub()](int r) { + delete namespace_replayer; + ctx->complete(r); + }); + namespace_replayer->shut_down(on_shut_down); + it = m_namespace_replayers.erase(it); + } else { + mirroring_namespaces.erase(iter); + it++; } } - m_instance_replayer->stop(); + for (auto &name : mirroring_namespaces) { + auto namespace_replayer = NamespaceReplayer::create( + name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid, + m_threads, m_image_sync_throttler.get(), m_service_daemon, + m_cache_manager_handler); + auto on_init = new FunctionContext( + [this, namespace_replayer, name, &mirroring_namespaces, + ctx=gather_ctx->new_sub()](int r) { + if (r < 0) { + derr << "failed to initialize namespace replayer for namespace " + << name << ": " << cpp_strerror(r) << dendl; + delete namespace_replayer; + mirroring_namespaces.erase(name); + } else { + std::lock_guard locker{m_lock}; + m_namespace_replayers[name] = namespace_replayer; + } + ctx->complete(r); + }); + namespace_replayer->init(on_init); + } + + gather_ctx->activate(); + + m_lock.unlock(); + cond.wait(); + m_lock.lock(); + + if (m_leader) { + C_SaferCond acquire_cond; + auto acquire_gather_ctx = new C_Gather(cct, &acquire_cond); + + for (auto &name : mirroring_namespaces) { + namespace_replayer_acquire_leader(name, acquire_gather_ctx->new_sub()); + } + acquire_gather_ctx->activate(); + + m_lock.unlock(); + acquire_cond.wait(); + m_lock.lock(); + + std::vector instance_ids; + m_leader_watcher->list_instances(&instance_ids); + + for (auto &name : mirroring_namespaces) { + auto it = m_namespace_replayers.find(name); + if (it == m_namespace_replayers.end()) { + // acuire leader for this namespace replayer failed + continue; + } + it->second->handle_instances_added(instance_ids); + } + } else { + std::string leader_instance_id; + if (m_leader_watcher->get_leader_instance_id(&leader_instance_id)) { + for (auto &name : mirroring_namespaces) { + m_namespace_replayers[name]->handle_update_leader(leader_instance_id); + } + } + } } template -void PoolReplayer::print_status(Formatter *f, stringstream *ss) -{ - dout(20) << "enter" << dendl; +int PoolReplayer::list_mirroring_namespaces( + std::set *namespaces) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + std::vector names; + + int r = librbd::api::Namespace::list(m_local_io_ctx, &names); + if (r < 0) { + derr << "failed to list namespaces: " << cpp_strerror(r) << dendl; + return r; + } + + for (auto &name : names) { + cls::rbd::MirrorMode mirror_mode = cls::rbd::MIRROR_MODE_DISABLED; + int r = librbd::cls_client::mirror_mode_get(&m_local_io_ctx, &mirror_mode); + if (r < 0 && r != -ENOENT) { + derr << "failed to get namespace mirror mode: " << cpp_strerror(r) + << dendl; + if (m_namespace_replayers.count(name) == 0) { + continue; + } + } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) { + dout(10) << "mirroring is disabled for namespace " << name << dendl; + continue; + } + + namespaces->insert(name); + } + + return 0; +} + +template +void PoolReplayer::namespace_replayer_acquire_leader(const std::string &name, + Context *on_finish) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + auto it = m_namespace_replayers.find(name); + ceph_assert(it != m_namespace_replayers.end()); + + on_finish = new FunctionContext( + [this, name, on_finish](int r) { + if (r < 0) { + derr << "failed to handle acquire leader for namespace: " + << name << ": " << cpp_strerror(r) << dendl; + + // remove the namespace replayer -- update_namespace_replayers will + // retry to create it and acquire leader. + + std::lock_guard locker{m_lock}; + + auto namespace_replayer = m_namespace_replayers[name]; + m_namespace_replayers.erase(name); + auto on_shut_down = new FunctionContext( + [this, namespace_replayer, on_finish](int r) { + delete namespace_replayer; + on_finish->complete(r); + }); + namespace_replayer->shut_down(on_shut_down); + return; + } + on_finish->complete(0); + }); + + it->second->handle_acquire_leader(on_finish); +} + +template +void PoolReplayer::print_status(Formatter *f, stringstream *ss) { + dout(20) << dendl; if (!f) { return; @@ -579,7 +733,7 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss) f->open_object_section("pool_replayer_status"); f->dump_string("pool", m_local_io_ctx.get_pool_name()); f->dump_stream("peer") << m_peer; - f->dump_string("instance_id", m_instance_watcher->get_instance_id()); + f->dump_stream("instance_id") << m_local_io_ctx.get_instance_id(); std::string state("running"); if (m_manual_stop) { @@ -602,7 +756,7 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss) for (auto instance_id : instance_ids) { f->dump_string("instance_id", instance_id); } - f->close_section(); + f->close_section(); // instances } f->dump_string("local_cluster_admin_socket", @@ -612,26 +766,30 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss) reinterpret_cast(m_remote_io_ctx.cct())->_conf. get_val("admin_socket")); - f->open_object_section("sync_throttler"); - m_instance_watcher->print_sync_status(f, ss); - f->close_section(); + if (m_image_sync_throttler) { + f->open_object_section("sync_throttler"); + m_image_sync_throttler->print_status(f, ss); + f->close_section(); // sync_throttler + } - m_instance_replayer->print_status(f, ss); + m_default_namespace_replayer->print_status(f, ss); - if (m_image_deleter) { - f->open_object_section("image_deleter"); - m_image_deleter->print_status(f, ss); - f->close_section(); + f->open_array_section("namespaces"); + for (auto &it : m_namespace_replayers) { + f->open_object_section("namespace"); + f->dump_string("name", it.first); + it.second->print_status(f, ss); + f->close_section(); // namespace } + f->close_section(); // namespaces - f->close_section(); + f->close_section(); // pool_replayer_status f->flush(*ss); } template -void PoolReplayer::start() -{ - dout(20) << "enter" << dendl; +void PoolReplayer::start() { + dout(20) << dendl; std::lock_guard l{m_lock}; @@ -640,12 +798,15 @@ void PoolReplayer::start() } m_manual_stop = false; - m_instance_replayer->start(); + + m_default_namespace_replayer->start(); + for (auto &it : m_namespace_replayers) { + it.second->start(); + } } template -void PoolReplayer::stop(bool manual) -{ +void PoolReplayer::stop(bool manual) { dout(20) << "enter: manual=" << manual << dendl; std::lock_guard l{m_lock}; @@ -658,13 +819,16 @@ void PoolReplayer::stop(bool manual) } m_manual_stop = true; - m_instance_replayer->stop(); + + m_default_namespace_replayer->stop(); + for (auto &it : m_namespace_replayers) { + it.second->stop(); + } } template -void PoolReplayer::restart() -{ - dout(20) << "enter" << dendl; +void PoolReplayer::restart() { + dout(20) << dendl; std::lock_guard l{m_lock}; @@ -672,13 +836,15 @@ void PoolReplayer::restart() return; } - m_instance_replayer->restart(); + m_default_namespace_replayer->restart(); + for (auto &it : m_namespace_replayers) { + it.second->restart(); + } } template -void PoolReplayer::flush() -{ - dout(20) << "enter" << dendl; +void PoolReplayer::flush() { + dout(20) << dendl; std::lock_guard l{m_lock}; @@ -686,13 +852,15 @@ void PoolReplayer::flush() return; } - m_instance_replayer->flush(); + m_default_namespace_replayer->flush(); + for (auto &it : m_namespace_replayers) { + it.second->flush(); + } } template -void PoolReplayer::release_leader() -{ - dout(20) << "enter" << dendl; +void PoolReplayer::release_leader() { + dout(20) << dendl; std::lock_guard l{m_lock}; @@ -703,331 +871,68 @@ void PoolReplayer::release_leader() m_leader_watcher->release_leader(); } -template -void PoolReplayer::handle_update(const std::string &mirror_uuid, - ImageIds &&added_image_ids, - ImageIds &&removed_image_ids) { - if (m_stopping) { - return; - } - - dout(10) << "mirror_uuid=" << mirror_uuid << ", " - << "added_count=" << added_image_ids.size() << ", " - << "removed_count=" << removed_image_ids.size() << dendl; - std::lock_guard locker{m_lock}; - if (!m_leader_watcher->is_leader()) { - return; - } - - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY, - m_local_pool_watcher->get_image_count()); - if (m_remote_pool_watcher) { - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY, - m_remote_pool_watcher->get_image_count()); - } - - std::set added_global_image_ids; - for (auto& image_id : added_image_ids) { - added_global_image_ids.insert(image_id.global_id); - } - - std::set removed_global_image_ids; - for (auto& image_id : removed_image_ids) { - removed_global_image_ids.insert(image_id.global_id); - } - - m_image_map->update_images(mirror_uuid, - std::move(added_global_image_ids), - std::move(removed_global_image_ids)); -} - template void PoolReplayer::handle_post_acquire_leader(Context *on_finish) { - dout(10) << dendl; + dout(20) << dendl; - m_service_daemon->add_or_update_attribute(m_local_pool_id, - SERVICE_DAEMON_LEADER_KEY, true); - m_instance_watcher->handle_acquire_leader(); - init_image_map(on_finish); -} + with_namespace_replayers( + [this](Context *on_finish) { + dout(10) << "handle_post_acquire_leader" << dendl; -template -void PoolReplayer::handle_pre_release_leader(Context *on_finish) { - dout(10) << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); - m_service_daemon->remove_attribute(m_local_pool_id, - SERVICE_DAEMON_LEADER_KEY); - m_instance_watcher->handle_release_leader(); - shut_down_image_deleter(on_finish); -} + m_service_daemon->add_or_update_attribute(m_local_pool_id, + SERVICE_DAEMON_LEADER_KEY, + true); + auto ctx = new FunctionContext( + [this, on_finish](int r) { + if (r == 0) { + std::lock_guard locker{m_lock}; + m_leader = true; + } + on_finish->complete(r); + }); -template -void PoolReplayer::init_image_map(Context *on_finish) { - dout(5) << dendl; + auto cct = reinterpret_cast(m_local_io_ctx.cct()); + auto gather_ctx = new C_Gather(cct, ctx); - std::lock_guard locker{m_lock}; - ceph_assert(!m_image_map); - m_image_map.reset(ImageMap::create(m_local_io_ctx, m_threads, - m_instance_watcher->get_instance_id(), - m_image_map_listener)); - - auto ctx = new FunctionContext([this, on_finish](int r) { - handle_init_image_map(r, on_finish); - }); - m_image_map->init(create_async_context_callback( - m_threads->work_queue, ctx)); -} - -template -void PoolReplayer::handle_init_image_map(int r, Context *on_finish) { - dout(5) << "r=" << r << dendl; - if (r < 0) { - derr << "failed to init image map: " << cpp_strerror(r) << dendl; - on_finish = new FunctionContext([on_finish, r](int) { - on_finish->complete(r); - }); - shut_down_image_map(on_finish); - return; - } - - init_local_pool_watcher(on_finish); -} - -template -void PoolReplayer::init_local_pool_watcher(Context *on_finish) { - dout(10) << dendl; - - std::lock_guard locker{m_lock}; - ceph_assert(!m_local_pool_watcher); - m_local_pool_watcher.reset(PoolWatcher::create( - m_threads, m_local_io_ctx, m_local_pool_watcher_listener)); - - // ensure the initial set of local images is up-to-date - // after acquiring the leader role - auto ctx = new FunctionContext([this, on_finish](int r) { - handle_init_local_pool_watcher(r, on_finish); - }); - m_local_pool_watcher->init(create_async_context_callback( - m_threads->work_queue, ctx)); -} - -template -void PoolReplayer::handle_init_local_pool_watcher( - int r, Context *on_finish) { - dout(10) << "r=" << r << dendl; - if (r < 0) { - derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; - on_finish = new FunctionContext([on_finish, r](int) { - on_finish->complete(r); - }); - shut_down_pool_watchers(on_finish); - return; - } - - init_remote_pool_watcher(on_finish); -} - -template -void PoolReplayer::init_remote_pool_watcher(Context *on_finish) { - dout(10) << dendl; - - std::lock_guard locker{m_lock}; - ceph_assert(!m_remote_pool_watcher); - m_remote_pool_watcher.reset(PoolWatcher::create( - m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener)); - - auto ctx = new FunctionContext([this, on_finish](int r) { - handle_init_remote_pool_watcher(r, on_finish); - }); - m_remote_pool_watcher->init(create_async_context_callback( - m_threads->work_queue, ctx)); -} - -template -void PoolReplayer::handle_init_remote_pool_watcher( - int r, Context *on_finish) { - dout(10) << "r=" << r << dendl; - if (r == -ENOENT) { - // Technically nothing to do since the other side doesn't - // have mirroring enabled. Eventually the remote pool watcher will - // detect images (if mirroring is enabled), so no point propagating - // an error which would just busy-spin the state machines. - dout(0) << "remote peer does not have mirroring configured" << dendl; - } else if (r < 0) { - derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl; - on_finish = new FunctionContext([on_finish, r](int) { - on_finish->complete(r); - }); - shut_down_pool_watchers(on_finish); - return; - } - - init_image_deleter(on_finish); -} - -template -void PoolReplayer::init_image_deleter(Context *on_finish) { - dout(10) << dendl; - - std::lock_guard locker{m_lock}; - ceph_assert(!m_image_deleter); - - on_finish = new FunctionContext([this, on_finish](int r) { - handle_init_image_deleter(r, on_finish); - }); - m_image_deleter.reset(ImageDeleter::create(m_local_io_ctx, m_threads, - m_service_daemon)); - m_image_deleter->init(create_async_context_callback( - m_threads->work_queue, on_finish)); -} - -template -void PoolReplayer::handle_init_image_deleter(int r, Context *on_finish) { - dout(10) << "r=" << r << dendl; - if (r < 0) { - derr << "failed to init image deleter: " << cpp_strerror(r) << dendl; - on_finish = new FunctionContext([on_finish, r](int) { - on_finish->complete(r); - }); - shut_down_image_deleter(on_finish); - return; - } - - on_finish->complete(0); - - std::lock_guard locker{m_lock}; - m_cond.notify_all(); -} - -template -void PoolReplayer::shut_down_image_deleter(Context* on_finish) { - dout(10) << dendl; - { - std::lock_guard locker{m_lock}; - if (m_image_deleter) { - Context *ctx = new FunctionContext([this, on_finish](int r) { - handle_shut_down_image_deleter(r, on_finish); - }); - ctx = create_async_context_callback(m_threads->work_queue, ctx); - - m_image_deleter->shut_down(ctx); - return; - } - } - shut_down_pool_watchers(on_finish); -} + m_default_namespace_replayer->handle_acquire_leader( + gather_ctx->new_sub()); -template -void PoolReplayer::handle_shut_down_image_deleter( - int r, Context* on_finish) { - dout(10) << "r=" << r << dendl; - - { - std::lock_guard locker{m_lock}; - ceph_assert(m_image_deleter); - m_image_deleter.reset(); - } - - shut_down_pool_watchers(on_finish); -} - -template -void PoolReplayer::shut_down_pool_watchers(Context *on_finish) { - dout(10) << dendl; - - { - std::lock_guard locker{m_lock}; - if (m_local_pool_watcher) { - Context *ctx = new FunctionContext([this, on_finish](int r) { - handle_shut_down_pool_watchers(r, on_finish); - }); - ctx = create_async_context_callback(m_threads->work_queue, ctx); - - auto gather_ctx = new C_Gather(g_ceph_context, ctx); - m_local_pool_watcher->shut_down(gather_ctx->new_sub()); - if (m_remote_pool_watcher) { - m_remote_pool_watcher->shut_down(gather_ctx->new_sub()); - } - gather_ctx->activate(); - return; - } - } - - on_finish->complete(0); -} - -template -void PoolReplayer::handle_shut_down_pool_watchers( - int r, Context *on_finish) { - dout(10) << "r=" << r << dendl; - - { - std::lock_guard locker{m_lock}; - ceph_assert(m_local_pool_watcher); - m_local_pool_watcher.reset(); - - if (m_remote_pool_watcher) { - m_remote_pool_watcher.reset(); - } - } - wait_for_update_ops(on_finish); -} - -template -void PoolReplayer::wait_for_update_ops(Context *on_finish) { - dout(10) << dendl; - - std::lock_guard locker{m_lock}; - - Context *ctx = new FunctionContext([this, on_finish](int r) { - handle_wait_for_update_ops(r, on_finish); - }); - ctx = create_async_context_callback(m_threads->work_queue, ctx); + for (auto &it : m_namespace_replayers) { + namespace_replayer_acquire_leader(it.first, gather_ctx->new_sub()); + } - m_update_op_tracker.wait_for_ops(ctx); + gather_ctx->activate(); + }, on_finish); } template -void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) { - dout(10) << "r=" << r << dendl; - ceph_assert(r == 0); +void PoolReplayer::handle_pre_release_leader(Context *on_finish) { + dout(20) << dendl; - shut_down_image_map(on_finish); -} + with_namespace_replayers( + [this](Context *on_finish) { + dout(10) << "handle_pre_release_leader" << dendl; -template -void PoolReplayer::shut_down_image_map(Context *on_finish) { - dout(5) << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); - { - std::lock_guard locker{m_lock}; - if (m_image_map) { - on_finish = new FunctionContext([this, on_finish](int r) { - handle_shut_down_image_map(r, on_finish); - }); - m_image_map->shut_down(create_async_context_callback( - m_threads->work_queue, on_finish)); - return; - } - } + m_leader = false; + m_service_daemon->remove_attribute(m_local_pool_id, + SERVICE_DAEMON_LEADER_KEY); - on_finish->complete(0); -} + auto cct = reinterpret_cast(m_local_io_ctx.cct()); + auto gather_ctx = new C_Gather(cct, on_finish); -template -void PoolReplayer::handle_shut_down_image_map(int r, Context *on_finish) { - dout(5) << "r=" << r << dendl; - if (r < 0 && r != -EBLACKLISTED) { - derr << "failed to shut down image map: " << cpp_strerror(r) << dendl; - } + m_default_namespace_replayer->handle_release_leader( + gather_ctx->new_sub()); - std::lock_guard locker{m_lock}; - ceph_assert(m_image_map); - m_image_map.reset(); + for (auto &it : m_namespace_replayers) { + it.second->handle_release_leader(gather_ctx->new_sub()); + } - m_instance_replayer->release_all(on_finish); + gather_ctx->activate(); + }, on_finish); } template @@ -1035,68 +940,47 @@ void PoolReplayer::handle_update_leader( const std::string &leader_instance_id) { dout(10) << "leader_instance_id=" << leader_instance_id << dendl; - m_instance_watcher->handle_update_leader(leader_instance_id); -} - -template -void PoolReplayer::handle_acquire_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) { - dout(5) << "global_image_id=" << global_image_id << ", " - << "instance_id=" << instance_id << dendl; - - m_instance_watcher->notify_image_acquire(instance_id, global_image_id, - on_finish); -} + std::lock_guard locker{m_lock}; -template -void PoolReplayer::handle_release_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) { - dout(5) << "global_image_id=" << global_image_id << ", " - << "instance_id=" << instance_id << dendl; - - m_instance_watcher->notify_image_release(instance_id, global_image_id, - on_finish); -} + m_default_namespace_replayer->handle_update_leader(leader_instance_id); -template -void PoolReplayer::handle_remove_image(const std::string &mirror_uuid, - const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) { - ceph_assert(!mirror_uuid.empty()); - dout(5) << "mirror_uuid=" << mirror_uuid << ", " - << "global_image_id=" << global_image_id << ", " - << "instance_id=" << instance_id << dendl; - - m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id, - mirror_uuid, on_finish); + for (auto &it : m_namespace_replayers) { + it.second->handle_update_leader(leader_instance_id); + } } template -void PoolReplayer::handle_instances_added(const InstanceIds &instance_ids) { +void PoolReplayer::handle_instances_added( + const std::vector &instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; + std::lock_guard locker{m_lock}; if (!m_leader_watcher->is_leader()) { return; } - ceph_assert(m_image_map); - m_image_map->update_instances_added(instance_ids); + m_default_namespace_replayer->handle_instances_added(instance_ids); + + for (auto &it : m_namespace_replayers) { + it.second->handle_instances_added(instance_ids); + } } template void PoolReplayer::handle_instances_removed( - const InstanceIds &instance_ids) { + const std::vector &instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; + std::lock_guard locker{m_lock}; if (!m_leader_watcher->is_leader()) { return; } - ceph_assert(m_image_map); - m_image_map->update_instances_removed(instance_ids); + m_default_namespace_replayer->handle_instances_removed(instance_ids); + + for (auto &it : m_namespace_replayers) { + it.second->handle_instances_removed(instance_ids); + } } } // namespace mirror diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h index c734d9a880925..039a420c67501 100644 --- a/src/tools/rbd_mirror/PoolReplayer.h +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -4,25 +4,21 @@ #ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H #define CEPH_RBD_MIRROR_POOL_REPLAYER_H -#include "common/AsyncOpTracker.h" -#include "common/ceph_mutex.h" +#include "common/Cond.h" #include "common/WorkQueue.h" +#include "common/ceph_mutex.h" #include "include/rados/librados.hpp" +#include "librbd/Utils.h" -#include "ClusterWatcher.h" -#include "LeaderWatcher.h" -#include "PoolWatcher.h" -#include "ImageDeleter.h" +#include "tools/rbd_mirror/ImageSyncThrottler.h" +#include "tools/rbd_mirror/LeaderWatcher.h" +#include "tools/rbd_mirror/NamespaceReplayer.h" #include "tools/rbd_mirror/Types.h" -#include "tools/rbd_mirror/image_map/Types.h" #include "tools/rbd_mirror/leader_watcher/Types.h" -#include "tools/rbd_mirror/pool_watcher/Types.h" #include "tools/rbd_mirror/service_daemon/Types.h" -#include #include #include -#include #include #include @@ -35,9 +31,6 @@ namespace librbd { class ImageCtx; } namespace rbd { namespace mirror { -template class ImageMap; -template class InstanceReplayer; -template class InstanceWatcher; template class ServiceDaemon; template struct Threads; @@ -82,86 +75,21 @@ private: * INIT * | * v - * <-------------------------\ - * . | - * . | - * v (leader acquired) | - * INIT_IMAGE_MAP SHUT_DOWN_IMAGE_MAP - * | ^ - * v | - * INIT_LOCAL_POOL_WATCHER WAIT_FOR_NOTIFICATIONS - * | ^ - * v | - * INIT_REMOTE_POOL_WATCHER SHUT_DOWN_POOL_WATCHERS - * | ^ - * v | - * INIT_IMAGE_DELETER SHUT_DOWN_IMAGE_DELETER - * | ^ - * v . - * <-----------\ . - * . | . - * . (image update) | . - * . . > NOTIFY_INSTANCE_WATCHER . - * . . - * . (leader lost / shut down) . - * . . . . . . . . . . . . . . . . . . + * <---------------------\ + * . | + * . (leader acquired) | + * v | + * NOTIFY_NAMESPACE_WATCHERS NOTIFY_NAMESPACE_WATCHERS + * | ^ + * v . + * . + * . . + * . (leader lost / shut down) . + * . . . . . . . . . . . . . . . . * * @endverbatim */ - typedef std::vector InstanceIds; - - struct PoolWatcherListener : public pool_watcher::Listener { - PoolReplayer *pool_replayer; - bool local; - - PoolWatcherListener(PoolReplayer *pool_replayer, bool local) - : pool_replayer(pool_replayer), local(local) { - } - - void handle_update(const std::string &mirror_uuid, - ImageIds &&added_image_ids, - ImageIds &&removed_image_ids) override { - pool_replayer->handle_update((local ? "" : mirror_uuid), - std::move(added_image_ids), - std::move(removed_image_ids)); - } - }; - - struct ImageMapListener : public image_map::Listener { - PoolReplayer *pool_replayer; - - ImageMapListener(PoolReplayer *pool_replayer) - : pool_replayer(pool_replayer) { - } - - void acquire_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) override { - pool_replayer->handle_acquire_image(global_image_id, instance_id, - on_finish); - } - - void release_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) override { - pool_replayer->handle_release_image(global_image_id, instance_id, - on_finish); - } - - void remove_image(const std::string &mirror_uuid, - const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish) override { - pool_replayer->handle_remove_image(mirror_uuid, global_image_id, - instance_id, on_finish); - } - }; - - void handle_update(const std::string &mirror_uuid, - ImageIds &&added_image_ids, - ImageIds &&removed_image_ids); - int init_rados(const std::string &cluster_name, const std::string &client_name, const std::string &mon_host, @@ -169,48 +97,89 @@ private: const std::string &description, RadosRef *rados_ref, bool strip_cluster_overrides); - void handle_post_acquire_leader(Context *on_finish); - void handle_pre_release_leader(Context *on_finish); - - void init_image_map(Context *on_finish); - void handle_init_image_map(int r, Context *on_finish); + void update_namespace_replayers(); + int list_mirroring_namespaces(std::set *namespaces); - void init_local_pool_watcher(Context *on_finish); - void handle_init_local_pool_watcher(int r, Context *on_finish); + void namespace_replayer_acquire_leader(const std::string &name, + Context *on_finish); - void init_remote_pool_watcher(Context *on_finish); - void handle_init_remote_pool_watcher(int r, Context *on_finish); - - void init_image_deleter(Context* on_finish); - void handle_init_image_deleter(int r, Context* on_finish); - - void shut_down_image_deleter(Context* on_finish); - void handle_shut_down_image_deleter(int r, Context* on_finish); + void handle_post_acquire_leader(Context *on_finish); + void handle_pre_release_leader(Context *on_finish); - void shut_down_pool_watchers(Context *on_finish); - void handle_shut_down_pool_watchers(int r, Context *on_finish); + void handle_update_leader(const std::string &leader_instance_id); - void wait_for_update_ops(Context *on_finish); - void handle_wait_for_update_ops(int r, Context *on_finish); + void handle_instances_added(const std::vector &instance_ids); + void handle_instances_removed(const std::vector &instance_ids); + + // sync version, executed in the caller thread + template + void with_namespace_replayers(L &&callback) { + std::lock_guard locker{m_lock}; + + if (m_namespace_replayers_locked) { + ceph_assert(m_on_namespace_replayers_unlocked == nullptr); + C_SaferCond cond; + m_on_namespace_replayers_unlocked = &cond; + m_lock.unlock(); + cond.wait(); + m_lock.lock(); + } else { + m_namespace_replayers_locked = true; + } - void shut_down_image_map(Context *on_finish); - void handle_shut_down_image_map(int r, Context *on_finish); + ceph_assert(m_namespace_replayers_locked); + callback(); // may temporary release the lock + ceph_assert(m_namespace_replayers_locked); - void handle_update_leader(const std::string &leader_instance_id); + if (m_on_namespace_replayers_unlocked == nullptr) { + m_namespace_replayers_locked = false; + return; + } - void handle_acquire_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish); - void handle_release_image(const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish); - void handle_remove_image(const std::string &mirror_uuid, - const std::string &global_image_id, - const std::string &instance_id, - Context* on_finish); + m_threads->work_queue->queue(m_on_namespace_replayers_unlocked); + m_on_namespace_replayers_unlocked = nullptr; + } + + // async version + template + void with_namespace_replayers(L &&callback, Context *on_finish) { + std::lock_guard locker{m_lock}; + + on_finish = librbd::util::create_async_context_callback( + m_threads->work_queue, new FunctionContext( + [this, on_finish](int r) { + { + std::lock_guard locker{m_lock}; + ceph_assert(m_namespace_replayers_locked); + + m_namespace_replayers_locked = false; + + if (m_on_namespace_replayers_unlocked != nullptr) { + m_namespace_replayers_locked = true; + m_threads->work_queue->queue(m_on_namespace_replayers_unlocked); + m_on_namespace_replayers_unlocked = nullptr; + } + } + on_finish->complete(r); + })); + + auto on_lock = new FunctionContext( + [this, callback, on_finish](int) { + std::lock_guard locker{m_lock}; + ceph_assert(m_namespace_replayers_locked); + + callback(on_finish); + }); + + if (m_namespace_replayers_locked) { + ceph_assert(m_on_namespace_replayers_unlocked == nullptr); + m_on_namespace_replayers_unlocked = on_lock; + return; + } - void handle_instances_added(const InstanceIds &instance_ids); - void handle_instances_removed(const InstanceIds &instance_ids); + m_namespace_replayers_locked = true; + m_threads->work_queue->queue(on_lock); + } Threads *m_threads; ServiceDaemon *m_service_daemon; @@ -221,7 +190,7 @@ private: mutable ceph::mutex m_lock; ceph::condition_variable m_cond; - std::atomic m_stopping = { false }; + bool m_stopping = false; bool m_manual_stop = false; bool m_blacklisted = false; @@ -231,23 +200,20 @@ private: librados::IoCtx m_local_io_ctx; librados::IoCtx m_remote_io_ctx; - PoolWatcherListener m_local_pool_watcher_listener; - std::unique_ptr> m_local_pool_watcher; - - PoolWatcherListener m_remote_pool_watcher_listener; - std::unique_ptr> m_remote_pool_watcher; + std::string m_local_mirror_uuid; - std::unique_ptr> m_instance_replayer; - std::unique_ptr> m_image_deleter; - - ImageMapListener m_image_map_listener; - std::unique_ptr> m_image_map; + std::unique_ptr> m_default_namespace_replayer; + std::map *> m_namespace_replayers; std::string m_asok_hook_name; AdminSocketHook *m_asok_hook = nullptr; service_daemon::CalloutId m_callout_id = service_daemon::CALLOUT_ID_NONE; + bool m_leader = false; + bool m_namespace_replayers_locked = false; + Context *m_on_namespace_replayers_unlocked = nullptr; + class PoolReplayerThread : public Thread { PoolReplayer *m_pool_replayer; public: @@ -293,8 +259,7 @@ private: } m_leader_listener; std::unique_ptr> m_leader_watcher; - std::unique_ptr> m_instance_watcher; - AsyncOpTracker m_update_op_tracker; + std::unique_ptr> m_image_sync_throttler; }; } // namespace mirror -- 2.39.5