From: Jason Dillaman Date: Wed, 25 Sep 2019 01:47:20 +0000 (-0400) Subject: rbd-mirror: initialize mirror status updater against remote peer X-Git-Tag: v15.1.0~1245^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bacb73f572848c6f5a02f295938efe88e688d79d;p=ceph.git rbd-mirror: initialize mirror status updater against remote peer Signed-off-by: Jason Dillaman --- diff --git a/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc index 1dcf61abb45..c4daeb85082 100644 --- a/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_NamespaceReplayer.cc @@ -171,30 +171,25 @@ InstanceWatcher* InstanceWatcher struct MirrorStatusUpdater { - static MirrorStatusUpdater* s_instance; + static std::map s_instance; static MirrorStatusUpdater *create(librados::IoCtx &io_ctx, Threads *threads, const std::string& site_name) { - ceph_assert(s_instance != nullptr); - return s_instance; - } - - MirrorStatusUpdater() { - ceph_assert(s_instance == nullptr); - s_instance = this; + ceph_assert(s_instance[site_name] != nullptr); + return s_instance[site_name]; } - ~MirrorStatusUpdater() { - ceph_assert(s_instance == this); - s_instance = nullptr; + MirrorStatusUpdater(const std::string& site_name) { + s_instance[site_name] = this; } MOCK_METHOD1(init, void(Context *)); MOCK_METHOD1(shut_down, void(Context *)); }; -MirrorStatusUpdater *MirrorStatusUpdater::s_instance = nullptr; +std::map *> + MirrorStatusUpdater::s_instance; template<> struct PoolWatcher { @@ -288,15 +283,15 @@ public: TestMockFixture::TearDown(); } - void expect_mirror_status_watcher_init( - MockMirrorStatusUpdater &mock_mirror_status_watcher, int r) { - EXPECT_CALL(mock_mirror_status_watcher, init(_)) + void expect_mirror_status_updater_init( + MockMirrorStatusUpdater &mock_mirror_status_updater, int r) { + EXPECT_CALL(mock_mirror_status_updater, init(_)) .WillOnce(CompleteContext(m_mock_threads->work_queue, r)); } - void expect_mirror_status_watcher_shut_down( - MockMirrorStatusUpdater &mock_mirror_status_watcher) { - EXPECT_CALL(mock_mirror_status_watcher, shut_down(_)) + void expect_mirror_status_updater_shut_down( + MockMirrorStatusUpdater &mock_mirror_status_updater) { + EXPECT_CALL(mock_mirror_status_updater, shut_down(_)) .WillOnce(CompleteContext(m_mock_threads->work_queue, 0)); } @@ -407,11 +402,33 @@ public: MockThreads *m_mock_threads; }; -TEST_F(TestMockNamespaceReplayer, Init_MirrorStatusUpdaterError) { +TEST_F(TestMockNamespaceReplayer, Init_LocalMirrorStatusUpdaterError) { + InSequence seq; + + auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""}; + expect_mirror_status_updater_init(*mock_local_mirror_status_updater, -EINVAL); + + MockNamespaceReplayer namespace_replayer( + {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", + "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr, + nullptr); + + C_SaferCond on_init; + namespace_replayer.init(&on_init); + ASSERT_EQ(-EINVAL, on_init.wait()); +} + +TEST_F(TestMockNamespaceReplayer, Init_RemoteMirrorStatusUpdaterError) { InSequence seq; - auto mock_mirror_status_watcher = new MockMirrorStatusUpdater; - expect_mirror_status_watcher_init(*mock_mirror_status_watcher, -EINVAL); + auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""}; + expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0); + + auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"}; + expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, + -EINVAL); + + expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater); MockNamespaceReplayer namespace_replayer( {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", @@ -426,13 +443,17 @@ TEST_F(TestMockNamespaceReplayer, Init_MirrorStatusUpdaterError) { TEST_F(TestMockNamespaceReplayer, Init_InstanceReplayerError) { InSequence seq; - auto mock_mirror_status_watcher = new MockMirrorStatusUpdater; - expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""}; + expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0); + + auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"}; + expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0); auto mock_instance_replayer = new MockInstanceReplayer(); expect_instance_replayer_init(*mock_instance_replayer, -EINVAL); - expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater); + expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater); MockNamespaceReplayer namespace_replayer( {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", @@ -447,8 +468,11 @@ TEST_F(TestMockNamespaceReplayer, Init_InstanceReplayerError) { TEST_F(TestMockNamespaceReplayer, Init_InstanceWatcherError) { InSequence seq; - auto mock_mirror_status_watcher = new MockMirrorStatusUpdater; - expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""}; + expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0); + + auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"}; + expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0); auto mock_instance_replayer = new MockInstanceReplayer(); expect_instance_replayer_init(*mock_instance_replayer, 0); @@ -459,7 +483,8 @@ TEST_F(TestMockNamespaceReplayer, Init_InstanceWatcherError) { expect_instance_watcher_init(*mock_instance_watcher, -EINVAL); expect_instance_replayer_shut_down(*mock_instance_replayer); - expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater); + expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater); MockNamespaceReplayer namespace_replayer( {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid", @@ -474,8 +499,11 @@ TEST_F(TestMockNamespaceReplayer, Init_InstanceWatcherError) { TEST_F(TestMockNamespaceReplayer, Init) { InSequence seq; - auto mock_mirror_status_watcher = new MockMirrorStatusUpdater; - expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""}; + expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0); + + auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"}; + expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0); auto mock_instance_replayer = new MockInstanceReplayer(); expect_instance_replayer_init(*mock_instance_replayer, 0); @@ -501,7 +529,8 @@ TEST_F(TestMockNamespaceReplayer, Init) { expect_instance_replayer_stop(*mock_instance_replayer); expect_instance_watcher_shut_down(*mock_instance_watcher); expect_instance_replayer_shut_down(*mock_instance_replayer); - expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater); + expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater); C_SaferCond on_shut_down; namespace_replayer.shut_down(&on_shut_down); @@ -513,8 +542,11 @@ TEST_F(TestMockNamespaceReplayer, AcuqireLeader) { // init - auto mock_mirror_status_watcher = new MockMirrorStatusUpdater; - expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0); + auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""}; + expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0); + + auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"}; + expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0); auto mock_instance_replayer = new MockInstanceReplayer(); expect_instance_replayer_init(*mock_instance_replayer, 0); @@ -571,7 +603,8 @@ TEST_F(TestMockNamespaceReplayer, AcuqireLeader) { expect_instance_replayer_stop(*mock_instance_replayer); expect_instance_watcher_shut_down(*mock_instance_watcher); expect_instance_replayer_shut_down(*mock_instance_replayer); - expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher); + expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater); + expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater); C_SaferCond on_shut_down; namespace_replayer.shut_down(&on_shut_down); diff --git a/src/tools/rbd_mirror/NamespaceReplayer.cc b/src/tools/rbd_mirror/NamespaceReplayer.cc index d3910c105b1..cd6c5780946 100644 --- a/src/tools/rbd_mirror/NamespaceReplayer.cc +++ b/src/tools/rbd_mirror/NamespaceReplayer.cc @@ -283,12 +283,47 @@ void NamespaceReplayer::handle_init_local_status_updater(int r) { derr << "error initializing local mirror status updater: " << cpp_strerror(r) << dendl; + m_local_status_updater.reset(); ceph_assert(m_on_finish != nullptr); m_threads->work_queue->queue(m_on_finish, r); m_on_finish = nullptr; return; } + init_remote_status_updater(); +} + +template +void NamespaceReplayer::init_remote_status_updater() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_remote_status_updater); + + m_remote_status_updater.reset(MirrorStatusUpdater::create( + m_remote_io_ctx, m_threads, m_local_site_name)); + auto ctx = create_context_callback< + NamespaceReplayer, + &NamespaceReplayer::handle_init_remote_status_updater>(this); + m_remote_status_updater->init(ctx); +} + +template +void NamespaceReplayer::handle_init_remote_status_updater(int r) { + dout(10) << "r=" << r << dendl; + + std::lock_guard locker{m_lock}; + + if (r < 0) { + derr << "error initializing remote mirror status updater: " + << cpp_strerror(r) << dendl; + + m_remote_status_updater.reset(); + m_ret_val = r; + shut_down_local_status_updater(); + return; + } + init_instance_replayer(); } @@ -320,7 +355,7 @@ void NamespaceReplayer::handle_init_instance_replayer(int r) { m_instance_replayer.reset(); m_ret_val = r; - shut_down_local_status_updater(); + shut_down_remote_status_updater(); return; } @@ -456,6 +491,35 @@ void NamespaceReplayer::handle_shut_down_instance_replayer(int r) { m_instance_replayer.reset(); + shut_down_remote_status_updater(); +} + +template +void NamespaceReplayer::shut_down_remote_status_updater() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_remote_status_updater); + + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + NamespaceReplayer, + &NamespaceReplayer::handle_shut_down_remote_status_updater>(this)); + m_remote_status_updater->shut_down(ctx); +} + +template +void NamespaceReplayer::handle_shut_down_remote_status_updater(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "error shutting remote mirror status updater down: " + << cpp_strerror(r) << dendl; + } + + std::lock_guard locker{m_lock}; + m_remote_status_updater.reset(); + shut_down_local_status_updater(); } @@ -479,8 +543,8 @@ void NamespaceReplayer::handle_shut_down_local_status_updater(int r) { dout(10) << "r=" << r << dendl; if (r < 0) { - derr << "error shutting mirror status watcher down: " << cpp_strerror(r) - << dendl; + derr << "error shutting local mirror status updater down: " + << cpp_strerror(r) << dendl; } std::lock_guard locker{m_lock}; diff --git a/src/tools/rbd_mirror/NamespaceReplayer.h b/src/tools/rbd_mirror/NamespaceReplayer.h index 20a5659c82a..3773bbd5758 100644 --- a/src/tools/rbd_mirror/NamespaceReplayer.h +++ b/src/tools/rbd_mirror/NamespaceReplayer.h @@ -96,22 +96,25 @@ private: /** * @verbatim * - * <--------------------------------\ - * | (init) ^ (error) | - * v * | - * INIT_LOCAL_STATUS_UPDATER * * * * * * > SHUT_DOWN_LOCAL_STATUS_UPDATER - * | * (error) ^ - * v * | - * INIT_INSTANCE_REPLAYER * * * * * * * > SHUT_DOWN_INSTANCE_REPLAYER - * | * ^ - * v * | - * INIT_INSTANCE_WATCHER * * * * * * * * SHUT_DOWN_INSTANCE_WATCHER - * | (error) ^ - * | | - * v STOP_INSTANCE_REPLAYER - * | ^ - * | (shut down) | - * | /------------------------------------------/ + * <------------------------------------\ + * | (init) ^ (error) | + * v * | + * INIT_LOCAL_STATUS_UPDATER * * * * * * * * > SHUT_DOWN_LOCAL_STATUS_UPDATER + * | * (error) ^ + * v * | + * INIT_REMOTE_STATUS_UPDATER * * * * * * * > SHUT_DOWN_REMOTE_STATUS_UPDATER + * | * (error) ^ + * v * | + * INIT_INSTANCE_REPLAYER * * * * * * * * * > SHUT_DOWN_INSTANCE_REPLAYER + * | * ^ + * v * | + * INIT_INSTANCE_WATCHER * * * * * * * * * * SHUT_DOWN_INSTANCE_WATCHER + * | (error) ^ + * | | + * v STOP_INSTANCE_REPLAYER + * | ^ + * | (shut down) | + * | /----------------------------------------------/ * v | * <---------------------------\ * . | @@ -201,6 +204,9 @@ private: void init_local_status_updater(); void handle_init_local_status_updater(int r); + void init_remote_status_updater(); + void handle_init_remote_status_updater(int r); + void init_instance_replayer(); void handle_init_instance_replayer(int r); @@ -216,6 +222,9 @@ private: void shut_down_instance_replayer(); void handle_shut_down_instance_replayer(int r); + void shut_down_remote_status_updater(); + void handle_shut_down_remote_status_updater(int r); + void shut_down_local_status_updater(); void handle_shut_down_local_status_updater(int r); @@ -268,6 +277,7 @@ private: Context *m_on_finish = nullptr; std::unique_ptr> m_local_status_updater; + std::unique_ptr> m_remote_status_updater; PoolWatcherListener m_local_pool_watcher_listener; std::unique_ptr> m_local_pool_watcher;