From 2df70c2e23a8ae715d04d63abb4a2ee6bea985a3 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Sat, 7 Dec 2019 16:52:51 -0500 Subject: [PATCH] rbd-mirror: extract journal replay logic to its own class This will help to greatly reduce the journal-specific code in the current image replayer. Signed-off-by: Jason Dillaman --- src/test/rbd_mirror/CMakeLists.txt | 1 + .../journal/test_mock_Replayer.cc | 2107 +++++++++++++++++ src/tools/rbd_mirror/CMakeLists.txt | 1 + .../rbd_mirror/image_replayer/Replayer.h | 37 + .../image_replayer/ReplayerListener.h | 21 + .../image_replayer/journal/Replayer.cc | 1204 ++++++++++ .../image_replayer/journal/Replayer.h | 305 +++ 7 files changed, 3676 insertions(+) create mode 100644 src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc create mode 100644 src/tools/rbd_mirror/image_replayer/Replayer.h create mode 100644 src/tools/rbd_mirror/image_replayer/ReplayerListener.h create mode 100644 src/tools/rbd_mirror/image_replayer/journal/Replayer.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/Replayer.h diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index c769426ba52b2..b70bb20ac43ed 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -39,6 +39,7 @@ add_executable(unittest_rbd_mirror image_replayer/test_mock_PrepareLocalImageRequest.cc image_replayer/test_mock_PrepareRemoteImageRequest.cc image_replayer/journal/test_mock_EventPreprocessor.cc + image_replayer/journal/test_mock_Replayer.cc image_sync/test_mock_SyncPointCreateRequest.cc image_sync/test_mock_SyncPointPruneRequest.cc pool_watcher/test_mock_RefreshImagesRequest.cc diff --git a/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc b/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc new file mode 100644 index 0000000000000..f8df90aac9d89 --- /dev/null +++ b/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc @@ -0,0 +1,2107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/rbd_mirror/test_mock_fixture.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/ReplayerListener.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "tools/rbd_mirror/image_replayer/journal/Replayer.h" +#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" +#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" +#include "test/journal/mock/MockJournaler.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/mock/MockContextWQ.h" +#include "test/rbd_mirror/mock/MockSafeTimer.h" +#include + +namespace librbd { + +namespace { + +struct MockTestJournal; + +struct MockTestImageCtx : public librbd::MockImageCtx { + explicit MockTestImageCtx(librbd::ImageCtx &image_ctx, + MockTestJournal& mock_test_journal) + : librbd::MockImageCtx(image_ctx), journal(&mock_test_journal) { + } + + MockTestJournal* journal = nullptr; +}; + +struct MockTestJournal : public MockJournal { + MOCK_METHOD2(start_external_replay, void(journal::Replay **, + Context *on_start)); + MOCK_METHOD0(stop_external_replay, void()); +}; + +} // anonymous namespace + +namespace journal { + +template <> +struct TypeTraits { + typedef ::journal::MockJournaler Journaler; + typedef ::journal::MockReplayEntryProxy ReplayEntry; +}; + +template<> +struct Replay { + MOCK_METHOD2(decode, int(bufferlist::const_iterator *, EventEntry *)); + MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *)); + MOCK_METHOD1(flush, void(Context*)); + MOCK_METHOD2(shut_down, void(bool, Context*)); +}; + +} // namespace journal +} // namespace librbd + +namespace boost { + +template<> +struct intrusive_ptr { + intrusive_ptr() { + } + intrusive_ptr(librbd::MockTestJournal* mock_test_journal) + : mock_test_journal(mock_test_journal) { + } + + librbd::MockTestJournal* operator->() { + return mock_test_journal; + } + + void reset() { + mock_test_journal = nullptr; + } + + const librbd::MockTestJournal* get() const { + return mock_test_journal; + } + + template + bool operator==(T* t) const { + return (mock_test_journal == t); + } + + librbd::MockTestJournal* mock_test_journal = nullptr; +}; + +} // namespace boost + +namespace rbd { +namespace mirror { + +template <> +struct Threads { + MockSafeTimer *timer; + ceph::mutex &timer_lock; + + MockContextWQ *work_queue; + + Threads(Threads* threads) + : timer(new MockSafeTimer()), + timer_lock(threads->timer_lock), + work_queue(new MockContextWQ()) { + } + ~Threads() { + delete timer; + delete work_queue; + } +}; + +namespace { + +struct MockReplayerListener : public image_replayer::ReplayerListener { + MOCK_METHOD0(handle_notification, void()); +}; + +} // anonymous namespace + +namespace image_replayer { + +template<> +struct CloseImageRequest { + static CloseImageRequest* s_instance; + librbd::MockTestImageCtx **image_ctx = nullptr; + Context *on_finish = nullptr; + + static CloseImageRequest* create(librbd::MockTestImageCtx **image_ctx, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->image_ctx = image_ctx; + s_instance->on_finish = on_finish; + return s_instance; + } + + CloseImageRequest() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~CloseImageRequest() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(send, void()); +}; + +CloseImageRequest* CloseImageRequest::s_instance = nullptr; + +namespace journal { + +template <> +struct EventPreprocessor { + static EventPreprocessor *s_instance; + + static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx, + ::journal::MockJournaler &remote_journaler, + const std::string &local_mirror_uuid, + librbd::journal::MirrorPeerClientMeta *client_meta, + MockContextWQ *work_queue) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + static void destroy(EventPreprocessor* processor) { + } + + EventPreprocessor() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~EventPreprocessor() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &)); + MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *)); +}; + +template<> +struct ReplayStatusFormatter { + static ReplayStatusFormatter* s_instance; + + static ReplayStatusFormatter* create(::journal::MockJournaler *journaler, + const std::string &mirror_uuid) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + static void destroy(ReplayStatusFormatter* formatter) { + } + + ReplayStatusFormatter() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~ReplayStatusFormatter() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish)); +}; + +EventPreprocessor* EventPreprocessor::s_instance = nullptr; +ReplayStatusFormatter* ReplayStatusFormatter::s_instance = nullptr; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +#include "tools/rbd_mirror/image_replayer/journal/Replayer.cc" + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +using ::testing::_; +using ::testing::AtLeast; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::MatcherCast; +using ::testing::Return; +using ::testing::ReturnArg; +using ::testing::SaveArg; +using ::testing::SetArgPointee; +using ::testing::WithArg; + +class TestMockImageReplayerJournalReplayer : public TestMockFixture { +public: + typedef Replayer MockReplayer; + typedef EventPreprocessor MockEventPreprocessor; + typedef ReplayStatusFormatter MockReplayStatusFormatter; + typedef Threads MockThreads; + typedef CloseImageRequest MockCloseImageRequest; + typedef librbd::journal::Replay MockReplay; + + void SetUp() override { + TestMockFixture::SetUp(); + + librbd::RBD rbd; + ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size)); + ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx)); + } + + bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) { + bufferlist bl; + encode(tag_data, bl); + return bl; + } + + void expect_work_queue_repeatedly(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) + .WillRepeatedly(Invoke([this](Context *ctx, int r) { + m_threads->work_queue->queue(ctx, r); + })); + } + + void expect_add_event_after_repeatedly(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.timer, add_event_after(_, _)) + .WillRepeatedly( + DoAll(Invoke([this](double seconds, Context *ctx) { + m_threads->timer->add_event_after(seconds, ctx); + }), + ReturnArg<1>())); + EXPECT_CALL(*mock_threads.timer, cancel_event(_)) + .WillRepeatedly( + Invoke([this](Context *ctx) { + return m_threads->timer->cancel_event(ctx); + })); + } + + void expect_init(::journal::MockJournaler &mock_journaler, int r) { + EXPECT_CALL(mock_journaler, init(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_stop_replay(::journal::MockJournaler &mock_journaler, int r) { + EXPECT_CALL(mock_journaler, stop_replay(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_shut_down(::journal::MockJournaler &mock_journaler, int r) { + EXPECT_CALL(mock_journaler, shut_down(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) { + EXPECT_CALL(mock_replay, shut_down(cancel_ops, _)) + .WillOnce(WithArg<1>(CompleteContext(m_threads->work_queue, r))); + } + + void expect_get_cached_client(::journal::MockJournaler &mock_journaler, + const std::string& client_id, + const cls::journal::Client& client, + const librbd::journal::ClientMeta& client_meta, + int r) { + librbd::journal::ClientData client_data; + client_data.client_meta = client_meta; + + cls::journal::Client client_copy{client}; + encode(client_data, client_copy.data); + + EXPECT_CALL(mock_journaler, get_cached_client(client_id, _)) + .WillOnce(DoAll(SetArgPointee<1>(client_copy), + Return(r))); + } + + void expect_start_external_replay(librbd::MockTestJournal &mock_journal, + MockReplay *mock_replay, int r) { + EXPECT_CALL(mock_journal, start_external_replay(_, _)) + .WillOnce(DoAll(SetArgPointee<0>(mock_replay), + WithArg<1>(CompleteContext(m_threads->work_queue, r)))); + } + + void expect_is_tag_owner(librbd::MockTestJournal &mock_journal, + bool is_owner) { + EXPECT_CALL(mock_journal, is_tag_owner()).WillOnce(Return(is_owner)); + } + + void expect_is_resync_requested(librbd::MockTestJournal &mock_journal, + int r, bool resync_requested) { + EXPECT_CALL(mock_journal, is_resync_requested(_)).WillOnce( + DoAll(SetArgPointee<0>(resync_requested), + Return(r))); + } + + void expect_get_commit_tid_in_debug( + ::journal::MockReplayEntry &mock_replay_entry) { + // It is used in debug messages and depends on debug level + EXPECT_CALL(mock_replay_entry, get_commit_tid()) + .Times(AtLeast(0)) + .WillRepeatedly(Return(0)); + } + + void expect_get_tag_tid_in_debug(librbd::MockTestJournal &mock_journal) { + // It is used in debug messages and depends on debug level + EXPECT_CALL(mock_journal, get_tag_tid()).Times(AtLeast(0)) + .WillRepeatedly(Return(0)); + } + + void expect_committed(::journal::MockReplayEntry &mock_replay_entry, + ::journal::MockJournaler &mock_journaler, int times) { + EXPECT_CALL(mock_replay_entry, get_data()).Times(times); + EXPECT_CALL(mock_journaler, committed( + MatcherCast(_))) + .Times(times); + } + + void expect_try_pop_front(::journal::MockJournaler &mock_journaler, + uint64_t replay_tag_tid, bool entries_available) { + EXPECT_CALL(mock_journaler, try_pop_front(_, _)) + .WillOnce(DoAll(SetArgPointee<0>(::journal::MockReplayEntryProxy()), + SetArgPointee<1>(replay_tag_tid), + Return(entries_available))); + } + + void expect_try_pop_front_return_no_entries( + ::journal::MockJournaler &mock_journaler, Context *on_finish) { + EXPECT_CALL(mock_journaler, try_pop_front(_, _)) + .WillOnce(DoAll(Invoke([on_finish](::journal::MockReplayEntryProxy *e, + uint64_t *t) { + on_finish->complete(0); + }), + Return(false))); + } + + void expect_get_tag(::journal::MockJournaler &mock_journaler, + const cls::journal::Tag &tag, int r) { + EXPECT_CALL(mock_journaler, get_tag(_, _, _)) + .WillOnce(DoAll(SetArgPointee<1>(tag), + WithArg<2>(CompleteContext(r)))); + } + + void expect_allocate_tag(librbd::MockTestJournal &mock_journal, int r) { + EXPECT_CALL(mock_journal, allocate_tag(_, _, _)) + .WillOnce(WithArg<2>(CompleteContext(r))); + } + + void expect_preprocess(MockEventPreprocessor &mock_event_preprocessor, + bool required, int r) { + EXPECT_CALL(mock_event_preprocessor, is_required(_)) + .WillOnce(Return(required)); + if (required) { + EXPECT_CALL(mock_event_preprocessor, preprocess(_, _)) + .WillOnce(WithArg<1>(CompleteContext(r))); + } + } + + void expect_process(MockReplay &mock_replay, + int on_ready_r, int on_commit_r) { + EXPECT_CALL(mock_replay, process(_, _, _)) + .WillOnce(DoAll(WithArg<1>(CompleteContext(on_ready_r)), + WithArg<2>(CompleteContext(on_commit_r)))); + } + + void expect_flush(MockReplay& mock_replay, int r) { + EXPECT_CALL(mock_replay, flush(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_flush_commit_position(::journal::MockJournaler& mock_journal, + int r) { + EXPECT_CALL(mock_journal, flush_commit_position(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_get_tag_data(librbd::MockTestJournal& mock_local_journal, + const librbd::journal::TagData& tag_data) { + EXPECT_CALL(mock_local_journal, get_tag_data()) + .WillOnce(Return(tag_data)); + } + + void expect_send(MockCloseImageRequest &mock_close_image_request, int r) { + EXPECT_CALL(mock_close_image_request, send()) + .WillOnce(Invoke([this, &mock_close_image_request, r]() { + *mock_close_image_request.image_ctx = nullptr; + m_threads->work_queue->queue(mock_close_image_request.on_finish, r); + })); + } + + void expect_notification(MockThreads& mock_threads, + MockReplayerListener& mock_replayer_listener) { + EXPECT_CALL(mock_replayer_listener, handle_notification()) + .WillOnce(Invoke([this]() { + std::unique_lock locker{m_lock}; + m_notified = true; + m_cond.notify_all(); + })); + } + + int wait_for_notification() { + std::unique_lock locker{m_lock}; + while (!m_notified) { + if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) { + return -ETIMEDOUT; + } + } + m_notified = false; + return 0; + } + + int init_entry_replayer(MockReplayer& mock_replayer, + MockThreads& mock_threads, + MockReplayerListener& mock_replayer_listener, + librbd::MockTestJournal& mock_local_journal, + ::journal::MockJournaler& mock_remote_journaler, + MockReplay& mock_local_journal_replay, + librbd::journal::Listener** local_journal_listener, + ::journal::ReplayHandler** remote_replay_handler, + ::journal::JournalMetadataListener** remote_journal_listener) { + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)) + .WillOnce(SaveArg<0>(remote_journal_listener)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + EXPECT_CALL(mock_local_journal, add_listener(_)) + .WillOnce(SaveArg<0>(local_journal_listener)); + expect_is_tag_owner(mock_local_journal, false); + expect_is_resync_requested(mock_local_journal, 0, false); + EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _)) + .WillOnce(SaveArg<0>(remote_replay_handler)); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + int r = init_ctx.wait(); + if (r < 0) { + return r; + } + + return wait_for_notification(); + } + + int shut_down_entry_replayer(MockReplayer& mock_replayer, + MockThreads& mock_threads, + librbd::MockTestJournal& mock_local_journal, + ::journal::MockJournaler& mock_remote_journaler, + MockReplay& mock_local_journal_replay) { + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + return shutdown_ctx.wait(); + } + + librbd::ImageCtx* m_local_image_ctx = nullptr; + + ceph::mutex m_lock = ceph::make_mutex( + "TestMockImageReplayerJournalReplayer"); + ceph::condition_variable m_cond; + bool m_notified = false; +}; + +TEST_F(TestMockImageReplayerJournalReplayer, InitShutDown) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitNoLocalJournal) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + + mock_local_image_ctx.journal = nullptr; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, -EINVAL); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerGetClientError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, -EINVAL); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitLocalJournalStartExternalReplayError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + expect_start_external_replay(mock_local_journal, nullptr, -EINVAL); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitIsPromoted) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + MockReplay mock_local_journal_replay; + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + EXPECT_CALL(mock_local_journal, add_listener(_)); + expect_is_tag_owner(mock_local_journal, true); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(0, init_ctx.wait()); + ASSERT_EQ(0, wait_for_notification()); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnected) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect", + "false"); + + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", + {{}, {}, {}, + cls::journal::CLIENT_STATE_DISCONNECTED}, + {librbd::journal::MirrorPeerClientMeta{ + mock_local_image_ctx.id}}, 0); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-ENOTCONN, init_ctx.wait()); + ASSERT_FALSE(mock_replayer.is_resync_requested()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnectedResync) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect", + "true"); + + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", + {{}, {}, {}, + cls::journal::CLIENT_STATE_DISCONNECTED}, + {librbd::journal::MirrorPeerClientMeta{ + mock_local_image_ctx.id}}, 0); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-ENOTCONN, init_ctx.wait()); + ASSERT_TRUE(mock_replayer.is_resync_requested()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequested) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + MockReplay mock_local_journal_replay; + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + EXPECT_CALL(mock_local_journal, add_listener(_)); + expect_is_tag_owner(mock_local_journal, false); + expect_is_resync_requested(mock_local_journal, 0, true); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(0, init_ctx.wait()); + ASSERT_EQ(0, wait_for_notification()); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequestedError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + MockReplay mock_local_journal_replay; + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + EXPECT_CALL(mock_local_journal, add_listener(_)); + expect_is_tag_owner(mock_local_journal, false); + expect_is_resync_requested(mock_local_journal, -EINVAL, false); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(0, init_ctx.wait()); + ASSERT_EQ(0, wait_for_notification()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ShutDownLocalJournalReplayError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_shut_down(mock_local_journal_replay, true, -EINVAL); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, -EPERM); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EINVAL, shutdown_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, CloseLocalImageError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, -EINVAL); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, -EPERM); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EINVAL, shutdown_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, StopRemoteJournalerError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, -EPERM); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EPERM, shutdown_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ShutDownRemoteJournalerError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, -EPERM); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EPERM, shutdown_ctx.wait()); + ASSERT_EQ(nullptr, mock_local_image_ctx_ptr); +} + +TEST_F(TestMockImageReplayerJournalReplayer, Replay) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 2); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + + // replay_flush + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + + // process + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + + // the next event with preprocess + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, true, 0); + expect_process(mock_local_journal_replay, 0, 0); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + + // fire + remote_replay_handler->handle_entries_available(); + ASSERT_EQ(0, replay_ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, DecodeError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + + // replay_flush + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + + // process + EXPECT_CALL(mock_replay_entry, get_data()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)) + .WillOnce(Return(-EINVAL)); + expect_notification(mock_threads, mock_replayer_listener); + + // fire + remote_replay_handler->handle_entries_available(); + wait_for_notification(); + + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, DelayedReplay) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 1); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + + // replay_flush + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + + // process with delay + EXPECT_CALL(mock_replay_entry, get_data()); + librbd::journal::EventEntry event_entry( + librbd::journal::AioDiscardEvent(123, 345, 0), ceph_clock_now()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)) + .WillOnce(DoAll(SetArgPointee<1>(event_entry), + Return(0))); + + Context* delayed_task_ctx = nullptr; + EXPECT_CALL(*mock_threads.timer, add_event_after(_, _)) + .WillOnce( + DoAll(Invoke([this, &delayed_task_ctx](double seconds, Context *ctx) { + std::unique_lock locker{m_lock}; + delayed_task_ctx = ctx; + m_cond.notify_all(); + }), + ReturnArg<1>())); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + + // fire + mock_local_image_ctx.mirroring_replay_delay = 600; + remote_replay_handler->handle_entries_available(); + { + std::unique_lock locker{m_lock}; + while (delayed_task_ctx == nullptr) { + if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) { + FAIL() << "timed out waiting for task"; + break; + } + } + } + { + std::unique_lock timer_locker{mock_threads.timer_lock}; + delayed_task_ctx->complete(0); + } + ASSERT_EQ(0, replay_ctx.wait()); + + // add a pending (delayed) entry before stop + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + C_SaferCond decode_ctx; + EXPECT_CALL(mock_local_journal_replay, decode(_, _)) + .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::const_iterator* it, + librbd::journal::EventEntry *e) { + decode_ctx.complete(0); + }), + Return(0))); + + remote_replay_handler->handle_entries_available(); + ASSERT_EQ(0, decode_ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ReplayNoMemoryError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_complete(-ENOMEM); + + wait_for_notification(); + ASSERT_EQ(false, mock_replayer.is_replaying()); + ASSERT_EQ(-ENOMEM, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalForcePromoted) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_notification(mock_threads, mock_replayer_listener); + local_journal_listener->handle_promoted(); + wait_for_notification(); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalResyncRequested) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_notification(mock_threads, mock_replayer_listener); + local_journal_listener->handle_resync(); + wait_for_notification(); + + ASSERT_TRUE(mock_replayer.is_resync_requested()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, RemoteJournalDisconnected) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect", + "true"); + + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", + {{}, {}, {}, + cls::journal::CLIENT_STATE_DISCONNECTED}, + {librbd::journal::MirrorPeerClientMeta{ + mock_local_image_ctx.id}}, 0); + expect_notification(mock_threads, mock_replayer_listener); + + remote_journaler_listener->handle_update(nullptr); + wait_for_notification(); + + ASSERT_EQ(-ENOTCONN, mock_replayer.get_error_code()); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_TRUE(mock_replayer.is_resync_requested()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, Flush) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_flush(mock_local_journal_replay, 0); + expect_flush_commit_position(mock_remote_journaler, 0); + + C_SaferCond ctx; + mock_replayer.flush(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, FlushError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_flush(mock_local_journal_replay, -EINVAL); + + C_SaferCond ctx; + mock_replayer.flush(&ctx); + ASSERT_EQ(-EINVAL, ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, FlushCommitPositionError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_flush(mock_local_journal_replay, 0); + expect_flush_commit_position(mock_remote_journaler, -EINVAL); + + C_SaferCond ctx; + mock_replayer.flush(&ctx); + ASSERT_EQ(-EINVAL, ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + + +TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushShutDownError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + ::journal::MockReplayEntry mock_replay_entry; + expect_try_pop_front(mock_remote_journaler, 1, true); + expect_shut_down(mock_local_journal_replay, false, -EINVAL); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + EXPECT_CALL(mock_local_journal, remove_listener(_)); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushStartError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + ::journal::MockReplayEntry mock_replay_entry; + expect_try_pop_front(mock_remote_journaler, 1, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, nullptr, -EINVAL); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + EXPECT_CALL(mock_local_journal, remove_listener(_)); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + expect_shut_down(mock_remote_journaler, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, GetTagError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + ::journal::MockReplayEntry mock_replay_entry; + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, -EINVAL); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagDemotion) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_notification(mock_threads, mock_replayer_listener); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 1); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::ORPHAN_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_get_tag_data(mock_local_journal, {}); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + + remote_replay_handler->handle_entries_available(); + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_get_tag_tid_in_debug(mock_local_journal); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, -EINVAL); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, PreprocessError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_replay_entry, get_data()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, true, -EINVAL); + + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ProcessError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_replay_entry, get_data()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, -EINVAL); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, replay_ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ImageNameUpdated) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockReplayer mock_replayer{ + &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid", + "remote mirror uuid", &mock_replayer_listener, &mock_threads}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 1); + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + mock_local_image_ctx.name = "NEW NAME"; + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + + remote_replay_handler->handle_entries_available(); + wait_for_notification(); + + auto image_spec = util::compute_image_spec(m_local_io_ctx, "NEW NAME"); + ASSERT_EQ(image_spec, mock_replayer.get_image_spec()); + + ASSERT_EQ(0, replay_ctx.wait()); + ASSERT_TRUE(mock_replayer.is_replaying()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index cdac9c614c42b..c642ae6afc4c8 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -42,6 +42,7 @@ set(rbd_mirror_internal image_replayer/PrepareRemoteImageRequest.cc image_replayer/Utils.cc image_replayer/journal/EventPreprocessor.cc + image_replayer/journal/Replayer.cc image_replayer/journal/ReplayStatusFormatter.cc image_sync/SyncPointCreateRequest.cc image_sync/SyncPointPruneRequest.cc diff --git a/src/tools/rbd_mirror/image_replayer/Replayer.h b/src/tools/rbd_mirror/image_replayer/Replayer.h new file mode 100644 index 0000000000000..3568614bf21d7 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/Replayer.h @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_H +#define RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_H + +#include + +struct Context; + +namespace rbd { +namespace mirror { +namespace image_replayer { + +struct Replayer { + virtual ~Replayer() {} + + virtual void init(Context* on_finish) = 0; + virtual void shut_down(Context* on_finish) = 0; + + virtual void flush(Context* on_finish) = 0; + + virtual bool get_replay_status(std::string* description, + Context* on_finish) = 0; + + virtual bool is_replaying() const = 0; + virtual bool is_resync_requested() const = 0; + + virtual int get_error_code() const = 0; + virtual std::string get_error_description() const = 0; +}; + +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_H diff --git a/src/tools/rbd_mirror/image_replayer/ReplayerListener.h b/src/tools/rbd_mirror/image_replayer/ReplayerListener.h new file mode 100644 index 0000000000000..f17f401b1fd15 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/ReplayerListener.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_LISTENER_H +#define RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_LISTENER_H + +namespace rbd { +namespace mirror { +namespace image_replayer { + +struct ReplayerListener { + virtual ~ReplayerListener() {} + + virtual void handle_notification() = 0; +}; + +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_LISTENER_H diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc new file mode 100644 index 0000000000000..c63d2e824b32a --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc @@ -0,0 +1,1204 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Replayer.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "librbd/journal/Replay.h" +#include "journal/Journaler.h" +#include "journal/JournalMetadataListener.h" +#include "journal/ReplayHandler.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/Types.h" +#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/ReplayerListener.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" +#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "Replayer: " << this << " " << __func__ << ": " + +extern PerfCounters *g_perf_counters; + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +namespace { + +uint32_t calculate_replay_delay(const utime_t &event_time, + int mirroring_replay_delay) { + if (mirroring_replay_delay <= 0) { + return 0; + } + + utime_t now = ceph_clock_now(); + if (event_time + mirroring_replay_delay <= now) { + return 0; + } + + // ensure it is rounded up when converting to integer + return (event_time + mirroring_replay_delay - now) + 1; +} + +} // anonymous namespace + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +template +struct Replayer::C_ReplayCommitted : public Context { + Replayer* replayer; + ReplayEntry replay_entry; + uint64_t replay_bytes; + utime_t replay_start_time; + + C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry, + uint64_t replay_bytes, const utime_t &replay_start_time) + : replayer(replayer), replay_entry(std::move(replay_entry)), + replay_bytes(replay_bytes), replay_start_time(replay_start_time) { + } + + void finish(int r) override { + replayer->handle_process_entry_safe(replay_entry, replay_bytes, + replay_start_time, r); + } +}; + +template +struct Replayer::C_TrackedOp : public Context { + Replayer *replayer; + Context* ctx; + + C_TrackedOp(Replayer* replayer, Context* ctx) + : replayer(replayer), ctx(ctx) { + replayer->m_in_flight_op_tracker.start_op(); + } + + void finish(int r) override { + ctx->complete(r); + replayer->m_in_flight_op_tracker.finish_op(); + } +}; + +template +struct Replayer::RemoteJournalerListener + : public ::journal::JournalMetadataListener { + Replayer* replayer; + + RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {} + + void handle_update(::journal::JournalMetadata*) override { + auto ctx = new C_TrackedOp(replayer, new LambdaContext([this](int r) { + replayer->handle_remote_journal_metadata_updated(); + })); + replayer->m_threads->work_queue->queue(ctx, 0); + } +}; + +template +struct Replayer::RemoteReplayHandler : public ::journal::ReplayHandler { + Replayer* replayer; + + RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {} + ~RemoteReplayHandler() override {}; + + void handle_entries_available() override { + replayer->handle_replay_ready(); + } + + void handle_complete(int r) override { + std::string error; + if (r == -ENOMEM) { + error = "not enough memory in autotune cache"; + } else if (r < 0) { + error = "replay completed with error: " + cpp_strerror(r); + } + replayer->handle_replay_complete(r, error); + } +}; + +template +struct Replayer::LocalJournalListener + : public librbd::journal::Listener { + Replayer* replayer; + + LocalJournalListener(Replayer* replayer) : replayer(replayer) { + } + + void handle_close() override { + replayer->handle_replay_complete(0, ""); + } + + void handle_promoted() override { + replayer->handle_replay_complete(0, "force promoted"); + } + + void handle_resync() override { + replayer->handle_resync_image(); + } +}; + +template +Replayer::Replayer( + I** local_image_ctx, Journaler* remote_journaler, + std::string local_mirror_uuid, std::string remote_mirror_uuid, + ReplayerListener* replayer_listener, Threads* threads) + : m_local_image_ctx(local_image_ctx), + m_remote_journaler(remote_journaler), + m_local_mirror_uuid(local_mirror_uuid), + m_remote_mirror_uuid(remote_mirror_uuid), + m_replayer_listener(replayer_listener), + m_threads(threads), + m_lock(ceph::make_mutex(librbd::util::unique_lock_name( + "rbd::mirror::image_replayer::journal::Replayer", this))) { + dout(10) << dendl; +} + +template +Replayer::~Replayer() { + dout(10) << dendl; + ceph_assert(m_remote_listener == nullptr); + ceph_assert(m_local_journal_listener == nullptr); + ceph_assert(m_local_journal_replay == nullptr); + ceph_assert(m_remote_replay_handler == nullptr); + ceph_assert(m_event_preprocessor == nullptr); + ceph_assert(m_replay_status_formatter == nullptr); + ceph_assert(m_delayed_preprocess_task == nullptr); + ceph_assert(m_flush_local_replay_task == nullptr); + ceph_assert(*m_local_image_ctx == nullptr); +} + +template +void Replayer::init(Context* on_finish) { + dout(10) << dendl; + + ceph_assert(m_local_journal == nullptr); + { + auto local_image_ctx = *m_local_image_ctx; + std::shared_lock image_locker{local_image_ctx->image_lock}; + m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx, + local_image_ctx->name); + m_local_journal = local_image_ctx->journal; + } + + ceph_assert(m_on_init_shutdown == nullptr); + m_on_init_shutdown = on_finish; + + if (m_local_journal == nullptr) { + std::unique_lock locker{m_lock}; + m_state = STATE_COMPLETE; + m_remote_journaler = nullptr; + + handle_replay_complete(locker, -EINVAL, "error accessing local journal"); + close_local_image(); + return; + } + + init_remote_journaler(); +} + +template +void Replayer::shut_down(Context* on_finish) { + dout(10) << dendl; + + std::unique_lock locker{m_lock}; + ceph_assert(m_on_init_shutdown == nullptr); + m_on_init_shutdown = on_finish; + + if (m_state == STATE_INIT) { + // raced with the last piece of the init state machine + return; + } else if (m_state == STATE_REPLAYING) { + m_state = STATE_COMPLETE; + } + + // if shutting down due to an error notification, we don't + // need to propagate the same error again + m_error_code = 0; + m_error_description = ""; + + cancel_delayed_preprocess_task(); + cancel_flush_local_replay_task(); + shut_down_local_journal_replay(); +} + +template +void Replayer::flush(Context* on_finish) { + dout(10) << dendl; + + flush_local_replay(new C_TrackedOp(this, on_finish)); +} + +template +bool Replayer::get_replay_status(std::string* description, + Context* on_finish) { + dout(10) << dendl; + + std::unique_lock locker{m_lock}; + if (m_replay_status_formatter == nullptr) { + derr << "replay not running" << dendl; + locker.unlock(); + + on_finish->complete(-EAGAIN); + return false; + } + + on_finish = new C_TrackedOp(this, on_finish); + return m_replay_status_formatter->get_or_send_update(description, + on_finish); +} + +template +void Replayer::init_remote_journaler() { + dout(10) << dendl; + + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_init_remote_journaler>(this); + m_remote_journaler->init(ctx); +} + +template +void Replayer::handle_init_remote_journaler(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl; + handle_replay_complete(locker, r, "error initializing remote journal"); + close_local_image(); + return; + } + + // listen for metadata updates to check for disconnect events + ceph_assert(m_remote_listener == nullptr); + m_remote_listener = new RemoteJournalerListener(this); + m_remote_journaler->add_listener(m_remote_listener); + + cls::journal::Client remote_client; + r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, + &remote_client); + if (r < 0) { + derr << "error retrieving remote journal client: " << cpp_strerror(r) + << dendl; + handle_replay_complete(locker, r, "error retrieving remote journal client"); + close_local_image(); + return; + } + + std::string error; + r = validate_remote_client_state(remote_client, &m_remote_client_meta, + &m_resync_requested, &error); + if (r < 0) { + handle_replay_complete(locker, r, error); + close_local_image(); + return; + } + + start_external_replay(); +} + +template +void Replayer::start_external_replay() { + dout(10) << dendl; + + Context *start_ctx = create_context_callback< + Replayer, &Replayer::handle_start_external_replay>(this); + m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx); +} + +template +void Replayer::handle_start_external_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + ceph_assert(m_local_journal_replay == nullptr); + derr << "error starting external replay on local image " + << (*m_local_image_ctx)->id << ": " << cpp_strerror(r) << dendl; + + handle_replay_complete(locker, r, "error starting replay on local image"); + close_local_image(); + return; + } + + if (!notify_init_complete(locker)) { + return; + } + + m_state = STATE_REPLAYING; + + // listen for promotion and resync requests against local journal + m_local_journal_listener = new LocalJournalListener(this); + m_local_journal->add_listener(m_local_journal_listener); + + // verify that the local image wasn't force-promoted and that a resync hasn't + // been requested now that we are listening for events + if (m_local_journal->is_tag_owner()) { + dout(10) << "local image force-promoted" << dendl; + handle_replay_complete(locker, 0, "force promoted"); + return; + } + + bool resync_requested = false; + r = m_local_journal->is_resync_requested(&resync_requested); + if (r < 0) { + dout(10) << "failed to determine resync state: " << cpp_strerror(r) + << dendl; + handle_replay_complete(locker, r, "error parsing resync state"); + return; + } else if (resync_requested) { + dout(10) << "local image resync requested" << dendl; + handle_replay_complete(locker, 0, "resync requested"); + return; + } + + // start remote journal replay + m_event_preprocessor = EventPreprocessor::create( + **m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid, + &m_remote_client_meta, m_threads->work_queue); + m_replay_status_formatter = ReplayStatusFormatter::create( + m_remote_journaler, m_local_mirror_uuid); + + auto cct = static_cast((*m_local_image_ctx)->cct); + double poll_seconds = cct->_conf.get_val( + "rbd_mirror_journal_poll_age"); + m_remote_replay_handler = new RemoteReplayHandler(this); + m_remote_journaler->start_live_replay(m_remote_replay_handler, poll_seconds); + + notify_status_updated(); +} + +template +bool Replayer::notify_init_complete(std::unique_lock& locker) { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(m_state == STATE_INIT); + + // notify that init has completed + Context *on_finish = nullptr; + std::swap(m_on_init_shutdown, on_finish); + + locker.unlock(); + on_finish->complete(0); + locker.lock(); + + if (m_on_init_shutdown != nullptr) { + // shut down requested after we notified init complete but before we + // grabbed the lock + close_local_image(); + return false; + } + + return true; +} + +template +void Replayer::shut_down_local_journal_replay() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (m_local_journal_replay == nullptr) { + wait_for_event_replay(); + return; + } + + dout(10) << dendl; + auto ctx = create_context_callback< + Replayer, &Replayer::handle_shut_down_local_journal_replay>(this); + m_local_journal_replay->shut_down(true, ctx); +} + +template +void Replayer::handle_shut_down_local_journal_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl; + handle_replay_error(r, "failed to shut down local journal replay"); + } + + wait_for_event_replay(); +} + +template +void Replayer::wait_for_event_replay() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << dendl; + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_wait_for_event_replay>(this)); + m_event_replay_tracker.wait_for_ops(ctx); +} + +template +void Replayer::handle_wait_for_event_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + close_local_image(); +} + +template +void Replayer::close_local_image() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << dendl; + if (m_local_journal_listener != nullptr) { + // blocks if listener notification is in-progress + m_local_journal->remove_listener(m_local_journal_listener); + delete m_local_journal_listener; + m_local_journal_listener = nullptr; + } + + if (m_local_journal_replay != nullptr) { + m_local_journal->stop_external_replay(); + m_local_journal_replay = nullptr; + } + + if (m_event_preprocessor != nullptr) { + image_replayer::journal::EventPreprocessor::destroy( + m_event_preprocessor); + m_event_preprocessor = nullptr; + } + + m_local_journal.reset(); + + // NOTE: it's important to ensure that the local image is fully + // closed before attempting to close the remote journal in + // case the remote cluster is unreachable + auto ctx = create_context_callback< + Replayer, &Replayer::handle_close_local_image>(this); + auto request = image_replayer::CloseImageRequest::create( + m_local_image_ctx, ctx); + request->send(); +} + + +template +void Replayer::handle_close_local_image(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "error closing local iamge: " << cpp_strerror(r) << dendl; + handle_replay_error(r, "failed to close local image"); + } + + ceph_assert(*m_local_image_ctx == nullptr); + stop_remote_journaler_replay(); +} + +template +void Replayer::stop_remote_journaler_replay() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (m_remote_journaler == nullptr) { + wait_for_in_flight_ops(); + return; + } else if (m_remote_replay_handler == nullptr) { + shut_down_remote_journaler(); + return; + } + + dout(10) << dendl; + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_stop_remote_journaler_replay>(this)); + m_remote_journaler->stop_replay(ctx); +} + +template +void Replayer::handle_stop_remote_journaler_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "failed to stop remote journaler replay : " << cpp_strerror(r) + << dendl; + handle_replay_error(r, "failed to stop remote journaler replay"); + } + + delete m_remote_replay_handler; + m_remote_replay_handler = nullptr; + + shut_down_remote_journaler(); +} + +template +void Replayer::shut_down_remote_journaler() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << dendl; + if (m_remote_listener != nullptr) { + m_remote_journaler->remove_listener(m_remote_listener); + delete m_remote_listener; + m_remote_listener = nullptr; + } + + auto ctx = create_context_callback< + Replayer, &Replayer::handle_shut_down_remote_journaler>(this); + m_remote_journaler->shut_down(ctx); +} + +template +void Replayer::handle_shut_down_remote_journaler(int r) { + dout(10) << "r=" << r << dendl; + if (r < 0) { + derr << "failed to shut down remote journaler: " << cpp_strerror(r) + << dendl; + + std::unique_lock locker{m_lock}; + handle_replay_error(r, "failed to shut down remote journaler"); + } + + m_remote_journaler = nullptr; + + wait_for_in_flight_ops(); +} + +template +void Replayer::wait_for_in_flight_ops() { + dout(10) << dendl; + + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_wait_for_in_flight_ops>(this)); + m_in_flight_op_tracker.wait_for_ops(ctx); +} + +template +void Replayer::handle_wait_for_in_flight_ops(int r) { + dout(10) << "r=" << r << dendl; + + ReplayStatusFormatter::destroy(m_replay_status_formatter); + m_replay_status_formatter = nullptr; + + ceph_assert(m_on_init_shutdown != nullptr); + m_on_init_shutdown->complete(m_error_code); +} + +template +void Replayer::handle_remote_journal_metadata_updated() { + dout(20) << dendl; + + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + return; + } + + cls::journal::Client remote_client; + int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, + &remote_client); + if (r < 0) { + derr << "failed to retrieve client: " << cpp_strerror(r) << dendl; + return; + } + + librbd::journal::MirrorPeerClientMeta remote_client_meta; + std::string error; + r = validate_remote_client_state(remote_client, &remote_client_meta, + &m_resync_requested, &error); + if (r < 0) { + dout(0) << "client flagged disconnected, stopping image replay" << dendl; + handle_replay_complete(locker, r, error); + } +} + +template +void Replayer::schedule_flush_local_replay_task() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + std::unique_lock timer_locker{m_threads->timer_lock}; + if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) { + return; + } + + dout(15) << dendl; + m_flush_local_replay_task = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_flush_local_replay_task>(this)); + m_threads->timer->add_event_after(30, m_flush_local_replay_task); +} + +template +void Replayer::cancel_flush_local_replay_task() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + std::unique_lock timer_locker{m_threads->timer_lock}; + if (m_flush_local_replay_task != nullptr) { + dout(10) << dendl; + m_threads->timer->cancel_event(m_flush_local_replay_task); + m_flush_local_replay_task = nullptr; + } +} + +template +void Replayer::handle_flush_local_replay_task(int) { + dout(15) << dendl; + + m_in_flight_op_tracker.start_op(); + auto on_finish = new LambdaContext([this](int) { + std::unique_lock locker{m_lock}; + + { + std::unique_lock timer_locker{m_threads->timer_lock}; + m_flush_local_replay_task = nullptr; + } + + notify_status_updated(); + m_in_flight_op_tracker.finish_op(); + }); + flush_local_replay(on_finish); +} + +template +void Replayer::flush_local_replay(Context* on_flush) { + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + locker.unlock(); + on_flush->complete(0); + return; + } else if (m_local_journal_replay == nullptr) { + // raced w/ a tag creation stop/start, which implies that + // the replay is flushed + locker.unlock(); + flush_commit_position(on_flush); + return; + } + + dout(15) << dendl; + auto ctx = new LambdaContext( + [this, on_flush](int r) { + handle_flush_local_replay(on_flush, r); + }); + m_local_journal_replay->flush(ctx); +} + +template +void Replayer::handle_flush_local_replay(Context* on_flush, int r) { + dout(15) << "r=" << r << dendl; + if (r < 0) { + derr << "error flushing local replay: " << cpp_strerror(r) << dendl; + on_flush->complete(r); + return; + } + + flush_commit_position(on_flush); +} + +template +void Replayer::flush_commit_position(Context* on_flush) { + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + locker.unlock(); + on_flush->complete(0); + return; + } + + dout(15) << dendl; + auto ctx = new LambdaContext( + [this, on_flush](int r) { + handle_flush_commit_position(on_flush, r); + }); + m_remote_journaler->flush_commit_position(ctx); +} + +template +void Replayer::handle_flush_commit_position(Context* on_flush, int r) { + dout(15) << "r=" << r << dendl; + if (r < 0) { + derr << "error flushing remote journal commit position: " + << cpp_strerror(r) << dendl; + } + + on_flush->complete(r); +} + +template +void Replayer::handle_replay_error(int r, const std::string &error) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (m_error_code == 0) { + m_error_code = r; + m_error_description = error; + } +} + +template +bool Replayer::is_replay_complete() const { + std::unique_lock locker{m_lock}; + return is_replay_complete(locker); +} + +template +bool Replayer::is_replay_complete( + const std::unique_lock&) const { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + return (m_state == STATE_COMPLETE); +} + +template +void Replayer::handle_replay_complete(int r, const std::string &error) { + std::unique_lock locker{m_lock}; + handle_replay_complete(locker, r, error); +} + +template +void Replayer::handle_replay_complete( + const std::unique_lock&, int r, const std::string &error) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << "r=" << r << ", error=" << error << dendl; + if (r < 0) { + derr << "replay encountered an error: " << cpp_strerror(r) << dendl; + handle_replay_error(r, error); + } + + if (m_state != STATE_REPLAYING) { + return; + } + + m_state = STATE_COMPLETE; + notify_status_updated(); +} + +template +void Replayer::handle_replay_ready() { + std::unique_lock locker{m_lock}; + handle_replay_ready(locker); +} + +template +void Replayer::handle_replay_ready( + std::unique_lock& locker) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(20) << dendl; + if (is_replay_complete(locker)) { + return; + } + + if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) { + dout(20) << "no entries ready for replay" << dendl; + return; + } + + // can safely drop lock once the entry is tracked + m_event_replay_tracker.start_op(); + locker.unlock(); + + dout(20) << "entry tid=" << m_replay_entry.get_commit_tid() + << "tag_tid=" << m_replay_tag_tid << dendl; + if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) { + // must allocate a new local journal tag prior to processing + replay_flush(); + return; + } + + preprocess_entry(); +} + +template +void Replayer::replay_flush() { + dout(10) << dendl; + + // shut down the replay to flush all IO and ops and create a new + // replayer to handle the new tag epoch + auto ctx = create_context_callback< + Replayer, &Replayer::handle_replay_flush_shut_down>(this); + ceph_assert(m_local_journal_replay != nullptr); + m_local_journal_replay->shut_down(false, ctx); +} + +template +void Replayer::handle_replay_flush_shut_down(int r) { + { + std::unique_lock locker{m_lock}; + ceph_assert(m_local_journal != nullptr); + m_local_journal->stop_external_replay(); + m_local_journal_replay = nullptr; + } + + dout(10) << "r=" << r << dendl; + if (r < 0) { + handle_replay_flush(r); + return; + } + + auto ctx = create_context_callback< + Replayer, &Replayer::handle_replay_flush>(this); + m_local_journal->start_external_replay(&m_local_journal_replay, ctx); +} + +template +void Replayer::handle_replay_flush(int r) { + dout(10) << "r=" << r << dendl; + if (r < 0) { + derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "replay flush encountered an error"); + m_event_replay_tracker.finish_op(); + return; + } else if (is_replay_complete()) { + m_event_replay_tracker.finish_op(); + return; + } + + get_remote_tag(); +} + +template +void Replayer::get_remote_tag() { + dout(15) << "tag_tid: " << m_replay_tag_tid << dendl; + + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_get_remote_tag>(this); + m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx); +} + +template +void Replayer::handle_get_remote_tag(int r) { + dout(15) << "r=" << r << dendl; + + if (r == 0) { + try { + auto it = m_replay_tag.data.cbegin(); + decode(m_replay_tag_data, it); + } catch (const buffer::error &err) { + r = -EBADMSG; + } + } + + if (r < 0) { + derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": " + << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to retrieve remote tag"); + m_event_replay_tracker.finish_op(); + return; + } + + m_replay_tag_valid = true; + dout(15) << "decoded remote tag " << m_replay_tag_tid << ": " + << m_replay_tag_data << dendl; + + allocate_local_tag(); +} + +template +void Replayer::allocate_local_tag() { + dout(15) << dendl; + + std::string mirror_uuid = m_replay_tag_data.mirror_uuid; + if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { + mirror_uuid = m_remote_mirror_uuid; + } else if (mirror_uuid == m_local_mirror_uuid) { + mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; + } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) { + // handle possible edge condition where daemon can failover and + // the local image has already been promoted/demoted + auto local_tag_data = m_local_journal->get_tag_data(); + if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID && + (local_tag_data.predecessor.commit_valid && + local_tag_data.predecessor.mirror_uuid == + librbd::Journal<>::LOCAL_MIRROR_UUID)) { + dout(15) << "skipping stale demotion event" << dendl; + handle_process_entry_safe(m_replay_entry, m_replay_bytes, + m_replay_start_time, 0); + handle_replay_ready(); + return; + } else { + dout(5) << "encountered image demotion: stopping" << dendl; + handle_replay_complete(0, ""); + } + } + + librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor); + if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { + predecessor.mirror_uuid = m_remote_mirror_uuid; + } else if (predecessor.mirror_uuid == m_local_mirror_uuid) { + predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; + } + + dout(15) << "mirror_uuid=" << mirror_uuid << ", " + << "predecessor=" << predecessor << ", " + << "replay_tag_tid=" << m_replay_tag_tid << dendl; + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_allocate_local_tag>(this); + m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx); +} + +template +void Replayer::handle_allocate_local_tag(int r) { + dout(15) << "r=" << r << ", " + << "tag_tid=" << m_local_journal->get_tag_tid() << dendl; + if (r < 0) { + derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to allocate journal tag"); + m_event_replay_tracker.finish_op(); + return; + } + + preprocess_entry(); +} + +template +void Replayer::preprocess_entry() { + dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid() + << dendl; + + bufferlist data = m_replay_entry.get_data(); + auto it = data.cbegin(); + int r = m_local_journal_replay->decode(&it, &m_event_entry); + if (r < 0) { + derr << "failed to decode journal event" << dendl; + handle_replay_complete(r, "failed to decode journal event"); + m_event_replay_tracker.finish_op(); + return; + } + + m_replay_bytes = data.length(); + uint32_t delay = calculate_replay_delay( + m_event_entry.timestamp, (*m_local_image_ctx)->mirroring_replay_delay); + if (delay == 0) { + handle_preprocess_entry_ready(0); + return; + } + + std::unique_lock locker{m_lock}; + if (is_replay_complete(locker)) { + // don't schedule a delayed replay task if a shut-down is in-progress + m_event_replay_tracker.finish_op(); + return; + } + + dout(20) << "delaying replay by " << delay << " sec" << dendl; + std::unique_lock timer_locker{m_threads->timer_lock}; + ceph_assert(m_delayed_preprocess_task == nullptr); + m_delayed_preprocess_task = create_context_callback< + Replayer, &Replayer::handle_delayed_preprocess_task>(this); + m_threads->timer->add_event_after(delay, m_delayed_preprocess_task); +} + +template +void Replayer::handle_delayed_preprocess_task(int r) { + dout(20) << "r=" << r << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock)); + m_delayed_preprocess_task = nullptr; + + m_threads->work_queue->queue(create_context_callback< + Replayer, &Replayer::handle_preprocess_entry_ready>(this), 0); +} + +template +void Replayer::handle_preprocess_entry_ready(int r) { + dout(20) << "r=" << r << dendl; + ceph_assert(r == 0); + + m_replay_start_time = ceph_clock_now(); + if (!m_event_preprocessor->is_required(m_event_entry)) { + process_entry(); + return; + } + + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_preprocess_entry_safe>(this); + m_event_preprocessor->preprocess(&m_event_entry, ctx); +} + +template +void Replayer::handle_preprocess_entry_safe(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + if (r == -ECANCELED) { + handle_replay_complete(0, "lost exclusive lock"); + } else { + derr << "failed to preprocess journal event" << dendl; + handle_replay_complete(r, "failed to preprocess journal event"); + } + + m_event_replay_tracker.finish_op(); + return; + } + + process_entry(); +} + +template +void Replayer::process_entry() { + dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() + << dendl; + + Context *on_ready = create_context_callback< + Replayer, &Replayer::handle_process_entry_ready>(this); + Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry), + m_replay_bytes, + m_replay_start_time); + + m_local_journal_replay->process(m_event_entry, on_ready, on_commit); +} + +template +void Replayer::handle_process_entry_ready(int r) { + std::unique_lock locker{m_lock}; + + dout(20) << dendl; + ceph_assert(r == 0); + + bool update_status = false; + { + auto local_image_ctx = *m_local_image_ctx; + std::shared_lock image_locker{local_image_ctx->image_lock}; + auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx, + local_image_ctx->name); + if (m_image_spec != image_spec) { + m_image_spec = image_spec; + update_status = true; + } + } + + if (update_status) { + notify_status_updated(); + } + + // attempt to process the next event + handle_replay_ready(locker); +} + +template +void Replayer::handle_process_entry_safe( + const ReplayEntry &replay_entry, uint64_t replay_bytes, + const utime_t &replay_start_time, int r) { + dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r + << dendl; + + if (r < 0) { + derr << "failed to commit journal event: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to commit journal event"); + } else { + ceph_assert(m_remote_journaler != nullptr); + m_remote_journaler->committed(replay_entry); + } + + auto latency = ceph_clock_now() - replay_start_time; + if (g_perf_counters) { + g_perf_counters->inc(l_rbd_mirror_replay); + g_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes); + g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); + } + + auto ctx = new LambdaContext( + [this, replay_bytes, latency](int r) { + std::unique_lock locker{m_lock}; + schedule_flush_local_replay_task(); + + if (m_perf_counters) { + m_perf_counters->inc(l_rbd_mirror_replay); + m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes); + m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); + } + + m_event_replay_tracker.finish_op(); + }); + m_threads->work_queue->queue(ctx, 0); +} + +template +void Replayer::handle_resync_image() { + dout(10) << dendl; + + std::unique_lock locker{m_lock}; + m_resync_requested = true; + handle_replay_complete(locker, 0, "resync requested"); +} + +template +void Replayer::notify_status_updated() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << dendl; + + auto ctx = new C_TrackedOp(this, new LambdaContext( + [this](int) { + m_replayer_listener->handle_notification(); + })); + m_threads->work_queue->queue(ctx, 0); +} + +template +void Replayer::cancel_delayed_preprocess_task() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + bool canceled_delayed_preprocess_task = false; + { + std::unique_lock timer_locker{m_threads->timer_lock}; + if (m_delayed_preprocess_task != nullptr) { + dout(10) << dendl; + canceled_delayed_preprocess_task = m_threads->timer->cancel_event( + m_delayed_preprocess_task); + ceph_assert(canceled_delayed_preprocess_task); + m_delayed_preprocess_task = nullptr; + } + } + + if (canceled_delayed_preprocess_task) { + // wake up sleeping replay + m_event_replay_tracker.finish_op(); + } +} + +template +int Replayer::validate_remote_client_state( + const cls::journal::Client& remote_client, + librbd::journal::MirrorPeerClientMeta* remote_client_meta, + bool* resync_requested, std::string* error) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (!util::decode_client_meta(remote_client, remote_client_meta)) { + // require operator intervention since the data is corrupt + *error = "error retrieving remote journal client"; + return -EBADMSG; + } + + auto local_image_ctx = *m_local_image_ctx; + dout(5) << "image_id=" << local_image_ctx->id << ", " + << "remote_client_meta.image_id=" + << remote_client_meta->image_id << ", " + << "remote_client.state=" << remote_client.state << dendl; + if (remote_client_meta->image_id == local_image_ctx->id && + remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) { + dout(5) << "client flagged disconnected, stopping image replay" << dendl; + if (local_image_ctx->config.template get_val( + "rbd_mirroring_resync_after_disconnect")) { + dout(10) << "disconnected: automatic resync" << dendl; + *resync_requested = true; + *error = "disconnected: automatic resync"; + return -ENOTCONN; + } else { + dout(10) << "disconnected" << dendl; + *error = "disconnected"; + return -ENOTCONN; + } + } + + return 0; +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::Replayer; diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.h b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h new file mode 100644 index 0000000000000..3b464c9065d30 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h @@ -0,0 +1,305 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H +#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H + +#include "tools/rbd_mirror/image_replayer/Replayer.h" +#include "include/utime.h" +#include "common/AsyncOpTracker.h" +#include "common/ceph_mutex.h" +#include "common/RefCountedObj.h" +#include "cls/journal/cls_journal_types.h" +#include "journal/ReplayEntry.h" +#include "librbd/ImageCtx.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include +#include + +namespace journal { class Journaler; } +namespace librbd { + +struct ImageCtx; +namespace journal { template class Replay; } + +} // namespace librbd + +namespace rbd { +namespace mirror { + +template struct Threads; + +namespace image_replayer { + +struct ReplayerListener; + +namespace journal { + +template class EventPreprocessor; +template class ReplayStatusFormatter; + +template +class Replayer : public image_replayer::Replayer { +public: + typedef typename librbd::journal::TypeTraits::Journaler Journaler; + + Replayer( + ImageCtxT** local_image_ctx, Journaler* remote_journaler, + std::string m_local_mirror_uuid, std::string m_remote_mirror_uuid, + ReplayerListener* replayer_listener, Threads* threads); + ~Replayer(); + + void init(Context* on_finish) override; + void shut_down(Context* on_finish) override; + + void flush(Context* on_finish) override; + + bool get_replay_status(std::string* description, Context* on_finish) override; + + bool is_replaying() const override { + std::unique_lock locker{m_lock}; + return (m_state == STATE_REPLAYING); + } + + bool is_resync_requested() const override { + std::unique_lock locker(m_lock); + return m_resync_requested; + } + + int get_error_code() const override { + std::unique_lock locker(m_lock); + return m_error_code; + } + + std::string get_error_description() const override { + std::unique_lock locker(m_lock); + return m_error_description; + } + + std::string get_image_spec() const { + std::unique_lock locker(m_lock); + return m_image_spec; + } + +private: + /** + * @verbatim + * + * + * | + * v (error) + * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * * + * | * + * | /--------------------------------------------\ * + * | | | * + * v v (asok flush) | * + * REPLAYING -------------> LOCAL_REPLAY_FLUSH | * + * | \ | | * + * | | v | * + * | | FLUSH_COMMIT_POSITION | * + * | | | | * + * | | \--------------------/| * + * | | | * + * | | (entries available) | * + * | \-----------> REPLAY_READY | * + * | | | * + * | | (skip if not | * + * | v needed) (error) * + * | REPLAY_FLUSH * * * * * * * * * * + * | | | * * + * | | (skip if not | * * + * | v needed) (error) * * + * | GET_REMOTE_TAG * * * * * * * * * + * | | | * * + * | | (skip if not | * * + * | v needed) (error) * * + * | ALLOCATE_LOCAL_TAG * * * * * * * + * | | | * * + * | v (error) * * + * | PREPROCESS_ENTRY * * * * * * * * + * | | | * * + * | v (error) * * + * | PROCESS_ENTRY * * * * * * * * * * + * | | | * * + * | \---------------------/ * * + * v (shutdown) * * + * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * * + * | * + * v * + * SHUT_DOWN_LOCAL_JOURNAL_REPLAY * + * | * + * v * + * WAIT_FOR_REPLAY * + * | * + * v * + * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * * + * | + * v (skip if not started) + * STOP_REMOTE_JOURNALER_REPLAY + * | + * v (skip if not initialized) + * SHUT_DOWN_REMOTE_JOURNALER + * | + * v + * WAIT_FOR_IN_FLIGHT_OPS + * | + * v + * + * + * @endverbatim + */ + + typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; + + enum State { + STATE_INIT, + STATE_REPLAYING, + STATE_COMPLETE + }; + + struct C_ReplayCommitted; + struct C_TrackedOp; + struct RemoteJournalerListener; + struct RemoteReplayHandler; + struct LocalJournalListener; + + ImageCtxT** m_local_image_ctx; + Journaler* m_remote_journaler; + std::string m_local_mirror_uuid; + std::string m_remote_mirror_uuid; + ReplayerListener* m_replayer_listener; + Threads* m_threads; + + mutable ceph::mutex m_lock; + + std::string m_image_spec; + Context* m_on_init_shutdown = nullptr; + + State m_state = STATE_INIT; + int m_error_code = 0; + std::string m_error_description; + bool m_resync_requested = false; + + ceph::ref_t::type> + m_local_journal; + RemoteJournalerListener* m_remote_listener = nullptr; + librbd::journal::MirrorPeerClientMeta m_remote_client_meta; + + librbd::journal::Replay* m_local_journal_replay = nullptr; + EventPreprocessor* m_event_preprocessor = nullptr; + ReplayStatusFormatter* m_replay_status_formatter = nullptr; + RemoteReplayHandler* m_remote_replay_handler = nullptr; + LocalJournalListener* m_local_journal_listener = nullptr; + + PerfCounters *m_perf_counters = nullptr; + + ReplayEntry m_replay_entry; + uint64_t m_replay_bytes = 0; + utime_t m_replay_start_time; + bool m_replay_tag_valid = false; + uint64_t m_replay_tag_tid = 0; + cls::journal::Tag m_replay_tag; + librbd::journal::TagData m_replay_tag_data; + librbd::journal::EventEntry m_event_entry; + + AsyncOpTracker m_event_replay_tracker; + Context *m_delayed_preprocess_task = nullptr; + + AsyncOpTracker m_in_flight_op_tracker; + Context *m_flush_local_replay_task = nullptr; + + void handle_remote_journal_metadata_updated(); + + void schedule_flush_local_replay_task(); + void cancel_flush_local_replay_task(); + void handle_flush_local_replay_task(int r); + + void flush_local_replay(Context* on_flush); + void handle_flush_local_replay(Context* on_flush, int r); + + void flush_commit_position(Context* on_flush); + void handle_flush_commit_position(Context* on_flush, int r); + + void init_remote_journaler(); + void handle_init_remote_journaler(int r); + + void start_external_replay(); + void handle_start_external_replay(int r); + + bool notify_init_complete(std::unique_lock& locker); + + void shut_down_local_journal_replay(); + void handle_shut_down_local_journal_replay(int r); + + void wait_for_event_replay(); + void handle_wait_for_event_replay(int r); + + void close_local_image(); + void handle_close_local_image(int r); + + void stop_remote_journaler_replay(); + void handle_stop_remote_journaler_replay(int r); + + void shut_down_remote_journaler(); + void handle_shut_down_remote_journaler(int r); + + void wait_for_in_flight_ops(); + void handle_wait_for_in_flight_ops(int r); + + void replay_flush(); + void handle_replay_flush_shut_down(int r); + void handle_replay_flush(int r); + + void get_remote_tag(); + void handle_get_remote_tag(int r); + + void allocate_local_tag(); + void handle_allocate_local_tag(int r); + + void handle_replay_error(int r, const std::string &error); + + bool is_replay_complete() const; + bool is_replay_complete(const std::unique_lock& locker) const; + + void handle_replay_complete(int r, const std::string &error_desc); + void handle_replay_complete(const std::unique_lock&, + int r, const std::string &error_desc); + void handle_replay_ready(); + void handle_replay_ready(std::unique_lock& locker); + + void preprocess_entry(); + void handle_delayed_preprocess_task(int r); + void handle_preprocess_entry_ready(int r); + void handle_preprocess_entry_safe(int r); + + void process_entry(); + void handle_process_entry_ready(int r); + void handle_process_entry_safe(const ReplayEntry& replay_entry, + uint64_t relay_bytes, + const utime_t &replay_start_time, int r); + + void handle_resync_image(); + + void notify_status_updated(); + + void cancel_delayed_preprocess_task(); + + int validate_remote_client_state( + const cls::journal::Client& remote_client, + librbd::journal::MirrorPeerClientMeta* remote_client_meta, + bool* resync_requested, std::string* error); + +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::Replayer; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H -- 2.39.5