--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+#include "test/journal/mock/MockJournaler.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/mock/MockContextWQ.h"
+#include "test/rbd_mirror/mock/MockSafeTimer.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace librbd {
+
+namespace {
+
+struct MockTestJournal;
+
+struct MockTestImageCtx : public librbd::MockImageCtx {
+ explicit MockTestImageCtx(librbd::ImageCtx &image_ctx,
+ MockTestJournal& mock_test_journal)
+ : librbd::MockImageCtx(image_ctx), journal(&mock_test_journal) {
+ }
+
+ MockTestJournal* journal = nullptr;
+};
+
+struct MockTestJournal : public MockJournal {
+ MOCK_METHOD2(start_external_replay, void(journal::Replay<MockTestImageCtx> **,
+ Context *on_start));
+ MOCK_METHOD0(stop_external_replay, void());
+};
+
+} // anonymous namespace
+
+namespace journal {
+
+template <>
+struct TypeTraits<librbd::MockTestImageCtx> {
+ typedef ::journal::MockJournaler Journaler;
+ typedef ::journal::MockReplayEntryProxy ReplayEntry;
+};
+
+template<>
+struct Replay<MockTestImageCtx> {
+ MOCK_METHOD2(decode, int(bufferlist::const_iterator *, EventEntry *));
+ MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *));
+ MOCK_METHOD1(flush, void(Context*));
+ MOCK_METHOD2(shut_down, void(bool, Context*));
+};
+
+} // namespace journal
+} // namespace librbd
+
+namespace boost {
+
+template<>
+struct intrusive_ptr<librbd::MockTestJournal> {
+ intrusive_ptr() {
+ }
+ intrusive_ptr(librbd::MockTestJournal* mock_test_journal)
+ : mock_test_journal(mock_test_journal) {
+ }
+
+ librbd::MockTestJournal* operator->() {
+ return mock_test_journal;
+ }
+
+ void reset() {
+ mock_test_journal = nullptr;
+ }
+
+ const librbd::MockTestJournal* get() const {
+ return mock_test_journal;
+ }
+
+ template<typename T>
+ bool operator==(T* t) const {
+ return (mock_test_journal == t);
+ }
+
+ librbd::MockTestJournal* mock_test_journal = nullptr;
+};
+
+} // namespace boost
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+ MockSafeTimer *timer;
+ ceph::mutex &timer_lock;
+
+ MockContextWQ *work_queue;
+
+ Threads(Threads<librbd::ImageCtx>* threads)
+ : timer(new MockSafeTimer()),
+ timer_lock(threads->timer_lock),
+ work_queue(new MockContextWQ()) {
+ }
+ ~Threads() {
+ delete timer;
+ delete work_queue;
+ }
+};
+
+namespace {
+
+struct MockReplayerListener : public image_replayer::ReplayerListener {
+ MOCK_METHOD0(handle_notification, void());
+};
+
+} // anonymous namespace
+
+namespace image_replayer {
+
+template<>
+struct CloseImageRequest<librbd::MockTestImageCtx> {
+ static CloseImageRequest* s_instance;
+ librbd::MockTestImageCtx **image_ctx = nullptr;
+ Context *on_finish = nullptr;
+
+ static CloseImageRequest* create(librbd::MockTestImageCtx **image_ctx,
+ Context *on_finish) {
+ ceph_assert(s_instance != nullptr);
+ s_instance->image_ctx = image_ctx;
+ s_instance->on_finish = on_finish;
+ return s_instance;
+ }
+
+ CloseImageRequest() {
+ ceph_assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ ~CloseImageRequest() {
+ ceph_assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD0(send, void());
+};
+
+CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+namespace journal {
+
+template <>
+struct EventPreprocessor<librbd::MockTestImageCtx> {
+ static EventPreprocessor *s_instance;
+
+ static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx,
+ ::journal::MockJournaler &remote_journaler,
+ const std::string &local_mirror_uuid,
+ librbd::journal::MirrorPeerClientMeta *client_meta,
+ MockContextWQ *work_queue) {
+ ceph_assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ static void destroy(EventPreprocessor* processor) {
+ }
+
+ EventPreprocessor() {
+ ceph_assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ ~EventPreprocessor() {
+ ceph_assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &));
+ MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
+};
+
+template<>
+struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
+ static ReplayStatusFormatter* s_instance;
+
+ static ReplayStatusFormatter* create(::journal::MockJournaler *journaler,
+ const std::string &mirror_uuid) {
+ ceph_assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ static void destroy(ReplayStatusFormatter* formatter) {
+ }
+
+ ReplayStatusFormatter() {
+ ceph_assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ ~ReplayStatusFormatter() {
+ ceph_assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
+};
+
+EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
+ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.cc"
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using ::testing::_;
+using ::testing::AtLeast;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::MatcherCast;
+using ::testing::Return;
+using ::testing::ReturnArg;
+using ::testing::SaveArg;
+using ::testing::SetArgPointee;
+using ::testing::WithArg;
+
+class TestMockImageReplayerJournalReplayer : public TestMockFixture {
+public:
+ typedef Replayer<librbd::MockTestImageCtx> MockReplayer;
+ typedef EventPreprocessor<librbd::MockTestImageCtx> MockEventPreprocessor;
+ typedef ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
+ typedef Threads<librbd::MockTestImageCtx> MockThreads;
+ typedef CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
+ typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
+
+ void SetUp() override {
+ TestMockFixture::SetUp();
+
+ librbd::RBD rbd;
+ ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size));
+ ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx));
+ }
+
+ bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) {
+ bufferlist bl;
+ encode(tag_data, bl);
+ return bl;
+ }
+
+ void expect_work_queue_repeatedly(MockThreads &mock_threads) {
+ EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+ .WillRepeatedly(Invoke([this](Context *ctx, int r) {
+ m_threads->work_queue->queue(ctx, r);
+ }));
+ }
+
+ void expect_add_event_after_repeatedly(MockThreads &mock_threads) {
+ EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
+ .WillRepeatedly(
+ DoAll(Invoke([this](double seconds, Context *ctx) {
+ m_threads->timer->add_event_after(seconds, ctx);
+ }),
+ ReturnArg<1>()));
+ EXPECT_CALL(*mock_threads.timer, cancel_event(_))
+ .WillRepeatedly(
+ Invoke([this](Context *ctx) {
+ return m_threads->timer->cancel_event(ctx);
+ }));
+ }
+
+ void expect_init(::journal::MockJournaler &mock_journaler, int r) {
+ EXPECT_CALL(mock_journaler, init(_))
+ .WillOnce(CompleteContext(m_threads->work_queue, r));
+ }
+
+ void expect_stop_replay(::journal::MockJournaler &mock_journaler, int r) {
+ EXPECT_CALL(mock_journaler, stop_replay(_))
+ .WillOnce(CompleteContext(r));
+ }
+
+ void expect_shut_down(::journal::MockJournaler &mock_journaler, int r) {
+ EXPECT_CALL(mock_journaler, shut_down(_))
+ .WillOnce(CompleteContext(m_threads->work_queue, r));
+ }
+
+ void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) {
+ EXPECT_CALL(mock_replay, shut_down(cancel_ops, _))
+ .WillOnce(WithArg<1>(CompleteContext(m_threads->work_queue, r)));
+ }
+
+ void expect_get_cached_client(::journal::MockJournaler &mock_journaler,
+ const std::string& client_id,
+ const cls::journal::Client& client,
+ const librbd::journal::ClientMeta& client_meta,
+ int r) {
+ librbd::journal::ClientData client_data;
+ client_data.client_meta = client_meta;
+
+ cls::journal::Client client_copy{client};
+ encode(client_data, client_copy.data);
+
+ EXPECT_CALL(mock_journaler, get_cached_client(client_id, _))
+ .WillOnce(DoAll(SetArgPointee<1>(client_copy),
+ Return(r)));
+ }
+
+ void expect_start_external_replay(librbd::MockTestJournal &mock_journal,
+ MockReplay *mock_replay, int r) {
+ EXPECT_CALL(mock_journal, start_external_replay(_, _))
+ .WillOnce(DoAll(SetArgPointee<0>(mock_replay),
+ WithArg<1>(CompleteContext(m_threads->work_queue, r))));
+ }
+
+ void expect_is_tag_owner(librbd::MockTestJournal &mock_journal,
+ bool is_owner) {
+ EXPECT_CALL(mock_journal, is_tag_owner()).WillOnce(Return(is_owner));
+ }
+
+ void expect_is_resync_requested(librbd::MockTestJournal &mock_journal,
+ int r, bool resync_requested) {
+ EXPECT_CALL(mock_journal, is_resync_requested(_)).WillOnce(
+ DoAll(SetArgPointee<0>(resync_requested),
+ Return(r)));
+ }
+
+ void expect_get_commit_tid_in_debug(
+ ::journal::MockReplayEntry &mock_replay_entry) {
+ // It is used in debug messages and depends on debug level
+ EXPECT_CALL(mock_replay_entry, get_commit_tid())
+ .Times(AtLeast(0))
+ .WillRepeatedly(Return(0));
+ }
+
+ void expect_get_tag_tid_in_debug(librbd::MockTestJournal &mock_journal) {
+ // It is used in debug messages and depends on debug level
+ EXPECT_CALL(mock_journal, get_tag_tid()).Times(AtLeast(0))
+ .WillRepeatedly(Return(0));
+ }
+
+ void expect_committed(::journal::MockReplayEntry &mock_replay_entry,
+ ::journal::MockJournaler &mock_journaler, int times) {
+ EXPECT_CALL(mock_replay_entry, get_data()).Times(times);
+ EXPECT_CALL(mock_journaler, committed(
+ MatcherCast<const ::journal::MockReplayEntryProxy&>(_)))
+ .Times(times);
+ }
+
+ void expect_try_pop_front(::journal::MockJournaler &mock_journaler,
+ uint64_t replay_tag_tid, bool entries_available) {
+ EXPECT_CALL(mock_journaler, try_pop_front(_, _))
+ .WillOnce(DoAll(SetArgPointee<0>(::journal::MockReplayEntryProxy()),
+ SetArgPointee<1>(replay_tag_tid),
+ Return(entries_available)));
+ }
+
+ void expect_try_pop_front_return_no_entries(
+ ::journal::MockJournaler &mock_journaler, Context *on_finish) {
+ EXPECT_CALL(mock_journaler, try_pop_front(_, _))
+ .WillOnce(DoAll(Invoke([on_finish](::journal::MockReplayEntryProxy *e,
+ uint64_t *t) {
+ on_finish->complete(0);
+ }),
+ Return(false)));
+ }
+
+ void expect_get_tag(::journal::MockJournaler &mock_journaler,
+ const cls::journal::Tag &tag, int r) {
+ EXPECT_CALL(mock_journaler, get_tag(_, _, _))
+ .WillOnce(DoAll(SetArgPointee<1>(tag),
+ WithArg<2>(CompleteContext(r))));
+ }
+
+ void expect_allocate_tag(librbd::MockTestJournal &mock_journal, int r) {
+ EXPECT_CALL(mock_journal, allocate_tag(_, _, _))
+ .WillOnce(WithArg<2>(CompleteContext(r)));
+ }
+
+ void expect_preprocess(MockEventPreprocessor &mock_event_preprocessor,
+ bool required, int r) {
+ EXPECT_CALL(mock_event_preprocessor, is_required(_))
+ .WillOnce(Return(required));
+ if (required) {
+ EXPECT_CALL(mock_event_preprocessor, preprocess(_, _))
+ .WillOnce(WithArg<1>(CompleteContext(r)));
+ }
+ }
+
+ void expect_process(MockReplay &mock_replay,
+ int on_ready_r, int on_commit_r) {
+ EXPECT_CALL(mock_replay, process(_, _, _))
+ .WillOnce(DoAll(WithArg<1>(CompleteContext(on_ready_r)),
+ WithArg<2>(CompleteContext(on_commit_r))));
+ }
+
+ void expect_flush(MockReplay& mock_replay, int r) {
+ EXPECT_CALL(mock_replay, flush(_))
+ .WillOnce(CompleteContext(m_threads->work_queue, r));
+ }
+
+ void expect_flush_commit_position(::journal::MockJournaler& mock_journal,
+ int r) {
+ EXPECT_CALL(mock_journal, flush_commit_position(_))
+ .WillOnce(CompleteContext(m_threads->work_queue, r));
+ }
+
+ void expect_get_tag_data(librbd::MockTestJournal& mock_local_journal,
+ const librbd::journal::TagData& tag_data) {
+ EXPECT_CALL(mock_local_journal, get_tag_data())
+ .WillOnce(Return(tag_data));
+ }
+
+ void expect_send(MockCloseImageRequest &mock_close_image_request, int r) {
+ EXPECT_CALL(mock_close_image_request, send())
+ .WillOnce(Invoke([this, &mock_close_image_request, r]() {
+ *mock_close_image_request.image_ctx = nullptr;
+ m_threads->work_queue->queue(mock_close_image_request.on_finish, r);
+ }));
+ }
+
+ void expect_notification(MockThreads& mock_threads,
+ MockReplayerListener& mock_replayer_listener) {
+ EXPECT_CALL(mock_replayer_listener, handle_notification())
+ .WillOnce(Invoke([this]() {
+ std::unique_lock locker{m_lock};
+ m_notified = true;
+ m_cond.notify_all();
+ }));
+ }
+
+ int wait_for_notification() {
+ std::unique_lock locker{m_lock};
+ while (!m_notified) {
+ if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) {
+ return -ETIMEDOUT;
+ }
+ }
+ m_notified = false;
+ return 0;
+ }
+
+ int init_entry_replayer(MockReplayer& mock_replayer,
+ MockThreads& mock_threads,
+ MockReplayerListener& mock_replayer_listener,
+ librbd::MockTestJournal& mock_local_journal,
+ ::journal::MockJournaler& mock_remote_journaler,
+ MockReplay& mock_local_journal_replay,
+ librbd::journal::Listener** local_journal_listener,
+ ::journal::ReplayHandler** remote_replay_handler,
+ ::journal::JournalMetadataListener** remote_journal_listener) {
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_))
+ .WillOnce(SaveArg<0>(remote_journal_listener));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+ {librbd::journal::MirrorPeerClientMeta{}}, 0);
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ EXPECT_CALL(mock_local_journal, add_listener(_))
+ .WillOnce(SaveArg<0>(local_journal_listener));
+ expect_is_tag_owner(mock_local_journal, false);
+ expect_is_resync_requested(mock_local_journal, 0, false);
+ EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _))
+ .WillOnce(SaveArg<0>(remote_replay_handler));
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ int r = init_ctx.wait();
+ if (r < 0) {
+ return r;
+ }
+
+ return wait_for_notification();
+ }
+
+ int shut_down_entry_replayer(MockReplayer& mock_replayer,
+ MockThreads& mock_threads,
+ librbd::MockTestJournal& mock_local_journal,
+ ::journal::MockJournaler& mock_remote_journaler,
+ MockReplay& mock_local_journal_replay) {
+ expect_shut_down(mock_local_journal_replay, true, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ return shutdown_ctx.wait();
+ }
+
+ librbd::ImageCtx* m_local_image_ctx = nullptr;
+
+ ceph::mutex m_lock = ceph::make_mutex(
+ "TestMockImageReplayerJournalReplayer");
+ ceph::condition_variable m_cond;
+ bool m_notified = false;
+};
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitShutDown) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitNoLocalJournal) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+
+ mock_local_image_ctx.journal = nullptr;
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(-EINVAL, init_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, -EINVAL);
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(-EINVAL, init_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerGetClientError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+ {librbd::journal::MirrorPeerClientMeta{}}, -EINVAL);
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(-EINVAL, init_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitLocalJournalStartExternalReplayError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+ {librbd::journal::MirrorPeerClientMeta{}}, 0);
+ expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(-EINVAL, init_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitIsPromoted) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+ {librbd::journal::MirrorPeerClientMeta{}}, 0);
+ MockReplay mock_local_journal_replay;
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ EXPECT_CALL(mock_local_journal, add_listener(_));
+ expect_is_tag_owner(mock_local_journal, true);
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(0, init_ctx.wait());
+ ASSERT_EQ(0, wait_for_notification());
+
+ expect_shut_down(mock_local_journal_replay, true, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(0, shutdown_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnected) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect",
+ "false");
+
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid",
+ {{}, {}, {},
+ cls::journal::CLIENT_STATE_DISCONNECTED},
+ {librbd::journal::MirrorPeerClientMeta{
+ mock_local_image_ctx.id}}, 0);
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(-ENOTCONN, init_ctx.wait());
+ ASSERT_FALSE(mock_replayer.is_resync_requested());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnectedResync) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect",
+ "true");
+
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid",
+ {{}, {}, {},
+ cls::journal::CLIENT_STATE_DISCONNECTED},
+ {librbd::journal::MirrorPeerClientMeta{
+ mock_local_image_ctx.id}}, 0);
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(-ENOTCONN, init_ctx.wait());
+ ASSERT_TRUE(mock_replayer.is_resync_requested());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequested) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+ {librbd::journal::MirrorPeerClientMeta{}}, 0);
+ MockReplay mock_local_journal_replay;
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ EXPECT_CALL(mock_local_journal, add_listener(_));
+ expect_is_tag_owner(mock_local_journal, false);
+ expect_is_resync_requested(mock_local_journal, 0, true);
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(0, init_ctx.wait());
+ ASSERT_EQ(0, wait_for_notification());
+
+ expect_shut_down(mock_local_journal_replay, true, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(0, shutdown_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequestedError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ expect_init(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+ {librbd::journal::MirrorPeerClientMeta{}}, 0);
+ MockReplay mock_local_journal_replay;
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ EXPECT_CALL(mock_local_journal, add_listener(_));
+ expect_is_tag_owner(mock_local_journal, false);
+ expect_is_resync_requested(mock_local_journal, -EINVAL, false);
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ C_SaferCond init_ctx;
+ mock_replayer.init(&init_ctx);
+ ASSERT_EQ(0, init_ctx.wait());
+ ASSERT_EQ(0, wait_for_notification());
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ expect_shut_down(mock_local_journal_replay, true, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(0, shutdown_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ShutDownLocalJournalReplayError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_shut_down(mock_local_journal_replay, true, -EINVAL);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, -EPERM);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(-EINVAL, shutdown_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, CloseLocalImageError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_shut_down(mock_local_journal_replay, true, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, -EINVAL);
+ expect_stop_replay(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, -EPERM);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(-EINVAL, shutdown_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, StopRemoteJournalerError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_shut_down(mock_local_journal_replay, true, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, -EPERM);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(-EPERM, shutdown_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ShutDownRemoteJournalerError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_shut_down(mock_local_journal_replay, true, 0);
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, -EPERM);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(-EPERM, shutdown_ctx.wait());
+ ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, Replay) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_add_event_after_repeatedly(mock_threads);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+ expect_committed(mock_replay_entry, mock_remote_journaler, 2);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+ // replay_flush
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, 0);
+
+ // process
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+ expect_preprocess(mock_event_preprocessor, false, 0);
+ expect_process(mock_local_journal_replay, 0, 0);
+
+ // the next event with preprocess
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+ expect_preprocess(mock_event_preprocessor, true, 0);
+ expect_process(mock_local_journal_replay, 0, 0);
+
+ // attempt to process the next event
+ C_SaferCond replay_ctx;
+ expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+ // fire
+ remote_replay_handler->handle_entries_available();
+ ASSERT_EQ(0, replay_ctx.wait());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, DecodeError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_add_event_after_repeatedly(mock_threads);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+ // replay_flush
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, 0);
+
+ // process
+ EXPECT_CALL(mock_replay_entry, get_data());
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _))
+ .WillOnce(Return(-EINVAL));
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ // fire
+ remote_replay_handler->handle_entries_available();
+ wait_for_notification();
+
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, DelayedReplay) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_add_event_after_repeatedly(mock_threads);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+ expect_committed(mock_replay_entry, mock_remote_journaler, 1);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+ // replay_flush
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, 0);
+
+ // process with delay
+ EXPECT_CALL(mock_replay_entry, get_data());
+ librbd::journal::EventEntry event_entry(
+ librbd::journal::AioDiscardEvent(123, 345, 0), ceph_clock_now());
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _))
+ .WillOnce(DoAll(SetArgPointee<1>(event_entry),
+ Return(0)));
+
+ Context* delayed_task_ctx = nullptr;
+ EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
+ .WillOnce(
+ DoAll(Invoke([this, &delayed_task_ctx](double seconds, Context *ctx) {
+ std::unique_lock locker{m_lock};
+ delayed_task_ctx = ctx;
+ m_cond.notify_all();
+ }),
+ ReturnArg<1>()));
+ expect_preprocess(mock_event_preprocessor, false, 0);
+ expect_process(mock_local_journal_replay, 0, 0);
+
+ // attempt to process the next event
+ C_SaferCond replay_ctx;
+ expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+ // fire
+ mock_local_image_ctx.mirroring_replay_delay = 600;
+ remote_replay_handler->handle_entries_available();
+ {
+ std::unique_lock locker{m_lock};
+ while (delayed_task_ctx == nullptr) {
+ if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) {
+ FAIL() << "timed out waiting for task";
+ break;
+ }
+ }
+ }
+ {
+ std::unique_lock timer_locker{mock_threads.timer_lock};
+ delayed_task_ctx->complete(0);
+ }
+ ASSERT_EQ(0, replay_ctx.wait());
+
+ // add a pending (delayed) entry before stop
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ C_SaferCond decode_ctx;
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _))
+ .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::const_iterator* it,
+ librbd::journal::EventEntry *e) {
+ decode_ctx.complete(0);
+ }),
+ Return(0)));
+
+ remote_replay_handler->handle_entries_available();
+ ASSERT_EQ(0, decode_ctx.wait());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ReplayNoMemoryError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_notification(mock_threads, mock_replayer_listener);
+ remote_replay_handler->handle_complete(-ENOMEM);
+
+ wait_for_notification();
+ ASSERT_EQ(false, mock_replayer.is_replaying());
+ ASSERT_EQ(-ENOMEM, mock_replayer.get_error_code());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalForcePromoted) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_notification(mock_threads, mock_replayer_listener);
+ local_journal_listener->handle_promoted();
+ wait_for_notification();
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalResyncRequested) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_notification(mock_threads, mock_replayer_listener);
+ local_journal_listener->handle_resync();
+ wait_for_notification();
+
+ ASSERT_TRUE(mock_replayer.is_resync_requested());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, RemoteJournalDisconnected) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect",
+ "true");
+
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_get_cached_client(mock_remote_journaler, "local mirror uuid",
+ {{}, {}, {},
+ cls::journal::CLIENT_STATE_DISCONNECTED},
+ {librbd::journal::MirrorPeerClientMeta{
+ mock_local_image_ctx.id}}, 0);
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ remote_journaler_listener->handle_update(nullptr);
+ wait_for_notification();
+
+ ASSERT_EQ(-ENOTCONN, mock_replayer.get_error_code());
+ ASSERT_FALSE(mock_replayer.is_replaying());
+ ASSERT_TRUE(mock_replayer.is_resync_requested());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, Flush) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_flush(mock_local_journal_replay, 0);
+ expect_flush_commit_position(mock_remote_journaler, 0);
+
+ C_SaferCond ctx;
+ mock_replayer.flush(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, FlushError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_flush(mock_local_journal_replay, -EINVAL);
+
+ C_SaferCond ctx;
+ mock_replayer.flush(&ctx);
+ ASSERT_EQ(-EINVAL, ctx.wait());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, FlushCommitPositionError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ expect_flush(mock_local_journal_replay, 0);
+ expect_flush_commit_position(mock_remote_journaler, -EINVAL);
+
+ C_SaferCond ctx;
+ mock_replayer.flush(&ctx);
+ ASSERT_EQ(-EINVAL, ctx.wait());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+
+TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushShutDownError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_try_pop_front(mock_remote_journaler, 1, true);
+ expect_shut_down(mock_local_journal_replay, false, -EINVAL);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_notification(mock_threads, mock_replayer_listener);
+ remote_replay_handler->handle_entries_available();
+
+ wait_for_notification();
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(0, shutdown_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushStartError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_try_pop_front(mock_remote_journaler, 1, true);
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
+ expect_notification(mock_threads, mock_replayer_listener);
+ remote_replay_handler->handle_entries_available();
+
+ wait_for_notification();
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ MockCloseImageRequest mock_close_image_request;
+ expect_send(mock_close_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ C_SaferCond shutdown_ctx;
+ mock_replayer.shut_down(&shutdown_ctx);
+ ASSERT_EQ(0, shutdown_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, GetTagError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ expect_work_queue_repeatedly(mock_threads);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, -EINVAL);
+ expect_notification(mock_threads, mock_replayer_listener);
+ remote_replay_handler->handle_entries_available();
+
+ wait_for_notification();
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagDemotion) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_notification(mock_threads, mock_replayer_listener);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+ expect_committed(mock_replay_entry, mock_remote_journaler, 1);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::ORPHAN_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_get_tag_data(mock_local_journal, {});
+ expect_allocate_tag(mock_local_journal, 0);
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+ expect_preprocess(mock_event_preprocessor, false, 0);
+ expect_process(mock_local_journal_replay, 0, 0);
+
+ remote_replay_handler->handle_entries_available();
+ wait_for_notification();
+ ASSERT_FALSE(mock_replayer.is_replaying());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, -EINVAL);
+ expect_notification(mock_threads, mock_replayer_listener);
+ remote_replay_handler->handle_entries_available();
+
+ wait_for_notification();
+ ASSERT_FALSE(mock_replayer.is_replaying());
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, PreprocessError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, 0);
+ EXPECT_CALL(mock_replay_entry, get_data());
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+ expect_preprocess(mock_event_preprocessor, true, -EINVAL);
+
+ expect_notification(mock_threads, mock_replayer_listener);
+ remote_replay_handler->handle_entries_available();
+
+ wait_for_notification();
+ ASSERT_FALSE(mock_replayer.is_replaying());
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ProcessError) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, 0);
+ EXPECT_CALL(mock_replay_entry, get_data());
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+ expect_preprocess(mock_event_preprocessor, false, 0);
+ expect_process(mock_local_journal_replay, 0, -EINVAL);
+
+ // attempt to process the next event
+ C_SaferCond replay_ctx;
+ expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+ remote_replay_handler->handle_entries_available();
+
+ wait_for_notification();
+ ASSERT_FALSE(mock_replayer.is_replaying());
+ ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+ ASSERT_EQ(0, replay_ctx.wait());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ImageNameUpdated) {
+ librbd::MockTestJournal mock_local_journal;
+ librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+ mock_local_journal};
+ librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+ ::journal::MockJournaler mock_remote_journaler;
+ MockReplayerListener mock_replayer_listener;
+ MockThreads mock_threads{m_threads};
+ MockReplayer mock_replayer{
+ &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+ "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ expect_work_queue_repeatedly(mock_threads);
+ expect_add_event_after_repeatedly(mock_threads);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_get_tag_tid_in_debug(mock_local_journal);
+ expect_committed(mock_replay_entry, mock_remote_journaler, 1);
+ expect_notification(mock_threads, mock_replayer_listener);
+
+ InSequence seq;
+
+ MockReplay mock_local_journal_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ librbd::journal::Listener* local_journal_listener = nullptr;
+ ::journal::ReplayHandler* remote_replay_handler = nullptr;
+ ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+ ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+ mock_replayer_listener, mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay,
+ &local_journal_listener,
+ &remote_replay_handler,
+ &remote_journaler_listener));
+
+ mock_local_image_ctx.name = "NEW NAME";
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ expect_shut_down(mock_local_journal_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+ 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, 0);
+ EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+ expect_preprocess(mock_event_preprocessor, false, 0);
+ expect_process(mock_local_journal_replay, 0, 0);
+
+ // attempt to process the next event
+ C_SaferCond replay_ctx;
+ expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+ remote_replay_handler->handle_entries_available();
+ wait_for_notification();
+
+ auto image_spec = util::compute_image_spec(m_local_io_ctx, "NEW NAME");
+ ASSERT_EQ(image_spec, mock_replayer.get_image_spec());
+
+ ASSERT_EQ(0, replay_ctx.wait());
+ ASSERT_TRUE(mock_replayer.is_replaying());
+
+ ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+ mock_local_journal,
+ mock_remote_journaler,
+ mock_local_journal_replay));
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Replayer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/Replay.h"
+#include "journal/Journaler.h"
+#include "journal/JournalMetadataListener.h"
+#include "journal/ReplayHandler.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Types.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "Replayer: " << this << " " << __func__ << ": "
+
+extern PerfCounters *g_perf_counters;
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+namespace {
+
+uint32_t calculate_replay_delay(const utime_t &event_time,
+ int mirroring_replay_delay) {
+ if (mirroring_replay_delay <= 0) {
+ return 0;
+ }
+
+ utime_t now = ceph_clock_now();
+ if (event_time + mirroring_replay_delay <= now) {
+ return 0;
+ }
+
+ // ensure it is rounded up when converting to integer
+ return (event_time + mirroring_replay_delay - now) + 1;
+}
+
+} // anonymous namespace
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+struct Replayer<I>::C_ReplayCommitted : public Context {
+ Replayer* replayer;
+ ReplayEntry replay_entry;
+ uint64_t replay_bytes;
+ utime_t replay_start_time;
+
+ C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry,
+ uint64_t replay_bytes, const utime_t &replay_start_time)
+ : replayer(replayer), replay_entry(std::move(replay_entry)),
+ replay_bytes(replay_bytes), replay_start_time(replay_start_time) {
+ }
+
+ void finish(int r) override {
+ replayer->handle_process_entry_safe(replay_entry, replay_bytes,
+ replay_start_time, r);
+ }
+};
+
+template <typename I>
+struct Replayer<I>::C_TrackedOp : public Context {
+ Replayer *replayer;
+ Context* ctx;
+
+ C_TrackedOp(Replayer* replayer, Context* ctx)
+ : replayer(replayer), ctx(ctx) {
+ replayer->m_in_flight_op_tracker.start_op();
+ }
+
+ void finish(int r) override {
+ ctx->complete(r);
+ replayer->m_in_flight_op_tracker.finish_op();
+ }
+};
+
+template <typename I>
+struct Replayer<I>::RemoteJournalerListener
+ : public ::journal::JournalMetadataListener {
+ Replayer* replayer;
+
+ RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
+
+ void handle_update(::journal::JournalMetadata*) override {
+ auto ctx = new C_TrackedOp(replayer, new LambdaContext([this](int r) {
+ replayer->handle_remote_journal_metadata_updated();
+ }));
+ replayer->m_threads->work_queue->queue(ctx, 0);
+ }
+};
+
+template <typename I>
+struct Replayer<I>::RemoteReplayHandler : public ::journal::ReplayHandler {
+ Replayer* replayer;
+
+ RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {}
+ ~RemoteReplayHandler() override {};
+
+ void handle_entries_available() override {
+ replayer->handle_replay_ready();
+ }
+
+ void handle_complete(int r) override {
+ std::string error;
+ if (r == -ENOMEM) {
+ error = "not enough memory in autotune cache";
+ } else if (r < 0) {
+ error = "replay completed with error: " + cpp_strerror(r);
+ }
+ replayer->handle_replay_complete(r, error);
+ }
+};
+
+template <typename I>
+struct Replayer<I>::LocalJournalListener
+ : public librbd::journal::Listener {
+ Replayer* replayer;
+
+ LocalJournalListener(Replayer* replayer) : replayer(replayer) {
+ }
+
+ void handle_close() override {
+ replayer->handle_replay_complete(0, "");
+ }
+
+ void handle_promoted() override {
+ replayer->handle_replay_complete(0, "force promoted");
+ }
+
+ void handle_resync() override {
+ replayer->handle_resync_image();
+ }
+};
+
+template <typename I>
+Replayer<I>::Replayer(
+ I** local_image_ctx, Journaler* remote_journaler,
+ std::string local_mirror_uuid, std::string remote_mirror_uuid,
+ ReplayerListener* replayer_listener, Threads<I>* threads)
+ : m_local_image_ctx(local_image_ctx),
+ m_remote_journaler(remote_journaler),
+ m_local_mirror_uuid(local_mirror_uuid),
+ m_remote_mirror_uuid(remote_mirror_uuid),
+ m_replayer_listener(replayer_listener),
+ m_threads(threads),
+ m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
+ "rbd::mirror::image_replayer::journal::Replayer", this))) {
+ dout(10) << dendl;
+}
+
+template <typename I>
+Replayer<I>::~Replayer() {
+ dout(10) << dendl;
+ ceph_assert(m_remote_listener == nullptr);
+ ceph_assert(m_local_journal_listener == nullptr);
+ ceph_assert(m_local_journal_replay == nullptr);
+ ceph_assert(m_remote_replay_handler == nullptr);
+ ceph_assert(m_event_preprocessor == nullptr);
+ ceph_assert(m_replay_status_formatter == nullptr);
+ ceph_assert(m_delayed_preprocess_task == nullptr);
+ ceph_assert(m_flush_local_replay_task == nullptr);
+ ceph_assert(*m_local_image_ctx == nullptr);
+}
+
+template <typename I>
+void Replayer<I>::init(Context* on_finish) {
+ dout(10) << dendl;
+
+ ceph_assert(m_local_journal == nullptr);
+ {
+ auto local_image_ctx = *m_local_image_ctx;
+ std::shared_lock image_locker{local_image_ctx->image_lock};
+ m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
+ local_image_ctx->name);
+ m_local_journal = local_image_ctx->journal;
+ }
+
+ ceph_assert(m_on_init_shutdown == nullptr);
+ m_on_init_shutdown = on_finish;
+
+ if (m_local_journal == nullptr) {
+ std::unique_lock locker{m_lock};
+ m_state = STATE_COMPLETE;
+ m_remote_journaler = nullptr;
+
+ handle_replay_complete(locker, -EINVAL, "error accessing local journal");
+ close_local_image();
+ return;
+ }
+
+ init_remote_journaler();
+}
+
+template <typename I>
+void Replayer<I>::shut_down(Context* on_finish) {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_on_init_shutdown == nullptr);
+ m_on_init_shutdown = on_finish;
+
+ if (m_state == STATE_INIT) {
+ // raced with the last piece of the init state machine
+ return;
+ } else if (m_state == STATE_REPLAYING) {
+ m_state = STATE_COMPLETE;
+ }
+
+ // if shutting down due to an error notification, we don't
+ // need to propagate the same error again
+ m_error_code = 0;
+ m_error_description = "";
+
+ cancel_delayed_preprocess_task();
+ cancel_flush_local_replay_task();
+ shut_down_local_journal_replay();
+}
+
+template <typename I>
+void Replayer<I>::flush(Context* on_finish) {
+ dout(10) << dendl;
+
+ flush_local_replay(new C_TrackedOp(this, on_finish));
+}
+
+template <typename I>
+bool Replayer<I>::get_replay_status(std::string* description,
+ Context* on_finish) {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (m_replay_status_formatter == nullptr) {
+ derr << "replay not running" << dendl;
+ locker.unlock();
+
+ on_finish->complete(-EAGAIN);
+ return false;
+ }
+
+ on_finish = new C_TrackedOp(this, on_finish);
+ return m_replay_status_formatter->get_or_send_update(description,
+ on_finish);
+}
+
+template <typename I>
+void Replayer<I>::init_remote_journaler() {
+ dout(10) << dendl;
+
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_init_remote_journaler>(this);
+ m_remote_journaler->init(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_init_remote_journaler(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(locker, r, "error initializing remote journal");
+ close_local_image();
+ return;
+ }
+
+ // listen for metadata updates to check for disconnect events
+ ceph_assert(m_remote_listener == nullptr);
+ m_remote_listener = new RemoteJournalerListener(this);
+ m_remote_journaler->add_listener(m_remote_listener);
+
+ cls::journal::Client remote_client;
+ r = m_remote_journaler->get_cached_client(m_local_mirror_uuid,
+ &remote_client);
+ if (r < 0) {
+ derr << "error retrieving remote journal client: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(locker, r, "error retrieving remote journal client");
+ close_local_image();
+ return;
+ }
+
+ std::string error;
+ r = validate_remote_client_state(remote_client, &m_remote_client_meta,
+ &m_resync_requested, &error);
+ if (r < 0) {
+ handle_replay_complete(locker, r, error);
+ close_local_image();
+ return;
+ }
+
+ start_external_replay();
+}
+
+template <typename I>
+void Replayer<I>::start_external_replay() {
+ dout(10) << dendl;
+
+ Context *start_ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_start_external_replay>(this);
+ m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_start_external_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ ceph_assert(m_local_journal_replay == nullptr);
+ derr << "error starting external replay on local image "
+ << (*m_local_image_ctx)->id << ": " << cpp_strerror(r) << dendl;
+
+ handle_replay_complete(locker, r, "error starting replay on local image");
+ close_local_image();
+ return;
+ }
+
+ if (!notify_init_complete(locker)) {
+ return;
+ }
+
+ m_state = STATE_REPLAYING;
+
+ // listen for promotion and resync requests against local journal
+ m_local_journal_listener = new LocalJournalListener(this);
+ m_local_journal->add_listener(m_local_journal_listener);
+
+ // verify that the local image wasn't force-promoted and that a resync hasn't
+ // been requested now that we are listening for events
+ if (m_local_journal->is_tag_owner()) {
+ dout(10) << "local image force-promoted" << dendl;
+ handle_replay_complete(locker, 0, "force promoted");
+ return;
+ }
+
+ bool resync_requested = false;
+ r = m_local_journal->is_resync_requested(&resync_requested);
+ if (r < 0) {
+ dout(10) << "failed to determine resync state: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(locker, r, "error parsing resync state");
+ return;
+ } else if (resync_requested) {
+ dout(10) << "local image resync requested" << dendl;
+ handle_replay_complete(locker, 0, "resync requested");
+ return;
+ }
+
+ // start remote journal replay
+ m_event_preprocessor = EventPreprocessor<I>::create(
+ **m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
+ &m_remote_client_meta, m_threads->work_queue);
+ m_replay_status_formatter = ReplayStatusFormatter<I>::create(
+ m_remote_journaler, m_local_mirror_uuid);
+
+ auto cct = static_cast<CephContext *>((*m_local_image_ctx)->cct);
+ double poll_seconds = cct->_conf.get_val<double>(
+ "rbd_mirror_journal_poll_age");
+ m_remote_replay_handler = new RemoteReplayHandler(this);
+ m_remote_journaler->start_live_replay(m_remote_replay_handler, poll_seconds);
+
+ notify_status_updated();
+}
+
+template <typename I>
+bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
+ dout(10) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(m_state == STATE_INIT);
+
+ // notify that init has completed
+ Context *on_finish = nullptr;
+ std::swap(m_on_init_shutdown, on_finish);
+
+ locker.unlock();
+ on_finish->complete(0);
+ locker.lock();
+
+ if (m_on_init_shutdown != nullptr) {
+ // shut down requested after we notified init complete but before we
+ // grabbed the lock
+ close_local_image();
+ return false;
+ }
+
+ return true;
+}
+
+template <typename I>
+void Replayer<I>::shut_down_local_journal_replay() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (m_local_journal_replay == nullptr) {
+ wait_for_event_replay();
+ return;
+ }
+
+ dout(10) << dendl;
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
+ m_local_journal_replay->shut_down(true, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_shut_down_local_journal_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl;
+ handle_replay_error(r, "failed to shut down local journal replay");
+ }
+
+ wait_for_event_replay();
+}
+
+template <typename I>
+void Replayer<I>::wait_for_event_replay() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << dendl;
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
+ m_event_replay_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_event_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ close_local_image();
+}
+
+template <typename I>
+void Replayer<I>::close_local_image() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << dendl;
+ if (m_local_journal_listener != nullptr) {
+ // blocks if listener notification is in-progress
+ m_local_journal->remove_listener(m_local_journal_listener);
+ delete m_local_journal_listener;
+ m_local_journal_listener = nullptr;
+ }
+
+ if (m_local_journal_replay != nullptr) {
+ m_local_journal->stop_external_replay();
+ m_local_journal_replay = nullptr;
+ }
+
+ if (m_event_preprocessor != nullptr) {
+ image_replayer::journal::EventPreprocessor<I>::destroy(
+ m_event_preprocessor);
+ m_event_preprocessor = nullptr;
+ }
+
+ m_local_journal.reset();
+
+ // NOTE: it's important to ensure that the local image is fully
+ // closed before attempting to close the remote journal in
+ // case the remote cluster is unreachable
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_close_local_image>(this);
+ auto request = image_replayer::CloseImageRequest<I>::create(
+ m_local_image_ctx, ctx);
+ request->send();
+}
+
+
+template <typename I>
+void Replayer<I>::handle_close_local_image(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "error closing local iamge: " << cpp_strerror(r) << dendl;
+ handle_replay_error(r, "failed to close local image");
+ }
+
+ ceph_assert(*m_local_image_ctx == nullptr);
+ stop_remote_journaler_replay();
+}
+
+template <typename I>
+void Replayer<I>::stop_remote_journaler_replay() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (m_remote_journaler == nullptr) {
+ wait_for_in_flight_ops();
+ return;
+ } else if (m_remote_replay_handler == nullptr) {
+ shut_down_remote_journaler();
+ return;
+ }
+
+ dout(10) << dendl;
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_stop_remote_journaler_replay>(this));
+ m_remote_journaler->stop_replay(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_stop_remote_journaler_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "failed to stop remote journaler replay : " << cpp_strerror(r)
+ << dendl;
+ handle_replay_error(r, "failed to stop remote journaler replay");
+ }
+
+ delete m_remote_replay_handler;
+ m_remote_replay_handler = nullptr;
+
+ shut_down_remote_journaler();
+}
+
+template <typename I>
+void Replayer<I>::shut_down_remote_journaler() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << dendl;
+ if (m_remote_listener != nullptr) {
+ m_remote_journaler->remove_listener(m_remote_listener);
+ delete m_remote_listener;
+ m_remote_listener = nullptr;
+ }
+
+ auto ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_shut_down_remote_journaler>(this);
+ m_remote_journaler->shut_down(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_shut_down_remote_journaler(int r) {
+ dout(10) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to shut down remote journaler: " << cpp_strerror(r)
+ << dendl;
+
+ std::unique_lock locker{m_lock};
+ handle_replay_error(r, "failed to shut down remote journaler");
+ }
+
+ m_remote_journaler = nullptr;
+
+ wait_for_in_flight_ops();
+}
+
+template <typename I>
+void Replayer<I>::wait_for_in_flight_ops() {
+ dout(10) << dendl;
+
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
+ m_in_flight_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
+ m_replay_status_formatter = nullptr;
+
+ ceph_assert(m_on_init_shutdown != nullptr);
+ m_on_init_shutdown->complete(m_error_code);
+}
+
+template <typename I>
+void Replayer<I>::handle_remote_journal_metadata_updated() {
+ dout(20) << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ cls::journal::Client remote_client;
+ int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid,
+ &remote_client);
+ if (r < 0) {
+ derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ librbd::journal::MirrorPeerClientMeta remote_client_meta;
+ std::string error;
+ r = validate_remote_client_state(remote_client, &remote_client_meta,
+ &m_resync_requested, &error);
+ if (r < 0) {
+ dout(0) << "client flagged disconnected, stopping image replay" << dendl;
+ handle_replay_complete(locker, r, error);
+ }
+}
+
+template <typename I>
+void Replayer<I>::schedule_flush_local_replay_task() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
+ return;
+ }
+
+ dout(15) << dendl;
+ m_flush_local_replay_task = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_flush_local_replay_task>(this));
+ m_threads->timer->add_event_after(30, m_flush_local_replay_task);
+}
+
+template <typename I>
+void Replayer<I>::cancel_flush_local_replay_task() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_flush_local_replay_task != nullptr) {
+ dout(10) << dendl;
+ m_threads->timer->cancel_event(m_flush_local_replay_task);
+ m_flush_local_replay_task = nullptr;
+ }
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_local_replay_task(int) {
+ dout(15) << dendl;
+
+ m_in_flight_op_tracker.start_op();
+ auto on_finish = new LambdaContext([this](int) {
+ std::unique_lock locker{m_lock};
+
+ {
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ m_flush_local_replay_task = nullptr;
+ }
+
+ notify_status_updated();
+ m_in_flight_op_tracker.finish_op();
+ });
+ flush_local_replay(on_finish);
+}
+
+template <typename I>
+void Replayer<I>::flush_local_replay(Context* on_flush) {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ locker.unlock();
+ on_flush->complete(0);
+ return;
+ } else if (m_local_journal_replay == nullptr) {
+ // raced w/ a tag creation stop/start, which implies that
+ // the replay is flushed
+ locker.unlock();
+ flush_commit_position(on_flush);
+ return;
+ }
+
+ dout(15) << dendl;
+ auto ctx = new LambdaContext(
+ [this, on_flush](int r) {
+ handle_flush_local_replay(on_flush, r);
+ });
+ m_local_journal_replay->flush(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_local_replay(Context* on_flush, int r) {
+ dout(15) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
+ on_flush->complete(r);
+ return;
+ }
+
+ flush_commit_position(on_flush);
+}
+
+template <typename I>
+void Replayer<I>::flush_commit_position(Context* on_flush) {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ locker.unlock();
+ on_flush->complete(0);
+ return;
+ }
+
+ dout(15) << dendl;
+ auto ctx = new LambdaContext(
+ [this, on_flush](int r) {
+ handle_flush_commit_position(on_flush, r);
+ });
+ m_remote_journaler->flush_commit_position(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_commit_position(Context* on_flush, int r) {
+ dout(15) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "error flushing remote journal commit position: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ on_flush->complete(r);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_error(int r, const std::string &error) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (m_error_code == 0) {
+ m_error_code = r;
+ m_error_description = error;
+ }
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_complete() const {
+ std::unique_lock locker{m_lock};
+ return is_replay_complete(locker);
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_complete(
+ const std::unique_lock<ceph::mutex>&) const {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ return (m_state == STATE_COMPLETE);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_complete(int r, const std::string &error) {
+ std::unique_lock locker{m_lock};
+ handle_replay_complete(locker, r, error);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_complete(
+ const std::unique_lock<ceph::mutex>&, int r, const std::string &error) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << "r=" << r << ", error=" << error << dendl;
+ if (r < 0) {
+ derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
+ handle_replay_error(r, error);
+ }
+
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ m_state = STATE_COMPLETE;
+ notify_status_updated();
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_ready() {
+ std::unique_lock locker{m_lock};
+ handle_replay_ready(locker);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_ready(
+ std::unique_lock<ceph::mutex>& locker) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(20) << dendl;
+ if (is_replay_complete(locker)) {
+ return;
+ }
+
+ if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
+ dout(20) << "no entries ready for replay" << dendl;
+ return;
+ }
+
+ // can safely drop lock once the entry is tracked
+ m_event_replay_tracker.start_op();
+ locker.unlock();
+
+ dout(20) << "entry tid=" << m_replay_entry.get_commit_tid()
+ << "tag_tid=" << m_replay_tag_tid << dendl;
+ if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) {
+ // must allocate a new local journal tag prior to processing
+ replay_flush();
+ return;
+ }
+
+ preprocess_entry();
+}
+
+template <typename I>
+void Replayer<I>::replay_flush() {
+ dout(10) << dendl;
+
+ // shut down the replay to flush all IO and ops and create a new
+ // replayer to handle the new tag epoch
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_replay_flush_shut_down>(this);
+ ceph_assert(m_local_journal_replay != nullptr);
+ m_local_journal_replay->shut_down(false, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_flush_shut_down(int r) {
+ {
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_local_journal != nullptr);
+ m_local_journal->stop_external_replay();
+ m_local_journal_replay = nullptr;
+ }
+
+ dout(10) << "r=" << r << dendl;
+ if (r < 0) {
+ handle_replay_flush(r);
+ return;
+ }
+
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_replay_flush>(this);
+ m_local_journal->start_external_replay(&m_local_journal_replay, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_flush(int r) {
+ dout(10) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(r, "replay flush encountered an error");
+ m_event_replay_tracker.finish_op();
+ return;
+ } else if (is_replay_complete()) {
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ get_remote_tag();
+}
+
+template <typename I>
+void Replayer<I>::get_remote_tag() {
+ dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
+
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_get_remote_tag>(this);
+ m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_get_remote_tag(int r) {
+ dout(15) << "r=" << r << dendl;
+
+ if (r == 0) {
+ try {
+ auto it = m_replay_tag.data.cbegin();
+ decode(m_replay_tag_data, it);
+ } catch (const buffer::error &err) {
+ r = -EBADMSG;
+ }
+ }
+
+ if (r < 0) {
+ derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
+ << cpp_strerror(r) << dendl;
+ handle_replay_complete(r, "failed to retrieve remote tag");
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ m_replay_tag_valid = true;
+ dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
+ << m_replay_tag_data << dendl;
+
+ allocate_local_tag();
+}
+
+template <typename I>
+void Replayer<I>::allocate_local_tag() {
+ dout(15) << dendl;
+
+ std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
+ if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ mirror_uuid = m_remote_mirror_uuid;
+ } else if (mirror_uuid == m_local_mirror_uuid) {
+ mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
+ } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
+ // handle possible edge condition where daemon can failover and
+ // the local image has already been promoted/demoted
+ auto local_tag_data = m_local_journal->get_tag_data();
+ if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
+ (local_tag_data.predecessor.commit_valid &&
+ local_tag_data.predecessor.mirror_uuid ==
+ librbd::Journal<>::LOCAL_MIRROR_UUID)) {
+ dout(15) << "skipping stale demotion event" << dendl;
+ handle_process_entry_safe(m_replay_entry, m_replay_bytes,
+ m_replay_start_time, 0);
+ handle_replay_ready();
+ return;
+ } else {
+ dout(5) << "encountered image demotion: stopping" << dendl;
+ handle_replay_complete(0, "");
+ }
+ }
+
+ librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
+ if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ predecessor.mirror_uuid = m_remote_mirror_uuid;
+ } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
+ predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
+ }
+
+ dout(15) << "mirror_uuid=" << mirror_uuid << ", "
+ << "predecessor=" << predecessor << ", "
+ << "replay_tag_tid=" << m_replay_tag_tid << dendl;
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_allocate_local_tag>(this);
+ m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_allocate_local_tag(int r) {
+ dout(15) << "r=" << r << ", "
+ << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
+ if (r < 0) {
+ derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(r, "failed to allocate journal tag");
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ preprocess_entry();
+}
+
+template <typename I>
+void Replayer<I>::preprocess_entry() {
+ dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
+ << dendl;
+
+ bufferlist data = m_replay_entry.get_data();
+ auto it = data.cbegin();
+ int r = m_local_journal_replay->decode(&it, &m_event_entry);
+ if (r < 0) {
+ derr << "failed to decode journal event" << dendl;
+ handle_replay_complete(r, "failed to decode journal event");
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ m_replay_bytes = data.length();
+ uint32_t delay = calculate_replay_delay(
+ m_event_entry.timestamp, (*m_local_image_ctx)->mirroring_replay_delay);
+ if (delay == 0) {
+ handle_preprocess_entry_ready(0);
+ return;
+ }
+
+ std::unique_lock locker{m_lock};
+ if (is_replay_complete(locker)) {
+ // don't schedule a delayed replay task if a shut-down is in-progress
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ dout(20) << "delaying replay by " << delay << " sec" << dendl;
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ ceph_assert(m_delayed_preprocess_task == nullptr);
+ m_delayed_preprocess_task = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_delayed_preprocess_task>(this);
+ m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
+}
+
+template <typename I>
+void Replayer<I>::handle_delayed_preprocess_task(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+ m_delayed_preprocess_task = nullptr;
+
+ m_threads->work_queue->queue(create_context_callback<
+ Replayer, &Replayer<I>::handle_preprocess_entry_ready>(this), 0);
+}
+
+template <typename I>
+void Replayer<I>::handle_preprocess_entry_ready(int r) {
+ dout(20) << "r=" << r << dendl;
+ ceph_assert(r == 0);
+
+ m_replay_start_time = ceph_clock_now();
+ if (!m_event_preprocessor->is_required(m_event_entry)) {
+ process_entry();
+ return;
+ }
+
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_preprocess_entry_safe>(this);
+ m_event_preprocessor->preprocess(&m_event_entry, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_preprocess_entry_safe(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ if (r == -ECANCELED) {
+ handle_replay_complete(0, "lost exclusive lock");
+ } else {
+ derr << "failed to preprocess journal event" << dendl;
+ handle_replay_complete(r, "failed to preprocess journal event");
+ }
+
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ process_entry();
+}
+
+template <typename I>
+void Replayer<I>::process_entry() {
+ dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+ << dendl;
+
+ Context *on_ready = create_context_callback<
+ Replayer, &Replayer<I>::handle_process_entry_ready>(this);
+ Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
+ m_replay_bytes,
+ m_replay_start_time);
+
+ m_local_journal_replay->process(m_event_entry, on_ready, on_commit);
+}
+
+template <typename I>
+void Replayer<I>::handle_process_entry_ready(int r) {
+ std::unique_lock locker{m_lock};
+
+ dout(20) << dendl;
+ ceph_assert(r == 0);
+
+ bool update_status = false;
+ {
+ auto local_image_ctx = *m_local_image_ctx;
+ std::shared_lock image_locker{local_image_ctx->image_lock};
+ auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
+ local_image_ctx->name);
+ if (m_image_spec != image_spec) {
+ m_image_spec = image_spec;
+ update_status = true;
+ }
+ }
+
+ if (update_status) {
+ notify_status_updated();
+ }
+
+ // attempt to process the next event
+ handle_replay_ready(locker);
+}
+
+template <typename I>
+void Replayer<I>::handle_process_entry_safe(
+ const ReplayEntry &replay_entry, uint64_t replay_bytes,
+ const utime_t &replay_start_time, int r) {
+ dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
+ << dendl;
+
+ if (r < 0) {
+ derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(r, "failed to commit journal event");
+ } else {
+ ceph_assert(m_remote_journaler != nullptr);
+ m_remote_journaler->committed(replay_entry);
+ }
+
+ auto latency = ceph_clock_now() - replay_start_time;
+ if (g_perf_counters) {
+ g_perf_counters->inc(l_rbd_mirror_replay);
+ g_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
+ g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
+ }
+
+ auto ctx = new LambdaContext(
+ [this, replay_bytes, latency](int r) {
+ std::unique_lock locker{m_lock};
+ schedule_flush_local_replay_task();
+
+ if (m_perf_counters) {
+ m_perf_counters->inc(l_rbd_mirror_replay);
+ m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
+ m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
+ }
+
+ m_event_replay_tracker.finish_op();
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::handle_resync_image() {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ m_resync_requested = true;
+ handle_replay_complete(locker, 0, "resync requested");
+}
+
+template <typename I>
+void Replayer<I>::notify_status_updated() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << dendl;
+
+ auto ctx = new C_TrackedOp(this, new LambdaContext(
+ [this](int) {
+ m_replayer_listener->handle_notification();
+ }));
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::cancel_delayed_preprocess_task() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ bool canceled_delayed_preprocess_task = false;
+ {
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_delayed_preprocess_task != nullptr) {
+ dout(10) << dendl;
+ canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
+ m_delayed_preprocess_task);
+ ceph_assert(canceled_delayed_preprocess_task);
+ m_delayed_preprocess_task = nullptr;
+ }
+ }
+
+ if (canceled_delayed_preprocess_task) {
+ // wake up sleeping replay
+ m_event_replay_tracker.finish_op();
+ }
+}
+
+template <typename I>
+int Replayer<I>::validate_remote_client_state(
+ const cls::journal::Client& remote_client,
+ librbd::journal::MirrorPeerClientMeta* remote_client_meta,
+ bool* resync_requested, std::string* error) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (!util::decode_client_meta(remote_client, remote_client_meta)) {
+ // require operator intervention since the data is corrupt
+ *error = "error retrieving remote journal client";
+ return -EBADMSG;
+ }
+
+ auto local_image_ctx = *m_local_image_ctx;
+ dout(5) << "image_id=" << local_image_ctx->id << ", "
+ << "remote_client_meta.image_id="
+ << remote_client_meta->image_id << ", "
+ << "remote_client.state=" << remote_client.state << dendl;
+ if (remote_client_meta->image_id == local_image_ctx->id &&
+ remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+ dout(5) << "client flagged disconnected, stopping image replay" << dendl;
+ if (local_image_ctx->config.template get_val<bool>(
+ "rbd_mirroring_resync_after_disconnect")) {
+ dout(10) << "disconnected: automatic resync" << dendl;
+ *resync_requested = true;
+ *error = "disconnected: automatic resync";
+ return -ENOTCONN;
+ } else {
+ dout(10) << "disconnected" << dendl;
+ *error = "disconnected";
+ return -ENOTCONN;
+ }
+ }
+
+ return 0;
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;