From 0d36eb58f8d5625bad0c075632594ae5e1169fa2 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 13 Dec 2019 10:54:58 -0500 Subject: [PATCH] rbd-mirror: switch image replayer to new standalone journal replayer Remove all the original journal replaying code embedded in the image replayer and instead rely on the new journal replayer class. Signed-off-by: Jason Dillaman --- src/test/rbd_mirror/test_ImageReplayer.cc | 6 +- .../rbd_mirror/test_mock_ImageReplayer.cc | 685 +++----------- src/tools/rbd_mirror/ImageReplayer.cc | 870 +++--------------- src/tools/rbd_mirror/ImageReplayer.h | 168 +--- .../rbd_mirror/image_replayer/Replayer.h | 2 + .../image_replayer/journal/Replayer.cc | 49 +- .../image_replayer/journal/Replayer.h | 20 +- 7 files changed, 324 insertions(+), 1476 deletions(-) diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index c56fffc1d18..cb214343572 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -668,9 +668,9 @@ TEST_F(TestImageReplayer, Resync) flush(ictx); close_image(ictx); - C_SaferCond ctx; - m_replayer->resync_image(&ctx); - ASSERT_EQ(0, ctx.wait()); + open_local_image(&ictx); + librbd::Journal<>::request_resync(ictx); + close_image(ictx); wait_for_stopped(); diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index 3b4d596ed55..f627d752d2d 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -2,8 +2,8 @@ // vim: ts=8 sw=2 smarttab #include "cls/journal/cls_journal_types.h" -#include "librbd/journal/Replay.h" #include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" #include "tools/rbd_mirror/ImageDeleter.h" #include "tools/rbd_mirror/ImageReplayer.h" #include "tools/rbd_mirror/InstanceWatcher.h" @@ -13,12 +13,13 @@ #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h" #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h" -#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" -#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" +#include "tools/rbd_mirror/image_replayer/Replayer.h" +#include "tools/rbd_mirror/image_replayer/ReplayerListener.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "tools/rbd_mirror/image_replayer/journal/Replayer.h" #include "test/rbd_mirror/test_mock_fixture.h" #include "test/journal/mock/MockJournaler.h" #include "test/librbd/mock/MockImageCtx.h" -#include "test/librbd/mock/MockJournal.h" #include "test/rbd_mirror/mock/MockContextWQ.h" #include "test/rbd_mirror/mock/MockSafeTimer.h" @@ -26,37 +27,19 @@ namespace librbd { namespace { -struct MockTestJournal; - struct MockTestImageCtx : public MockImageCtx { MockTestImageCtx(librbd::ImageCtx &image_ctx) : librbd::MockImageCtx(image_ctx) { } - MockTestJournal *journal = nullptr; -}; - -struct MockTestJournal : public MockJournal { - MOCK_METHOD2(start_external_replay, void(journal::Replay **, - Context *on_start)); - MOCK_METHOD0(stop_external_replay, void()); }; } // anonymous namespace namespace journal { -template<> -struct Replay { - MOCK_METHOD2(decode, int(bufferlist::const_iterator *, EventEntry *)); - MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *)); - MOCK_METHOD1(flush, void(Context*)); - MOCK_METHOD2(shut_down, void(bool, Context*)); -}; - template <> struct TypeTraits { typedef ::journal::MockJournalerProxy Journaler; - typedef ::journal::MockReplayEntryProxy ReplayEntry; }; struct MirrorPeerClientMeta; @@ -277,64 +260,44 @@ PrepareRemoteImageRequest* PrepareRemoteImageRequest -struct EventPreprocessor { - static EventPreprocessor *s_instance; - - static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx, - ::journal::MockJournalerProxy &remote_journaler, - const std::string &local_mirror_uuid, - librbd::journal::MirrorPeerClientMeta *client_meta, - MockContextWQ *work_queue) { +template <> +struct Replayer : public image_replayer::Replayer { + static Replayer* s_instance; + librbd::MockTestImageCtx** local_image_ctx; + image_replayer::ReplayerListener* replayer_listener; + + static Replayer* create(librbd::MockTestImageCtx** local_image_ctx, + ::journal::MockJournalerProxy* remote_journaler, + const std::string& local_mirror_uuid, + const std::string& remote_mirror_uuid, + image_replayer::ReplayerListener* replayer_listener, + Threads* threads) { ceph_assert(s_instance != nullptr); + ceph_assert(local_image_ctx != nullptr); + s_instance->local_image_ctx = local_image_ctx; + s_instance->replayer_listener = replayer_listener; return s_instance; } - static void destroy(EventPreprocessor* processor) { - } - - EventPreprocessor() { - ceph_assert(s_instance == nullptr); + Replayer() { s_instance = this; } - ~EventPreprocessor() { - ceph_assert(s_instance == this); - s_instance = nullptr; - } - - MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &)); - MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *)); -}; + MOCK_METHOD0(destroy, void()); -template<> -struct ReplayStatusFormatter { - static ReplayStatusFormatter* s_instance; - - static ReplayStatusFormatter* create(::journal::MockJournalerProxy *journaler, - const std::string &mirror_uuid) { - ceph_assert(s_instance != nullptr); - return s_instance; - } - - static void destroy(ReplayStatusFormatter* formatter) { - } - - ReplayStatusFormatter() { - ceph_assert(s_instance == nullptr); - s_instance = this; - } + MOCK_METHOD1(init, void(Context*)); + MOCK_METHOD1(shut_down, void(Context*)); + MOCK_METHOD1(flush, void(Context*)); - ~ReplayStatusFormatter() { - ceph_assert(s_instance == this); - s_instance = nullptr; - } + MOCK_METHOD2(get_replay_status, bool(std::string*, Context*)); - MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish)); + MOCK_CONST_METHOD0(is_replaying, bool()); + MOCK_CONST_METHOD0(is_resync_requested, bool()); + MOCK_CONST_METHOD0(get_error_code, int()); + MOCK_CONST_METHOD0(get_error_description, std::string()); }; -EventPreprocessor* EventPreprocessor::s_instance = nullptr; -ReplayStatusFormatter* ReplayStatusFormatter::s_instance = nullptr; +Replayer* Replayer::s_instance = nullptr; } // namespace journal } // namespace image_replayer @@ -367,9 +330,7 @@ public: typedef image_replayer::CloseImageRequest MockCloseImageRequest; typedef image_replayer::PrepareLocalImageRequest MockPrepareLocalImageRequest; typedef image_replayer::PrepareRemoteImageRequest MockPrepareRemoteImageRequest; - typedef image_replayer::journal::EventPreprocessor MockEventPreprocessor; - typedef image_replayer::journal::ReplayStatusFormatter MockReplayStatusFormatter; - typedef librbd::journal::Replay MockReplay; + typedef image_replayer::journal::Replayer MockJournalReplayer; typedef ImageReplayer MockImageReplayer; typedef InstanceWatcher MockInstanceWatcher; @@ -414,18 +375,6 @@ public: })); } - void expect_flush_repeatedly(MockReplay& mock_replay, - journal::MockJournaler& mock_journal) { - EXPECT_CALL(mock_replay, flush(_)) - .WillRepeatedly(Invoke([this](Context* ctx) { - m_threads->work_queue->queue(ctx, 0); - })); - EXPECT_CALL(mock_journal, flush_commit_position(_)) - .WillRepeatedly(Invoke([this](Context* ctx) { - m_threads->work_queue->queue(ctx, 0); - })); - } - void expect_trash_move(MockImageDeleter& mock_image_deleter, const std::string& global_image_id, bool ignore_orphan, int r) { @@ -442,13 +391,6 @@ public: return bl; } - void expect_get_or_send_update( - MockReplayStatusFormatter &mock_replay_status_formatter) { - EXPECT_CALL(mock_replay_status_formatter, get_or_send_update(_, _)) - .WillRepeatedly(DoAll(WithArg<1>(CompleteContext(-EEXIST)), - Return(true))); - } - void expect_send(MockPrepareLocalImageRequest &mock_request, const std::string &local_image_id, const std::string &local_image_name, @@ -494,51 +436,29 @@ public: })); } - void expect_start_external_replay(librbd::MockTestJournal &mock_journal, - MockReplay *mock_replay, int r) { - EXPECT_CALL(mock_journal, start_external_replay(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(mock_replay), - WithArg<1>(CompleteContext(r)))); - } - - void expect_init(::journal::MockJournaler &mock_journaler, int r) { - EXPECT_CALL(mock_journaler, init(_)) - .WillOnce(CompleteContext(r)); - } - - void expect_get_cached_client(::journal::MockJournaler &mock_journaler, - int r) { - librbd::journal::ImageClientMeta image_client_meta; - image_client_meta.tag_class = 0; - - librbd::journal::ClientData client_data; - client_data.client_meta = image_client_meta; - - cls::journal::Client client; - encode(client_data, client.data); - - EXPECT_CALL(mock_journaler, get_cached_client("local_mirror_uuid", _)) - .WillOnce(DoAll(SetArgPointee<1>(client), - Return(r))); - } - - void expect_stop_replay(::journal::MockJournaler &mock_journaler, int r) { - EXPECT_CALL(mock_journaler, stop_replay(_)) - .WillOnce(CompleteContext(r)); - } - - void expect_flush(MockReplay &mock_replay, int r) { - EXPECT_CALL(mock_replay, flush(_)).WillOnce(CompleteContext(r)); + void expect_init(MockJournalReplayer& mock_journal_replayer, int r) { + EXPECT_CALL(mock_journal_replayer, init(_)) + .WillOnce(Invoke([this, &mock_journal_replayer, r](Context* ctx) { + if (r < 0) { + *mock_journal_replayer.local_image_ctx = nullptr; + } + m_threads->work_queue->queue(ctx, r); + })); } - void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) { - EXPECT_CALL(mock_replay, shut_down(cancel_ops, _)) - .WillOnce(WithArg<1>(CompleteContext(r))); + void expect_shut_down(MockJournalReplayer& mock_journal_replayer, int r) { + EXPECT_CALL(mock_journal_replayer, shut_down(_)) + .WillOnce(Invoke([this, &mock_journal_replayer, r](Context* ctx) { + *mock_journal_replayer.local_image_ctx = nullptr; + m_threads->work_queue->queue(ctx, r); + })); + EXPECT_CALL(mock_journal_replayer, destroy()); } - void expect_shut_down(journal::MockJournaler &mock_journaler, int r) { - EXPECT_CALL(mock_journaler, shut_down(_)) - .WillOnce(CompleteContext(r)); + void expect_get_replay_status(MockJournalReplayer& mock_journal_replayer) { + EXPECT_CALL(mock_journal_replayer, get_replay_status(_, _)) + .WillRepeatedly(DoAll(WithArg<1>(CompleteContext(-EEXIST)), + Return(true))); } void expect_send(MockCloseImageRequest &mock_close_image_request, int r) { @@ -549,75 +469,6 @@ public: })); } - void expect_get_commit_tid_in_debug( - ::journal::MockReplayEntry &mock_replay_entry) { - // It is used in debug messages and depends on debug level - EXPECT_CALL(mock_replay_entry, get_commit_tid()) - .Times(AtLeast(0)) - .WillRepeatedly(Return(0)); - } - - void expect_get_tag_tid_in_debug(librbd::MockTestJournal &mock_journal) { - // It is used in debug messages and depends on debug level - EXPECT_CALL(mock_journal, get_tag_tid()).Times(AtLeast(0)) - .WillRepeatedly(Return(0)); - } - - void expect_committed(::journal::MockReplayEntry &mock_replay_entry, - ::journal::MockJournaler &mock_journaler, int times) { - EXPECT_CALL(mock_replay_entry, get_data()).Times(times); - EXPECT_CALL(mock_journaler, committed( - MatcherCast(_))) - .Times(times); - } - - void expect_try_pop_front(::journal::MockJournaler &mock_journaler, - uint64_t replay_tag_tid, bool entries_available) { - EXPECT_CALL(mock_journaler, try_pop_front(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(::journal::MockReplayEntryProxy()), - SetArgPointee<1>(replay_tag_tid), - Return(entries_available))); - } - - void expect_try_pop_front_return_no_entries( - ::journal::MockJournaler &mock_journaler, Context *on_finish) { - EXPECT_CALL(mock_journaler, try_pop_front(_, _)) - .WillOnce(DoAll(Invoke([on_finish](::journal::MockReplayEntryProxy *e, - uint64_t *t) { - on_finish->complete(0); - }), - Return(false))); - } - - void expect_get_tag(::journal::MockJournaler &mock_journaler, - const cls::journal::Tag &tag, int r) { - EXPECT_CALL(mock_journaler, get_tag(_, _, _)) - .WillOnce(DoAll(SetArgPointee<1>(tag), - WithArg<2>(CompleteContext(r)))); - } - - void expect_allocate_tag(librbd::MockTestJournal &mock_journal, int r) { - EXPECT_CALL(mock_journal, allocate_tag(_, _, _)) - .WillOnce(WithArg<2>(CompleteContext(r))); - } - - void expect_preprocess(MockEventPreprocessor &mock_event_preprocessor, - bool required, int r) { - EXPECT_CALL(mock_event_preprocessor, is_required(_)) - .WillOnce(Return(required)); - if (required) { - EXPECT_CALL(mock_event_preprocessor, preprocess(_, _)) - .WillOnce(WithArg<1>(CompleteContext(r))); - } - } - - void expect_process(MockReplay &mock_replay, - int on_ready_r, int on_commit_r) { - EXPECT_CALL(mock_replay, process(_, _, _)) - .WillOnce(DoAll(WithArg<1>(CompleteContext(on_ready_r)), - WithArg<2>(CompleteContext(on_commit_r)))); - } - void expect_set_mirror_image_status_repeatedly() { EXPECT_CALL(m_local_status_updater, set_mirror_image_status(_, _, _)) .WillRepeatedly(Invoke([](auto, auto, auto){})); @@ -640,6 +491,16 @@ public: &m_remote_status_updater); } + void wait_for_stopped() { + for (int i = 0; i < 10000; i++) { + if (m_image_replayer->is_stopped()) { + break; + } + usleep(1000); + } + ASSERT_TRUE(m_image_replayer->is_stopped()); + } + librbd::ImageCtx *m_remote_image_ctx; librbd::ImageCtx *m_local_image_ctx = nullptr; MockInstanceWatcher m_instance_watcher; @@ -654,9 +515,6 @@ TEST_F(TestMockImageReplayer, StartStop) { create_local_image(); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); - librbd::MockTestJournal mock_local_journal; - mock_local_image_ctx.journal = &mock_local_journal; - journal::MockJournaler mock_remote_journaler; MockThreads mock_threads(m_threads); expect_work_queue_repeatedly(mock_threads); @@ -666,13 +524,10 @@ TEST_F(TestMockImageReplayer, StartStop) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplay mock_local_replay; - MockEventPreprocessor mock_event_preprocessor; - MockReplayStatusFormatter mock_replay_status_formatter; + MockJournalReplayer mock_journal_replayer; + expect_get_replay_status(mock_journal_replayer); expect_set_mirror_image_status_repeatedly(); - expect_flush_repeatedly(mock_local_replay, mock_remote_journaler); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -681,17 +536,7 @@ TEST_F(TestMockImageReplayer, StartStop) { m_remote_image_ctx->id, 0); EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0); - - EXPECT_CALL(mock_local_journal, add_listener(_)); - - expect_init(mock_remote_journaler, 0); - - EXPECT_CALL(mock_remote_journaler, add_listener(_)); - expect_get_cached_client(mock_remote_journaler, 0); - - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - - EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _)); + expect_init(mock_journal_replayer, 0); create_image_replayer(mock_threads); @@ -702,17 +547,7 @@ TEST_F(TestMockImageReplayer, StartStop) { m_image_replayer->get_health_state()); // STOP - - MockCloseImageRequest mock_close_local_image_request; - - expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - expect_send(mock_close_local_image_request, 0); - - expect_stop_replay(mock_remote_journaler, 0); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); + expect_shut_down(mock_journal_replayer, 0); expect_mirror_image_status_exists(false); C_SaferCond stop_ctx; @@ -734,10 +569,8 @@ TEST_F(TestMockImageReplayer, LocalImagePrimary) { MockImageDeleter mock_image_deleter; MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; - MockReplayStatusFormatter mock_replay_status_formatter; expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -745,8 +578,6 @@ TEST_F(TestMockImageReplayer, LocalImagePrimary) { expect_send(mock_prepare_remote_image_request, "remote mirror uuid", "remote image id", 0); EXPECT_CALL(mock_remote_journaler, construct()); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); expect_mirror_image_status_exists(false); create_image_replayer(mock_threads); @@ -769,10 +600,8 @@ TEST_F(TestMockImageReplayer, LocalImageDNE) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplayStatusFormatter mock_replay_status_formatter; expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, "", "", "", -ENOENT); @@ -781,8 +610,6 @@ TEST_F(TestMockImageReplayer, LocalImageDNE) { EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, -EREMOTEIO); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); expect_mirror_image_status_exists(false); create_image_replayer(mock_threads); @@ -802,11 +629,9 @@ TEST_F(TestMockImageReplayer, PrepareLocalImageError) { MockImageDeleter mock_image_deleter; MockPrepareLocalImageRequest mock_prepare_local_image_request; - MockReplayStatusFormatter mock_replay_status_formatter; EXPECT_CALL(m_local_status_updater, set_mirror_image_status(_, _, _)) .WillRepeatedly(Invoke([](auto, auto, auto){})); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -832,10 +657,8 @@ TEST_F(TestMockImageReplayer, GetRemoteImageIdDNE) { MockImageDeleter mock_image_deleter; MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; - MockReplayStatusFormatter mock_replay_status_formatter; expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -863,10 +686,8 @@ TEST_F(TestMockImageReplayer, GetRemoteImageIdNonLinkedDNE) { MockImageDeleter mock_image_deleter; MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; - MockReplayStatusFormatter mock_replay_status_formatter; expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -893,10 +714,8 @@ TEST_F(TestMockImageReplayer, GetRemoteImageIdError) { MockImageDeleter mock_image_deleter; MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; - MockReplayStatusFormatter mock_replay_status_formatter; expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -925,10 +744,8 @@ TEST_F(TestMockImageReplayer, BootstrapError) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplayStatusFormatter mock_replay_status_formatter; expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -938,8 +755,6 @@ TEST_F(TestMockImageReplayer, BootstrapError) { EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, -EINVAL); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); expect_mirror_image_status_exists(false); create_image_replayer(mock_threads); @@ -961,10 +776,8 @@ TEST_F(TestMockImageReplayer, StopBeforeBootstrap) { MockImageDeleter mock_image_deleter; MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; - MockReplayStatusFormatter mock_replay_status_formatter; expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -976,8 +789,6 @@ TEST_F(TestMockImageReplayer, StopBeforeBootstrap) { m_image_replayer->stop(nullptr, true); })); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); expect_mirror_image_status_exists(false); create_image_replayer(mock_threads); @@ -987,15 +798,12 @@ TEST_F(TestMockImageReplayer, StopBeforeBootstrap) { ASSERT_EQ(-ECANCELED, start_ctx.wait()); } -TEST_F(TestMockImageReplayer, StartExternalReplayError) { +TEST_F(TestMockImageReplayer, StopError) { // START create_local_image(); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); - librbd::MockTestJournal mock_local_journal; - mock_local_image_ctx.journal = &mock_local_journal; - journal::MockJournaler mock_remote_journaler; MockThreads mock_threads(m_threads); expect_work_queue_repeatedly(mock_threads); @@ -1005,12 +813,10 @@ TEST_F(TestMockImageReplayer, StartExternalReplayError) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplay mock_local_replay; - MockEventPreprocessor mock_event_preprocessor; - MockReplayStatusFormatter mock_replay_status_formatter; + MockJournalReplayer mock_journal_replayer; + expect_get_replay_status(mock_journal_replayer); expect_set_mirror_image_status_repeatedly(); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -1019,42 +825,28 @@ TEST_F(TestMockImageReplayer, StartExternalReplayError) { m_remote_image_ctx->id, 0); EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0); + expect_init(mock_journal_replayer, 0); - EXPECT_CALL(mock_local_journal, add_listener(_)); - - expect_init(mock_remote_journaler, 0); - - EXPECT_CALL(mock_remote_journaler, add_listener(_)); - expect_get_cached_client(mock_remote_journaler, 0); + create_image_replayer(mock_threads); - expect_start_external_replay(mock_local_journal, nullptr, -EINVAL); + C_SaferCond start_ctx; + m_image_replayer->start(&start_ctx); + ASSERT_EQ(0, start_ctx.wait()); - MockCloseImageRequest mock_close_local_image_request; - EXPECT_CALL(mock_local_journal, remove_listener(_)); - expect_send(mock_close_local_image_request, 0); + // STOP (errors are ignored) - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); + expect_shut_down(mock_journal_replayer, -EINVAL); expect_mirror_image_status_exists(false); - create_image_replayer(mock_threads); - - C_SaferCond start_ctx; - m_image_replayer->start(&start_ctx); - ASSERT_EQ(-EINVAL, start_ctx.wait()); - ASSERT_EQ(image_replayer::HEALTH_STATE_ERROR, - m_image_replayer->get_health_state()); + C_SaferCond stop_ctx; + m_image_replayer->stop(&stop_ctx); + ASSERT_EQ(0, stop_ctx.wait()); } -TEST_F(TestMockImageReplayer, StopError) { - // START - +TEST_F(TestMockImageReplayer, ReplayerError) { create_local_image(); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); - librbd::MockTestJournal mock_local_journal; - mock_local_image_ctx.journal = &mock_local_journal; - journal::MockJournaler mock_remote_journaler; MockThreads mock_threads(m_threads); expect_work_queue_repeatedly(mock_threads); @@ -1064,13 +856,9 @@ TEST_F(TestMockImageReplayer, StopError) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplay mock_local_replay; - MockEventPreprocessor mock_event_preprocessor; - MockReplayStatusFormatter mock_replay_status_formatter; + MockJournalReplayer mock_journal_replayer; expect_set_mirror_image_status_repeatedly(); - expect_flush_repeatedly(mock_local_replay, mock_remote_journaler); - expect_get_or_send_update(mock_replay_status_formatter); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -1079,52 +867,24 @@ TEST_F(TestMockImageReplayer, StopError) { m_remote_image_ctx->id, 0); EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0); + expect_init(mock_journal_replayer, -EINVAL); + EXPECT_CALL(mock_journal_replayer, get_error_description()) + .WillOnce(Return("FAIL")); + EXPECT_CALL(mock_journal_replayer, destroy()); - EXPECT_CALL(mock_local_journal, add_listener(_)); - - expect_init(mock_remote_journaler, 0); - - EXPECT_CALL(mock_remote_journaler, add_listener(_)); - expect_get_cached_client(mock_remote_journaler, 0); - - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - - EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _)); - + expect_mirror_image_status_exists(false); create_image_replayer(mock_threads); C_SaferCond start_ctx; m_image_replayer->start(&start_ctx); - ASSERT_EQ(0, start_ctx.wait()); - - // STOP (errors are ignored) - - MockCloseImageRequest mock_close_local_image_request; - - expect_shut_down(mock_local_replay, true, -EINVAL); - EXPECT_CALL(mock_local_journal, remove_listener(_)); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - expect_send(mock_close_local_image_request, -EINVAL); - - expect_stop_replay(mock_remote_journaler, -EINVAL); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, -EINVAL); - expect_mirror_image_status_exists(false); - - C_SaferCond stop_ctx; - m_image_replayer->stop(&stop_ctx); - ASSERT_EQ(0, stop_ctx.wait()); + ASSERT_EQ(-EINVAL, start_ctx.wait()); } -TEST_F(TestMockImageReplayer, Replay) { +TEST_F(TestMockImageReplayer, ReplayerResync) { // START - create_local_image(); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); - librbd::MockTestJournal mock_local_journal; - mock_local_image_ctx.journal = &mock_local_journal; - journal::MockJournaler mock_remote_journaler; MockThreads mock_threads(m_threads); expect_work_queue_repeatedly(mock_threads); @@ -1134,17 +894,10 @@ TEST_F(TestMockImageReplayer, Replay) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplay mock_local_replay; - MockEventPreprocessor mock_event_preprocessor; - MockReplayStatusFormatter mock_replay_status_formatter; - ::journal::MockReplayEntry mock_replay_entry; + MockJournalReplayer mock_journal_replayer; + expect_get_replay_status(mock_journal_replayer); expect_set_mirror_image_status_repeatedly(); - expect_flush_repeatedly(mock_local_replay, mock_remote_journaler); - expect_get_or_send_update(mock_replay_status_formatter); - expect_get_commit_tid_in_debug(mock_replay_entry); - expect_get_tag_tid_in_debug(mock_local_journal); - expect_committed(mock_replay_entry, mock_remote_journaler, 2); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -1153,17 +906,7 @@ TEST_F(TestMockImageReplayer, Replay) { m_remote_image_ctx->id, 0); EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0); - - EXPECT_CALL(mock_local_journal, add_listener(_)); - - expect_init(mock_remote_journaler, 0); - - EXPECT_CALL(mock_remote_journaler, add_listener(_)); - expect_get_cached_client(mock_remote_journaler, 0); - - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - - EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _)); + expect_init(mock_journal_replayer, 0); create_image_replayer(mock_threads); @@ -1171,72 +914,23 @@ TEST_F(TestMockImageReplayer, Replay) { m_image_replayer->start(&start_ctx); ASSERT_EQ(0, start_ctx.wait()); - // REPLAY - - cls::journal::Tag tag = - {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, - librbd::Journal<>::LOCAL_MIRROR_UUID, - true, 0, 0})}; - - expect_try_pop_front(mock_remote_journaler, tag.tid, true); - - // replay_flush - expect_shut_down(mock_local_replay, false, 0); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - expect_get_tag(mock_remote_journaler, tag, 0); - expect_allocate_tag(mock_local_journal, 0); - - // process - EXPECT_CALL(mock_replay_entry, get_data()); - EXPECT_CALL(mock_local_replay, decode(_, _)) - .WillOnce(Return(0)); - expect_preprocess(mock_event_preprocessor, false, 0); - expect_process(mock_local_replay, 0, 0); - - // the next event with preprocess - expect_try_pop_front(mock_remote_journaler, tag.tid, true); - EXPECT_CALL(mock_replay_entry, get_data()); - EXPECT_CALL(mock_local_replay, decode(_, _)) - .WillOnce(Return(0)); - expect_preprocess(mock_event_preprocessor, true, 0); - expect_process(mock_local_replay, 0, 0); - - // attempt to process the next event - C_SaferCond replay_ctx; - expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); - - // fire - m_image_replayer->handle_replay_ready(); - ASSERT_EQ(0, replay_ctx.wait()); - - // STOP - - MockCloseImageRequest mock_close_local_image_request; - expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - expect_send(mock_close_local_image_request, 0); - - expect_stop_replay(mock_remote_journaler, 0); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); + // NOTIFY + EXPECT_CALL(mock_journal_replayer, is_resync_requested()) + .WillOnce(Return(true)); + expect_shut_down(mock_journal_replayer, 0); + expect_trash_move(mock_image_deleter, "global image id", true, 0); expect_mirror_image_status_exists(false); + mock_journal_replayer.replayer_listener->handle_notification(); + ASSERT_FALSE(m_image_replayer->is_running()); - C_SaferCond stop_ctx; - m_image_replayer->stop(&stop_ctx); - ASSERT_EQ(0, stop_ctx.wait()); + wait_for_stopped(); } -TEST_F(TestMockImageReplayer, DecodeError) { +TEST_F(TestMockImageReplayer, ReplayerInterrupted) { // START - create_local_image(); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); - librbd::MockTestJournal mock_local_journal; - mock_local_image_ctx.journal = &mock_local_journal; - journal::MockJournaler mock_remote_journaler; MockThreads mock_threads(m_threads); expect_work_queue_repeatedly(mock_threads); @@ -1246,16 +940,10 @@ TEST_F(TestMockImageReplayer, DecodeError) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplay mock_local_replay; - MockEventPreprocessor mock_event_preprocessor; - MockReplayStatusFormatter mock_replay_status_formatter; - ::journal::MockReplayEntry mock_replay_entry; + MockJournalReplayer mock_journal_replayer; + expect_get_replay_status(mock_journal_replayer); expect_set_mirror_image_status_repeatedly(); - expect_flush_repeatedly(mock_local_replay, mock_remote_journaler); - expect_get_or_send_update(mock_replay_status_formatter); - expect_get_commit_tid_in_debug(mock_replay_entry); - expect_get_tag_tid_in_debug(mock_local_journal); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -1264,17 +952,7 @@ TEST_F(TestMockImageReplayer, DecodeError) { m_remote_image_ctx->id, 0); EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0); - - EXPECT_CALL(mock_local_journal, add_listener(_)); - - expect_init(mock_remote_journaler, 0); - - EXPECT_CALL(mock_remote_journaler, add_listener(_)); - expect_get_cached_client(mock_remote_journaler, 0); - - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - - EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _)); + expect_init(mock_journal_replayer, 0); create_image_replayer(mock_threads); @@ -1282,65 +960,28 @@ TEST_F(TestMockImageReplayer, DecodeError) { m_image_replayer->start(&start_ctx); ASSERT_EQ(0, start_ctx.wait()); - // REPLAY - - cls::journal::Tag tag = - {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, - librbd::Journal<>::LOCAL_MIRROR_UUID, - true, 0, 0})}; - - expect_try_pop_front(mock_remote_journaler, tag.tid, true); - - // replay_flush - expect_shut_down(mock_local_replay, false, 0); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - expect_get_tag(mock_remote_journaler, tag, 0); - expect_allocate_tag(mock_local_journal, 0); - - // process - EXPECT_CALL(mock_replay_entry, get_data()); - EXPECT_CALL(mock_local_replay, decode(_, _)) + // NOTIFY + EXPECT_CALL(mock_journal_replayer, is_resync_requested()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_journal_replayer, is_replaying()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_journal_replayer, get_error_description()) + .WillOnce(Return("INVALID")); + EXPECT_CALL(mock_journal_replayer, get_error_code()) .WillOnce(Return(-EINVAL)); - - // stop on error - expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - - MockCloseImageRequest mock_close_local_image_request; - C_SaferCond close_ctx; - EXPECT_CALL(mock_close_local_image_request, send()) - .WillOnce(Invoke([&mock_close_local_image_request, &close_ctx]() { - *mock_close_local_image_request.image_ctx = nullptr; - mock_close_local_image_request.on_finish->complete(0); - close_ctx.complete(0); - })); - - expect_stop_replay(mock_remote_journaler, 0); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); + expect_shut_down(mock_journal_replayer, 0); expect_mirror_image_status_exists(false); + mock_journal_replayer.replayer_listener->handle_notification(); + ASSERT_FALSE(m_image_replayer->is_running()); - // fire - m_image_replayer->handle_replay_ready(); - ASSERT_EQ(0, close_ctx.wait()); - - while (!m_image_replayer->is_stopped()) { - usleep(1000); - } + wait_for_stopped(); } -TEST_F(TestMockImageReplayer, DelayedReplay) { - +TEST_F(TestMockImageReplayer, ReplayerRenamed) { // START - create_local_image(); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); - librbd::MockTestJournal mock_local_journal; - mock_local_image_ctx.journal = &mock_local_journal; - journal::MockJournaler mock_remote_journaler; MockThreads mock_threads(m_threads); expect_work_queue_repeatedly(mock_threads); @@ -1350,17 +991,10 @@ TEST_F(TestMockImageReplayer, DelayedReplay) { MockPrepareLocalImageRequest mock_prepare_local_image_request; MockPrepareRemoteImageRequest mock_prepare_remote_image_request; MockBootstrapRequest mock_bootstrap_request; - MockReplay mock_local_replay; - MockEventPreprocessor mock_event_preprocessor; - MockReplayStatusFormatter mock_replay_status_formatter; - ::journal::MockReplayEntry mock_replay_entry; + MockJournalReplayer mock_journal_replayer; + expect_get_replay_status(mock_journal_replayer); expect_set_mirror_image_status_repeatedly(); - expect_flush_repeatedly(mock_local_replay, mock_remote_journaler); - expect_get_or_send_update(mock_replay_status_formatter); - expect_get_commit_tid_in_debug(mock_replay_entry); - expect_get_tag_tid_in_debug(mock_local_journal); - expect_committed(mock_replay_entry, mock_remote_journaler, 1); InSequence seq; expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id, @@ -1369,17 +1003,7 @@ TEST_F(TestMockImageReplayer, DelayedReplay) { m_remote_image_ctx->id, 0); EXPECT_CALL(mock_remote_journaler, construct()); expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0); - - EXPECT_CALL(mock_local_journal, add_listener(_)); - - expect_init(mock_remote_journaler, 0); - - EXPECT_CALL(mock_remote_journaler, add_listener(_)); - expect_get_cached_client(mock_remote_journaler, 0); - - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - - EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _)); + expect_init(mock_journal_replayer, 0); create_image_replayer(mock_threads); @@ -1387,75 +1011,26 @@ TEST_F(TestMockImageReplayer, DelayedReplay) { m_image_replayer->start(&start_ctx); ASSERT_EQ(0, start_ctx.wait()); - // REPLAY - - cls::journal::Tag tag = - {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, - librbd::Journal<>::LOCAL_MIRROR_UUID, - true, 0, 0})}; - - expect_try_pop_front(mock_remote_journaler, tag.tid, true); - - // replay_flush - expect_shut_down(mock_local_replay, false, 0); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - expect_start_external_replay(mock_local_journal, &mock_local_replay, 0); - expect_get_tag(mock_remote_journaler, tag, 0); - expect_allocate_tag(mock_local_journal, 0); - - // process with delay - EXPECT_CALL(mock_replay_entry, get_data()); - librbd::journal::EventEntry event_entry( - librbd::journal::AioDiscardEvent(123, 345, 0), ceph_clock_now()); - EXPECT_CALL(mock_local_replay, decode(_, _)) - .WillOnce(DoAll(SetArgPointee<1>(event_entry), - Return(0))); - expect_preprocess(mock_event_preprocessor, false, 0); - expect_process(mock_local_replay, 0, 0); - - // attempt to process the next event - C_SaferCond replay_ctx; - expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); - - // fire - mock_local_image_ctx.mirroring_replay_delay = 2; - m_image_replayer->handle_replay_ready(); - ASSERT_EQ(0, replay_ctx.wait()); - - // add a pending (delayed) entry before stop - expect_try_pop_front(mock_remote_journaler, tag.tid, true); - EXPECT_CALL(mock_replay_entry, get_data()); - C_SaferCond decode_ctx; - EXPECT_CALL(mock_local_replay, decode(_, _)) - .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::const_iterator* it, - librbd::journal::EventEntry *e) { - decode_ctx.complete(0); - }), - Return(0))); - - mock_local_image_ctx.mirroring_replay_delay = 10; - m_image_replayer->handle_replay_ready(); - ASSERT_EQ(0, decode_ctx.wait()); + // NOTIFY + EXPECT_CALL(mock_journal_replayer, is_resync_requested()) + .WillOnce(Return(false)); + EXPECT_CALL(mock_journal_replayer, is_replaying()) + .WillOnce(Return(true)); + mock_local_image_ctx.name = "NEW NAME"; + mock_journal_replayer.replayer_listener->handle_notification(); // STOP - - MockCloseImageRequest mock_close_local_image_request; - - expect_shut_down(mock_local_replay, true, 0); - EXPECT_CALL(mock_local_journal, remove_listener(_)); - EXPECT_CALL(mock_local_journal, stop_external_replay()); - expect_send(mock_close_local_image_request, 0); - - expect_stop_replay(mock_remote_journaler, 0); - EXPECT_CALL(mock_remote_journaler, remove_listener(_)); - expect_shut_down(mock_remote_journaler, 0); + expect_shut_down(mock_journal_replayer, 0); expect_mirror_image_status_exists(false); C_SaferCond stop_ctx; m_image_replayer->stop(&stop_ctx); ASSERT_EQ(0, stop_ctx.wait()); -} + auto image_spec = image_replayer::util::compute_image_spec( + m_local_io_ctx, "NEW NAME"); + ASSERT_EQ(image_spec, m_image_replayer->get_name()); +} } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 8bed3fc28be..efe2ba4e237 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -12,7 +12,6 @@ #include "common/WorkQueue.h" #include "global/global_context.h" #include "journal/Journaler.h" -#include "journal/ReplayHandler.h" #include "journal/Settings.h" #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" @@ -20,7 +19,6 @@ #include "librbd/Journal.h" #include "librbd/Operations.h" #include "librbd/Utils.h" -#include "librbd/journal/Replay.h" #include "ImageDeleter.h" #include "ImageReplayer.h" #include "MirrorStatusUpdater.h" @@ -29,9 +27,10 @@ #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h" #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h" +#include "tools/rbd_mirror/image_replayer/ReplayerListener.h" #include "tools/rbd_mirror/image_replayer/Utils.h" -#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" -#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" +#include "tools/rbd_mirror/image_replayer/journal/Replayer.h" +#include #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -39,20 +38,12 @@ #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \ << __func__ << ": " -using std::map; -using std::string; -using std::unique_ptr; -using std::shared_ptr; -using std::vector; - extern PerfCounters *g_perf_counters; namespace rbd { namespace mirror { -using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; -using librbd::util::create_rados_callback; template std::ostream &operator<<(std::ostream &os, @@ -60,25 +51,6 @@ std::ostream &operator<<(std::ostream &os, namespace { -template -struct ReplayHandler : public ::journal::ReplayHandler { - ImageReplayer *replayer; - ReplayHandler(ImageReplayer *replayer) : replayer(replayer) {} - - void handle_entries_available() override { - replayer->handle_replay_ready(); - } - void handle_complete(int r) override { - std::stringstream ss; - if (r == -ENOMEM) { - ss << "not enough memory in autotune cache"; - } else if (r < 0) { - ss << "replay completed with error: " << cpp_strerror(r); - } - replayer->handle_replay_complete(r, ss.str()); - } -}; - template class ImageReplayerAdminSocketCommand { public: @@ -214,21 +186,6 @@ private: Commands commands; }; -uint32_t calculate_replay_delay(const utime_t &event_time, - int mirroring_replay_delay) { - if (mirroring_replay_delay <= 0) { - return 0; - } - - utime_t now = ceph_clock_now(); - if (event_time + mirroring_replay_delay <= now) { - return 0; - } - - // ensure it is rounded up when converting to integer - return (event_time + mirroring_replay_delay - now) + 1; -} - } // anonymous namespace template @@ -243,13 +200,18 @@ void ImageReplayer::BootstrapProgressContext::update_progress( } template -void ImageReplayer::RemoteJournalerListener::handle_update( - ::journal::JournalMetadata *) { - auto ctx = new LambdaContext([this](int r) { - replayer->handle_remote_journal_metadata_updated(); - }); - replayer->m_threads->work_queue->queue(ctx, 0); -} +struct ImageReplayer::ReplayerListener + : public image_replayer::ReplayerListener { + ImageReplayer* image_replayer; + + ReplayerListener(ImageReplayer* image_replayer) + : image_replayer(image_replayer) { + } + + void handle_notification() override { + image_replayer->handle_replayer_notification(); + } +}; template ImageReplayer::ImageReplayer( @@ -267,8 +229,7 @@ ImageReplayer::ImageReplayer( m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " + stringify(local_io_ctx.get_id()) + " " + global_image_id)), m_progress_cxt(this), - m_journal_listener(new JournalListener(this)), - m_remote_listener(this) + m_replayer_listener(new ReplayerListener(this)) { // Register asok commands using a temporary "remote_pool_name/global_image_id" // name. When the image name becomes known on start the asok commands will be @@ -283,18 +244,11 @@ template ImageReplayer::~ImageReplayer() { unregister_admin_socket_hook(); - ceph_assert(m_event_preprocessor == nullptr); - ceph_assert(m_replay_status_formatter == nullptr); ceph_assert(m_local_image_ctx == nullptr); - ceph_assert(m_local_replay == nullptr); - ceph_assert(m_remote_journaler == nullptr); - ceph_assert(m_replay_handler == nullptr); ceph_assert(m_on_start_finish == nullptr); ceph_assert(m_on_stop_finish == nullptr); ceph_assert(m_bootstrap_request == nullptr); - ceph_assert(m_flush_local_replay_task == nullptr); - - delete m_journal_listener; + delete m_replayer_listener; } template @@ -328,7 +282,7 @@ void ImageReplayer::add_peer( template void ImageReplayer::set_state_description(int r, const std::string &desc) { - dout(10) << r << " " << desc << dendl; + dout(10) << "r=" << r << ", desc=" << desc << dendl; std::lock_guard l{m_lock}; m_last_r = r; @@ -398,10 +352,10 @@ void ImageReplayer::handle_prepare_local_image(int r) { on_start_fail(r, "error preparing local image for replay"); return; } else { + // have the correct local image name now reregister_admin_socket_hook(); } - // local image doesn't exist or is non-primary prepare_remote_image(); } @@ -423,6 +377,8 @@ void ImageReplayer::prepare_remote_image() { journal_settings.commit_interval = cct->_conf.get_val( "rbd_mirror_journal_commit_age"); + ceph_assert(m_remote_journaler == nullptr); + Context *ctx = create_context_callback< ImageReplayer, &ImageReplayer::handle_prepare_remote_image>(this); auto req = image_replayer::PrepareRemoteImageRequest::create( @@ -437,7 +393,8 @@ template void ImageReplayer::handle_prepare_remote_image(int r) { dout(10) << "r=" << r << dendl; - ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr); + ceph_assert(r < 0 ? m_remote_journaler == nullptr : + m_remote_journaler != nullptr); if (r < 0 && !m_local_image_id.empty() && m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) { // local image is primary -- fall-through @@ -534,113 +491,57 @@ void ImageReplayer::handle_bootstrap(int r) { return; } - ceph_assert(m_local_journal == nullptr); - { - std::shared_lock image_locker{m_local_image_ctx->image_lock}; - if (m_local_image_ctx->journal != nullptr) { - m_local_journal = m_local_image_ctx->journal; - m_local_journal->add_listener(m_journal_listener); - } - } - - if (m_local_journal == nullptr) { - on_start_fail(-EINVAL, "error accessing local journal"); - return; - } - - update_mirror_image_status(false, boost::none); - init_remote_journaler(); + start_replay(); } template -void ImageReplayer::init_remote_journaler() { +void ImageReplayer::start_replay() { dout(10) << dendl; - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_init_remote_journaler>(this); - m_remote_journaler->init(ctx); + // TODO support journal + snapshot replay + std::unique_lock locker{m_lock}; + ceph_assert(m_replayer == nullptr); + m_replayer = image_replayer::journal::Replayer::create( + &m_local_image_ctx, m_remote_journaler, m_local_mirror_uuid, + m_remote_image.mirror_uuid, m_replayer_listener, m_threads); + + auto ctx = create_context_callback< + ImageReplayer, &ImageReplayer::handle_start_replay>(this); + m_replayer->init(ctx); } template -void ImageReplayer::handle_init_remote_journaler(int r) { +void ImageReplayer::handle_start_replay(int r) { dout(10) << "r=" << r << dendl; if (on_start_interrupted()) { return; } else if (r < 0) { - derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl; - on_start_fail(r, "error initializing remote journal"); - return; - } - - m_remote_journaler->add_listener(&m_remote_listener); - - cls::journal::Client client; - r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client); - if (r < 0) { - derr << "error retrieving remote journal client: " << cpp_strerror(r) - << dendl; - on_start_fail(r, "error retrieving remote journal client"); - return; - } - - dout(5) << "image_id=" << m_local_image_id << ", " - << "client_meta.image_id=" << m_client_meta.image_id << ", " - << "client.state=" << client.state << dendl; - if (m_client_meta.image_id == m_local_image_id && - client.state != cls::journal::CLIENT_STATE_CONNECTED) { - dout(5) << "client flagged disconnected, stopping image replay" << dendl; - if (m_local_image_ctx->config.template get_val("rbd_mirroring_resync_after_disconnect")) { + std::string error_description = m_replayer->get_error_description(); + if (r == -ENOTCONN && m_replayer->is_resync_requested()) { + std::unique_lock locker{m_lock}; m_resync_requested = true; - on_start_fail(-ENOTCONN, "disconnected: automatic resync"); - } else { - on_start_fail(-ENOTCONN, "disconnected"); } - return; - } - - start_replay(); -} - -template -void ImageReplayer::start_replay() { - dout(10) << dendl; - Context *start_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_start_replay>(this); - m_local_journal->start_external_replay(&m_local_replay, start_ctx); -} + // shut down not required if init failed + m_replayer->destroy(); + m_replayer = nullptr; -template -void ImageReplayer::handle_start_replay(int r) { - dout(10) << "r=" << r << dendl; - - if (r < 0) { - ceph_assert(m_local_replay == nullptr); derr << "error starting external replay on local image " << m_local_image_id << ": " << cpp_strerror(r) << dendl; - on_start_fail(r, "error starting replay on local image"); + on_start_fail(r, error_description); return; } - m_replay_status_formatter = - image_replayer::journal::ReplayStatusFormatter::create( - m_remote_journaler, m_local_mirror_uuid); - - Context *on_finish(nullptr); + Context *on_finish = nullptr; { - std::lock_guard locker{m_lock}; + std::unique_lock locker{m_lock}; ceph_assert(m_state == STATE_STARTING); m_state = STATE_REPLAYING; std::swap(m_on_start_finish, on_finish); } - m_event_preprocessor = image_replayer::journal::EventPreprocessor::create( - *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid, - &m_client_meta, m_threads->work_queue); - update_mirror_image_status(true, boost::none); - if (on_replay_interrupted()) { if (on_finish != nullptr) { on_finish->complete(r); @@ -648,18 +549,6 @@ void ImageReplayer::handle_start_replay(int r) { return; } - { - CephContext *cct = static_cast(m_local_io_ctx.cct()); - double poll_seconds = cct->_conf.get_val( - "rbd_mirror_journal_poll_age"); - - std::lock_guard locker{m_lock}; - m_replay_handler = new ReplayHandler(this); - m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds); - - dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl; - } - dout(10) << "start succeeded" << dendl; if (on_finish != nullptr) { dout(10) << "on finish complete, r=" << r << dendl; @@ -670,7 +559,7 @@ void ImageReplayer::handle_start_replay(int r) { template void ImageReplayer::on_start_fail(int r, const std::string &desc) { - dout(10) << "r=" << r << dendl; + dout(10) << "r=" << r << ", desc=" << desc << dendl; Context *ctx = new LambdaContext([this, r, desc](int _r) { { std::lock_guard locker{m_lock}; @@ -780,7 +669,6 @@ void ImageReplayer::on_stop_journal_replay(int r, const std::string &desc) m_stop_requested = true; m_state = STATE_STOPPING; - cancel_flush_local_replay_task(); } set_state_description(r, desc); @@ -788,38 +676,6 @@ void ImageReplayer::on_stop_journal_replay(int r, const std::string &desc) shut_down(0); } -template -void ImageReplayer::handle_replay_ready() -{ - dout(20) << dendl; - if (on_replay_interrupted()) { - return; - } - - if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) { - return; - } - - m_event_replay_tracker.start_op(); - - m_lock.lock(); - bool stopping = (m_state == STATE_STOPPING); - m_lock.unlock(); - - if (stopping) { - dout(10) << "stopping event replay" << dendl; - m_event_replay_tracker.finish_op(); - return; - } - - if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) { - preprocess_entry(); - return; - } - - replay_flush(); -} - template void ImageReplayer::restart(Context *on_finish) { @@ -836,121 +692,23 @@ void ImageReplayer::restart(Context *on_finish) template void ImageReplayer::flush() { - dout(10) << dendl; C_SaferCond ctx; - flush_local_replay(&ctx); - ctx.wait(); - - update_mirror_image_status(false, boost::none); -} - - -template -void ImageReplayer::schedule_flush_local_replay_task() { - ceph_assert(ceph_mutex_is_locked(m_lock)); - - std::lock_guard timer_locker{m_threads->timer_lock}; - if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) { - return; - } - - dout(15) << dendl; - m_flush_local_replay_task = create_async_context_callback( - m_threads->work_queue, create_context_callback< - ImageReplayer, - &ImageReplayer::handle_flush_local_replay_task>(this)); - m_threads->timer->add_event_after(30, m_flush_local_replay_task); -} - -template -void ImageReplayer::cancel_flush_local_replay_task() { - ceph_assert(ceph_mutex_is_locked(m_lock)); - std::lock_guard timer_locker{m_threads->timer_lock}; - if (m_flush_local_replay_task != nullptr) { - auto canceled = m_threads->timer->cancel_event(m_flush_local_replay_task); - m_flush_local_replay_task = nullptr; - ceph_assert(canceled); - } -} - -template -void ImageReplayer::handle_flush_local_replay_task(int) { - dout(15) << dendl; - - m_in_flight_op_tracker.start_op(); - auto on_finish = new LambdaContext([this](int) { - { - std::lock_guard timer_locker{m_threads->timer_lock}; - m_flush_local_replay_task = nullptr; - } - - update_mirror_image_status(false, boost::none); - m_in_flight_op_tracker.finish_op(); - }); - flush_local_replay(on_finish); -} - -template -void ImageReplayer::flush_local_replay(Context* on_flush) -{ - m_lock.lock(); - if (m_state != STATE_REPLAYING) { - m_lock.unlock(); - on_flush->complete(0); - return; - } - - dout(15) << dendl; - auto ctx = new LambdaContext( - [this, on_flush](int r) { - handle_flush_local_replay(on_flush, r); - }); - m_local_replay->flush(ctx); - m_lock.unlock(); -} - -template -void ImageReplayer::handle_flush_local_replay(Context* on_flush, int r) -{ - dout(15) << "r=" << r << dendl; - if (r < 0) { - derr << "error flushing local replay: " << cpp_strerror(r) << dendl; - on_flush->complete(r); - return; - } - flush_commit_position(on_flush); -} + { + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + return; + } -template -void ImageReplayer::flush_commit_position(Context* on_flush) -{ - m_lock.lock(); - if (m_state != STATE_REPLAYING) { - m_lock.unlock(); - on_flush->complete(0); - return; + dout(10) << dendl; + ceph_assert(m_replayer != nullptr); + m_replayer->flush(&ctx); } - dout(15) << dendl; - auto ctx = new LambdaContext( - [this, on_flush](int r) { - handle_flush_commit_position(on_flush, r); - }); - m_remote_journaler->flush_commit_position(ctx); - m_lock.unlock(); -} - -template -void ImageReplayer::handle_flush_commit_position(Context* on_flush, int r) -{ - dout(15) << "r=" << r << dendl; - if (r < 0) { - derr << "error flushing remote journal commit position: " - << cpp_strerror(r) << dendl; + int r = ctx.wait(); + if (r >= 0) { + update_mirror_image_status(false, boost::none); } - - on_flush->complete(r); } template @@ -981,330 +739,6 @@ void ImageReplayer::print_status(Formatter *f) f->close_section(); } -template -void ImageReplayer::handle_replay_complete(int r, const std::string &error_desc) -{ - dout(10) << "r=" << r << dendl; - if (r < 0) { - derr << "replay encountered an error: " << cpp_strerror(r) << dendl; - } - - { - std::lock_guard locker{m_lock}; - m_stop_requested = true; - } - on_stop_journal_replay(r, error_desc); -} - -template -void ImageReplayer::replay_flush() { - dout(10) << dendl; - - bool interrupted = false; - { - std::lock_guard locker{m_lock}; - if (m_state != STATE_REPLAYING) { - dout(10) << "replay interrupted" << dendl; - interrupted = true; - } else { - m_state = STATE_REPLAY_FLUSHING; - } - } - - if (interrupted) { - m_event_replay_tracker.finish_op(); - return; - } - - // shut down the replay to flush all IO and ops and create a new - // replayer to handle the new tag epoch - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_replay_flush>(this); - ctx = new LambdaContext([this, ctx](int r) { - m_local_image_ctx->journal->stop_external_replay(); - m_local_replay = nullptr; - - if (r < 0) { - ctx->complete(r); - return; - } - - m_local_journal->start_external_replay(&m_local_replay, ctx); - }); - m_local_replay->shut_down(false, ctx); -} - -template -void ImageReplayer::handle_replay_flush(int r) { - dout(10) << "r=" << r << dendl; - - { - std::lock_guard locker{m_lock}; - ceph_assert(m_state == STATE_REPLAY_FLUSHING); - m_state = STATE_REPLAYING; - } - - if (r < 0) { - derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "replay flush encountered an error"); - return; - } else if (on_replay_interrupted()) { - m_event_replay_tracker.finish_op(); - return; - } - - get_remote_tag(); -} - -template -void ImageReplayer::get_remote_tag() { - dout(15) << "tag_tid: " << m_replay_tag_tid << dendl; - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_get_remote_tag>(this); - m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx); -} - -template -void ImageReplayer::handle_get_remote_tag(int r) { - dout(15) << "r=" << r << dendl; - - if (r == 0) { - try { - auto it = m_replay_tag.data.cbegin(); - decode(m_replay_tag_data, it); - } catch (const buffer::error &err) { - r = -EBADMSG; - } - } - - if (r < 0) { - derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": " - << cpp_strerror(r) << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "failed to retrieve remote tag"); - return; - } - - m_replay_tag_valid = true; - dout(15) << "decoded remote tag " << m_replay_tag_tid << ": " - << m_replay_tag_data << dendl; - - allocate_local_tag(); -} - -template -void ImageReplayer::allocate_local_tag() { - dout(15) << dendl; - - std::string mirror_uuid = m_replay_tag_data.mirror_uuid; - if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { - mirror_uuid = m_remote_image.mirror_uuid; - } else if (mirror_uuid == m_local_mirror_uuid) { - mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; - } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) { - // handle possible edge condition where daemon can failover and - // the local image has already been promoted/demoted - auto local_tag_data = m_local_journal->get_tag_data(); - if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID && - (local_tag_data.predecessor.commit_valid && - local_tag_data.predecessor.mirror_uuid == - librbd::Journal<>::LOCAL_MIRROR_UUID)) { - dout(15) << "skipping stale demotion event" << dendl; - handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0); - handle_replay_ready(); - return; - } else { - dout(5) << "encountered image demotion: stopping" << dendl; - std::lock_guard locker{m_lock}; - m_stop_requested = true; - } - } - - librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor); - if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { - predecessor.mirror_uuid = m_remote_image.mirror_uuid; - } else if (predecessor.mirror_uuid == m_local_mirror_uuid) { - predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; - } - - dout(15) << "mirror_uuid=" << mirror_uuid << ", " - << "predecessor=" << predecessor << ", " - << "replay_tag_tid=" << m_replay_tag_tid << dendl; - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_allocate_local_tag>(this); - m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx); -} - -template -void ImageReplayer::handle_allocate_local_tag(int r) { - dout(15) << "r=" << r << ", " - << "tag_tid=" << m_local_journal->get_tag_tid() << dendl; - - if (r < 0) { - derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "failed to allocate journal tag"); - return; - } - - preprocess_entry(); -} - -template -void ImageReplayer::preprocess_entry() { - dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid() - << dendl; - - bufferlist data = m_replay_entry.get_data(); - auto it = data.cbegin(); - int r = m_local_replay->decode(&it, &m_event_entry); - if (r < 0) { - derr << "failed to decode journal event" << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "failed to decode journal event"); - return; - } - - uint32_t delay = calculate_replay_delay( - m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay); - if (delay == 0) { - handle_preprocess_entry_ready(0); - return; - } - - dout(20) << "delaying replay by " << delay << " sec" << dendl; - - std::lock_guard timer_locker{m_threads->timer_lock}; - ceph_assert(m_delayed_preprocess_task == nullptr); - m_delayed_preprocess_task = new LambdaContext( - [this](int r) { - ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); - m_delayed_preprocess_task = nullptr; - m_threads->work_queue->queue( - create_context_callback::handle_preprocess_entry_ready>(this), 0); - }); - m_threads->timer->add_event_after(delay, m_delayed_preprocess_task); -} - -template -void ImageReplayer::handle_preprocess_entry_ready(int r) { - dout(20) << "r=" << r << dendl; - ceph_assert(r == 0); - - m_replay_start_time = ceph_clock_now(); - if (!m_event_preprocessor->is_required(m_event_entry)) { - process_entry(); - return; - } - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_preprocess_entry_safe>(this); - m_event_preprocessor->preprocess(&m_event_entry, ctx); -} - -template -void ImageReplayer::handle_preprocess_entry_safe(int r) { - dout(20) << "r=" << r << dendl; - - if (r < 0) { - m_event_replay_tracker.finish_op(); - - if (r == -ECANCELED) { - handle_replay_complete(0, "lost exclusive lock"); - } else { - derr << "failed to preprocess journal event" << dendl; - handle_replay_complete(r, "failed to preprocess journal event"); - } - return; - } - - process_entry(); -} - -template -void ImageReplayer::process_entry() { - dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() - << dendl; - - // stop replaying events if stop has been requested - if (on_replay_interrupted()) { - m_event_replay_tracker.finish_op(); - return; - } - - Context *on_ready = create_context_callback< - ImageReplayer, &ImageReplayer::handle_process_entry_ready>(this); - Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry), - m_replay_start_time); - - m_local_replay->process(m_event_entry, on_ready, on_commit); -} - -template -void ImageReplayer::handle_process_entry_ready(int r) { - dout(20) << dendl; - ceph_assert(r == 0); - - bool update_status = false; - { - std::shared_lock image_locker{m_local_image_ctx->image_lock}; - if (m_local_image_name != m_local_image_ctx->name) { - m_local_image_name = m_local_image_ctx->name; - update_status = true; - } - } - - if (update_status) { - update_mirror_image_status(false, {}); - } - - // attempt to process the next event - handle_replay_ready(); -} - -template -void ImageReplayer::handle_process_entry_safe(const ReplayEntry &replay_entry, - const utime_t &replay_start_time, - int r) { - dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r - << dendl; - - if (r < 0) { - derr << "failed to commit journal event: " << cpp_strerror(r) << dendl; - handle_replay_complete(r, "failed to commit journal event"); - } else { - ceph_assert(m_remote_journaler != nullptr); - m_remote_journaler->committed(replay_entry); - } - - auto bytes = replay_entry.get_data().length(); - auto latency = ceph_clock_now() - replay_start_time; - - if (g_perf_counters) { - g_perf_counters->inc(l_rbd_mirror_replay); - g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes); - g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); - } - - auto ctx = new LambdaContext( - [this, bytes, latency](int r) { - std::lock_guard locker{m_lock}; - schedule_flush_local_replay_task(); - - if (m_perf_counters) { - m_perf_counters->inc(l_rbd_mirror_replay); - m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes); - m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); - } - - m_event_replay_tracker.finish_op(); - }); - m_threads->work_queue->queue(ctx, 0); -} - template void ImageReplayer::update_mirror_image_status( bool force, const OptionalState &opt_state) { @@ -1382,9 +816,9 @@ void ImageReplayer::set_mirror_image_status_update( } break; case STATE_REPLAYING: - case STATE_REPLAY_FLUSHING: status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING; { + std::string desc; auto on_req_finish = new LambdaContext( [this, force](int r) { dout(15) << "replay status ready: r=" << r << dendl; @@ -1395,10 +829,8 @@ void ImageReplayer::set_mirror_image_status_update( } }); - std::string desc; - ceph_assert(m_replay_status_formatter != nullptr); - if (!m_replay_status_formatter->get_or_send_update(&desc, - on_req_finish)) { + ceph_assert(m_replayer != nullptr); + if (!m_replayer->get_replay_status(&desc, on_req_finish)) { dout(15) << "waiting for replay status" << dendl; return; } @@ -1460,21 +892,6 @@ template void ImageReplayer::shut_down(int r) { dout(10) << "r=" << r << dendl; - bool canceled_delayed_preprocess_task = false; - { - std::lock_guard timer_locker{m_threads->timer_lock}; - if (m_delayed_preprocess_task != nullptr) { - canceled_delayed_preprocess_task = m_threads->timer->cancel_event( - m_delayed_preprocess_task); - ceph_assert(canceled_delayed_preprocess_task); - m_delayed_preprocess_task = nullptr; - } - } - if (canceled_delayed_preprocess_task) { - // wake up sleeping replay - m_event_replay_tracker.finish_op(); - } - { std::lock_guard locker{m_lock}; ceph_assert(m_state == STATE_STOPPING); @@ -1488,10 +905,6 @@ void ImageReplayer::shut_down(int r) { return; } - // NOTE: it's important to ensure that the local image is fully - // closed before attempting to close the remote journal in - // case the remote cluster is unreachable - // chain the shut down sequence (reverse order) Context *ctx = new LambdaContext( [this, r](int _r) { @@ -1499,80 +912,44 @@ void ImageReplayer::shut_down(int r) { handle_shut_down(r); }); - // close the remote journal + // destruct the remote journaler created in prepare remote if (m_remote_journaler != nullptr) { ctx = new LambdaContext([this, ctx](int r) { - delete m_remote_journaler; - m_remote_journaler = nullptr; - ctx->complete(0); - }); - ctx = new LambdaContext([this, ctx](int r) { - m_remote_journaler->remove_listener(&m_remote_listener); - m_remote_journaler->shut_down(ctx); - }); + delete m_remote_journaler; + m_remote_journaler = nullptr; + ctx->complete(0); + }); } - // stop the replay of remote journal events - if (m_replay_handler != nullptr) { + // close the local image (if we aborted after a successful bootstrap) + if (m_local_image_ctx != nullptr) { ctx = new LambdaContext([this, ctx](int r) { - delete m_replay_handler; - m_replay_handler = nullptr; - - m_event_replay_tracker.wait_for_ops(ctx); - }); + ceph_assert(m_local_image_ctx == nullptr); + ctx->complete(0); + }); ctx = new LambdaContext([this, ctx](int r) { - m_remote_journaler->stop_replay(ctx); - }); - } + if (m_local_image_ctx == nullptr) { + // never opened or closed via the replayer shutdown + ctx->complete(0); + return; + } - // close the local image (release exclusive lock) - if (m_local_image_ctx) { - ctx = new LambdaContext([this, ctx](int r) { auto request = image_replayer::CloseImageRequest::create( &m_local_image_ctx, ctx); request->send(); }); } - // shut down event replay into the local image - if (m_local_journal != nullptr) { - ctx = new LambdaContext([this, ctx](int r) { - m_local_journal = nullptr; - ctx->complete(0); - }); - if (m_local_replay != nullptr) { - ctx = new LambdaContext([this, ctx](int r) { - m_local_journal->stop_external_replay(); - m_local_replay = nullptr; - - image_replayer::journal::EventPreprocessor::destroy( - m_event_preprocessor); - m_event_preprocessor = nullptr; - ctx->complete(0); - }); - } + // close the replayer + if (m_replayer != nullptr) { ctx = new LambdaContext([this, ctx](int r) { - // blocks if listener notification is in-progress - m_local_journal->remove_listener(m_journal_listener); - ctx->complete(0); - }); - } - - // wait for all local in-flight replay events to complete - ctx = new LambdaContext([this, ctx](int r) { - if (r < 0) { - derr << "error shutting down journal replay: " << cpp_strerror(r) - << dendl; - } - - m_event_replay_tracker.wait_for_ops(ctx); + m_replayer->destroy(); + m_replayer = nullptr; + ctx->complete(0); }); - - // flush any local in-flight replay events - if (m_local_replay != nullptr) { ctx = new LambdaContext([this, ctx](int r) { - m_local_replay->shut_down(true, ctx); - }); + m_replayer->shut_down(ctx); + }); } m_threads->work_queue->queue(ctx, 0); @@ -1647,10 +1024,6 @@ void ImageReplayer::handle_shut_down(int r) { } dout(10) << "stop complete" << dendl; - image_replayer::journal::ReplayStatusFormatter::destroy( - m_replay_status_formatter); - m_replay_status_formatter = nullptr; - Context *on_start = nullptr; Context *on_stop = nullptr; { @@ -1658,7 +1031,6 @@ void ImageReplayer::handle_shut_down(int r) { std::swap(on_start, m_on_start_finish); std::swap(on_stop, m_on_stop_finish); m_stop_requested = false; - ceph_assert(m_delayed_preprocess_task == nullptr); ceph_assert(m_state == STATE_STOPPING); m_state = STATE_STOPPED; } @@ -1675,27 +1047,44 @@ void ImageReplayer::handle_shut_down(int r) { } template -void ImageReplayer::handle_remote_journal_metadata_updated() { - dout(20) << dendl; +void ImageReplayer::handle_replayer_notification() { + dout(10) << dendl; - cls::journal::Client client; + // detect a rename of the local image + std::string local_image_name; { - std::lock_guard locker{m_lock}; - if (!is_running_()) { - return; - } + std::shared_lock image_locker{m_local_image_ctx->image_lock}; + local_image_name = m_local_image_ctx->name; + } - int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client); - if (r < 0) { - derr << "failed to retrieve client: " << cpp_strerror(r) << dendl; - return; + { + std::unique_lock locker{m_lock}; + ceph_assert(m_state == STATE_REPLAYING); + ceph_assert(m_replayer != nullptr); + + if (m_local_image_name != local_image_name) { + // will re-register with new name after next status update + dout(10) << "image renamed" << dendl; + m_local_image_name = local_image_name; } } - if (client.state != cls::journal::CLIENT_STATE_CONNECTED) { - dout(0) << "client flagged disconnected, stopping image replay" << dendl; - stop(nullptr, false, -ENOTCONN, "disconnected"); + // replayer cannot be shut down while notification is in-flight + if (m_replayer->is_resync_requested()) { + dout(10) << "resync requested" << dendl; + m_resync_requested = true; + on_stop_journal_replay(0, "resync requested"); + return; } + + if (!m_replayer->is_replaying()) { + dout(10) << "replay interrupted" << dendl; + on_stop_journal_replay(m_replayer->get_error_code(), + m_replayer->get_error_description()); + return; + } + + update_mirror_image_status(false, {}); } template @@ -1705,8 +1094,6 @@ std::string ImageReplayer::to_string(const State state) { return "Starting"; case ImageReplayer::STATE_REPLAYING: return "Replaying"; - case ImageReplayer::STATE_REPLAY_FLUSHING: - return "ReplayFlushing"; case ImageReplayer::STATE_STOPPING: return "Stopping"; case ImageReplayer::STATE_STOPPED: @@ -1717,14 +1104,6 @@ std::string ImageReplayer::to_string(const State state) { return "Unknown(" + stringify(state) + ")"; } -template -void ImageReplayer::resync_image(Context *on_finish) { - dout(10) << dendl; - - m_resync_requested = true; - stop(on_finish); -} - template void ImageReplayer::register_admin_socket_hook() { ImageReplayerAdminSocketHook *asok_hook; @@ -1734,29 +1113,12 @@ void ImageReplayer::register_admin_socket_hook() { return; } - ceph_assert(m_perf_counters == nullptr); - dout(15) << "registered asok hook: " << m_image_spec << dendl; asok_hook = new ImageReplayerAdminSocketHook( g_ceph_context, m_image_spec, this); int r = asok_hook->register_commands(); if (r == 0) { m_asok_hook = asok_hook; - - CephContext *cct = static_cast(m_local_io_ctx.cct()); - auto prio = cct->_conf.get_val( - "rbd_mirror_image_perf_stats_prio"); - PerfCountersBuilder plb(g_ceph_context, - "rbd_mirror_image_" + m_image_spec, - l_rbd_mirror_first, l_rbd_mirror_last); - plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio); - plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes", - "Replayed data", "rb", prio, unit_t(UNIT_BYTES)); - plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency", - "Replay latency", "rl", prio); - m_perf_counters = plb.create_perf_counters(); - g_ceph_context->get_perfcounters_collection()->add(m_perf_counters); - return; } derr << "error registering admin socket commands" << dendl; @@ -1769,17 +1131,11 @@ void ImageReplayer::unregister_admin_socket_hook() { dout(15) << dendl; AdminSocketHook *asok_hook = nullptr; - PerfCounters *perf_counters = nullptr; { std::lock_guard locker{m_lock}; std::swap(asok_hook, m_asok_hook); - std::swap(perf_counters, m_perf_counters); } delete asok_hook; - if (perf_counters != nullptr) { - g_ceph_context->get_perfcounters_collection()->remove(perf_counters); - delete perf_counters; - } } template diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 03b200730ab..5b6165422ec 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -10,39 +10,27 @@ #include "include/rados/librados.hpp" #include "cls/journal/cls_journal_types.h" #include "cls/rbd/cls_rbd_types.h" -#include "journal/JournalMetadataListener.h" -#include "journal/ReplayEntry.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" #include "ProgressContext.h" #include "tools/rbd_mirror/Types.h" #include "tools/rbd_mirror/image_replayer/Types.h" - -#include #include - -#include -#include -#include #include -#include class AdminSocketHook; -class PerfCounters; namespace journal { struct CacheManagerHandler; class Journaler; -class ReplayHandler; } // namespace journal namespace librbd { class ImageCtx; -namespace journal { template class Replay; } } // namespace librbd @@ -55,14 +43,9 @@ template struct Threads; namespace image_replayer { +class Replayer; template class BootstrapRequest; -namespace journal { - -template class EventPreprocessor; -template class ReplayStatusFormatter; - -} // namespace journal } // namespace image_replayer /** @@ -136,13 +119,8 @@ public: void restart(Context *on_finish = nullptr); void flush(); - void resync_image(Context *on_finish=nullptr); - void print_status(Formatter *f); - virtual void handle_replay_ready(); - virtual void handle_replay_complete(int r, const std::string &error_desc); - protected: /** * @verbatim @@ -162,45 +140,10 @@ protected: * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * * * | * * v (error) * - * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * - * | * - * v (error) * * START_REPLAY * * * * * * * * * * * * * * * * * * * * * * * | - * | /--------------------------------------------\ - * | | | - * v v (asok flush) | - * REPLAYING -------------> LOCAL_REPLAY_FLUSH | - * | \ | | - * | | v | - * | | FLUSH_COMMIT_POSITION | - * | | | | - * | | \--------------------/| - * | | | - * | | (entries available) | - * | \-----------> REPLAY_READY | - * | | | - * | | (skip if not | - * | v needed) (error) - * | REPLAY_FLUSH * * * * * * * * * - * | | | * - * | | (skip if not | * - * | v needed) (error) * - * | GET_REMOTE_TAG * * * * * * * * - * | | | * - * | | (skip if not | * - * | v needed) (error) * - * | ALLOCATE_LOCAL_TAG * * * * * * - * | | | * - * | v (error) * - * | PREPROCESS_ENTRY * * * * * * * - * | | | * - * | v (error) * - * | PROCESS_ENTRY * * * * * * * * * - * | | | * - * | \---------------------/ * - * v * - * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * + * v + * REPLAYING * | * v * JOURNAL_REPLAY_SHUT_DOWN @@ -224,13 +167,11 @@ protected: private: typedef std::set> Peers; - typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; enum State { STATE_UNKNOWN, STATE_STARTING, STATE_REPLAYING, - STATE_REPLAY_FLUSHING, STATE_STOPPING, STATE_STOPPED, }; @@ -247,32 +188,13 @@ private: : io_ctx(peer.io_ctx), mirror_status_updater(peer.mirror_status_updater) { } }; + struct ReplayerListener; typedef typename librbd::journal::TypeTraits::Journaler Journaler; typedef boost::optional OptionalState; typedef boost::optional OptionalMirrorImageStatusState; - struct JournalListener : public librbd::journal::Listener { - ImageReplayer *img_replayer; - - JournalListener(ImageReplayer *img_replayer) - : img_replayer(img_replayer) { - } - - void handle_close() override { - img_replayer->on_stop_journal_replay(); - } - - void handle_promoted() override { - img_replayer->on_stop_journal_replay(0, "force promoted"); - } - - void handle_resync() override { - img_replayer->resync_image(); - } - }; - class BootstrapProgressContext : public ProgressContext { public: BootstrapProgressContext(ImageReplayer *replayer) : @@ -314,18 +236,14 @@ private: bool m_delete_requested = false; bool m_resync_requested = false; - image_replayer::journal::EventPreprocessor* - m_event_preprocessor = nullptr; - image_replayer::journal::ReplayStatusFormatter* - m_replay_status_formatter = nullptr; ImageCtxT *m_local_image_ctx = nullptr; std::string m_local_image_tag_owner; decltype(ImageCtxT::journal) m_local_journal = nullptr; - librbd::journal::Replay *m_local_replay = nullptr; Journaler* m_remote_journaler = nullptr; - ::journal::ReplayHandler *m_replay_handler = nullptr; - librbd::journal::Listener *m_journal_listener; + + image_replayer::Replayer* m_replayer = nullptr; + ReplayerListener* m_replayer_listener = nullptr; Context *m_on_start_finish = nullptr; Context *m_on_stop_finish = nullptr; @@ -333,7 +251,6 @@ private: bool m_manual_stop = false; AdminSocketHook *m_asok_hook = nullptr; - PerfCounters *m_perf_counters = nullptr; image_replayer::BootstrapRequest *m_bootstrap_request = nullptr; @@ -341,43 +258,7 @@ private: cls::journal::CLIENT_STATE_DISCONNECTED; librbd::journal::MirrorPeerClientMeta m_client_meta; - ReplayEntry m_replay_entry; - utime_t m_replay_start_time; - bool m_replay_tag_valid = false; - uint64_t m_replay_tag_tid = 0; - cls::journal::Tag m_replay_tag; - librbd::journal::TagData m_replay_tag_data; - librbd::journal::EventEntry m_event_entry; - AsyncOpTracker m_event_replay_tracker; - Context *m_delayed_preprocess_task = nullptr; - Context* m_periodic_flush_task = nullptr; - AsyncOpTracker m_in_flight_op_tracker; - Context *m_flush_local_replay_task = nullptr; - - struct RemoteJournalerListener : public ::journal::JournalMetadataListener { - ImageReplayer *replayer; - - RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { } - - void handle_update(::journal::JournalMetadata *) override; - } m_remote_listener; - - struct C_ReplayCommitted : public Context { - ImageReplayer *replayer; - ReplayEntry replay_entry; - utime_t replay_start_time; - - C_ReplayCommitted(ImageReplayer *replayer, - ReplayEntry &&replay_entry, - const utime_t &replay_start_time) - : replayer(replayer), replay_entry(std::move(replay_entry)), - replay_start_time(replay_start_time) { - } - void finish(int r) override { - replayer->handle_process_entry_safe(replay_entry, replay_start_time, r); - } - }; static std::string to_string(const State state); @@ -388,26 +269,14 @@ private: return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested; } bool is_replaying_() const { - return (m_state == STATE_REPLAYING || - m_state == STATE_REPLAY_FLUSHING); + return (m_state == STATE_REPLAYING); } - void schedule_flush_local_replay_task(); - void cancel_flush_local_replay_task(); - void handle_flush_local_replay_task(int r); - - void flush_local_replay(Context* on_flush); - void handle_flush_local_replay(Context* on_flush, int r); - - void flush_commit_position(Context* on_flush); - void handle_flush_commit_position(Context* on_flush, int r); - void update_mirror_image_status(bool force, const OptionalState &state); void set_mirror_image_status_update(bool force, const OptionalState &state); void shut_down(int r); void handle_shut_down(int r); - void handle_remote_journal_metadata_updated(); void prepare_local_image(); void handle_prepare_local_image(int r); @@ -418,29 +287,10 @@ private: void bootstrap(); void handle_bootstrap(int r); - void init_remote_journaler(); - void handle_init_remote_journaler(int r); - void start_replay(); void handle_start_replay(int r); - void replay_flush(); - void handle_replay_flush(int r); - - void get_remote_tag(); - void handle_get_remote_tag(int r); - - void allocate_local_tag(); - void handle_allocate_local_tag(int r); - - void preprocess_entry(); - void handle_preprocess_entry_ready(int r); - void handle_preprocess_entry_safe(int r); - - void process_entry(); - void handle_process_entry_ready(int r); - void handle_process_entry_safe(const ReplayEntry& replay_entry, - const utime_t &m_replay_start_time, int r); + void handle_replayer_notification(); void register_admin_socket_hook(); void unregister_admin_socket_hook(); diff --git a/src/tools/rbd_mirror/image_replayer/Replayer.h b/src/tools/rbd_mirror/image_replayer/Replayer.h index 3568614bf21..f3bfa4da04b 100644 --- a/src/tools/rbd_mirror/image_replayer/Replayer.h +++ b/src/tools/rbd_mirror/image_replayer/Replayer.h @@ -15,6 +15,8 @@ namespace image_replayer { struct Replayer { virtual ~Replayer() {} + virtual void destroy() = 0; + virtual void init(Context* on_finish) = 0; virtual void shut_down(Context* on_finish) = 0; diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc index c63d2e824b3..59c2ab239b8 100644 --- a/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc +++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc @@ -151,7 +151,7 @@ struct Replayer::LocalJournalListener template Replayer::Replayer( I** local_image_ctx, Journaler* remote_journaler, - std::string local_mirror_uuid, std::string remote_mirror_uuid, + const std::string& local_mirror_uuid, const std::string& remote_mirror_uuid, ReplayerListener* replayer_listener, Threads* threads) : m_local_image_ctx(local_image_ctx), m_remote_journaler(remote_journaler), @@ -162,11 +162,22 @@ Replayer::Replayer( m_lock(ceph::make_mutex(librbd::util::unique_lock_name( "rbd::mirror::image_replayer::journal::Replayer", this))) { dout(10) << dendl; + + { + std::unique_lock locker{m_lock}; + register_perf_counters(); + } } template Replayer::~Replayer() { dout(10) << dendl; + + { + std::unique_lock locker{m_lock}; + unregister_perf_counters(); + } + ceph_assert(m_remote_listener == nullptr); ceph_assert(m_local_journal_listener == nullptr); ceph_assert(m_local_journal_replay == nullptr); @@ -1070,6 +1081,8 @@ void Replayer::handle_process_entry_ready(int r) { } if (update_status) { + unregister_perf_counters(); + register_perf_counters(); notify_status_updated(); } @@ -1196,6 +1209,40 @@ int Replayer::validate_remote_client_state( return 0; } +template +void Replayer::register_perf_counters() { + dout(5) << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(m_perf_counters == nullptr); + + auto cct = static_cast((*m_local_image_ctx)->cct); + auto prio = cct->_conf.get_val("rbd_mirror_image_perf_stats_prio"); + PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec, + l_rbd_mirror_first, l_rbd_mirror_last); + plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio); + plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes", + "Replayed data", "rb", prio, unit_t(UNIT_BYTES)); + plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency", + "Replay latency", "rl", prio); + m_perf_counters = plb.create_perf_counters(); + g_ceph_context->get_perfcounters_collection()->add(m_perf_counters); +} + +template +void Replayer::unregister_perf_counters() { + dout(5) << dendl; + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + PerfCounters *perf_counters = nullptr; + std::swap(perf_counters, m_perf_counters); + + if (perf_counters != nullptr) { + g_ceph_context->get_perfcounters_collection()->remove(perf_counters); + delete perf_counters; + } +} + } // namespace journal } // namespace image_replayer } // namespace mirror diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.h b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h index 3b464c9065d..af22b145c41 100644 --- a/src/tools/rbd_mirror/image_replayer/journal/Replayer.h +++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h @@ -44,12 +44,27 @@ class Replayer : public image_replayer::Replayer { public: typedef typename librbd::journal::TypeTraits::Journaler Journaler; + static Replayer* create(ImageCtxT** local_image_ctx, + Journaler* remote_journaler, + const std::string& local_mirror_uuid, + const std::string& remote_mirror_uuid, + ReplayerListener* replayer_listener, + Threads* threads) { + return new Replayer(local_image_ctx, remote_journaler, local_mirror_uuid, + remote_mirror_uuid, replayer_listener, threads); + } + Replayer( ImageCtxT** local_image_ctx, Journaler* remote_journaler, - std::string m_local_mirror_uuid, std::string m_remote_mirror_uuid, + const std::string& local_mirror_uuid, + const std::string& remote_mirror_uuid, ReplayerListener* replayer_listener, Threads* threads); ~Replayer(); + void destroy() override { + delete this; + } + void init(Context* on_finish) override; void shut_down(Context* on_finish) override; @@ -293,6 +308,9 @@ private: librbd::journal::MirrorPeerClientMeta* remote_client_meta, bool* resync_requested, std::string* error); + void register_perf_counters(); + void unregister_perf_counters(); + }; } // namespace journal -- 2.47.3