]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rbd-mirror: extract journal replay logic to its own class
authorJason Dillaman <dillaman@redhat.com>
Sat, 7 Dec 2019 21:52:51 +0000 (16:52 -0500)
committerJason Dillaman <dillaman@redhat.com>
Mon, 16 Dec 2019 01:08:10 +0000 (20:08 -0500)
This will help to greatly reduce the journal-specific code in the
current image replayer.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc [new file with mode: 0644]
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/image_replayer/Replayer.h [new file with mode: 0644]
src/tools/rbd_mirror/image_replayer/ReplayerListener.h [new file with mode: 0644]
src/tools/rbd_mirror/image_replayer/journal/Replayer.cc [new file with mode: 0644]
src/tools/rbd_mirror/image_replayer/journal/Replayer.h [new file with mode: 0644]

index c769426ba52b2c43803e80fba50100b3e11c0192..b70bb20ac43ed277c8e29fd25aa62873b58adddd 100644 (file)
@@ -39,6 +39,7 @@ add_executable(unittest_rbd_mirror
   image_replayer/test_mock_PrepareLocalImageRequest.cc
   image_replayer/test_mock_PrepareRemoteImageRequest.cc
   image_replayer/journal/test_mock_EventPreprocessor.cc
+  image_replayer/journal/test_mock_Replayer.cc
   image_sync/test_mock_SyncPointCreateRequest.cc
   image_sync/test_mock_SyncPointPruneRequest.cc
   pool_watcher/test_mock_RefreshImagesRequest.cc
diff --git a/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc b/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc
new file mode 100644 (file)
index 0000000..f8df90a
--- /dev/null
@@ -0,0 +1,2107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+#include "test/journal/mock/MockJournaler.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/mock/MockContextWQ.h"
+#include "test/rbd_mirror/mock/MockSafeTimer.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace librbd {
+
+namespace {
+
+struct MockTestJournal;
+
+struct MockTestImageCtx : public librbd::MockImageCtx {
+  explicit MockTestImageCtx(librbd::ImageCtx &image_ctx,
+                            MockTestJournal& mock_test_journal)
+    : librbd::MockImageCtx(image_ctx), journal(&mock_test_journal) {
+  }
+
+  MockTestJournal* journal = nullptr;
+};
+
+struct MockTestJournal : public MockJournal {
+  MOCK_METHOD2(start_external_replay, void(journal::Replay<MockTestImageCtx> **,
+                                           Context *on_start));
+  MOCK_METHOD0(stop_external_replay, void());
+};
+
+} // anonymous namespace
+
+namespace journal {
+
+template <>
+struct TypeTraits<librbd::MockTestImageCtx> {
+  typedef ::journal::MockJournaler Journaler;
+  typedef ::journal::MockReplayEntryProxy ReplayEntry;
+};
+
+template<>
+struct Replay<MockTestImageCtx> {
+  MOCK_METHOD2(decode, int(bufferlist::const_iterator *, EventEntry *));
+  MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *));
+  MOCK_METHOD1(flush, void(Context*));
+  MOCK_METHOD2(shut_down, void(bool, Context*));
+};
+
+} // namespace journal
+} // namespace librbd
+
+namespace boost {
+
+template<>
+struct intrusive_ptr<librbd::MockTestJournal> {
+  intrusive_ptr() {
+  }
+  intrusive_ptr(librbd::MockTestJournal* mock_test_journal)
+    : mock_test_journal(mock_test_journal) {
+  }
+
+  librbd::MockTestJournal* operator->() {
+    return mock_test_journal;
+  }
+
+  void reset() {
+    mock_test_journal = nullptr;
+  }
+
+  const librbd::MockTestJournal* get() const {
+    return mock_test_journal;
+  }
+
+  template<typename T>
+  bool operator==(T* t) const {
+    return (mock_test_journal == t);
+  }
+
+  librbd::MockTestJournal* mock_test_journal = nullptr;
+};
+
+} // namespace boost
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+  MockSafeTimer *timer;
+  ceph::mutex &timer_lock;
+
+  MockContextWQ *work_queue;
+
+  Threads(Threads<librbd::ImageCtx>* threads)
+    : timer(new MockSafeTimer()),
+      timer_lock(threads->timer_lock),
+      work_queue(new MockContextWQ()) {
+  }
+  ~Threads() {
+    delete timer;
+    delete work_queue;
+  }
+};
+
+namespace {
+
+struct MockReplayerListener : public image_replayer::ReplayerListener {
+  MOCK_METHOD0(handle_notification, void());
+};
+
+} // anonymous namespace
+
+namespace image_replayer {
+
+template<>
+struct CloseImageRequest<librbd::MockTestImageCtx> {
+  static CloseImageRequest* s_instance;
+  librbd::MockTestImageCtx **image_ctx = nullptr;
+  Context *on_finish = nullptr;
+
+  static CloseImageRequest* create(librbd::MockTestImageCtx **image_ctx,
+                                   Context *on_finish) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->image_ctx = image_ctx;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  CloseImageRequest() {
+    ceph_assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~CloseImageRequest() {
+    ceph_assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(send, void());
+};
+
+CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+namespace journal {
+
+template <>
+struct EventPreprocessor<librbd::MockTestImageCtx> {
+  static EventPreprocessor *s_instance;
+
+  static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx,
+                                   ::journal::MockJournaler &remote_journaler,
+                                   const std::string &local_mirror_uuid,
+                                   librbd::journal::MirrorPeerClientMeta *client_meta,
+                                   MockContextWQ *work_queue) {
+    ceph_assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  static void destroy(EventPreprocessor* processor) {
+  }
+
+  EventPreprocessor() {
+    ceph_assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~EventPreprocessor() {
+    ceph_assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &));
+  MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
+};
+
+template<>
+struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
+  static ReplayStatusFormatter* s_instance;
+
+  static ReplayStatusFormatter* create(::journal::MockJournaler *journaler,
+                                       const std::string &mirror_uuid) {
+    ceph_assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  static void destroy(ReplayStatusFormatter* formatter) {
+  }
+
+  ReplayStatusFormatter() {
+    ceph_assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~ReplayStatusFormatter() {
+    ceph_assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
+};
+
+EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
+ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.cc"
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using ::testing::_;
+using ::testing::AtLeast;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::MatcherCast;
+using ::testing::Return;
+using ::testing::ReturnArg;
+using ::testing::SaveArg;
+using ::testing::SetArgPointee;
+using ::testing::WithArg;
+
+class TestMockImageReplayerJournalReplayer : public TestMockFixture {
+public:
+  typedef Replayer<librbd::MockTestImageCtx> MockReplayer;
+  typedef EventPreprocessor<librbd::MockTestImageCtx> MockEventPreprocessor;
+  typedef ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
+  typedef Threads<librbd::MockTestImageCtx> MockThreads;
+  typedef CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
+  typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
+
+  void SetUp() override {
+    TestMockFixture::SetUp();
+
+    librbd::RBD rbd;
+    ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size));
+    ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx));
+  }
+
+  bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) {
+    bufferlist bl;
+    encode(tag_data, bl);
+    return bl;
+  }
+
+  void expect_work_queue_repeatedly(MockThreads &mock_threads) {
+    EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+      .WillRepeatedly(Invoke([this](Context *ctx, int r) {
+          m_threads->work_queue->queue(ctx, r);
+        }));
+  }
+
+  void expect_add_event_after_repeatedly(MockThreads &mock_threads) {
+    EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
+      .WillRepeatedly(
+        DoAll(Invoke([this](double seconds, Context *ctx) {
+                       m_threads->timer->add_event_after(seconds, ctx);
+                     }),
+          ReturnArg<1>()));
+    EXPECT_CALL(*mock_threads.timer, cancel_event(_))
+      .WillRepeatedly(
+        Invoke([this](Context *ctx) {
+          return m_threads->timer->cancel_event(ctx);
+        }));
+  }
+
+  void expect_init(::journal::MockJournaler &mock_journaler, int r) {
+    EXPECT_CALL(mock_journaler, init(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+  }
+
+  void expect_stop_replay(::journal::MockJournaler &mock_journaler, int r) {
+    EXPECT_CALL(mock_journaler, stop_replay(_))
+      .WillOnce(CompleteContext(r));
+  }
+
+  void expect_shut_down(::journal::MockJournaler &mock_journaler, int r) {
+    EXPECT_CALL(mock_journaler, shut_down(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+  }
+
+  void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) {
+    EXPECT_CALL(mock_replay, shut_down(cancel_ops, _))
+      .WillOnce(WithArg<1>(CompleteContext(m_threads->work_queue, r)));
+  }
+
+  void expect_get_cached_client(::journal::MockJournaler &mock_journaler,
+                                const std::string& client_id,
+                                const cls::journal::Client& client,
+                                const librbd::journal::ClientMeta& client_meta,
+                                int r) {
+    librbd::journal::ClientData client_data;
+    client_data.client_meta = client_meta;
+
+    cls::journal::Client client_copy{client};
+    encode(client_data, client_copy.data);
+
+    EXPECT_CALL(mock_journaler, get_cached_client(client_id, _))
+      .WillOnce(DoAll(SetArgPointee<1>(client_copy),
+                      Return(r)));
+  }
+
+  void expect_start_external_replay(librbd::MockTestJournal &mock_journal,
+                                    MockReplay *mock_replay, int r) {
+    EXPECT_CALL(mock_journal, start_external_replay(_, _))
+      .WillOnce(DoAll(SetArgPointee<0>(mock_replay),
+                      WithArg<1>(CompleteContext(m_threads->work_queue, r))));
+  }
+
+  void expect_is_tag_owner(librbd::MockTestJournal &mock_journal,
+                           bool is_owner) {
+    EXPECT_CALL(mock_journal, is_tag_owner()).WillOnce(Return(is_owner));
+  }
+
+  void expect_is_resync_requested(librbd::MockTestJournal &mock_journal,
+                                  int r, bool resync_requested) {
+    EXPECT_CALL(mock_journal, is_resync_requested(_)).WillOnce(
+      DoAll(SetArgPointee<0>(resync_requested),
+            Return(r)));
+  }
+
+  void expect_get_commit_tid_in_debug(
+    ::journal::MockReplayEntry &mock_replay_entry) {
+    // It is used in debug messages and depends on debug level
+    EXPECT_CALL(mock_replay_entry, get_commit_tid())
+    .Times(AtLeast(0))
+      .WillRepeatedly(Return(0));
+  }
+
+  void expect_get_tag_tid_in_debug(librbd::MockTestJournal &mock_journal) {
+    // It is used in debug messages and depends on debug level
+    EXPECT_CALL(mock_journal, get_tag_tid()).Times(AtLeast(0))
+      .WillRepeatedly(Return(0));
+  }
+
+  void expect_committed(::journal::MockReplayEntry &mock_replay_entry,
+                        ::journal::MockJournaler &mock_journaler, int times) {
+    EXPECT_CALL(mock_replay_entry, get_data()).Times(times);
+    EXPECT_CALL(mock_journaler, committed(
+                  MatcherCast<const ::journal::MockReplayEntryProxy&>(_)))
+      .Times(times);
+  }
+
+  void expect_try_pop_front(::journal::MockJournaler &mock_journaler,
+                            uint64_t replay_tag_tid, bool entries_available) {
+    EXPECT_CALL(mock_journaler, try_pop_front(_, _))
+      .WillOnce(DoAll(SetArgPointee<0>(::journal::MockReplayEntryProxy()),
+                      SetArgPointee<1>(replay_tag_tid),
+                      Return(entries_available)));
+  }
+
+  void expect_try_pop_front_return_no_entries(
+    ::journal::MockJournaler &mock_journaler, Context *on_finish) {
+    EXPECT_CALL(mock_journaler, try_pop_front(_, _))
+      .WillOnce(DoAll(Invoke([on_finish](::journal::MockReplayEntryProxy *e,
+                                         uint64_t *t) {
+                               on_finish->complete(0);
+                             }),
+                      Return(false)));
+  }
+
+  void expect_get_tag(::journal::MockJournaler &mock_journaler,
+                      const cls::journal::Tag &tag, int r) {
+    EXPECT_CALL(mock_journaler, get_tag(_, _, _))
+      .WillOnce(DoAll(SetArgPointee<1>(tag),
+                      WithArg<2>(CompleteContext(r))));
+  }
+
+  void expect_allocate_tag(librbd::MockTestJournal &mock_journal, int r) {
+    EXPECT_CALL(mock_journal, allocate_tag(_, _, _))
+      .WillOnce(WithArg<2>(CompleteContext(r)));
+  }
+
+  void expect_preprocess(MockEventPreprocessor &mock_event_preprocessor,
+                         bool required, int r) {
+    EXPECT_CALL(mock_event_preprocessor, is_required(_))
+      .WillOnce(Return(required));
+    if (required) {
+      EXPECT_CALL(mock_event_preprocessor, preprocess(_, _))
+        .WillOnce(WithArg<1>(CompleteContext(r)));
+    }
+  }
+
+  void expect_process(MockReplay &mock_replay,
+                      int on_ready_r, int on_commit_r) {
+    EXPECT_CALL(mock_replay, process(_, _, _))
+      .WillOnce(DoAll(WithArg<1>(CompleteContext(on_ready_r)),
+                      WithArg<2>(CompleteContext(on_commit_r))));
+  }
+
+  void expect_flush(MockReplay& mock_replay, int r) {
+    EXPECT_CALL(mock_replay, flush(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+  }
+
+  void expect_flush_commit_position(::journal::MockJournaler& mock_journal,
+                                    int r) {
+    EXPECT_CALL(mock_journal, flush_commit_position(_))
+      .WillOnce(CompleteContext(m_threads->work_queue, r));
+  }
+
+  void expect_get_tag_data(librbd::MockTestJournal& mock_local_journal,
+                           const librbd::journal::TagData& tag_data) {
+    EXPECT_CALL(mock_local_journal, get_tag_data())
+      .WillOnce(Return(tag_data));
+  }
+
+  void expect_send(MockCloseImageRequest &mock_close_image_request, int r) {
+    EXPECT_CALL(mock_close_image_request, send())
+      .WillOnce(Invoke([this, &mock_close_image_request, r]() {
+            *mock_close_image_request.image_ctx = nullptr;
+            m_threads->work_queue->queue(mock_close_image_request.on_finish, r);
+          }));
+  }
+
+  void expect_notification(MockThreads& mock_threads,
+                           MockReplayerListener& mock_replayer_listener) {
+    EXPECT_CALL(mock_replayer_listener, handle_notification())
+      .WillOnce(Invoke([this]() {
+          std::unique_lock locker{m_lock};
+          m_notified = true;
+          m_cond.notify_all();
+        }));
+  }
+
+  int wait_for_notification() {
+    std::unique_lock locker{m_lock};
+    while (!m_notified) {
+      if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) {
+        return -ETIMEDOUT;
+      }
+    }
+    m_notified = false;
+    return 0;
+  }
+
+  int init_entry_replayer(MockReplayer& mock_replayer,
+                          MockThreads& mock_threads,
+                          MockReplayerListener& mock_replayer_listener,
+                          librbd::MockTestJournal& mock_local_journal,
+                          ::journal::MockJournaler& mock_remote_journaler,
+                          MockReplay& mock_local_journal_replay,
+                          librbd::journal::Listener** local_journal_listener,
+                          ::journal::ReplayHandler** remote_replay_handler,
+                          ::journal::JournalMetadataListener** remote_journal_listener) {
+    expect_init(mock_remote_journaler, 0);
+    EXPECT_CALL(mock_remote_journaler, add_listener(_))
+      .WillOnce(SaveArg<0>(remote_journal_listener));
+    expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+                             {librbd::journal::MirrorPeerClientMeta{}}, 0);
+    expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                                 0);
+    EXPECT_CALL(mock_local_journal, add_listener(_))
+      .WillOnce(SaveArg<0>(local_journal_listener));
+    expect_is_tag_owner(mock_local_journal, false);
+    expect_is_resync_requested(mock_local_journal, 0, false);
+    EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _))
+      .WillOnce(SaveArg<0>(remote_replay_handler));
+    expect_notification(mock_threads, mock_replayer_listener);
+
+    C_SaferCond init_ctx;
+    mock_replayer.init(&init_ctx);
+    int r = init_ctx.wait();
+    if (r < 0) {
+      return r;
+    }
+
+    return wait_for_notification();
+  }
+
+  int shut_down_entry_replayer(MockReplayer& mock_replayer,
+                               MockThreads& mock_threads,
+                               librbd::MockTestJournal& mock_local_journal,
+                               ::journal::MockJournaler& mock_remote_journaler,
+                               MockReplay& mock_local_journal_replay) {
+    expect_shut_down(mock_local_journal_replay, true, 0);
+    EXPECT_CALL(mock_local_journal, remove_listener(_));
+    EXPECT_CALL(mock_local_journal, stop_external_replay());
+    MockCloseImageRequest mock_close_image_request;
+    expect_send(mock_close_image_request, 0);
+    expect_stop_replay(mock_remote_journaler, 0);
+    EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+    expect_shut_down(mock_remote_journaler, 0);
+
+    C_SaferCond shutdown_ctx;
+    mock_replayer.shut_down(&shutdown_ctx);
+    return shutdown_ctx.wait();
+  }
+
+  librbd::ImageCtx* m_local_image_ctx = nullptr;
+
+  ceph::mutex m_lock = ceph::make_mutex(
+    "TestMockImageReplayerJournalReplayer");
+  ceph::condition_variable m_cond;
+  bool m_notified = false;
+};
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitShutDown) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitNoLocalJournal) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+
+  mock_local_image_ctx.journal = nullptr;
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(-EINVAL, init_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, -EINVAL);
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(-EINVAL, init_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerGetClientError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+                           {librbd::journal::MirrorPeerClientMeta{}}, -EINVAL);
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(-EINVAL, init_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitLocalJournalStartExternalReplayError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+                           {librbd::journal::MirrorPeerClientMeta{}}, 0);
+  expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(-EINVAL, init_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitIsPromoted) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+                           {librbd::journal::MirrorPeerClientMeta{}}, 0);
+  MockReplay mock_local_journal_replay;
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  EXPECT_CALL(mock_local_journal, add_listener(_));
+  expect_is_tag_owner(mock_local_journal, true);
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(0, init_ctx.wait());
+  ASSERT_EQ(0, wait_for_notification());
+
+  expect_shut_down(mock_local_journal_replay, true, 0);
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(0, shutdown_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnected) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect",
+                                      "false");
+
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid",
+                           {{}, {}, {},
+                            cls::journal::CLIENT_STATE_DISCONNECTED},
+                           {librbd::journal::MirrorPeerClientMeta{
+                              mock_local_image_ctx.id}}, 0);
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(-ENOTCONN, init_ctx.wait());
+  ASSERT_FALSE(mock_replayer.is_resync_requested());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnectedResync) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect",
+                                      "true");
+
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid",
+                           {{}, {}, {},
+                            cls::journal::CLIENT_STATE_DISCONNECTED},
+                           {librbd::journal::MirrorPeerClientMeta{
+                              mock_local_image_ctx.id}}, 0);
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(-ENOTCONN, init_ctx.wait());
+  ASSERT_TRUE(mock_replayer.is_resync_requested());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequested) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+                           {librbd::journal::MirrorPeerClientMeta{}}, 0);
+  MockReplay mock_local_journal_replay;
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  EXPECT_CALL(mock_local_journal, add_listener(_));
+  expect_is_tag_owner(mock_local_journal, false);
+  expect_is_resync_requested(mock_local_journal, 0, true);
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(0, init_ctx.wait());
+  ASSERT_EQ(0, wait_for_notification());
+
+  expect_shut_down(mock_local_journal_replay, true, 0);
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(0, shutdown_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequestedError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  expect_init(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, add_listener(_));
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {},
+                           {librbd::journal::MirrorPeerClientMeta{}}, 0);
+  MockReplay mock_local_journal_replay;
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  EXPECT_CALL(mock_local_journal, add_listener(_));
+  expect_is_tag_owner(mock_local_journal, false);
+  expect_is_resync_requested(mock_local_journal, -EINVAL, false);
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(0, init_ctx.wait());
+  ASSERT_EQ(0, wait_for_notification());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  expect_shut_down(mock_local_journal_replay, true, 0);
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(0, shutdown_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ShutDownLocalJournalReplayError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_shut_down(mock_local_journal_replay, true, -EINVAL);
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  expect_stop_replay(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, -EPERM);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(-EINVAL, shutdown_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, CloseLocalImageError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_shut_down(mock_local_journal_replay, true, 0);
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, -EINVAL);
+  expect_stop_replay(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, -EPERM);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(-EINVAL, shutdown_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, StopRemoteJournalerError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_shut_down(mock_local_journal_replay, true, 0);
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  expect_stop_replay(mock_remote_journaler, -EPERM);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(-EPERM, shutdown_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ShutDownRemoteJournalerError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_shut_down(mock_local_journal_replay, true, 0);
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  expect_stop_replay(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, -EPERM);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(-EPERM, shutdown_ctx.wait());
+  ASSERT_EQ(nullptr, mock_local_image_ctx_ptr);
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, Replay) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_add_event_after_repeatedly(mock_threads);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+  expect_committed(mock_replay_entry, mock_remote_journaler, 2);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+  // replay_flush
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, 0);
+
+  // process
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+  expect_preprocess(mock_event_preprocessor, false, 0);
+  expect_process(mock_local_journal_replay, 0, 0);
+
+  // the next event with preprocess
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+  expect_preprocess(mock_event_preprocessor, true, 0);
+  expect_process(mock_local_journal_replay, 0, 0);
+
+  // attempt to process the next event
+  C_SaferCond replay_ctx;
+  expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+  // fire
+  remote_replay_handler->handle_entries_available();
+  ASSERT_EQ(0, replay_ctx.wait());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, DecodeError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_add_event_after_repeatedly(mock_threads);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+  // replay_flush
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, 0);
+
+  // process
+  EXPECT_CALL(mock_replay_entry, get_data());
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _))
+    .WillOnce(Return(-EINVAL));
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  // fire
+  remote_replay_handler->handle_entries_available();
+  wait_for_notification();
+
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, DelayedReplay) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_add_event_after_repeatedly(mock_threads);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+  expect_committed(mock_replay_entry, mock_remote_journaler, 1);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+  // replay_flush
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, 0);
+
+  // process with delay
+  EXPECT_CALL(mock_replay_entry, get_data());
+  librbd::journal::EventEntry event_entry(
+    librbd::journal::AioDiscardEvent(123, 345, 0), ceph_clock_now());
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _))
+    .WillOnce(DoAll(SetArgPointee<1>(event_entry),
+                    Return(0)));
+
+  Context* delayed_task_ctx = nullptr;
+  EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
+    .WillOnce(
+      DoAll(Invoke([this, &delayed_task_ctx](double seconds, Context *ctx) {
+                std::unique_lock locker{m_lock};
+                delayed_task_ctx = ctx;
+                m_cond.notify_all();
+            }),
+            ReturnArg<1>()));
+  expect_preprocess(mock_event_preprocessor, false, 0);
+  expect_process(mock_local_journal_replay, 0, 0);
+
+  // attempt to process the next event
+  C_SaferCond replay_ctx;
+  expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+  // fire
+  mock_local_image_ctx.mirroring_replay_delay = 600;
+  remote_replay_handler->handle_entries_available();
+  {
+    std::unique_lock locker{m_lock};
+    while (delayed_task_ctx == nullptr) {
+      if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) {
+        FAIL() << "timed out waiting for task";
+        break;
+      }
+    }
+  }
+  {
+    std::unique_lock timer_locker{mock_threads.timer_lock};
+    delayed_task_ctx->complete(0);
+  }
+  ASSERT_EQ(0, replay_ctx.wait());
+
+  // add a pending (delayed) entry before stop
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  C_SaferCond decode_ctx;
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _))
+    .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::const_iterator* it,
+                                         librbd::journal::EventEntry *e) {
+                             decode_ctx.complete(0);
+                           }),
+                    Return(0)));
+
+  remote_replay_handler->handle_entries_available();
+  ASSERT_EQ(0, decode_ctx.wait());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ReplayNoMemoryError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_notification(mock_threads, mock_replayer_listener);
+  remote_replay_handler->handle_complete(-ENOMEM);
+
+  wait_for_notification();
+  ASSERT_EQ(false, mock_replayer.is_replaying());
+  ASSERT_EQ(-ENOMEM, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalForcePromoted) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_notification(mock_threads, mock_replayer_listener);
+  local_journal_listener->handle_promoted();
+  wait_for_notification();
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalResyncRequested) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_notification(mock_threads, mock_replayer_listener);
+  local_journal_listener->handle_resync();
+  wait_for_notification();
+
+  ASSERT_TRUE(mock_replayer.is_resync_requested());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, RemoteJournalDisconnected) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect",
+                                      "true");
+
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_get_cached_client(mock_remote_journaler, "local mirror uuid",
+                           {{}, {}, {},
+                            cls::journal::CLIENT_STATE_DISCONNECTED},
+                           {librbd::journal::MirrorPeerClientMeta{
+                              mock_local_image_ctx.id}}, 0);
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  remote_journaler_listener->handle_update(nullptr);
+  wait_for_notification();
+
+  ASSERT_EQ(-ENOTCONN, mock_replayer.get_error_code());
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_TRUE(mock_replayer.is_resync_requested());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, Flush) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_flush(mock_local_journal_replay, 0);
+  expect_flush_commit_position(mock_remote_journaler, 0);
+
+  C_SaferCond ctx;
+  mock_replayer.flush(&ctx);
+  ASSERT_EQ(0, ctx.wait());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, FlushError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_flush(mock_local_journal_replay, -EINVAL);
+
+  C_SaferCond ctx;
+  mock_replayer.flush(&ctx);
+  ASSERT_EQ(-EINVAL, ctx.wait());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, FlushCommitPositionError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  expect_flush(mock_local_journal_replay, 0);
+  expect_flush_commit_position(mock_remote_journaler, -EINVAL);
+
+  C_SaferCond ctx;
+  mock_replayer.flush(&ctx);
+  ASSERT_EQ(-EINVAL, ctx.wait());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+
+TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushShutDownError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_try_pop_front(mock_remote_journaler, 1, true);
+  expect_shut_down(mock_local_journal_replay, false, -EINVAL);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_notification(mock_threads, mock_replayer_listener);
+  remote_replay_handler->handle_entries_available();
+
+  wait_for_notification();
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  expect_stop_replay(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(0, shutdown_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushStartError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_try_pop_front(mock_remote_journaler, 1, true);
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
+  expect_notification(mock_threads, mock_replayer_listener);
+  remote_replay_handler->handle_entries_available();
+
+  wait_for_notification();
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  EXPECT_CALL(mock_local_journal, remove_listener(_));
+  MockCloseImageRequest mock_close_image_request;
+  expect_send(mock_close_image_request, 0);
+  expect_stop_replay(mock_remote_journaler, 0);
+  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+  expect_shut_down(mock_remote_journaler, 0);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(0, shutdown_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, GetTagError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, -EINVAL);
+  expect_notification(mock_threads, mock_replayer_listener);
+  remote_replay_handler->handle_entries_available();
+
+  wait_for_notification();
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagDemotion) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_notification(mock_threads, mock_replayer_listener);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+  expect_committed(mock_replay_entry, mock_remote_journaler, 1);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::ORPHAN_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_get_tag_data(mock_local_journal, {});
+  expect_allocate_tag(mock_local_journal, 0);
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+  expect_preprocess(mock_event_preprocessor, false, 0);
+  expect_process(mock_local_journal_replay, 0, 0);
+
+  remote_replay_handler->handle_entries_available();
+  wait_for_notification();
+  ASSERT_FALSE(mock_replayer.is_replaying());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, -EINVAL);
+  expect_notification(mock_threads, mock_replayer_listener);
+  remote_replay_handler->handle_entries_available();
+
+  wait_for_notification();
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, PreprocessError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, 0);
+  EXPECT_CALL(mock_replay_entry, get_data());
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+  expect_preprocess(mock_event_preprocessor, true, -EINVAL);
+
+  expect_notification(mock_threads, mock_replayer_listener);
+  remote_replay_handler->handle_entries_available();
+
+  wait_for_notification();
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ProcessError) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, 0);
+  EXPECT_CALL(mock_replay_entry, get_data());
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+  expect_preprocess(mock_event_preprocessor, false, 0);
+  expect_process(mock_local_journal_replay, 0, -EINVAL);
+
+  // attempt to process the next event
+  C_SaferCond replay_ctx;
+  expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+  remote_replay_handler->handle_entries_available();
+
+  wait_for_notification();
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, replay_ctx.wait());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+TEST_F(TestMockImageReplayerJournalReplayer, ImageNameUpdated) {
+  librbd::MockTestJournal mock_local_journal;
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx,
+                                                mock_local_journal};
+  librbd::MockTestImageCtx* mock_local_image_ctx_ptr = &mock_local_image_ctx;
+  ::journal::MockJournaler mock_remote_journaler;
+  MockReplayerListener mock_replayer_listener;
+  MockThreads mock_threads{m_threads};
+  MockReplayer mock_replayer{
+    &mock_local_image_ctx_ptr, &mock_remote_journaler, "local mirror uuid",
+    "remote mirror uuid", &mock_replayer_listener, &mock_threads};
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  expect_work_queue_repeatedly(mock_threads);
+  expect_add_event_after_repeatedly(mock_threads);
+  expect_get_commit_tid_in_debug(mock_replay_entry);
+  expect_get_tag_tid_in_debug(mock_local_journal);
+  expect_committed(mock_replay_entry, mock_remote_journaler, 1);
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockReplay mock_local_journal_replay;
+  MockEventPreprocessor mock_event_preprocessor;
+  MockReplayStatusFormatter mock_replay_status_formatter;
+  librbd::journal::Listener* local_journal_listener = nullptr;
+  ::journal::ReplayHandler* remote_replay_handler = nullptr;
+  ::journal::JournalMetadataListener* remote_journaler_listener = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_replayer_listener, mock_local_journal,
+                                   mock_remote_journaler,
+                                   mock_local_journal_replay,
+                                   &local_journal_listener,
+                                   &remote_replay_handler,
+                                   &remote_journaler_listener));
+
+  mock_local_image_ctx.name = "NEW NAME";
+  cls::journal::Tag tag =
+    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            librbd::Journal<>::LOCAL_MIRROR_UUID,
+                            true, 0, 0})};
+
+  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+  expect_shut_down(mock_local_journal_replay, false, 0);
+  EXPECT_CALL(mock_local_journal, stop_external_replay());
+  expect_start_external_replay(mock_local_journal, &mock_local_journal_replay,
+                               0);
+  expect_get_tag(mock_remote_journaler, tag, 0);
+  expect_allocate_tag(mock_local_journal, 0);
+  EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
+  expect_preprocess(mock_event_preprocessor, false, 0);
+  expect_process(mock_local_journal_replay, 0, 0);
+
+  // attempt to process the next event
+  C_SaferCond replay_ctx;
+  expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+  remote_replay_handler->handle_entries_available();
+  wait_for_notification();
+
+  auto image_spec = util::compute_image_spec(m_local_io_ctx, "NEW NAME");
+  ASSERT_EQ(image_spec, mock_replayer.get_image_spec());
+
+  ASSERT_EQ(0, replay_ctx.wait());
+  ASSERT_TRUE(mock_replayer.is_replaying());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_local_journal,
+                                        mock_remote_journaler,
+                                        mock_local_journal_replay));
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
index cdac9c614c42b029176b5c10aad703c9fcea72f1..c642ae6afc4c8a353c6ff570fbf5da400cb99f31 100644 (file)
@@ -42,6 +42,7 @@ set(rbd_mirror_internal
   image_replayer/PrepareRemoteImageRequest.cc
   image_replayer/Utils.cc
   image_replayer/journal/EventPreprocessor.cc
+  image_replayer/journal/Replayer.cc
   image_replayer/journal/ReplayStatusFormatter.cc
   image_sync/SyncPointCreateRequest.cc
   image_sync/SyncPointPruneRequest.cc
diff --git a/src/tools/rbd_mirror/image_replayer/Replayer.h b/src/tools/rbd_mirror/image_replayer/Replayer.h
new file mode 100644 (file)
index 0000000..3568614
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_H
+
+#include <string>
+
+struct Context;
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+struct Replayer {
+  virtual ~Replayer() {}
+
+  virtual void init(Context* on_finish) = 0;
+  virtual void shut_down(Context* on_finish) = 0;
+
+  virtual void flush(Context* on_finish) = 0;
+
+  virtual bool get_replay_status(std::string* description,
+                                 Context* on_finish) = 0;
+
+  virtual bool is_replaying() const = 0;
+  virtual bool is_resync_requested() const = 0;
+
+  virtual int get_error_code() const = 0;
+  virtual std::string get_error_description() const = 0;
+};
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_H
diff --git a/src/tools/rbd_mirror/image_replayer/ReplayerListener.h b/src/tools/rbd_mirror/image_replayer/ReplayerListener.h
new file mode 100644 (file)
index 0000000..f17f401
--- /dev/null
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_LISTENER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_LISTENER_H
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+struct ReplayerListener {
+  virtual ~ReplayerListener() {}
+
+  virtual void handle_notification() = 0;
+};
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAYER_LISTENER_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
new file mode 100644 (file)
index 0000000..c63d2e8
--- /dev/null
@@ -0,0 +1,1204 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Replayer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/Replay.h"
+#include "journal/Journaler.h"
+#include "journal/JournalMetadataListener.h"
+#include "journal/ReplayHandler.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Types.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+                           << "Replayer: " << this << " " << __func__ << ": "
+
+extern PerfCounters *g_perf_counters;
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+namespace {
+
+uint32_t calculate_replay_delay(const utime_t &event_time,
+                                int mirroring_replay_delay) {
+  if (mirroring_replay_delay <= 0) {
+    return 0;
+  }
+
+  utime_t now = ceph_clock_now();
+  if (event_time + mirroring_replay_delay <= now) {
+    return 0;
+  }
+
+  // ensure it is rounded up when converting to integer
+  return (event_time + mirroring_replay_delay - now) + 1;
+}
+
+} // anonymous namespace
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+struct Replayer<I>::C_ReplayCommitted : public Context {
+  Replayer* replayer;
+  ReplayEntry replay_entry;
+  uint64_t replay_bytes;
+  utime_t replay_start_time;
+
+  C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry,
+                    uint64_t replay_bytes, const utime_t &replay_start_time)
+    : replayer(replayer), replay_entry(std::move(replay_entry)),
+      replay_bytes(replay_bytes), replay_start_time(replay_start_time) {
+  }
+
+  void finish(int r) override {
+    replayer->handle_process_entry_safe(replay_entry, replay_bytes,
+                                        replay_start_time, r);
+  }
+};
+
+template <typename I>
+struct Replayer<I>::C_TrackedOp : public Context {
+  Replayer *replayer;
+  Context* ctx;
+
+  C_TrackedOp(Replayer* replayer, Context* ctx)
+    : replayer(replayer), ctx(ctx) {
+    replayer->m_in_flight_op_tracker.start_op();
+  }
+
+  void finish(int r) override {
+    ctx->complete(r);
+    replayer->m_in_flight_op_tracker.finish_op();
+  }
+};
+
+template <typename I>
+struct Replayer<I>::RemoteJournalerListener
+  : public ::journal::JournalMetadataListener {
+  Replayer* replayer;
+
+  RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
+
+  void handle_update(::journal::JournalMetadata*) override {
+    auto ctx = new C_TrackedOp(replayer, new LambdaContext([this](int r) {
+      replayer->handle_remote_journal_metadata_updated();
+    }));
+    replayer->m_threads->work_queue->queue(ctx, 0);
+  }
+};
+
+template <typename I>
+struct Replayer<I>::RemoteReplayHandler : public ::journal::ReplayHandler {
+  Replayer* replayer;
+
+  RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {}
+  ~RemoteReplayHandler() override {};
+
+  void handle_entries_available() override {
+    replayer->handle_replay_ready();
+  }
+
+  void handle_complete(int r) override {
+    std::string error;
+    if (r == -ENOMEM) {
+      error = "not enough memory in autotune cache";
+    } else if (r < 0) {
+      error = "replay completed with error: " + cpp_strerror(r);
+    }
+    replayer->handle_replay_complete(r, error);
+  }
+};
+
+template <typename I>
+struct Replayer<I>::LocalJournalListener
+  : public librbd::journal::Listener {
+  Replayer* replayer;
+
+  LocalJournalListener(Replayer* replayer) : replayer(replayer) {
+  }
+
+  void handle_close() override {
+    replayer->handle_replay_complete(0, "");
+  }
+
+  void handle_promoted() override {
+    replayer->handle_replay_complete(0, "force promoted");
+  }
+
+  void handle_resync() override {
+    replayer->handle_resync_image();
+  }
+};
+
+template <typename I>
+Replayer<I>::Replayer(
+    I** local_image_ctx, Journaler* remote_journaler,
+    std::string local_mirror_uuid, std::string remote_mirror_uuid,
+    ReplayerListener* replayer_listener, Threads<I>* threads)
+  : m_local_image_ctx(local_image_ctx),
+    m_remote_journaler(remote_journaler),
+    m_local_mirror_uuid(local_mirror_uuid),
+    m_remote_mirror_uuid(remote_mirror_uuid),
+    m_replayer_listener(replayer_listener),
+    m_threads(threads),
+    m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
+      "rbd::mirror::image_replayer::journal::Replayer", this))) {
+  dout(10) << dendl;
+}
+
+template <typename I>
+Replayer<I>::~Replayer() {
+  dout(10) << dendl;
+  ceph_assert(m_remote_listener == nullptr);
+  ceph_assert(m_local_journal_listener == nullptr);
+  ceph_assert(m_local_journal_replay == nullptr);
+  ceph_assert(m_remote_replay_handler == nullptr);
+  ceph_assert(m_event_preprocessor == nullptr);
+  ceph_assert(m_replay_status_formatter == nullptr);
+  ceph_assert(m_delayed_preprocess_task == nullptr);
+  ceph_assert(m_flush_local_replay_task == nullptr);
+  ceph_assert(*m_local_image_ctx == nullptr);
+}
+
+template <typename I>
+void Replayer<I>::init(Context* on_finish) {
+  dout(10) << dendl;
+
+  ceph_assert(m_local_journal == nullptr);
+  {
+    auto local_image_ctx = *m_local_image_ctx;
+    std::shared_lock image_locker{local_image_ctx->image_lock};
+    m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
+                                            local_image_ctx->name);
+    m_local_journal = local_image_ctx->journal;
+  }
+
+  ceph_assert(m_on_init_shutdown == nullptr);
+  m_on_init_shutdown = on_finish;
+
+  if (m_local_journal == nullptr) {
+    std::unique_lock locker{m_lock};
+    m_state = STATE_COMPLETE;
+    m_remote_journaler = nullptr;
+
+    handle_replay_complete(locker, -EINVAL, "error accessing local journal");
+    close_local_image();
+    return;
+  }
+
+  init_remote_journaler();
+}
+
+template <typename I>
+void Replayer<I>::shut_down(Context* on_finish) {
+  dout(10) << dendl;
+
+  std::unique_lock locker{m_lock};
+  ceph_assert(m_on_init_shutdown == nullptr);
+  m_on_init_shutdown = on_finish;
+
+  if (m_state == STATE_INIT) {
+    // raced with the last piece of the init state machine
+    return;
+  } else if (m_state == STATE_REPLAYING) {
+    m_state = STATE_COMPLETE;
+  }
+
+  // if shutting down due to an error notification, we don't
+  // need to propagate the same error again
+  m_error_code = 0;
+  m_error_description = "";
+
+  cancel_delayed_preprocess_task();
+  cancel_flush_local_replay_task();
+  shut_down_local_journal_replay();
+}
+
+template <typename I>
+void Replayer<I>::flush(Context* on_finish) {
+  dout(10) << dendl;
+
+  flush_local_replay(new C_TrackedOp(this, on_finish));
+}
+
+template <typename I>
+bool Replayer<I>::get_replay_status(std::string* description,
+                                    Context* on_finish) {
+  dout(10) << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (m_replay_status_formatter == nullptr) {
+    derr << "replay not running" << dendl;
+    locker.unlock();
+
+    on_finish->complete(-EAGAIN);
+    return false;
+  }
+
+  on_finish = new C_TrackedOp(this, on_finish);
+  return m_replay_status_formatter->get_or_send_update(description,
+                                                       on_finish);
+}
+
+template <typename I>
+void Replayer<I>::init_remote_journaler() {
+  dout(10) << dendl;
+
+  Context *ctx = create_context_callback<
+    Replayer, &Replayer<I>::handle_init_remote_journaler>(this);
+  m_remote_journaler->init(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_init_remote_journaler(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (r < 0) {
+    derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(locker, r, "error initializing remote journal");
+    close_local_image();
+    return;
+  }
+
+  // listen for metadata updates to check for disconnect events
+  ceph_assert(m_remote_listener == nullptr);
+  m_remote_listener = new RemoteJournalerListener(this);
+  m_remote_journaler->add_listener(m_remote_listener);
+
+  cls::journal::Client remote_client;
+  r = m_remote_journaler->get_cached_client(m_local_mirror_uuid,
+                                            &remote_client);
+  if (r < 0) {
+    derr << "error retrieving remote journal client: " << cpp_strerror(r)
+         << dendl;
+    handle_replay_complete(locker, r, "error retrieving remote journal client");
+    close_local_image();
+    return;
+  }
+
+  std::string error;
+  r = validate_remote_client_state(remote_client, &m_remote_client_meta,
+                                   &m_resync_requested, &error);
+  if (r < 0) {
+    handle_replay_complete(locker, r, error);
+    close_local_image();
+    return;
+  }
+
+  start_external_replay();
+}
+
+template <typename I>
+void Replayer<I>::start_external_replay() {
+  dout(10) << dendl;
+
+  Context *start_ctx = create_context_callback<
+    Replayer, &Replayer<I>::handle_start_external_replay>(this);
+  m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_start_external_replay(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (r < 0) {
+    ceph_assert(m_local_journal_replay == nullptr);
+    derr << "error starting external replay on local image "
+         <<  (*m_local_image_ctx)->id << ": " << cpp_strerror(r) << dendl;
+
+    handle_replay_complete(locker, r, "error starting replay on local image");
+    close_local_image();
+    return;
+  }
+
+  if (!notify_init_complete(locker)) {
+    return;
+  }
+
+  m_state = STATE_REPLAYING;
+
+  // listen for promotion and resync requests against local journal
+  m_local_journal_listener = new LocalJournalListener(this);
+  m_local_journal->add_listener(m_local_journal_listener);
+
+  // verify that the local image wasn't force-promoted and that a resync hasn't
+  // been requested now that we are listening for events
+  if (m_local_journal->is_tag_owner()) {
+    dout(10) << "local image force-promoted" << dendl;
+    handle_replay_complete(locker, 0, "force promoted");
+    return;
+  }
+
+  bool resync_requested = false;
+  r = m_local_journal->is_resync_requested(&resync_requested);
+  if (r < 0) {
+    dout(10) << "failed to determine resync state: " << cpp_strerror(r)
+             << dendl;
+    handle_replay_complete(locker, r, "error parsing resync state");
+    return;
+  } else if (resync_requested) {
+    dout(10) << "local image resync requested" << dendl;
+    handle_replay_complete(locker, 0, "resync requested");
+    return;
+  }
+
+  // start remote journal replay
+  m_event_preprocessor = EventPreprocessor<I>::create(
+    **m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
+    &m_remote_client_meta, m_threads->work_queue);
+  m_replay_status_formatter = ReplayStatusFormatter<I>::create(
+    m_remote_journaler, m_local_mirror_uuid);
+
+  auto cct = static_cast<CephContext *>((*m_local_image_ctx)->cct);
+  double poll_seconds = cct->_conf.get_val<double>(
+    "rbd_mirror_journal_poll_age");
+  m_remote_replay_handler = new RemoteReplayHandler(this);
+  m_remote_journaler->start_live_replay(m_remote_replay_handler, poll_seconds);
+
+  notify_status_updated();
+}
+
+template <typename I>
+bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
+  dout(10) << dendl;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  ceph_assert(m_state == STATE_INIT);
+
+  // notify that init has completed
+  Context *on_finish = nullptr;
+  std::swap(m_on_init_shutdown, on_finish);
+
+  locker.unlock();
+  on_finish->complete(0);
+  locker.lock();
+
+  if (m_on_init_shutdown != nullptr) {
+    // shut down requested after we notified init complete but before we
+    // grabbed the lock
+    close_local_image();
+    return false;
+  }
+
+  return true;
+}
+
+template <typename I>
+void Replayer<I>::shut_down_local_journal_replay() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  if (m_local_journal_replay == nullptr) {
+    wait_for_event_replay();
+    return;
+  }
+
+  dout(10) << dendl;
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
+  m_local_journal_replay->shut_down(true, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_shut_down_local_journal_replay(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (r < 0) {
+    derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl;
+    handle_replay_error(r, "failed to shut down local journal replay");
+  }
+
+  wait_for_event_replay();
+}
+
+template <typename I>
+void Replayer<I>::wait_for_event_replay() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  dout(10) << dendl;
+  auto ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<
+      Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
+  m_event_replay_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_event_replay(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  std::unique_lock locker{m_lock};
+  close_local_image();
+}
+
+template <typename I>
+void Replayer<I>::close_local_image() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  dout(10) << dendl;
+  if (m_local_journal_listener != nullptr) {
+    // blocks if listener notification is in-progress
+    m_local_journal->remove_listener(m_local_journal_listener);
+    delete m_local_journal_listener;
+    m_local_journal_listener = nullptr;
+  }
+
+  if (m_local_journal_replay != nullptr) {
+    m_local_journal->stop_external_replay();
+    m_local_journal_replay = nullptr;
+  }
+
+  if (m_event_preprocessor != nullptr) {
+    image_replayer::journal::EventPreprocessor<I>::destroy(
+      m_event_preprocessor);
+    m_event_preprocessor = nullptr;
+  }
+
+  m_local_journal.reset();
+
+  // NOTE: it's important to ensure that the local image is fully
+  // closed before attempting to close the remote journal in
+  // case the remote cluster is unreachable
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_close_local_image>(this);
+  auto request = image_replayer::CloseImageRequest<I>::create(
+    m_local_image_ctx, ctx);
+  request->send();
+}
+
+
+template <typename I>
+void Replayer<I>::handle_close_local_image(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (r < 0) {
+    derr << "error closing local iamge: " << cpp_strerror(r) << dendl;
+    handle_replay_error(r, "failed to close local image");
+  }
+
+  ceph_assert(*m_local_image_ctx == nullptr);
+  stop_remote_journaler_replay();
+}
+
+template <typename I>
+void Replayer<I>::stop_remote_journaler_replay() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  if (m_remote_journaler == nullptr) {
+    wait_for_in_flight_ops();
+    return;
+  } else if (m_remote_replay_handler == nullptr) {
+    shut_down_remote_journaler();
+    return;
+  }
+
+  dout(10) << dendl;
+  auto ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<
+      Replayer<I>, &Replayer<I>::handle_stop_remote_journaler_replay>(this));
+  m_remote_journaler->stop_replay(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_stop_remote_journaler_replay(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (r < 0) {
+    derr << "failed to stop remote journaler replay : " << cpp_strerror(r)
+         << dendl;
+    handle_replay_error(r, "failed to stop remote journaler replay");
+  }
+
+  delete m_remote_replay_handler;
+  m_remote_replay_handler = nullptr;
+
+  shut_down_remote_journaler();
+}
+
+template <typename I>
+void Replayer<I>::shut_down_remote_journaler() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  dout(10) << dendl;
+  if (m_remote_listener != nullptr) {
+    m_remote_journaler->remove_listener(m_remote_listener);
+    delete m_remote_listener;
+    m_remote_listener = nullptr;
+  }
+
+  auto ctx = create_context_callback<
+    Replayer, &Replayer<I>::handle_shut_down_remote_journaler>(this);
+  m_remote_journaler->shut_down(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_shut_down_remote_journaler(int r) {
+  dout(10) << "r=" << r << dendl;
+  if (r < 0) {
+    derr << "failed to shut down remote journaler: " << cpp_strerror(r)
+         << dendl;
+
+    std::unique_lock locker{m_lock};
+    handle_replay_error(r, "failed to shut down remote journaler");
+  }
+
+  m_remote_journaler = nullptr;
+
+  wait_for_in_flight_ops();
+}
+
+template <typename I>
+void Replayer<I>::wait_for_in_flight_ops() {
+  dout(10) << dendl;
+
+  auto ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<
+      Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
+  m_in_flight_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
+  m_replay_status_formatter = nullptr;
+
+  ceph_assert(m_on_init_shutdown != nullptr);
+  m_on_init_shutdown->complete(m_error_code);
+}
+
+template <typename I>
+void Replayer<I>::handle_remote_journal_metadata_updated() {
+  dout(20) << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (m_state != STATE_REPLAYING) {
+    return;
+  }
+
+  cls::journal::Client remote_client;
+  int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid,
+                                                &remote_client);
+  if (r < 0) {
+    derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+    return;
+  }
+
+  librbd::journal::MirrorPeerClientMeta remote_client_meta;
+  std::string error;
+  r = validate_remote_client_state(remote_client, &remote_client_meta,
+                                   &m_resync_requested, &error);
+  if (r < 0) {
+    dout(0) << "client flagged disconnected, stopping image replay" << dendl;
+    handle_replay_complete(locker, r, error);
+  }
+}
+
+template <typename I>
+void Replayer<I>::schedule_flush_local_replay_task() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  std::unique_lock timer_locker{m_threads->timer_lock};
+  if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
+    return;
+  }
+
+  dout(15) << dendl;
+  m_flush_local_replay_task = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<
+      Replayer<I>, &Replayer<I>::handle_flush_local_replay_task>(this));
+  m_threads->timer->add_event_after(30, m_flush_local_replay_task);
+}
+
+template <typename I>
+void Replayer<I>::cancel_flush_local_replay_task() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  std::unique_lock timer_locker{m_threads->timer_lock};
+  if (m_flush_local_replay_task != nullptr) {
+    dout(10) << dendl;
+    m_threads->timer->cancel_event(m_flush_local_replay_task);
+    m_flush_local_replay_task = nullptr;
+  }
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_local_replay_task(int) {
+  dout(15) << dendl;
+
+  m_in_flight_op_tracker.start_op();
+  auto on_finish = new LambdaContext([this](int) {
+      std::unique_lock locker{m_lock};
+
+      {
+        std::unique_lock timer_locker{m_threads->timer_lock};
+        m_flush_local_replay_task = nullptr;
+      }
+
+      notify_status_updated();
+      m_in_flight_op_tracker.finish_op();
+    });
+  flush_local_replay(on_finish);
+}
+
+template <typename I>
+void Replayer<I>::flush_local_replay(Context* on_flush) {
+  std::unique_lock locker{m_lock};
+  if (m_state != STATE_REPLAYING) {
+    locker.unlock();
+    on_flush->complete(0);
+    return;
+  } else if (m_local_journal_replay == nullptr) {
+    // raced w/ a tag creation stop/start, which implies that
+    // the replay is flushed
+    locker.unlock();
+    flush_commit_position(on_flush);
+    return;
+  }
+
+  dout(15) << dendl;
+  auto ctx = new LambdaContext(
+    [this, on_flush](int r) {
+      handle_flush_local_replay(on_flush, r);
+    });
+  m_local_journal_replay->flush(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_local_replay(Context* on_flush, int r) {
+  dout(15) << "r=" << r << dendl;
+  if (r < 0) {
+    derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
+    on_flush->complete(r);
+    return;
+  }
+
+  flush_commit_position(on_flush);
+}
+
+template <typename I>
+void Replayer<I>::flush_commit_position(Context* on_flush) {
+  std::unique_lock locker{m_lock};
+  if (m_state != STATE_REPLAYING) {
+    locker.unlock();
+    on_flush->complete(0);
+    return;
+  }
+
+  dout(15) << dendl;
+  auto ctx = new LambdaContext(
+    [this, on_flush](int r) {
+      handle_flush_commit_position(on_flush, r);
+    });
+  m_remote_journaler->flush_commit_position(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_commit_position(Context* on_flush, int r) {
+  dout(15) << "r=" << r << dendl;
+  if (r < 0) {
+    derr << "error flushing remote journal commit position: "
+         << cpp_strerror(r) << dendl;
+  }
+
+  on_flush->complete(r);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_error(int r, const std::string &error) {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  if (m_error_code == 0) {
+    m_error_code = r;
+    m_error_description = error;
+  }
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_complete() const {
+  std::unique_lock locker{m_lock};
+  return is_replay_complete(locker);
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_complete(
+    const std::unique_lock<ceph::mutex>&) const {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  return (m_state == STATE_COMPLETE);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_complete(int r, const std::string &error) {
+  std::unique_lock locker{m_lock};
+  handle_replay_complete(locker, r, error);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_complete(
+    const std::unique_lock<ceph::mutex>&, int r, const std::string &error) {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  dout(10) << "r=" << r << ", error=" << error << dendl;
+  if (r < 0) {
+    derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
+    handle_replay_error(r, error);
+  }
+
+  if (m_state != STATE_REPLAYING) {
+    return;
+  }
+
+  m_state = STATE_COMPLETE;
+  notify_status_updated();
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_ready() {
+  std::unique_lock locker{m_lock};
+  handle_replay_ready(locker);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_ready(
+    std::unique_lock<ceph::mutex>& locker) {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  dout(20) << dendl;
+  if (is_replay_complete(locker)) {
+    return;
+  }
+
+  if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
+    dout(20) << "no entries ready for replay" << dendl;
+    return;
+  }
+
+  // can safely drop lock once the entry is tracked
+  m_event_replay_tracker.start_op();
+  locker.unlock();
+
+  dout(20) << "entry tid=" << m_replay_entry.get_commit_tid()
+           << "tag_tid=" << m_replay_tag_tid << dendl;
+  if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) {
+    // must allocate a new local journal tag prior to processing
+    replay_flush();
+    return;
+  }
+
+  preprocess_entry();
+}
+
+template <typename I>
+void Replayer<I>::replay_flush() {
+  dout(10) << dendl;
+
+  // shut down the replay to flush all IO and ops and create a new
+  // replayer to handle the new tag epoch
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_replay_flush_shut_down>(this);
+  ceph_assert(m_local_journal_replay != nullptr);
+  m_local_journal_replay->shut_down(false, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_flush_shut_down(int r) {
+  {
+    std::unique_lock locker{m_lock};
+    ceph_assert(m_local_journal != nullptr);
+    m_local_journal->stop_external_replay();
+    m_local_journal_replay = nullptr;
+  }
+
+  dout(10) << "r=" << r << dendl;
+  if (r < 0) {
+    handle_replay_flush(r);
+    return;
+  }
+
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_replay_flush>(this);
+  m_local_journal->start_external_replay(&m_local_journal_replay, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_flush(int r) {
+  dout(10) << "r=" << r << dendl;
+  if (r < 0) {
+    derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "replay flush encountered an error");
+    m_event_replay_tracker.finish_op();
+    return;
+  } else if (is_replay_complete()) {
+    m_event_replay_tracker.finish_op();
+    return;
+  }
+
+  get_remote_tag();
+}
+
+template <typename I>
+void Replayer<I>::get_remote_tag() {
+  dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
+
+  Context *ctx = create_context_callback<
+    Replayer, &Replayer<I>::handle_get_remote_tag>(this);
+  m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_get_remote_tag(int r) {
+  dout(15) << "r=" << r << dendl;
+
+  if (r == 0) {
+    try {
+      auto it = m_replay_tag.data.cbegin();
+      decode(m_replay_tag_data, it);
+    } catch (const buffer::error &err) {
+      r = -EBADMSG;
+    }
+  }
+
+  if (r < 0) {
+    derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
+         << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "failed to retrieve remote tag");
+    m_event_replay_tracker.finish_op();
+    return;
+  }
+
+  m_replay_tag_valid = true;
+  dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
+           << m_replay_tag_data << dendl;
+
+  allocate_local_tag();
+}
+
+template <typename I>
+void Replayer<I>::allocate_local_tag() {
+  dout(15) << dendl;
+
+  std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
+  if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+    mirror_uuid = m_remote_mirror_uuid;
+  } else if (mirror_uuid == m_local_mirror_uuid) {
+    mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
+  } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
+    // handle possible edge condition where daemon can failover and
+    // the local image has already been promoted/demoted
+    auto local_tag_data = m_local_journal->get_tag_data();
+    if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
+        (local_tag_data.predecessor.commit_valid &&
+         local_tag_data.predecessor.mirror_uuid ==
+           librbd::Journal<>::LOCAL_MIRROR_UUID)) {
+      dout(15) << "skipping stale demotion event" << dendl;
+      handle_process_entry_safe(m_replay_entry, m_replay_bytes,
+                                m_replay_start_time, 0);
+      handle_replay_ready();
+      return;
+    } else {
+      dout(5) << "encountered image demotion: stopping" << dendl;
+      handle_replay_complete(0, "");
+    }
+  }
+
+  librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
+  if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+    predecessor.mirror_uuid = m_remote_mirror_uuid;
+  } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
+    predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
+  }
+
+  dout(15) << "mirror_uuid=" << mirror_uuid << ", "
+           << "predecessor=" << predecessor << ", "
+           << "replay_tag_tid=" << m_replay_tag_tid << dendl;
+  Context *ctx = create_context_callback<
+    Replayer, &Replayer<I>::handle_allocate_local_tag>(this);
+  m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_allocate_local_tag(int r) {
+  dout(15) << "r=" << r << ", "
+           << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
+  if (r < 0) {
+    derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "failed to allocate journal tag");
+    m_event_replay_tracker.finish_op();
+    return;
+  }
+
+  preprocess_entry();
+}
+
+template <typename I>
+void Replayer<I>::preprocess_entry() {
+  dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
+           << dendl;
+
+  bufferlist data = m_replay_entry.get_data();
+  auto it = data.cbegin();
+  int r = m_local_journal_replay->decode(&it, &m_event_entry);
+  if (r < 0) {
+    derr << "failed to decode journal event" << dendl;
+    handle_replay_complete(r, "failed to decode journal event");
+    m_event_replay_tracker.finish_op();
+    return;
+  }
+
+  m_replay_bytes = data.length();
+  uint32_t delay = calculate_replay_delay(
+    m_event_entry.timestamp, (*m_local_image_ctx)->mirroring_replay_delay);
+  if (delay == 0) {
+    handle_preprocess_entry_ready(0);
+    return;
+  }
+
+  std::unique_lock locker{m_lock};
+  if (is_replay_complete(locker)) {
+    // don't schedule a delayed replay task if a shut-down is in-progress
+    m_event_replay_tracker.finish_op();
+    return;
+  }
+
+  dout(20) << "delaying replay by " << delay << " sec" << dendl;
+  std::unique_lock timer_locker{m_threads->timer_lock};
+  ceph_assert(m_delayed_preprocess_task == nullptr);
+  m_delayed_preprocess_task = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_delayed_preprocess_task>(this);
+  m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
+}
+
+template <typename I>
+void Replayer<I>::handle_delayed_preprocess_task(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+  m_delayed_preprocess_task = nullptr;
+
+  m_threads->work_queue->queue(create_context_callback<
+    Replayer, &Replayer<I>::handle_preprocess_entry_ready>(this), 0);
+}
+
+template <typename I>
+void Replayer<I>::handle_preprocess_entry_ready(int r) {
+  dout(20) << "r=" << r << dendl;
+  ceph_assert(r == 0);
+
+  m_replay_start_time = ceph_clock_now();
+  if (!m_event_preprocessor->is_required(m_event_entry)) {
+    process_entry();
+    return;
+  }
+
+  Context *ctx = create_context_callback<
+    Replayer, &Replayer<I>::handle_preprocess_entry_safe>(this);
+  m_event_preprocessor->preprocess(&m_event_entry, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_preprocess_entry_safe(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    if (r == -ECANCELED) {
+      handle_replay_complete(0, "lost exclusive lock");
+    } else {
+      derr << "failed to preprocess journal event" << dendl;
+      handle_replay_complete(r, "failed to preprocess journal event");
+    }
+
+    m_event_replay_tracker.finish_op();
+    return;
+  }
+
+  process_entry();
+}
+
+template <typename I>
+void Replayer<I>::process_entry() {
+  dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+           << dendl;
+
+  Context *on_ready = create_context_callback<
+    Replayer, &Replayer<I>::handle_process_entry_ready>(this);
+  Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
+                                             m_replay_bytes,
+                                             m_replay_start_time);
+
+  m_local_journal_replay->process(m_event_entry, on_ready, on_commit);
+}
+
+template <typename I>
+void Replayer<I>::handle_process_entry_ready(int r) {
+  std::unique_lock locker{m_lock};
+
+  dout(20) << dendl;
+  ceph_assert(r == 0);
+
+  bool update_status = false;
+  {
+    auto local_image_ctx = *m_local_image_ctx;
+    std::shared_lock image_locker{local_image_ctx->image_lock};
+    auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
+                                               local_image_ctx->name);
+    if (m_image_spec != image_spec) {
+      m_image_spec = image_spec;
+      update_status = true;
+    }
+  }
+
+  if (update_status) {
+    notify_status_updated();
+  }
+
+  // attempt to process the next event
+  handle_replay_ready(locker);
+}
+
+template <typename I>
+void Replayer<I>::handle_process_entry_safe(
+    const ReplayEntry &replay_entry, uint64_t replay_bytes,
+    const utime_t &replay_start_time, int r) {
+  dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
+           << dendl;
+
+  if (r < 0) {
+    derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "failed to commit journal event");
+  } else {
+    ceph_assert(m_remote_journaler != nullptr);
+    m_remote_journaler->committed(replay_entry);
+  }
+
+  auto latency = ceph_clock_now() - replay_start_time;
+  if (g_perf_counters) {
+    g_perf_counters->inc(l_rbd_mirror_replay);
+    g_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
+    g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
+  }
+
+  auto ctx = new LambdaContext(
+    [this, replay_bytes, latency](int r) {
+      std::unique_lock locker{m_lock};
+      schedule_flush_local_replay_task();
+
+      if (m_perf_counters) {
+        m_perf_counters->inc(l_rbd_mirror_replay);
+        m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
+        m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
+      }
+
+      m_event_replay_tracker.finish_op();
+    });
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::handle_resync_image() {
+  dout(10) << dendl;
+
+  std::unique_lock locker{m_lock};
+  m_resync_requested = true;
+  handle_replay_complete(locker, 0, "resync requested");
+}
+
+template <typename I>
+void Replayer<I>::notify_status_updated() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  dout(10) << dendl;
+
+  auto ctx = new C_TrackedOp(this, new LambdaContext(
+    [this](int) {
+      m_replayer_listener->handle_notification();
+    }));
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::cancel_delayed_preprocess_task() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  bool canceled_delayed_preprocess_task = false;
+  {
+    std::unique_lock timer_locker{m_threads->timer_lock};
+    if (m_delayed_preprocess_task != nullptr) {
+      dout(10) << dendl;
+      canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
+        m_delayed_preprocess_task);
+      ceph_assert(canceled_delayed_preprocess_task);
+      m_delayed_preprocess_task = nullptr;
+    }
+  }
+
+  if (canceled_delayed_preprocess_task) {
+    // wake up sleeping replay
+    m_event_replay_tracker.finish_op();
+  }
+}
+
+template <typename I>
+int Replayer<I>::validate_remote_client_state(
+    const cls::journal::Client& remote_client,
+    librbd::journal::MirrorPeerClientMeta* remote_client_meta,
+    bool* resync_requested, std::string* error) {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  if (!util::decode_client_meta(remote_client, remote_client_meta)) {
+    // require operator intervention since the data is corrupt
+    *error = "error retrieving remote journal client";
+    return -EBADMSG;
+  }
+
+  auto local_image_ctx = *m_local_image_ctx;
+  dout(5) << "image_id=" << local_image_ctx->id << ", "
+          << "remote_client_meta.image_id="
+          << remote_client_meta->image_id << ", "
+          << "remote_client.state=" << remote_client.state << dendl;
+  if (remote_client_meta->image_id == local_image_ctx->id &&
+      remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+    dout(5) << "client flagged disconnected, stopping image replay" << dendl;
+    if (local_image_ctx->config.template get_val<bool>(
+          "rbd_mirroring_resync_after_disconnect")) {
+      dout(10) << "disconnected: automatic resync" << dendl;
+      *resync_requested = true;
+      *error = "disconnected: automatic resync";
+      return -ENOTCONN;
+    } else {
+      dout(10) << "disconnected" << dendl;
+      *error = "disconnected";
+      return -ENOTCONN;
+    }
+  }
+
+  return 0;
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.h b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h
new file mode 100644 (file)
index 0000000..3b464c9
--- /dev/null
@@ -0,0 +1,305 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
+
+#include "tools/rbd_mirror/image_replayer/Replayer.h"
+#include "include/utime.h"
+#include "common/AsyncOpTracker.h"
+#include "common/ceph_mutex.h"
+#include "common/RefCountedObj.h"
+#include "cls/journal/cls_journal_types.h"
+#include "journal/ReplayEntry.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include <string>
+#include <type_traits>
+
+namespace journal { class Journaler; }
+namespace librbd {
+
+struct ImageCtx;
+namespace journal { template <typename I> class Replay; }
+
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <typename> struct Threads;
+
+namespace image_replayer {
+
+struct ReplayerListener;
+
+namespace journal {
+
+template <typename I> class EventPreprocessor;
+template <typename I> class ReplayStatusFormatter;
+
+template <typename ImageCtxT>
+class Replayer : public image_replayer::Replayer {
+public:
+  typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
+  Replayer(
+      ImageCtxT** local_image_ctx, Journaler* remote_journaler,
+      std::string m_local_mirror_uuid, std::string m_remote_mirror_uuid,
+      ReplayerListener* replayer_listener, Threads<ImageCtxT>* threads);
+  ~Replayer();
+
+  void init(Context* on_finish) override;
+  void shut_down(Context* on_finish) override;
+
+  void flush(Context* on_finish) override;
+
+  bool get_replay_status(std::string* description, Context* on_finish) override;
+
+  bool is_replaying() const override {
+    std::unique_lock locker{m_lock};
+    return (m_state == STATE_REPLAYING);
+  }
+
+  bool is_resync_requested() const override {
+    std::unique_lock locker(m_lock);
+    return m_resync_requested;
+  }
+
+  int get_error_code() const override {
+    std::unique_lock locker(m_lock);
+    return m_error_code;
+  }
+
+  std::string get_error_description() const override {
+    std::unique_lock locker(m_lock);
+    return m_error_description;
+  }
+
+  std::string get_image_spec() const {
+    std::unique_lock locker(m_lock);
+    return m_image_spec;
+  }
+
+private:
+  /**
+   * @verbatim
+   *
+   *  <init>
+   *    |
+   *    v                     (error)
+   * INIT_REMOTE_JOURNALER  * * * * * * * * * * * * * * * * * * *
+   *    |                                                       *
+   *    v                     (error)                           *
+   * START_EXTERNAL_REPLAY  * * * * * * * * * * * * * * * * * * *
+   *    |                                                       *
+   *    |  /--------------------------------------------\       *
+   *    |  |                                            |       *
+   *    v  v   (asok flush)                             |       *
+   * REPLAYING -------------> LOCAL_REPLAY_FLUSH        |       *
+   *    |       \                 |                     |       *
+   *    |       |                 v                     |       *
+   *    |       |             FLUSH_COMMIT_POSITION     |       *
+   *    |       |                 |                     |       *
+   *    |       |                 \--------------------/|       *
+   *    |       |                                       |       *
+   *    |       | (entries available)                   |       *
+   *    |       \-----------> REPLAY_READY              |       *
+   *    |                         |                     |       *
+   *    |                         | (skip if not        |       *
+   *    |                         v  needed)        (error)     *
+   *    |                     REPLAY_FLUSH  * * * * * * * * *   *
+   *    |                         |                     |   *   *
+   *    |                         | (skip if not        |   *   *
+   *    |                         v  needed)        (error) *   *
+   *    |                     GET_REMOTE_TAG  * * * * * * * *   *
+   *    |                         |                     |   *   *
+   *    |                         | (skip if not        |   *   *
+   *    |                         v  needed)        (error) *   *
+   *    |                     ALLOCATE_LOCAL_TAG  * * * * * *   *
+   *    |                         |                     |   *   *
+   *    |                         v                 (error) *   *
+   *    |                     PREPROCESS_ENTRY  * * * * * * *   *
+   *    |                         |                     |   *   *
+   *    |                         v                 (error) *   *
+   *    |                     PROCESS_ENTRY * * * * * * * * *   *
+   *    |                         |                     |   *   *
+   *    |                         \---------------------/   *   *
+   *    v (shutdown)                                        *   *
+   * REPLAY_COMPLETE  < * * * * * * * * * * * * * * * * * * *   *
+   *    |                                                       *
+   *    v                                                       *
+   * SHUT_DOWN_LOCAL_JOURNAL_REPLAY                             *
+   *    |                                                       *
+   *    v                                                       *
+   * WAIT_FOR_REPLAY                                            *
+   *    |                                                       *
+   *    v                                                       *
+   * CLOSE_LOCAL_IMAGE  < * * * * * * * * * * * * * * * * * * * *
+   *    |
+   *    v (skip if not started)
+   * STOP_REMOTE_JOURNALER_REPLAY
+   *    |
+   *    v (skip if not initialized)
+   * SHUT_DOWN_REMOTE_JOURNALER
+   *    |
+   *    v
+   * WAIT_FOR_IN_FLIGHT_OPS
+   *    |
+   *    v
+   * <shutdown>
+   *
+   * @endverbatim
+   */
+
+  typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
+
+  enum State {
+    STATE_INIT,
+    STATE_REPLAYING,
+    STATE_COMPLETE
+  };
+
+  struct C_ReplayCommitted;
+  struct C_TrackedOp;
+  struct RemoteJournalerListener;
+  struct RemoteReplayHandler;
+  struct LocalJournalListener;
+
+  ImageCtxT** m_local_image_ctx;
+  Journaler* m_remote_journaler;
+  std::string m_local_mirror_uuid;
+  std::string m_remote_mirror_uuid;
+  ReplayerListener* m_replayer_listener;
+  Threads<ImageCtxT>* m_threads;
+
+  mutable ceph::mutex m_lock;
+
+  std::string m_image_spec;
+  Context* m_on_init_shutdown = nullptr;
+
+  State m_state = STATE_INIT;
+  int m_error_code = 0;
+  std::string m_error_description;
+  bool m_resync_requested = false;
+
+  ceph::ref_t<typename std::remove_pointer<decltype(ImageCtxT::journal)>::type>
+    m_local_journal;
+  RemoteJournalerListener* m_remote_listener = nullptr;
+  librbd::journal::MirrorPeerClientMeta m_remote_client_meta;
+
+  librbd::journal::Replay<ImageCtxT>* m_local_journal_replay = nullptr;
+  EventPreprocessor<ImageCtxT>* m_event_preprocessor = nullptr;
+  ReplayStatusFormatter<ImageCtxT>* m_replay_status_formatter = nullptr;
+  RemoteReplayHandler* m_remote_replay_handler = nullptr;
+  LocalJournalListener* m_local_journal_listener = nullptr;
+
+  PerfCounters *m_perf_counters = nullptr;
+
+  ReplayEntry m_replay_entry;
+  uint64_t m_replay_bytes = 0;
+  utime_t m_replay_start_time;
+  bool m_replay_tag_valid = false;
+  uint64_t m_replay_tag_tid = 0;
+  cls::journal::Tag m_replay_tag;
+  librbd::journal::TagData m_replay_tag_data;
+  librbd::journal::EventEntry m_event_entry;
+
+  AsyncOpTracker m_event_replay_tracker;
+  Context *m_delayed_preprocess_task = nullptr;
+
+  AsyncOpTracker m_in_flight_op_tracker;
+  Context *m_flush_local_replay_task = nullptr;
+
+  void handle_remote_journal_metadata_updated();
+
+  void schedule_flush_local_replay_task();
+  void cancel_flush_local_replay_task();
+  void handle_flush_local_replay_task(int r);
+
+  void flush_local_replay(Context* on_flush);
+  void handle_flush_local_replay(Context* on_flush, int r);
+
+  void flush_commit_position(Context* on_flush);
+  void handle_flush_commit_position(Context* on_flush, int r);
+
+  void init_remote_journaler();
+  void handle_init_remote_journaler(int r);
+
+  void start_external_replay();
+  void handle_start_external_replay(int r);
+
+  bool notify_init_complete(std::unique_lock<ceph::mutex>& locker);
+
+  void shut_down_local_journal_replay();
+  void handle_shut_down_local_journal_replay(int r);
+
+  void wait_for_event_replay();
+  void handle_wait_for_event_replay(int r);
+
+  void close_local_image();
+  void handle_close_local_image(int r);
+
+  void stop_remote_journaler_replay();
+  void handle_stop_remote_journaler_replay(int r);
+
+  void shut_down_remote_journaler();
+  void handle_shut_down_remote_journaler(int r);
+
+  void wait_for_in_flight_ops();
+  void handle_wait_for_in_flight_ops(int r);
+
+  void replay_flush();
+  void handle_replay_flush_shut_down(int r);
+  void handle_replay_flush(int r);
+
+  void get_remote_tag();
+  void handle_get_remote_tag(int r);
+
+  void allocate_local_tag();
+  void handle_allocate_local_tag(int r);
+
+  void handle_replay_error(int r, const std::string &error);
+
+  bool is_replay_complete() const;
+  bool is_replay_complete(const std::unique_lock<ceph::mutex>& locker) const;
+
+  void handle_replay_complete(int r, const std::string &error_desc);
+  void handle_replay_complete(const std::unique_lock<ceph::mutex>&,
+                              int r, const std::string &error_desc);
+  void handle_replay_ready();
+  void handle_replay_ready(std::unique_lock<ceph::mutex>& locker);
+
+  void preprocess_entry();
+  void handle_delayed_preprocess_task(int r);
+  void handle_preprocess_entry_ready(int r);
+  void handle_preprocess_entry_safe(int r);
+
+  void process_entry();
+  void handle_process_entry_ready(int r);
+  void handle_process_entry_safe(const ReplayEntry& replay_entry,
+                                 uint64_t relay_bytes,
+                                 const utime_t &replay_start_time, int r);
+
+  void handle_resync_image();
+
+  void notify_status_updated();
+
+  void cancel_delayed_preprocess_task();
+
+  int validate_remote_client_state(
+      const cls::journal::Client& remote_client,
+      librbd::journal::MirrorPeerClientMeta* remote_client_meta,
+      bool* resync_requested, std::string* error);
+
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H