]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: switch image replayer to new standalone journal replayer 32257/head
authorJason Dillaman <dillaman@redhat.com>
Fri, 13 Dec 2019 15:54:58 +0000 (10:54 -0500)
committerJason Dillaman <dillaman@redhat.com>
Mon, 16 Dec 2019 01:15:26 +0000 (20:15 -0500)
Remove all the original journal replaying code embedded in the image
replayer and instead rely on the new journal replayer class.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_ImageReplayer.cc
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/image_replayer/Replayer.h
src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
src/tools/rbd_mirror/image_replayer/journal/Replayer.h

index c56fffc1d18c55be1b427233c47ac5309b04b3e4..cb21434357255fb76e9f562280147b1660d8ac26 100644 (file)
@@ -668,9 +668,9 @@ TEST_F(TestImageReplayer, Resync)
   flush(ictx);
   close_image(ictx);
 
-  C_SaferCond ctx;
-  m_replayer->resync_image(&ctx);
-  ASSERT_EQ(0, ctx.wait());
+  open_local_image(&ictx);
+  librbd::Journal<>::request_resync(ictx);
+  close_image(ictx);
 
   wait_for_stopped();
 
index 3b4d596ed55b08f6350a1e33528b3f84e4f45eee..f627d752d2deaa7b4ac09189d1dc3ba31381a289 100644 (file)
@@ -2,8 +2,8 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "cls/journal/cls_journal_types.h"
-#include "librbd/journal/Replay.h"
 #include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
 #include "tools/rbd_mirror/ImageDeleter.h"
 #include "tools/rbd_mirror/ImageReplayer.h"
 #include "tools/rbd_mirror/InstanceWatcher.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
-#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
-#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "test/journal/mock/MockJournaler.h"
 #include "test/librbd/mock/MockImageCtx.h"
-#include "test/librbd/mock/MockJournal.h"
 #include "test/rbd_mirror/mock/MockContextWQ.h"
 #include "test/rbd_mirror/mock/MockSafeTimer.h"
 
@@ -26,37 +27,19 @@ namespace librbd {
 
 namespace {
 
-struct MockTestJournal;
-
 struct MockTestImageCtx : public MockImageCtx {
   MockTestImageCtx(librbd::ImageCtx &image_ctx)
     : librbd::MockImageCtx(image_ctx) {
   }
-  MockTestJournal *journal = nullptr;
-};
-
-struct MockTestJournal : public MockJournal {
-  MOCK_METHOD2(start_external_replay, void(journal::Replay<MockTestImageCtx> **,
-                                           Context *on_start));
-  MOCK_METHOD0(stop_external_replay, void());
 };
 
 } // anonymous namespace
 
 namespace journal {
 
-template<>
-struct Replay<MockTestImageCtx> {
-  MOCK_METHOD2(decode, int(bufferlist::const_iterator *, EventEntry *));
-  MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *));
-  MOCK_METHOD1(flush, void(Context*));
-  MOCK_METHOD2(shut_down, void(bool, Context*));
-};
-
 template <>
 struct TypeTraits<MockTestImageCtx> {
   typedef ::journal::MockJournalerProxy Journaler;
-  typedef ::journal::MockReplayEntryProxy ReplayEntry;
 };
 
 struct MirrorPeerClientMeta;
@@ -277,64 +260,44 @@ PrepareRemoteImageRequest<librbd::MockTestImageCtx>* PrepareRemoteImageRequest<l
 
 namespace journal {
 
-template<>
-struct EventPreprocessor<librbd::MockTestImageCtx> {
-  static EventPreprocessor *s_instance;
-
-  static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx,
-                                   ::journal::MockJournalerProxy &remote_journaler,
-                                   const std::string &local_mirror_uuid,
-                                   librbd::journal::MirrorPeerClientMeta *client_meta,
-                                   MockContextWQ *work_queue) {
+template <>
+struct Replayer<librbd::MockTestImageCtx> : public image_replayer::Replayer {
+  static Replayer* s_instance;
+  librbd::MockTestImageCtx** local_image_ctx;
+  image_replayer::ReplayerListener* replayer_listener;
+
+  static Replayer* create(librbd::MockTestImageCtx** local_image_ctx,
+                          ::journal::MockJournalerProxy* remote_journaler,
+                          const std::string& local_mirror_uuid,
+                          const std::string& remote_mirror_uuid,
+                          image_replayer::ReplayerListener* replayer_listener,
+                          Threads<librbd::MockTestImageCtx>* threads) {
     ceph_assert(s_instance != nullptr);
+    ceph_assert(local_image_ctx != nullptr);
+    s_instance->local_image_ctx = local_image_ctx;
+    s_instance->replayer_listener = replayer_listener;
     return s_instance;
   }
 
-  static void destroy(EventPreprocessor* processor) {
-  }
-
-  EventPreprocessor() {
-    ceph_assert(s_instance == nullptr);
+  Replayer() {
     s_instance = this;
   }
 
-  ~EventPreprocessor() {
-    ceph_assert(s_instance == this);
-    s_instance = nullptr;
-  }
-
-  MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &));
-  MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
-};
+  MOCK_METHOD0(destroy, void());
 
-template<>
-struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
-  static ReplayStatusFormatter* s_instance;
-
-  static ReplayStatusFormatter* create(::journal::MockJournalerProxy *journaler,
-                                       const std::string &mirror_uuid) {
-    ceph_assert(s_instance != nullptr);
-    return s_instance;
-  }
-
-  static void destroy(ReplayStatusFormatter* formatter) {
-  }
-
-  ReplayStatusFormatter() {
-    ceph_assert(s_instance == nullptr);
-    s_instance = this;
-  }
+  MOCK_METHOD1(init, void(Context*));
+  MOCK_METHOD1(shut_down, void(Context*));
+  MOCK_METHOD1(flush, void(Context*));
 
-  ~ReplayStatusFormatter() {
-    ceph_assert(s_instance == this);
-    s_instance = nullptr;
-  }
+  MOCK_METHOD2(get_replay_status, bool(std::string*, Context*));
 
-  MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
+  MOCK_CONST_METHOD0(is_replaying, bool());
+  MOCK_CONST_METHOD0(is_resync_requested, bool());
+  MOCK_CONST_METHOD0(get_error_code, int());
+  MOCK_CONST_METHOD0(get_error_description, std::string());
 };
 
-EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
-ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
+Replayer<librbd::MockTestImageCtx>* Replayer<librbd::MockTestImageCtx>::s_instance = nullptr;
 
 } // namespace journal
 } // namespace image_replayer
@@ -367,9 +330,7 @@ public:
   typedef image_replayer::CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
   typedef image_replayer::PrepareLocalImageRequest<librbd::MockTestImageCtx> MockPrepareLocalImageRequest;
   typedef image_replayer::PrepareRemoteImageRequest<librbd::MockTestImageCtx> MockPrepareRemoteImageRequest;
-  typedef image_replayer::journal::EventPreprocessor<librbd::MockTestImageCtx> MockEventPreprocessor;
-  typedef image_replayer::journal::ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
-  typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
+  typedef image_replayer::journal::Replayer<librbd::MockTestImageCtx> MockJournalReplayer;
   typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
   typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
 
@@ -414,18 +375,6 @@ public:
         }));
   }
 
-  void expect_flush_repeatedly(MockReplay& mock_replay,
-                               journal::MockJournaler& mock_journal) {
-    EXPECT_CALL(mock_replay, flush(_))
-      .WillRepeatedly(Invoke([this](Context* ctx) {
-                        m_threads->work_queue->queue(ctx, 0);
-                      }));
-    EXPECT_CALL(mock_journal, flush_commit_position(_))
-      .WillRepeatedly(Invoke([this](Context* ctx) {
-                        m_threads->work_queue->queue(ctx, 0);
-                      }));
-  }
-
   void expect_trash_move(MockImageDeleter& mock_image_deleter,
                          const std::string& global_image_id,
                          bool ignore_orphan, int r) {
@@ -442,13 +391,6 @@ public:
     return bl;
   }
 
-  void expect_get_or_send_update(
-    MockReplayStatusFormatter &mock_replay_status_formatter) {
-    EXPECT_CALL(mock_replay_status_formatter, get_or_send_update(_, _))
-      .WillRepeatedly(DoAll(WithArg<1>(CompleteContext(-EEXIST)),
-                            Return(true)));
-  }
-
   void expect_send(MockPrepareLocalImageRequest &mock_request,
                    const std::string &local_image_id,
                    const std::string &local_image_name,
@@ -494,51 +436,29 @@ public:
           }));
   }
 
-  void expect_start_external_replay(librbd::MockTestJournal &mock_journal,
-                                    MockReplay *mock_replay, int r) {
-    EXPECT_CALL(mock_journal, start_external_replay(_, _))
-      .WillOnce(DoAll(SetArgPointee<0>(mock_replay),
-                      WithArg<1>(CompleteContext(r))));
-  }
-
-  void expect_init(::journal::MockJournaler &mock_journaler, int r) {
-    EXPECT_CALL(mock_journaler, init(_))
-      .WillOnce(CompleteContext(r));
-  }
-
-  void expect_get_cached_client(::journal::MockJournaler &mock_journaler,
-                                int r) {
-    librbd::journal::ImageClientMeta image_client_meta;
-    image_client_meta.tag_class = 0;
-
-    librbd::journal::ClientData client_data;
-    client_data.client_meta = image_client_meta;
-
-    cls::journal::Client client;
-    encode(client_data, client.data);
-
-    EXPECT_CALL(mock_journaler, get_cached_client("local_mirror_uuid", _))
-      .WillOnce(DoAll(SetArgPointee<1>(client),
-                      Return(r)));
-  }
-
-  void expect_stop_replay(::journal::MockJournaler &mock_journaler, int r) {
-    EXPECT_CALL(mock_journaler, stop_replay(_))
-      .WillOnce(CompleteContext(r));
-  }
-
-  void expect_flush(MockReplay &mock_replay, int r) {
-    EXPECT_CALL(mock_replay, flush(_)).WillOnce(CompleteContext(r));
+  void expect_init(MockJournalReplayer& mock_journal_replayer, int r) {
+    EXPECT_CALL(mock_journal_replayer, init(_))
+      .WillOnce(Invoke([this, &mock_journal_replayer, r](Context* ctx) {
+                  if (r < 0) {
+                    *mock_journal_replayer.local_image_ctx = nullptr;
+                  }
+                  m_threads->work_queue->queue(ctx, r);
+                }));
   }
 
-  void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) {
-    EXPECT_CALL(mock_replay, shut_down(cancel_ops, _))
-      .WillOnce(WithArg<1>(CompleteContext(r)));
+  void expect_shut_down(MockJournalReplayer& mock_journal_replayer, int r) {
+    EXPECT_CALL(mock_journal_replayer, shut_down(_))
+      .WillOnce(Invoke([this, &mock_journal_replayer, r](Context* ctx) {
+                  *mock_journal_replayer.local_image_ctx = nullptr;
+                  m_threads->work_queue->queue(ctx, r);
+                }));
+    EXPECT_CALL(mock_journal_replayer, destroy());
   }
 
-  void expect_shut_down(journal::MockJournaler &mock_journaler, int r) {
-    EXPECT_CALL(mock_journaler, shut_down(_))
-      .WillOnce(CompleteContext(r));
+  void expect_get_replay_status(MockJournalReplayer& mock_journal_replayer) {
+    EXPECT_CALL(mock_journal_replayer, get_replay_status(_, _))
+      .WillRepeatedly(DoAll(WithArg<1>(CompleteContext(-EEXIST)),
+                            Return(true)));
   }
 
   void expect_send(MockCloseImageRequest &mock_close_image_request, int r) {
@@ -549,75 +469,6 @@ public:
           }));
   }
 
-  void expect_get_commit_tid_in_debug(
-    ::journal::MockReplayEntry &mock_replay_entry) {
-    // It is used in debug messages and depends on debug level
-    EXPECT_CALL(mock_replay_entry, get_commit_tid())
-    .Times(AtLeast(0))
-      .WillRepeatedly(Return(0));
-  }
-
-  void expect_get_tag_tid_in_debug(librbd::MockTestJournal &mock_journal) {
-    // It is used in debug messages and depends on debug level
-    EXPECT_CALL(mock_journal, get_tag_tid()).Times(AtLeast(0))
-      .WillRepeatedly(Return(0));
-  }
-
-  void expect_committed(::journal::MockReplayEntry &mock_replay_entry,
-                        ::journal::MockJournaler &mock_journaler, int times) {
-    EXPECT_CALL(mock_replay_entry, get_data()).Times(times);
-    EXPECT_CALL(mock_journaler, committed(
-                  MatcherCast<const ::journal::MockReplayEntryProxy&>(_)))
-      .Times(times);
-  }
-
-  void expect_try_pop_front(::journal::MockJournaler &mock_journaler,
-                            uint64_t replay_tag_tid, bool entries_available) {
-    EXPECT_CALL(mock_journaler, try_pop_front(_, _))
-      .WillOnce(DoAll(SetArgPointee<0>(::journal::MockReplayEntryProxy()),
-                      SetArgPointee<1>(replay_tag_tid),
-                      Return(entries_available)));
-  }
-
-  void expect_try_pop_front_return_no_entries(
-    ::journal::MockJournaler &mock_journaler, Context *on_finish) {
-    EXPECT_CALL(mock_journaler, try_pop_front(_, _))
-      .WillOnce(DoAll(Invoke([on_finish](::journal::MockReplayEntryProxy *e,
-                                        uint64_t *t) {
-                              on_finish->complete(0);
-                            }),
-                     Return(false)));
-  }
-
-  void expect_get_tag(::journal::MockJournaler &mock_journaler,
-                      const cls::journal::Tag &tag, int r) {
-    EXPECT_CALL(mock_journaler, get_tag(_, _, _))
-      .WillOnce(DoAll(SetArgPointee<1>(tag),
-                      WithArg<2>(CompleteContext(r))));
-  }
-
-  void expect_allocate_tag(librbd::MockTestJournal &mock_journal, int r) {
-    EXPECT_CALL(mock_journal, allocate_tag(_, _, _))
-      .WillOnce(WithArg<2>(CompleteContext(r)));
-  }
-
-  void expect_preprocess(MockEventPreprocessor &mock_event_preprocessor,
-                         bool required, int r) {
-    EXPECT_CALL(mock_event_preprocessor, is_required(_))
-      .WillOnce(Return(required));
-    if (required) {
-      EXPECT_CALL(mock_event_preprocessor, preprocess(_, _))
-        .WillOnce(WithArg<1>(CompleteContext(r)));
-    }
-  }
-
-  void expect_process(MockReplay &mock_replay,
-                      int on_ready_r, int on_commit_r) {
-    EXPECT_CALL(mock_replay, process(_, _, _))
-      .WillOnce(DoAll(WithArg<1>(CompleteContext(on_ready_r)),
-                      WithArg<2>(CompleteContext(on_commit_r))));
-  }
-
   void expect_set_mirror_image_status_repeatedly() {
     EXPECT_CALL(m_local_status_updater, set_mirror_image_status(_, _, _))
       .WillRepeatedly(Invoke([](auto, auto, auto){}));
@@ -640,6 +491,16 @@ public:
                                &m_remote_status_updater);
   }
 
+  void wait_for_stopped() {
+    for (int i = 0; i < 10000; i++) {
+      if (m_image_replayer->is_stopped()) {
+        break;
+      }
+      usleep(1000);
+    }
+    ASSERT_TRUE(m_image_replayer->is_stopped());
+  }
+
   librbd::ImageCtx *m_remote_image_ctx;
   librbd::ImageCtx *m_local_image_ctx = nullptr;
   MockInstanceWatcher m_instance_watcher;
@@ -654,9 +515,6 @@ TEST_F(TestMockImageReplayer, StartStop) {
   create_local_image();
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
 
-  librbd::MockTestJournal mock_local_journal;
-  mock_local_image_ctx.journal = &mock_local_journal;
-
   journal::MockJournaler mock_remote_journaler;
   MockThreads mock_threads(m_threads);
   expect_work_queue_repeatedly(mock_threads);
@@ -666,13 +524,10 @@ TEST_F(TestMockImageReplayer, StartStop) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplay mock_local_replay;
-  MockEventPreprocessor mock_event_preprocessor;
-  MockReplayStatusFormatter mock_replay_status_formatter;
+  MockJournalReplayer mock_journal_replayer;
 
+  expect_get_replay_status(mock_journal_replayer);
   expect_set_mirror_image_status_repeatedly();
-  expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -681,17 +536,7 @@ TEST_F(TestMockImageReplayer, StartStop) {
               m_remote_image_ctx->id, 0);
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
-  EXPECT_CALL(mock_local_journal, add_listener(_));
-
-  expect_init(mock_remote_journaler, 0);
-
-  EXPECT_CALL(mock_remote_journaler, add_listener(_));
-  expect_get_cached_client(mock_remote_journaler, 0);
-
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
-  EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+  expect_init(mock_journal_replayer, 0);
 
   create_image_replayer(mock_threads);
 
@@ -702,17 +547,7 @@ TEST_F(TestMockImageReplayer, StartStop) {
             m_image_replayer->get_health_state());
 
   // STOP
-
-  MockCloseImageRequest mock_close_local_image_request;
-
-  expect_shut_down(mock_local_replay, true, 0);
-  EXPECT_CALL(mock_local_journal, remove_listener(_));
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-  expect_send(mock_close_local_image_request, 0);
-
-  expect_stop_replay(mock_remote_journaler, 0);
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
+  expect_shut_down(mock_journal_replayer, 0);
   expect_mirror_image_status_exists(false);
 
   C_SaferCond stop_ctx;
@@ -734,10 +569,8 @@ TEST_F(TestMockImageReplayer, LocalImagePrimary) {
   MockImageDeleter mock_image_deleter;
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -745,8 +578,6 @@ TEST_F(TestMockImageReplayer, LocalImagePrimary) {
   expect_send(mock_prepare_remote_image_request, "remote mirror uuid",
               "remote image id", 0);
   EXPECT_CALL(mock_remote_journaler, construct());
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
   expect_mirror_image_status_exists(false);
 
   create_image_replayer(mock_threads);
@@ -769,10 +600,8 @@ TEST_F(TestMockImageReplayer, LocalImageDNE) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, "", "", "", -ENOENT);
@@ -781,8 +610,6 @@ TEST_F(TestMockImageReplayer, LocalImageDNE) {
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, -EREMOTEIO);
 
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
   expect_mirror_image_status_exists(false);
 
   create_image_replayer(mock_threads);
@@ -802,11 +629,9 @@ TEST_F(TestMockImageReplayer, PrepareLocalImageError) {
 
   MockImageDeleter mock_image_deleter;
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   EXPECT_CALL(m_local_status_updater, set_mirror_image_status(_, _, _))
     .WillRepeatedly(Invoke([](auto, auto, auto){}));
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -832,10 +657,8 @@ TEST_F(TestMockImageReplayer, GetRemoteImageIdDNE) {
   MockImageDeleter mock_image_deleter;
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -863,10 +686,8 @@ TEST_F(TestMockImageReplayer, GetRemoteImageIdNonLinkedDNE) {
   MockImageDeleter mock_image_deleter;
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -893,10 +714,8 @@ TEST_F(TestMockImageReplayer, GetRemoteImageIdError) {
   MockImageDeleter mock_image_deleter;
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -925,10 +744,8 @@ TEST_F(TestMockImageReplayer, BootstrapError) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -938,8 +755,6 @@ TEST_F(TestMockImageReplayer, BootstrapError) {
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, -EINVAL);
 
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
   expect_mirror_image_status_exists(false);
 
   create_image_replayer(mock_threads);
@@ -961,10 +776,8 @@ TEST_F(TestMockImageReplayer, StopBeforeBootstrap) {
   MockImageDeleter mock_image_deleter;
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
-  MockReplayStatusFormatter mock_replay_status_formatter;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -976,8 +789,6 @@ TEST_F(TestMockImageReplayer, StopBeforeBootstrap) {
                 m_image_replayer->stop(nullptr, true);
               }));
 
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
   expect_mirror_image_status_exists(false);
 
   create_image_replayer(mock_threads);
@@ -987,15 +798,12 @@ TEST_F(TestMockImageReplayer, StopBeforeBootstrap) {
   ASSERT_EQ(-ECANCELED, start_ctx.wait());
 }
 
-TEST_F(TestMockImageReplayer, StartExternalReplayError) {
+TEST_F(TestMockImageReplayer, StopError) {
   // START
 
   create_local_image();
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
 
-  librbd::MockTestJournal mock_local_journal;
-  mock_local_image_ctx.journal = &mock_local_journal;
-
   journal::MockJournaler mock_remote_journaler;
   MockThreads mock_threads(m_threads);
   expect_work_queue_repeatedly(mock_threads);
@@ -1005,12 +813,10 @@ TEST_F(TestMockImageReplayer, StartExternalReplayError) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplay mock_local_replay;
-  MockEventPreprocessor mock_event_preprocessor;
-  MockReplayStatusFormatter mock_replay_status_formatter;
+  MockJournalReplayer mock_journal_replayer;
 
+  expect_get_replay_status(mock_journal_replayer);
   expect_set_mirror_image_status_repeatedly();
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -1019,42 +825,28 @@ TEST_F(TestMockImageReplayer, StartExternalReplayError) {
               m_remote_image_ctx->id, 0);
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
+  expect_init(mock_journal_replayer, 0);
 
-  EXPECT_CALL(mock_local_journal, add_listener(_));
-
-  expect_init(mock_remote_journaler, 0);
-
-  EXPECT_CALL(mock_remote_journaler, add_listener(_));
-  expect_get_cached_client(mock_remote_journaler, 0);
+  create_image_replayer(mock_threads);
 
-  expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
+  C_SaferCond start_ctx;
+  m_image_replayer->start(&start_ctx);
+  ASSERT_EQ(0, start_ctx.wait());
 
-  MockCloseImageRequest mock_close_local_image_request;
-  EXPECT_CALL(mock_local_journal, remove_listener(_));
-  expect_send(mock_close_local_image_request, 0);
+  // STOP (errors are ignored)
 
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
+  expect_shut_down(mock_journal_replayer, -EINVAL);
   expect_mirror_image_status_exists(false);
 
-  create_image_replayer(mock_threads);
-
-  C_SaferCond start_ctx;
-  m_image_replayer->start(&start_ctx);
-  ASSERT_EQ(-EINVAL, start_ctx.wait());
-  ASSERT_EQ(image_replayer::HEALTH_STATE_ERROR,
-            m_image_replayer->get_health_state());
+  C_SaferCond stop_ctx;
+  m_image_replayer->stop(&stop_ctx);
+  ASSERT_EQ(0, stop_ctx.wait());
 }
 
-TEST_F(TestMockImageReplayer, StopError) {
-  // START
-
+TEST_F(TestMockImageReplayer, ReplayerError) {
   create_local_image();
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
 
-  librbd::MockTestJournal mock_local_journal;
-  mock_local_image_ctx.journal = &mock_local_journal;
-
   journal::MockJournaler mock_remote_journaler;
   MockThreads mock_threads(m_threads);
   expect_work_queue_repeatedly(mock_threads);
@@ -1064,13 +856,9 @@ TEST_F(TestMockImageReplayer, StopError) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplay mock_local_replay;
-  MockEventPreprocessor mock_event_preprocessor;
-  MockReplayStatusFormatter mock_replay_status_formatter;
+  MockJournalReplayer mock_journal_replayer;
 
   expect_set_mirror_image_status_repeatedly();
-  expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
-  expect_get_or_send_update(mock_replay_status_formatter);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -1079,52 +867,24 @@ TEST_F(TestMockImageReplayer, StopError) {
               m_remote_image_ctx->id, 0);
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
+  expect_init(mock_journal_replayer, -EINVAL);
+  EXPECT_CALL(mock_journal_replayer, get_error_description())
+    .WillOnce(Return("FAIL"));
+  EXPECT_CALL(mock_journal_replayer, destroy());
 
-  EXPECT_CALL(mock_local_journal, add_listener(_));
-
-  expect_init(mock_remote_journaler, 0);
-
-  EXPECT_CALL(mock_remote_journaler, add_listener(_));
-  expect_get_cached_client(mock_remote_journaler, 0);
-
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
-  EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
-
+  expect_mirror_image_status_exists(false);
   create_image_replayer(mock_threads);
 
   C_SaferCond start_ctx;
   m_image_replayer->start(&start_ctx);
-  ASSERT_EQ(0, start_ctx.wait());
-
-  // STOP (errors are ignored)
-
-  MockCloseImageRequest mock_close_local_image_request;
-
-  expect_shut_down(mock_local_replay, true, -EINVAL);
-  EXPECT_CALL(mock_local_journal, remove_listener(_));
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-  expect_send(mock_close_local_image_request, -EINVAL);
-
-  expect_stop_replay(mock_remote_journaler, -EINVAL);
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, -EINVAL);
-  expect_mirror_image_status_exists(false);
-
-  C_SaferCond stop_ctx;
-  m_image_replayer->stop(&stop_ctx);
-  ASSERT_EQ(0, stop_ctx.wait());
+  ASSERT_EQ(-EINVAL, start_ctx.wait());
 }
 
-TEST_F(TestMockImageReplayer, Replay) {
+TEST_F(TestMockImageReplayer, ReplayerResync) {
   // START
-
   create_local_image();
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
 
-  librbd::MockTestJournal mock_local_journal;
-  mock_local_image_ctx.journal = &mock_local_journal;
-
   journal::MockJournaler mock_remote_journaler;
   MockThreads mock_threads(m_threads);
   expect_work_queue_repeatedly(mock_threads);
@@ -1134,17 +894,10 @@ TEST_F(TestMockImageReplayer, Replay) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplay mock_local_replay;
-  MockEventPreprocessor mock_event_preprocessor;
-  MockReplayStatusFormatter mock_replay_status_formatter;
-  ::journal::MockReplayEntry mock_replay_entry;
+  MockJournalReplayer mock_journal_replayer;
 
+  expect_get_replay_status(mock_journal_replayer);
   expect_set_mirror_image_status_repeatedly();
-  expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
-  expect_get_or_send_update(mock_replay_status_formatter);
-  expect_get_commit_tid_in_debug(mock_replay_entry);
-  expect_get_tag_tid_in_debug(mock_local_journal);
-  expect_committed(mock_replay_entry, mock_remote_journaler, 2);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -1153,17 +906,7 @@ TEST_F(TestMockImageReplayer, Replay) {
               m_remote_image_ctx->id, 0);
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
-  EXPECT_CALL(mock_local_journal, add_listener(_));
-
-  expect_init(mock_remote_journaler, 0);
-
-  EXPECT_CALL(mock_remote_journaler, add_listener(_));
-  expect_get_cached_client(mock_remote_journaler, 0);
-
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
-  EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+  expect_init(mock_journal_replayer, 0);
 
   create_image_replayer(mock_threads);
 
@@ -1171,72 +914,23 @@ TEST_F(TestMockImageReplayer, Replay) {
   m_image_replayer->start(&start_ctx);
   ASSERT_EQ(0, start_ctx.wait());
 
-  // REPLAY
-
-  cls::journal::Tag tag =
-    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
-                            librbd::Journal<>::LOCAL_MIRROR_UUID,
-                            true, 0, 0})};
-
-  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-
-  // replay_flush
-  expect_shut_down(mock_local_replay, false, 0);
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-  expect_get_tag(mock_remote_journaler, tag, 0);
-  expect_allocate_tag(mock_local_journal, 0);
-
-  // process
-  EXPECT_CALL(mock_replay_entry, get_data());
-  EXPECT_CALL(mock_local_replay, decode(_, _))
-    .WillOnce(Return(0));
-  expect_preprocess(mock_event_preprocessor, false, 0);
-  expect_process(mock_local_replay, 0, 0);
-
-  // the next event with preprocess
-  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-  EXPECT_CALL(mock_replay_entry, get_data());
-  EXPECT_CALL(mock_local_replay, decode(_, _))
-    .WillOnce(Return(0));
-  expect_preprocess(mock_event_preprocessor, true, 0);
-  expect_process(mock_local_replay, 0, 0);
-
-  // attempt to process the next event
-  C_SaferCond replay_ctx;
-  expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
-
-  // fire
-  m_image_replayer->handle_replay_ready();
-  ASSERT_EQ(0, replay_ctx.wait());
-
-  // STOP
-
-  MockCloseImageRequest mock_close_local_image_request;
-  expect_shut_down(mock_local_replay, true, 0);
-  EXPECT_CALL(mock_local_journal, remove_listener(_));
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-  expect_send(mock_close_local_image_request, 0);
-
-  expect_stop_replay(mock_remote_journaler, 0);
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
+  // NOTIFY
+  EXPECT_CALL(mock_journal_replayer, is_resync_requested())
+    .WillOnce(Return(true));
+  expect_shut_down(mock_journal_replayer, 0);
+  expect_trash_move(mock_image_deleter, "global image id", true, 0);
   expect_mirror_image_status_exists(false);
+  mock_journal_replayer.replayer_listener->handle_notification();
+  ASSERT_FALSE(m_image_replayer->is_running());
 
-  C_SaferCond stop_ctx;
-  m_image_replayer->stop(&stop_ctx);
-  ASSERT_EQ(0, stop_ctx.wait());
+  wait_for_stopped();
 }
 
-TEST_F(TestMockImageReplayer, DecodeError) {
+TEST_F(TestMockImageReplayer, ReplayerInterrupted) {
   // START
-
   create_local_image();
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
 
-  librbd::MockTestJournal mock_local_journal;
-  mock_local_image_ctx.journal = &mock_local_journal;
-
   journal::MockJournaler mock_remote_journaler;
   MockThreads mock_threads(m_threads);
   expect_work_queue_repeatedly(mock_threads);
@@ -1246,16 +940,10 @@ TEST_F(TestMockImageReplayer, DecodeError) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplay mock_local_replay;
-  MockEventPreprocessor mock_event_preprocessor;
-  MockReplayStatusFormatter mock_replay_status_formatter;
-  ::journal::MockReplayEntry mock_replay_entry;
+  MockJournalReplayer mock_journal_replayer;
 
+  expect_get_replay_status(mock_journal_replayer);
   expect_set_mirror_image_status_repeatedly();
-  expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
-  expect_get_or_send_update(mock_replay_status_formatter);
-  expect_get_commit_tid_in_debug(mock_replay_entry);
-  expect_get_tag_tid_in_debug(mock_local_journal);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -1264,17 +952,7 @@ TEST_F(TestMockImageReplayer, DecodeError) {
               m_remote_image_ctx->id, 0);
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
-  EXPECT_CALL(mock_local_journal, add_listener(_));
-
-  expect_init(mock_remote_journaler, 0);
-
-  EXPECT_CALL(mock_remote_journaler, add_listener(_));
-  expect_get_cached_client(mock_remote_journaler, 0);
-
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
-  EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+  expect_init(mock_journal_replayer, 0);
 
   create_image_replayer(mock_threads);
 
@@ -1282,65 +960,28 @@ TEST_F(TestMockImageReplayer, DecodeError) {
   m_image_replayer->start(&start_ctx);
   ASSERT_EQ(0, start_ctx.wait());
 
-  // REPLAY
-
-  cls::journal::Tag tag =
-    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
-                            librbd::Journal<>::LOCAL_MIRROR_UUID,
-                            true, 0, 0})};
-
-  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-
-  // replay_flush
-  expect_shut_down(mock_local_replay, false, 0);
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-  expect_get_tag(mock_remote_journaler, tag, 0);
-  expect_allocate_tag(mock_local_journal, 0);
-
-  // process
-  EXPECT_CALL(mock_replay_entry, get_data());
-  EXPECT_CALL(mock_local_replay, decode(_, _))
+  // NOTIFY
+  EXPECT_CALL(mock_journal_replayer, is_resync_requested())
+    .WillOnce(Return(false));
+  EXPECT_CALL(mock_journal_replayer, is_replaying())
+    .WillOnce(Return(false));
+  EXPECT_CALL(mock_journal_replayer, get_error_description())
+    .WillOnce(Return("INVALID"));
+  EXPECT_CALL(mock_journal_replayer, get_error_code())
     .WillOnce(Return(-EINVAL));
-
-  // stop on error
-  expect_shut_down(mock_local_replay, true, 0);
-  EXPECT_CALL(mock_local_journal, remove_listener(_));
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-
-  MockCloseImageRequest mock_close_local_image_request;
-  C_SaferCond close_ctx;
-  EXPECT_CALL(mock_close_local_image_request, send())
-    .WillOnce(Invoke([&mock_close_local_image_request, &close_ctx]() {
-         *mock_close_local_image_request.image_ctx = nullptr;
-         mock_close_local_image_request.on_finish->complete(0);
-         close_ctx.complete(0);
-       }));
-
-  expect_stop_replay(mock_remote_journaler, 0);
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
+  expect_shut_down(mock_journal_replayer, 0);
   expect_mirror_image_status_exists(false);
+  mock_journal_replayer.replayer_listener->handle_notification();
+  ASSERT_FALSE(m_image_replayer->is_running());
 
-  // fire
-  m_image_replayer->handle_replay_ready();
-  ASSERT_EQ(0, close_ctx.wait());
-
-  while (!m_image_replayer->is_stopped()) {
-    usleep(1000);
-  }
+  wait_for_stopped();
 }
 
-TEST_F(TestMockImageReplayer, DelayedReplay) {
-
+TEST_F(TestMockImageReplayer, ReplayerRenamed) {
   // START
-
   create_local_image();
   librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
 
-  librbd::MockTestJournal mock_local_journal;
-  mock_local_image_ctx.journal = &mock_local_journal;
-
   journal::MockJournaler mock_remote_journaler;
   MockThreads mock_threads(m_threads);
   expect_work_queue_repeatedly(mock_threads);
@@ -1350,17 +991,10 @@ TEST_F(TestMockImageReplayer, DelayedReplay) {
   MockPrepareLocalImageRequest mock_prepare_local_image_request;
   MockPrepareRemoteImageRequest mock_prepare_remote_image_request;
   MockBootstrapRequest mock_bootstrap_request;
-  MockReplay mock_local_replay;
-  MockEventPreprocessor mock_event_preprocessor;
-  MockReplayStatusFormatter mock_replay_status_formatter;
-  ::journal::MockReplayEntry mock_replay_entry;
+  MockJournalReplayer mock_journal_replayer;
 
+  expect_get_replay_status(mock_journal_replayer);
   expect_set_mirror_image_status_repeatedly();
-  expect_flush_repeatedly(mock_local_replay, mock_remote_journaler);
-  expect_get_or_send_update(mock_replay_status_formatter);
-  expect_get_commit_tid_in_debug(mock_replay_entry);
-  expect_get_tag_tid_in_debug(mock_local_journal);
-  expect_committed(mock_replay_entry, mock_remote_journaler, 1);
 
   InSequence seq;
   expect_send(mock_prepare_local_image_request, mock_local_image_ctx.id,
@@ -1369,17 +1003,7 @@ TEST_F(TestMockImageReplayer, DelayedReplay) {
               m_remote_image_ctx->id, 0);
   EXPECT_CALL(mock_remote_journaler, construct());
   expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
-
-  EXPECT_CALL(mock_local_journal, add_listener(_));
-
-  expect_init(mock_remote_journaler, 0);
-
-  EXPECT_CALL(mock_remote_journaler, add_listener(_));
-  expect_get_cached_client(mock_remote_journaler, 0);
-
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-
-  EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+  expect_init(mock_journal_replayer, 0);
 
   create_image_replayer(mock_threads);
 
@@ -1387,75 +1011,26 @@ TEST_F(TestMockImageReplayer, DelayedReplay) {
   m_image_replayer->start(&start_ctx);
   ASSERT_EQ(0, start_ctx.wait());
 
-  // REPLAY
-
-  cls::journal::Tag tag =
-    {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
-                            librbd::Journal<>::LOCAL_MIRROR_UUID,
-                            true, 0, 0})};
-
-  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-
-  // replay_flush
-  expect_shut_down(mock_local_replay, false, 0);
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-  expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
-  expect_get_tag(mock_remote_journaler, tag, 0);
-  expect_allocate_tag(mock_local_journal, 0);
-
-  // process with delay
-  EXPECT_CALL(mock_replay_entry, get_data());
-  librbd::journal::EventEntry event_entry(
-    librbd::journal::AioDiscardEvent(123, 345, 0), ceph_clock_now());
-  EXPECT_CALL(mock_local_replay, decode(_, _))
-    .WillOnce(DoAll(SetArgPointee<1>(event_entry),
-                    Return(0)));
-  expect_preprocess(mock_event_preprocessor, false, 0);
-  expect_process(mock_local_replay, 0, 0);
-
-  // attempt to process the next event
-  C_SaferCond replay_ctx;
-  expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
-
-  // fire
-  mock_local_image_ctx.mirroring_replay_delay = 2;
-  m_image_replayer->handle_replay_ready();
-  ASSERT_EQ(0, replay_ctx.wait());
-
-  // add a pending (delayed) entry before stop
-  expect_try_pop_front(mock_remote_journaler, tag.tid, true);
-  EXPECT_CALL(mock_replay_entry, get_data());
-  C_SaferCond decode_ctx;
-  EXPECT_CALL(mock_local_replay, decode(_, _))
-    .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::const_iterator* it,
-                                         librbd::journal::EventEntry *e) {
-                             decode_ctx.complete(0);
-                           }),
-                    Return(0)));
-
-  mock_local_image_ctx.mirroring_replay_delay = 10;
-  m_image_replayer->handle_replay_ready();
-  ASSERT_EQ(0, decode_ctx.wait());
+  // NOTIFY
+  EXPECT_CALL(mock_journal_replayer, is_resync_requested())
+    .WillOnce(Return(false));
+  EXPECT_CALL(mock_journal_replayer, is_replaying())
+    .WillOnce(Return(true));
+  mock_local_image_ctx.name = "NEW NAME";
+  mock_journal_replayer.replayer_listener->handle_notification();
 
   // STOP
-
-  MockCloseImageRequest mock_close_local_image_request;
-
-  expect_shut_down(mock_local_replay, true, 0);
-  EXPECT_CALL(mock_local_journal, remove_listener(_));
-  EXPECT_CALL(mock_local_journal, stop_external_replay());
-  expect_send(mock_close_local_image_request, 0);
-
-  expect_stop_replay(mock_remote_journaler, 0);
-  EXPECT_CALL(mock_remote_journaler, remove_listener(_));
-  expect_shut_down(mock_remote_journaler, 0);
+  expect_shut_down(mock_journal_replayer, 0);
   expect_mirror_image_status_exists(false);
 
   C_SaferCond stop_ctx;
   m_image_replayer->stop(&stop_ctx);
   ASSERT_EQ(0, stop_ctx.wait());
-}
 
+  auto image_spec = image_replayer::util::compute_image_spec(
+    m_local_io_ctx, "NEW NAME");
+  ASSERT_EQ(image_spec, m_image_replayer->get_name());
+}
 
 } // namespace mirror
 } // namespace rbd
index 8bed3fc28be48ea41955a0c351efafc6e6e3501f..efe2ba4e2372a9be70f2ae75e5c6a11c9e6691fa 100644 (file)
@@ -12,7 +12,6 @@
 #include "common/WorkQueue.h"
 #include "global/global_context.h"
 #include "journal/Journaler.h"
-#include "journal/ReplayHandler.h"
 #include "journal/Settings.h"
 #include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
@@ -20,7 +19,6 @@
 #include "librbd/Journal.h"
 #include "librbd/Operations.h"
 #include "librbd/Utils.h"
-#include "librbd/journal/Replay.h"
 #include "ImageDeleter.h"
 #include "ImageReplayer.h"
 #include "MirrorStatusUpdater.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
 #include "tools/rbd_mirror/image_replayer/Utils.h"
-#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
-#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
+#include <map>
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
                            << __func__ << ": "
 
-using std::map;
-using std::string;
-using std::unique_ptr;
-using std::shared_ptr;
-using std::vector;
-
 extern PerfCounters *g_perf_counters;
 
 namespace rbd {
 namespace mirror {
 
-using librbd::util::create_async_context_callback;
 using librbd::util::create_context_callback;
-using librbd::util::create_rados_callback;
 
 template <typename I>
 std::ostream &operator<<(std::ostream &os,
@@ -60,25 +51,6 @@ std::ostream &operator<<(std::ostream &os,
 
 namespace {
 
-template <typename I>
-struct ReplayHandler : public ::journal::ReplayHandler {
-  ImageReplayer<I> *replayer;
-  ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
-  void handle_entries_available() override {
-    replayer->handle_replay_ready();
-  }
-  void handle_complete(int r) override {
-    std::stringstream ss;
-    if (r == -ENOMEM) {
-      ss << "not enough memory in autotune cache";
-    } else if (r < 0) {
-      ss << "replay completed with error: " << cpp_strerror(r);
-    }
-    replayer->handle_replay_complete(r, ss.str());
-  }
-};
-
 template <typename I>
 class ImageReplayerAdminSocketCommand {
 public:
@@ -214,21 +186,6 @@ private:
   Commands commands;
 };
 
-uint32_t calculate_replay_delay(const utime_t &event_time,
-                                int mirroring_replay_delay) {
-    if (mirroring_replay_delay <= 0) {
-      return 0;
-    }
-
-    utime_t now = ceph_clock_now();
-    if (event_time + mirroring_replay_delay <= now) {
-      return 0;
-    }
-
-    // ensure it is rounded up when converting to integer
-    return (event_time + mirroring_replay_delay - now) + 1;
-}
-
 } // anonymous namespace
 
 template <typename I>
@@ -243,13 +200,18 @@ void ImageReplayer<I>::BootstrapProgressContext::update_progress(
 }
 
 template <typename I>
-void ImageReplayer<I>::RemoteJournalerListener::handle_update(
-  ::journal::JournalMetadata *) {
-  auto ctx = new LambdaContext([this](int r) {
-      replayer->handle_remote_journal_metadata_updated();
-    });
-  replayer->m_threads->work_queue->queue(ctx, 0);
-}
+struct ImageReplayer<I>::ReplayerListener
+  : public image_replayer::ReplayerListener {
+  ImageReplayer<I>* image_replayer;
+
+  ReplayerListener(ImageReplayer<I>* image_replayer)
+    : image_replayer(image_replayer) {
+  }
+
+  void handle_notification() override {
+    image_replayer->handle_replayer_notification();
+  }
+};
 
 template <typename I>
 ImageReplayer<I>::ImageReplayer(
@@ -267,8 +229,7 @@ ImageReplayer<I>::ImageReplayer(
   m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
       stringify(local_io_ctx.get_id()) + " " + global_image_id)),
   m_progress_cxt(this),
-  m_journal_listener(new JournalListener(this)),
-  m_remote_listener(this)
+  m_replayer_listener(new ReplayerListener(this))
 {
   // Register asok commands using a temporary "remote_pool_name/global_image_id"
   // name.  When the image name becomes known on start the asok commands will be
@@ -283,18 +244,11 @@ template <typename I>
 ImageReplayer<I>::~ImageReplayer()
 {
   unregister_admin_socket_hook();
-  ceph_assert(m_event_preprocessor == nullptr);
-  ceph_assert(m_replay_status_formatter == nullptr);
   ceph_assert(m_local_image_ctx == nullptr);
-  ceph_assert(m_local_replay == nullptr);
-  ceph_assert(m_remote_journaler == nullptr);
-  ceph_assert(m_replay_handler == nullptr);
   ceph_assert(m_on_start_finish == nullptr);
   ceph_assert(m_on_stop_finish == nullptr);
   ceph_assert(m_bootstrap_request == nullptr);
-  ceph_assert(m_flush_local_replay_task == nullptr);
-
-  delete m_journal_listener;
+  delete m_replayer_listener;
 }
 
 template <typename I>
@@ -328,7 +282,7 @@ void ImageReplayer<I>::add_peer(
 
 template <typename I>
 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
-  dout(10) << r << " " << desc << dendl;
+  dout(10) << "r=" << r << ", desc=" << desc << dendl;
 
   std::lock_guard l{m_lock};
   m_last_r = r;
@@ -398,10 +352,10 @@ void ImageReplayer<I>::handle_prepare_local_image(int r) {
     on_start_fail(r, "error preparing local image for replay");
     return;
   } else {
+    // have the correct local image name now
     reregister_admin_socket_hook();
   }
 
-  // local image doesn't exist or is non-primary
   prepare_remote_image();
 }
 
@@ -423,6 +377,8 @@ void ImageReplayer<I>::prepare_remote_image() {
   journal_settings.commit_interval = cct->_conf.get_val<double>(
     "rbd_mirror_journal_commit_age");
 
+  ceph_assert(m_remote_journaler == nullptr);
+
   Context *ctx = create_context_callback<
     ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
   auto req = image_replayer::PrepareRemoteImageRequest<I>::create(
@@ -437,7 +393,8 @@ template <typename I>
 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
   dout(10) << "r=" << r << dendl;
 
-  ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
+  ceph_assert(r < 0 ? m_remote_journaler == nullptr :
+                      m_remote_journaler != nullptr);
   if (r < 0 && !m_local_image_id.empty() &&
       m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
     // local image is primary -- fall-through
@@ -534,113 +491,57 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
     return;
   }
 
-  ceph_assert(m_local_journal == nullptr);
-  {
-    std::shared_lock image_locker{m_local_image_ctx->image_lock};
-    if (m_local_image_ctx->journal != nullptr) {
-      m_local_journal = m_local_image_ctx->journal;
-      m_local_journal->add_listener(m_journal_listener);
-    }
-  }
-
-  if (m_local_journal == nullptr) {
-    on_start_fail(-EINVAL, "error accessing local journal");
-    return;
-  }
-
-  update_mirror_image_status(false, boost::none);
-  init_remote_journaler();
+  start_replay();
 }
 
 template <typename I>
-void ImageReplayer<I>::init_remote_journaler() {
+void ImageReplayer<I>::start_replay() {
   dout(10) << dendl;
 
-  Context *ctx = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
-  m_remote_journaler->init(ctx);
+  // TODO support journal + snapshot replay
+  std::unique_lock locker{m_lock};
+  ceph_assert(m_replayer == nullptr);
+  m_replayer = image_replayer::journal::Replayer<I>::create(
+    &m_local_image_ctx, m_remote_journaler, m_local_mirror_uuid,
+    m_remote_image.mirror_uuid, m_replayer_listener, m_threads);
+
+  auto ctx = create_context_callback<
+    ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
+  m_replayer->init(ctx);
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_init_remote_journaler(int r) {
+void ImageReplayer<I>::handle_start_replay(int r) {
   dout(10) << "r=" << r << dendl;
 
   if (on_start_interrupted()) {
     return;
   } else if (r < 0) {
-    derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
-    on_start_fail(r, "error initializing remote journal");
-    return;
-  }
-
-  m_remote_journaler->add_listener(&m_remote_listener);
-
-  cls::journal::Client client;
-  r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
-  if (r < 0) {
-    derr << "error retrieving remote journal client: " << cpp_strerror(r)
-        << dendl;
-    on_start_fail(r, "error retrieving remote journal client");
-    return;
-  }
-
-  dout(5) << "image_id=" << m_local_image_id << ", "
-          << "client_meta.image_id=" << m_client_meta.image_id << ", "
-          << "client.state=" << client.state << dendl;
-  if (m_client_meta.image_id == m_local_image_id &&
-      client.state != cls::journal::CLIENT_STATE_CONNECTED) {
-    dout(5) << "client flagged disconnected, stopping image replay" << dendl;
-    if (m_local_image_ctx->config.template get_val<bool>("rbd_mirroring_resync_after_disconnect")) {
+    std::string error_description = m_replayer->get_error_description();
+    if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
+      std::unique_lock locker{m_lock};
       m_resync_requested = true;
-      on_start_fail(-ENOTCONN, "disconnected: automatic resync");
-    } else {
-      on_start_fail(-ENOTCONN, "disconnected");
     }
-    return;
-  }
-
-  start_replay();
-}
-
-template <typename I>
-void ImageReplayer<I>::start_replay() {
-  dout(10) << dendl;
 
-  Context *start_ctx = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
-  m_local_journal->start_external_replay(&m_local_replay, start_ctx);
-}
+    // shut down not required if init failed
+    m_replayer->destroy();
+    m_replayer = nullptr;
 
-template <typename I>
-void ImageReplayer<I>::handle_start_replay(int r) {
-  dout(10) << "r=" << r << dendl;
-
-  if (r < 0) {
-    ceph_assert(m_local_replay == nullptr);
     derr << "error starting external replay on local image "
         <<  m_local_image_id << ": " << cpp_strerror(r) << dendl;
-    on_start_fail(r, "error starting replay on local image");
+    on_start_fail(r, error_description);
     return;
   }
 
-  m_replay_status_formatter =
-    image_replayer::journal::ReplayStatusFormatter<I>::create(
-      m_remote_journaler, m_local_mirror_uuid);
-
-  Context *on_finish(nullptr);
+  Context *on_finish = nullptr;
   {
-    std::lock_guard locker{m_lock};
+    std::unique_lock locker{m_lock};
     ceph_assert(m_state == STATE_STARTING);
     m_state = STATE_REPLAYING;
     std::swap(m_on_start_finish, on_finish);
   }
 
-  m_event_preprocessor = image_replayer::journal::EventPreprocessor<I>::create(
-    *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
-    &m_client_meta, m_threads->work_queue);
-
   update_mirror_image_status(true, boost::none);
-
   if (on_replay_interrupted()) {
     if (on_finish != nullptr) {
       on_finish->complete(r);
@@ -648,18 +549,6 @@ void ImageReplayer<I>::handle_start_replay(int r) {
     return;
   }
 
-  {
-    CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
-    double poll_seconds = cct->_conf.get_val<double>(
-      "rbd_mirror_journal_poll_age");
-
-    std::lock_guard locker{m_lock};
-    m_replay_handler = new ReplayHandler<I>(this);
-    m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
-
-    dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl;
-  }
-
   dout(10) << "start succeeded" << dendl;
   if (on_finish != nullptr) {
     dout(10) << "on finish complete, r=" << r << dendl;
@@ -670,7 +559,7 @@ void ImageReplayer<I>::handle_start_replay(int r) {
 template <typename I>
 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
 {
-  dout(10) << "r=" << r << dendl;
+  dout(10) << "r=" << r << ", desc=" << desc << dendl;
   Context *ctx = new LambdaContext([this, r, desc](int _r) {
       {
        std::lock_guard locker{m_lock};
@@ -780,7 +669,6 @@ void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
 
     m_stop_requested = true;
     m_state = STATE_STOPPING;
-    cancel_flush_local_replay_task();
   }
 
   set_state_description(r, desc);
@@ -788,38 +676,6 @@ void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
   shut_down(0);
 }
 
-template <typename I>
-void ImageReplayer<I>::handle_replay_ready()
-{
-  dout(20) << dendl;
-  if (on_replay_interrupted()) {
-    return;
-  }
-
-  if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
-    return;
-  }
-
-  m_event_replay_tracker.start_op();
-
-  m_lock.lock();
-  bool stopping = (m_state == STATE_STOPPING);
-  m_lock.unlock();
-
-  if (stopping) {
-    dout(10) << "stopping event replay" << dendl;
-    m_event_replay_tracker.finish_op();
-    return;
-  }
-
-  if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
-    preprocess_entry();
-    return;
-  }
-
-  replay_flush();
-}
-
 template <typename I>
 void ImageReplayer<I>::restart(Context *on_finish)
 {
@@ -836,121 +692,23 @@ void ImageReplayer<I>::restart(Context *on_finish)
 template <typename I>
 void ImageReplayer<I>::flush()
 {
-  dout(10) << dendl;
   C_SaferCond ctx;
-  flush_local_replay(&ctx);
-  ctx.wait();
-
-  update_mirror_image_status(false, boost::none);
-}
-
-
-template <typename I>
-void ImageReplayer<I>::schedule_flush_local_replay_task() {
-  ceph_assert(ceph_mutex_is_locked(m_lock));
-
-  std::lock_guard timer_locker{m_threads->timer_lock};
-  if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
-    return;
-  }
-
-  dout(15) << dendl;
-  m_flush_local_replay_task = create_async_context_callback(
-    m_threads->work_queue, create_context_callback<
-      ImageReplayer<I>,
-      &ImageReplayer<I>::handle_flush_local_replay_task>(this));
-  m_threads->timer->add_event_after(30, m_flush_local_replay_task);
-}
-
-template <typename I>
-void ImageReplayer<I>::cancel_flush_local_replay_task() {
-  ceph_assert(ceph_mutex_is_locked(m_lock));
-  std::lock_guard timer_locker{m_threads->timer_lock};
-  if (m_flush_local_replay_task != nullptr) {
-    auto canceled = m_threads->timer->cancel_event(m_flush_local_replay_task);
-    m_flush_local_replay_task = nullptr;
-    ceph_assert(canceled);
-  }
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_flush_local_replay_task(int) {
-  dout(15) << dendl;
-
-  m_in_flight_op_tracker.start_op();
-  auto on_finish = new LambdaContext([this](int) {
-      {
-        std::lock_guard timer_locker{m_threads->timer_lock};
-        m_flush_local_replay_task = nullptr;
-      }
-
-      update_mirror_image_status(false, boost::none);
-      m_in_flight_op_tracker.finish_op();
-    });
-  flush_local_replay(on_finish);
-}
-
-template <typename I>
-void ImageReplayer<I>::flush_local_replay(Context* on_flush)
-{
-  m_lock.lock();
-  if (m_state != STATE_REPLAYING) {
-    m_lock.unlock();
-    on_flush->complete(0);
-    return;
-  }
-
-  dout(15) << dendl;
-  auto ctx = new LambdaContext(
-    [this, on_flush](int r) {
-      handle_flush_local_replay(on_flush, r);
-    });
-  m_local_replay->flush(ctx);
-  m_lock.unlock();
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_flush_local_replay(Context* on_flush, int r)
-{
-  dout(15) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
-    on_flush->complete(r);
-    return;
-  }
 
-  flush_commit_position(on_flush);
-}
+  {
+    std::unique_lock locker{m_lock};
+    if (m_state != STATE_REPLAYING) {
+      return;
+    }
 
-template <typename I>
-void ImageReplayer<I>::flush_commit_position(Context* on_flush)
-{
-  m_lock.lock();
-  if (m_state != STATE_REPLAYING) {
-    m_lock.unlock();
-    on_flush->complete(0);
-    return;
+    dout(10) << dendl;
+    ceph_assert(m_replayer != nullptr);
+    m_replayer->flush(&ctx);
   }
 
-  dout(15) << dendl;
-  auto ctx = new LambdaContext(
-    [this, on_flush](int r) {
-      handle_flush_commit_position(on_flush, r);
-    });
-  m_remote_journaler->flush_commit_position(ctx);
-  m_lock.unlock();
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_flush_commit_position(Context* on_flush, int r)
-{
-  dout(15) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "error flushing remote journal commit position: "
-        << cpp_strerror(r) << dendl;
+  int r = ctx.wait();
+  if (r >= 0) {
+    update_mirror_image_status(false, boost::none);
   }
-
-  on_flush->complete(r);
 }
 
 template <typename I>
@@ -981,330 +739,6 @@ void ImageReplayer<I>::print_status(Formatter *f)
   f->close_section();
 }
 
-template <typename I>
-void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
-{
-  dout(10) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
-  }
-
-  {
-    std::lock_guard locker{m_lock};
-    m_stop_requested = true;
-  }
-  on_stop_journal_replay(r, error_desc);
-}
-
-template <typename I>
-void ImageReplayer<I>::replay_flush() {
-  dout(10) << dendl;
-
-  bool interrupted = false;
-  {
-    std::lock_guard locker{m_lock};
-    if (m_state != STATE_REPLAYING) {
-      dout(10) << "replay interrupted" << dendl;
-      interrupted = true;
-    } else {
-      m_state = STATE_REPLAY_FLUSHING;
-    }
-  }
-
-  if (interrupted) {
-    m_event_replay_tracker.finish_op();
-    return;
-  }
-
-  // shut down the replay to flush all IO and ops and create a new
-  // replayer to handle the new tag epoch
-  Context *ctx = create_context_callback<
-    ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
-  ctx = new LambdaContext([this, ctx](int r) {
-      m_local_image_ctx->journal->stop_external_replay();
-      m_local_replay = nullptr;
-
-      if (r < 0) {
-        ctx->complete(r);
-        return;
-      }
-
-      m_local_journal->start_external_replay(&m_local_replay, ctx);
-    });
-  m_local_replay->shut_down(false, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_replay_flush(int r) {
-  dout(10) << "r=" << r << dendl;
-
-  {
-    std::lock_guard locker{m_lock};
-    ceph_assert(m_state == STATE_REPLAY_FLUSHING);
-    m_state = STATE_REPLAYING;
-  }
-
-  if (r < 0) {
-    derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
-    m_event_replay_tracker.finish_op();
-    handle_replay_complete(r, "replay flush encountered an error");
-    return;
-  } else if (on_replay_interrupted()) {
-    m_event_replay_tracker.finish_op();
-    return;
-  }
-
-  get_remote_tag();
-}
-
-template <typename I>
-void ImageReplayer<I>::get_remote_tag() {
-  dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
-
-  Context *ctx = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
-  m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_get_remote_tag(int r) {
-  dout(15) << "r=" << r << dendl;
-
-  if (r == 0) {
-    try {
-      auto it = m_replay_tag.data.cbegin();
-      decode(m_replay_tag_data, it);
-    } catch (const buffer::error &err) {
-      r = -EBADMSG;
-    }
-  }
-
-  if (r < 0) {
-    derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
-         << cpp_strerror(r) << dendl;
-    m_event_replay_tracker.finish_op();
-    handle_replay_complete(r, "failed to retrieve remote tag");
-    return;
-  }
-
-  m_replay_tag_valid = true;
-  dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
-           << m_replay_tag_data << dendl;
-
-  allocate_local_tag();
-}
-
-template <typename I>
-void ImageReplayer<I>::allocate_local_tag() {
-  dout(15) << dendl;
-
-  std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
-  if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
-    mirror_uuid = m_remote_image.mirror_uuid;
-  } else if (mirror_uuid == m_local_mirror_uuid) {
-    mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
-  } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
-    // handle possible edge condition where daemon can failover and
-    // the local image has already been promoted/demoted
-    auto local_tag_data = m_local_journal->get_tag_data();
-    if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
-        (local_tag_data.predecessor.commit_valid &&
-         local_tag_data.predecessor.mirror_uuid ==
-           librbd::Journal<>::LOCAL_MIRROR_UUID)) {
-      dout(15) << "skipping stale demotion event" << dendl;
-      handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0);
-      handle_replay_ready();
-      return;
-    } else {
-      dout(5) << "encountered image demotion: stopping" << dendl;
-      std::lock_guard locker{m_lock};
-      m_stop_requested = true;
-    }
-  }
-
-  librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
-  if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
-    predecessor.mirror_uuid = m_remote_image.mirror_uuid;
-  } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
-    predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
-  }
-
-  dout(15) << "mirror_uuid=" << mirror_uuid << ", "
-           << "predecessor=" << predecessor << ", "
-           << "replay_tag_tid=" << m_replay_tag_tid << dendl;
-  Context *ctx = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
-  m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_allocate_local_tag(int r) {
-  dout(15) << "r=" << r << ", "
-           << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
-
-  if (r < 0) {
-    derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
-    m_event_replay_tracker.finish_op();
-    handle_replay_complete(r, "failed to allocate journal tag");
-    return;
-  }
-
-  preprocess_entry();
-}
-
-template <typename I>
-void ImageReplayer<I>::preprocess_entry() {
-  dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
-           << dendl;
-
-  bufferlist data = m_replay_entry.get_data();
-  auto it = data.cbegin();
-  int r = m_local_replay->decode(&it, &m_event_entry);
-  if (r < 0) {
-    derr << "failed to decode journal event" << dendl;
-    m_event_replay_tracker.finish_op();
-    handle_replay_complete(r, "failed to decode journal event");
-    return;
-  }
-
-  uint32_t delay = calculate_replay_delay(
-    m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
-  if (delay == 0) {
-    handle_preprocess_entry_ready(0);
-    return;
-  }
-
-  dout(20) << "delaying replay by " << delay << " sec" << dendl;
-
-  std::lock_guard timer_locker{m_threads->timer_lock};
-  ceph_assert(m_delayed_preprocess_task == nullptr);
-  m_delayed_preprocess_task = new LambdaContext(
-    [this](int r) {
-      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
-      m_delayed_preprocess_task = nullptr;
-      m_threads->work_queue->queue(
-        create_context_callback<ImageReplayer,
-        &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
-    });
-  m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
-  dout(20) << "r=" << r << dendl;
-  ceph_assert(r == 0);
-
-  m_replay_start_time = ceph_clock_now();
-  if (!m_event_preprocessor->is_required(m_event_entry)) {
-    process_entry();
-    return;
-  }
-
-  Context *ctx = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
-  m_event_preprocessor->preprocess(&m_event_entry, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
-  dout(20) << "r=" << r << dendl;
-
-  if (r < 0) {
-    m_event_replay_tracker.finish_op();
-
-    if (r == -ECANCELED) {
-      handle_replay_complete(0, "lost exclusive lock");
-    } else {
-      derr << "failed to preprocess journal event" << dendl;
-      handle_replay_complete(r, "failed to preprocess journal event");
-    }
-    return;
-  }
-
-  process_entry();
-}
-
-template <typename I>
-void ImageReplayer<I>::process_entry() {
-  dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
-           << dendl;
-
-  // stop replaying events if stop has been requested
-  if (on_replay_interrupted()) {
-    m_event_replay_tracker.finish_op();
-    return;
-  }
-
-  Context *on_ready = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
-  Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
-                                             m_replay_start_time);
-
-  m_local_replay->process(m_event_entry, on_ready, on_commit);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_process_entry_ready(int r) {
-  dout(20) << dendl;
-  ceph_assert(r == 0);
-
-  bool update_status = false;
-  {
-    std::shared_lock image_locker{m_local_image_ctx->image_lock};
-    if (m_local_image_name != m_local_image_ctx->name) {
-      m_local_image_name = m_local_image_ctx->name;
-      update_status = true;
-    }
-  }
-
-  if (update_status) {
-    update_mirror_image_status(false, {});
-  }
-
-  // attempt to process the next event
-  handle_replay_ready();
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry,
-                                                 const utime_t &replay_start_time,
-                                                 int r) {
-  dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
-          << dendl;
-
-  if (r < 0) {
-    derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
-    handle_replay_complete(r, "failed to commit journal event");
-  } else {
-    ceph_assert(m_remote_journaler != nullptr);
-    m_remote_journaler->committed(replay_entry);
-  }
-
-  auto bytes = replay_entry.get_data().length();
-  auto latency = ceph_clock_now() - replay_start_time;
-
-  if (g_perf_counters) {
-    g_perf_counters->inc(l_rbd_mirror_replay);
-    g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
-    g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
-  }
-
-  auto ctx = new LambdaContext(
-    [this, bytes, latency](int r) {
-      std::lock_guard locker{m_lock};
-      schedule_flush_local_replay_task();
-
-      if (m_perf_counters) {
-        m_perf_counters->inc(l_rbd_mirror_replay);
-        m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
-        m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
-      }
-
-      m_event_replay_tracker.finish_op();
-    });
-  m_threads->work_queue->queue(ctx, 0);
-}
-
 template <typename I>
 void ImageReplayer<I>::update_mirror_image_status(
     bool force, const OptionalState &opt_state) {
@@ -1382,9 +816,9 @@ void ImageReplayer<I>::set_mirror_image_status_update(
     }
     break;
   case STATE_REPLAYING:
-  case STATE_REPLAY_FLUSHING:
     status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
     {
+      std::string desc;
       auto on_req_finish = new LambdaContext(
         [this, force](int r) {
           dout(15) << "replay status ready: r=" << r << dendl;
@@ -1395,10 +829,8 @@ void ImageReplayer<I>::set_mirror_image_status_update(
           }
         });
 
-      std::string desc;
-      ceph_assert(m_replay_status_formatter != nullptr);
-      if (!m_replay_status_formatter->get_or_send_update(&desc,
-                                                         on_req_finish)) {
+      ceph_assert(m_replayer != nullptr);
+      if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
         dout(15) << "waiting for replay status" << dendl;
         return;
       }
@@ -1460,21 +892,6 @@ template <typename I>
 void ImageReplayer<I>::shut_down(int r) {
   dout(10) << "r=" << r << dendl;
 
-  bool canceled_delayed_preprocess_task = false;
-  {
-    std::lock_guard timer_locker{m_threads->timer_lock};
-    if (m_delayed_preprocess_task != nullptr) {
-      canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
-        m_delayed_preprocess_task);
-      ceph_assert(canceled_delayed_preprocess_task);
-      m_delayed_preprocess_task = nullptr;
-    }
-  }
-  if (canceled_delayed_preprocess_task) {
-    // wake up sleeping replay
-    m_event_replay_tracker.finish_op();
-  }
-
   {
     std::lock_guard locker{m_lock};
     ceph_assert(m_state == STATE_STOPPING);
@@ -1488,10 +905,6 @@ void ImageReplayer<I>::shut_down(int r) {
     return;
   }
 
-  // NOTE: it's important to ensure that the local image is fully
-  // closed before attempting to close the remote journal in
-  // case the remote cluster is unreachable
-
   // chain the shut down sequence (reverse order)
   Context *ctx = new LambdaContext(
     [this, r](int _r) {
@@ -1499,80 +912,44 @@ void ImageReplayer<I>::shut_down(int r) {
       handle_shut_down(r);
     });
 
-  // close the remote journal
+  // destruct the remote journaler created in prepare remote
   if (m_remote_journaler != nullptr) {
     ctx = new LambdaContext([this, ctx](int r) {
-        delete m_remote_journaler;
-        m_remote_journaler = nullptr;
-        ctx->complete(0);
-      });
-    ctx = new LambdaContext([this, ctx](int r) {
-       m_remote_journaler->remove_listener(&m_remote_listener);
-        m_remote_journaler->shut_down(ctx);
-      });
+      delete m_remote_journaler;
+      m_remote_journaler = nullptr;
+      ctx->complete(0);
+    });
   }
 
-  // stop the replay of remote journal events
-  if (m_replay_handler != nullptr) {
+  // close the local image (if we aborted after a successful bootstrap)
+  if (m_local_image_ctx != nullptr) {
     ctx = new LambdaContext([this, ctx](int r) {
-        delete m_replay_handler;
-        m_replay_handler = nullptr;
-
-        m_event_replay_tracker.wait_for_ops(ctx);
-      });
+      ceph_assert(m_local_image_ctx == nullptr);
+      ctx->complete(0);
+    });
     ctx = new LambdaContext([this, ctx](int r) {
-        m_remote_journaler->stop_replay(ctx);
-      });
-  }
+      if (m_local_image_ctx == nullptr) {
+        // never opened or closed via the replayer shutdown
+        ctx->complete(0);
+        return;
+      }
 
-  // close the local image (release exclusive lock)
-  if (m_local_image_ctx) {
-    ctx = new LambdaContext([this, ctx](int r) {
       auto request = image_replayer::CloseImageRequest<I>::create(
         &m_local_image_ctx, ctx);
       request->send();
     });
   }
 
-  // shut down event replay into the local image
-  if (m_local_journal != nullptr) {
-    ctx = new LambdaContext([this, ctx](int r) {
-        m_local_journal = nullptr;
-        ctx->complete(0);
-      });
-    if (m_local_replay != nullptr) {
-      ctx = new LambdaContext([this, ctx](int r) {
-          m_local_journal->stop_external_replay();
-          m_local_replay = nullptr;
-
-          image_replayer::journal::EventPreprocessor<I>::destroy(
-            m_event_preprocessor);
-          m_event_preprocessor = nullptr;
-          ctx->complete(0);
-        });
-    }
+  // close the replayer
+  if (m_replayer != nullptr) {
     ctx = new LambdaContext([this, ctx](int r) {
-        // blocks if listener notification is in-progress
-        m_local_journal->remove_listener(m_journal_listener);
-        ctx->complete(0);
-      });
-  }
-
-  // wait for all local in-flight replay events to complete
-  ctx = new LambdaContext([this, ctx](int r) {
-      if (r < 0) {
-        derr << "error shutting down journal replay: " << cpp_strerror(r)
-             << dendl;
-      }
-
-      m_event_replay_tracker.wait_for_ops(ctx);
+      m_replayer->destroy();
+      m_replayer = nullptr;
+      ctx->complete(0);
     });
-
-  // flush any local in-flight replay events
-  if (m_local_replay != nullptr) {
     ctx = new LambdaContext([this, ctx](int r) {
-        m_local_replay->shut_down(true, ctx);
-      });
+      m_replayer->shut_down(ctx);
+    });
   }
 
   m_threads->work_queue->queue(ctx, 0);
@@ -1647,10 +1024,6 @@ void ImageReplayer<I>::handle_shut_down(int r) {
   }
 
   dout(10) << "stop complete" << dendl;
-  image_replayer::journal::ReplayStatusFormatter<I>::destroy(
-    m_replay_status_formatter);
-  m_replay_status_formatter = nullptr;
-
   Context *on_start = nullptr;
   Context *on_stop = nullptr;
   {
@@ -1658,7 +1031,6 @@ void ImageReplayer<I>::handle_shut_down(int r) {
     std::swap(on_start, m_on_start_finish);
     std::swap(on_stop, m_on_stop_finish);
     m_stop_requested = false;
-    ceph_assert(m_delayed_preprocess_task == nullptr);
     ceph_assert(m_state == STATE_STOPPING);
     m_state = STATE_STOPPED;
   }
@@ -1675,27 +1047,44 @@ void ImageReplayer<I>::handle_shut_down(int r) {
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
-  dout(20) << dendl;
+void ImageReplayer<I>::handle_replayer_notification() {
+  dout(10) << dendl;
 
-  cls::journal::Client client;
+  // detect a rename of the local image
+  std::string local_image_name;
   {
-    std::lock_guard locker{m_lock};
-    if (!is_running_()) {
-      return;
-    }
+    std::shared_lock image_locker{m_local_image_ctx->image_lock};
+    local_image_name = m_local_image_ctx->name;
+  }
 
-    int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
-    if (r < 0) {
-      derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
-      return;
+  {
+    std::unique_lock locker{m_lock};
+    ceph_assert(m_state == STATE_REPLAYING);
+    ceph_assert(m_replayer != nullptr);
+
+    if (m_local_image_name != local_image_name) {
+      // will re-register with new name after next status update
+      dout(10) << "image renamed" << dendl;
+      m_local_image_name = local_image_name;
     }
   }
 
-  if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
-    dout(0) << "client flagged disconnected, stopping image replay" << dendl;
-    stop(nullptr, false, -ENOTCONN, "disconnected");
+  // replayer cannot be shut down while notification is in-flight
+  if (m_replayer->is_resync_requested()) {
+    dout(10) << "resync requested" << dendl;
+    m_resync_requested = true;
+    on_stop_journal_replay(0, "resync requested");
+    return;
   }
+
+  if (!m_replayer->is_replaying()) {
+    dout(10) << "replay interrupted" << dendl;
+    on_stop_journal_replay(m_replayer->get_error_code(),
+                           m_replayer->get_error_description());
+    return;
+  }
+
+  update_mirror_image_status(false, {});
 }
 
 template <typename I>
@@ -1705,8 +1094,6 @@ std::string ImageReplayer<I>::to_string(const State state) {
     return "Starting";
   case ImageReplayer<I>::STATE_REPLAYING:
     return "Replaying";
-  case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
-    return "ReplayFlushing";
   case ImageReplayer<I>::STATE_STOPPING:
     return "Stopping";
   case ImageReplayer<I>::STATE_STOPPED:
@@ -1717,14 +1104,6 @@ std::string ImageReplayer<I>::to_string(const State state) {
   return "Unknown(" + stringify(state) + ")";
 }
 
-template <typename I>
-void ImageReplayer<I>::resync_image(Context *on_finish) {
-  dout(10) << dendl;
-
-  m_resync_requested = true;
-  stop(on_finish);
-}
-
 template <typename I>
 void ImageReplayer<I>::register_admin_socket_hook() {
   ImageReplayerAdminSocketHook<I> *asok_hook;
@@ -1734,29 +1113,12 @@ void ImageReplayer<I>::register_admin_socket_hook() {
       return;
     }
 
-    ceph_assert(m_perf_counters == nullptr);
-
     dout(15) << "registered asok hook: " << m_image_spec << dendl;
     asok_hook = new ImageReplayerAdminSocketHook<I>(
       g_ceph_context, m_image_spec, this);
     int r = asok_hook->register_commands();
     if (r == 0) {
       m_asok_hook = asok_hook;
-
-      CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
-      auto prio = cct->_conf.get_val<int64_t>(
-        "rbd_mirror_image_perf_stats_prio");
-      PerfCountersBuilder plb(g_ceph_context,
-                              "rbd_mirror_image_" + m_image_spec,
-                              l_rbd_mirror_first, l_rbd_mirror_last);
-      plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
-      plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
-                          "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
-      plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
-                       "Replay latency", "rl", prio);
-      m_perf_counters = plb.create_perf_counters();
-      g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
-
       return;
     }
     derr << "error registering admin socket commands" << dendl;
@@ -1769,17 +1131,11 @@ void ImageReplayer<I>::unregister_admin_socket_hook() {
   dout(15) << dendl;
 
   AdminSocketHook *asok_hook = nullptr;
-  PerfCounters *perf_counters = nullptr;
   {
     std::lock_guard locker{m_lock};
     std::swap(asok_hook, m_asok_hook);
-    std::swap(perf_counters, m_perf_counters);
   }
   delete asok_hook;
-  if (perf_counters != nullptr) {
-    g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
-    delete perf_counters;
-  }
 }
 
 template <typename I>
index 03b200730abbab77f4db917bd4a42a0577bc7d48..5b6165422ec71f5840ef82a1f11ed421ce9d94e6 100644 (file)
 #include "include/rados/librados.hpp"
 #include "cls/journal/cls_journal_types.h"
 #include "cls/rbd/cls_rbd_types.h"
-#include "journal/JournalMetadataListener.h"
-#include "journal/ReplayEntry.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/journal/Types.h"
 #include "librbd/journal/TypeTraits.h"
 #include "ProgressContext.h"
 #include "tools/rbd_mirror/Types.h"
 #include "tools/rbd_mirror/image_replayer/Types.h"
-
-#include <boost/noncopyable.hpp>
 #include <boost/optional.hpp>
-
-#include <set>
-#include <map>
-#include <atomic>
 #include <string>
-#include <vector>
 
 class AdminSocketHook;
-class PerfCounters;
 
 namespace journal {
 
 struct CacheManagerHandler;
 class Journaler;
-class ReplayHandler;
 
 } // namespace journal
 
 namespace librbd {
 
 class ImageCtx;
-namespace journal { template <typename> class Replay; }
 
 } // namespace librbd
 
@@ -55,14 +43,9 @@ template <typename> struct Threads;
 
 namespace image_replayer {
 
+class Replayer;
 template <typename> class BootstrapRequest;
 
-namespace journal {
-
-template <typename> class EventPreprocessor;
-template <typename> class ReplayStatusFormatter;
-
-} // namespace journal
 } // namespace image_replayer
 
 /**
@@ -136,13 +119,8 @@ public:
   void restart(Context *on_finish = nullptr);
   void flush();
 
-  void resync_image(Context *on_finish=nullptr);
-
   void print_status(Formatter *f);
 
-  virtual void handle_replay_ready();
-  virtual void handle_replay_complete(int r, const std::string &error_desc);
-
 protected:
   /**
    * @verbatim
@@ -162,45 +140,10 @@ protected:
    * BOOTSTRAP_IMAGE  * * * * * * * * * * * * * * * * * * * *
    *    |                                                   *
    *    v                                           (error) *
-   * INIT_REMOTE_JOURNALER  * * * * * * * * * * * * * * * * *
-   *    |                                                   *
-   *    v                                           (error) *
    * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
    *    |
-   *    |  /--------------------------------------------\
-   *    |  |                                            |
-   *    v  v   (asok flush)                             |
-   * REPLAYING -------------> LOCAL_REPLAY_FLUSH        |
-   *    |       \                 |                     |
-   *    |       |                 v                     |
-   *    |       |             FLUSH_COMMIT_POSITION     |
-   *    |       |                 |                     |
-   *    |       |                 \--------------------/|
-   *    |       |                                       |
-   *    |       | (entries available)                   |
-   *    |       \-----------> REPLAY_READY              |
-   *    |                         |                     |
-   *    |                         | (skip if not        |
-   *    |                         v  needed)        (error)
-   *    |                     REPLAY_FLUSH  * * * * * * * * *
-   *    |                         |                     |   *
-   *    |                         | (skip if not        |   *
-   *    |                         v  needed)        (error) *
-   *    |                     GET_REMOTE_TAG  * * * * * * * *
-   *    |                         |                     |   *
-   *    |                         | (skip if not        |   *
-   *    |                         v  needed)        (error) *
-   *    |                     ALLOCATE_LOCAL_TAG  * * * * * *
-   *    |                         |                     |   *
-   *    |                         v                 (error) *
-   *    |                     PREPROCESS_ENTRY  * * * * * * *
-   *    |                         |                     |   *
-   *    |                         v                 (error) *
-   *    |                     PROCESS_ENTRY * * * * * * * * *
-   *    |                         |                     |   *
-   *    |                         \---------------------/   *
-   *    v                                                   *
-   * REPLAY_COMPLETE  < * * * * * * * * * * * * * * * * * * *
+   *    v
+   * REPLAYING
    *    |
    *    v
    * JOURNAL_REPLAY_SHUT_DOWN
@@ -224,13 +167,11 @@ protected:
 
 private:
   typedef std::set<Peer<ImageCtxT>> Peers;
-  typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
 
   enum State {
     STATE_UNKNOWN,
     STATE_STARTING,
     STATE_REPLAYING,
-    STATE_REPLAY_FLUSHING,
     STATE_STOPPING,
     STATE_STOPPED,
   };
@@ -247,32 +188,13 @@ private:
       : io_ctx(peer.io_ctx), mirror_status_updater(peer.mirror_status_updater) {
     }
   };
+  struct ReplayerListener;
 
   typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
   typedef boost::optional<State> OptionalState;
   typedef boost::optional<cls::rbd::MirrorImageStatusState>
       OptionalMirrorImageStatusState;
 
-  struct JournalListener : public librbd::journal::Listener {
-    ImageReplayer *img_replayer;
-
-    JournalListener(ImageReplayer *img_replayer)
-      : img_replayer(img_replayer) {
-    }
-
-    void handle_close() override {
-      img_replayer->on_stop_journal_replay();
-    }
-
-    void handle_promoted() override {
-      img_replayer->on_stop_journal_replay(0, "force promoted");
-    }
-
-    void handle_resync() override {
-      img_replayer->resync_image();
-    }
-  };
-
   class BootstrapProgressContext : public ProgressContext {
   public:
     BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
@@ -314,18 +236,14 @@ private:
   bool m_delete_requested = false;
   bool m_resync_requested = false;
 
-  image_replayer::journal::EventPreprocessor<ImageCtxT>*
-      m_event_preprocessor = nullptr;
-  image_replayer::journal::ReplayStatusFormatter<ImageCtxT>*
-    m_replay_status_formatter = nullptr;
   ImageCtxT *m_local_image_ctx = nullptr;
   std::string m_local_image_tag_owner;
 
   decltype(ImageCtxT::journal) m_local_journal = nullptr;
-  librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
   Journaler* m_remote_journaler = nullptr;
-  ::journal::ReplayHandler *m_replay_handler = nullptr;
-  librbd::journal::Listener *m_journal_listener;
+
+  image_replayer::Replayer* m_replayer = nullptr;
+  ReplayerListener* m_replayer_listener = nullptr;
 
   Context *m_on_start_finish = nullptr;
   Context *m_on_stop_finish = nullptr;
@@ -333,7 +251,6 @@ private:
   bool m_manual_stop = false;
 
   AdminSocketHook *m_asok_hook = nullptr;
-  PerfCounters *m_perf_counters = nullptr;
 
   image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
 
@@ -341,43 +258,7 @@ private:
     cls::journal::CLIENT_STATE_DISCONNECTED;
   librbd::journal::MirrorPeerClientMeta m_client_meta;
 
-  ReplayEntry m_replay_entry;
-  utime_t m_replay_start_time;
-  bool m_replay_tag_valid = false;
-  uint64_t m_replay_tag_tid = 0;
-  cls::journal::Tag m_replay_tag;
-  librbd::journal::TagData m_replay_tag_data;
-  librbd::journal::EventEntry m_event_entry;
-  AsyncOpTracker m_event_replay_tracker;
-  Context *m_delayed_preprocess_task = nullptr;
-  Context* m_periodic_flush_task = nullptr;
-
   AsyncOpTracker m_in_flight_op_tracker;
-  Context *m_flush_local_replay_task = nullptr;
-
-  struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
-    ImageReplayer *replayer;
-
-    RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
-
-    void handle_update(::journal::JournalMetadata *) override;
-  } m_remote_listener;
-
-  struct C_ReplayCommitted : public Context {
-    ImageReplayer *replayer;
-    ReplayEntry replay_entry;
-    utime_t replay_start_time;
-
-    C_ReplayCommitted(ImageReplayer *replayer,
-                      ReplayEntry &&replay_entry,
-                      const utime_t &replay_start_time)
-      : replayer(replayer), replay_entry(std::move(replay_entry)),
-        replay_start_time(replay_start_time) {
-    }
-    void finish(int r) override {
-      replayer->handle_process_entry_safe(replay_entry, replay_start_time, r);
-    }
-  };
 
   static std::string to_string(const State state);
 
@@ -388,26 +269,14 @@ private:
     return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
   }
   bool is_replaying_() const {
-    return (m_state == STATE_REPLAYING ||
-            m_state == STATE_REPLAY_FLUSHING);
+    return (m_state == STATE_REPLAYING);
   }
 
-  void schedule_flush_local_replay_task();
-  void cancel_flush_local_replay_task();
-  void handle_flush_local_replay_task(int r);
-
-  void flush_local_replay(Context* on_flush);
-  void handle_flush_local_replay(Context* on_flush, int r);
-
-  void flush_commit_position(Context* on_flush);
-  void handle_flush_commit_position(Context* on_flush, int r);
-
   void update_mirror_image_status(bool force, const OptionalState &state);
   void set_mirror_image_status_update(bool force, const OptionalState &state);
 
   void shut_down(int r);
   void handle_shut_down(int r);
-  void handle_remote_journal_metadata_updated();
 
   void prepare_local_image();
   void handle_prepare_local_image(int r);
@@ -418,29 +287,10 @@ private:
   void bootstrap();
   void handle_bootstrap(int r);
 
-  void init_remote_journaler();
-  void handle_init_remote_journaler(int r);
-
   void start_replay();
   void handle_start_replay(int r);
 
-  void replay_flush();
-  void handle_replay_flush(int r);
-
-  void get_remote_tag();
-  void handle_get_remote_tag(int r);
-
-  void allocate_local_tag();
-  void handle_allocate_local_tag(int r);
-
-  void preprocess_entry();
-  void handle_preprocess_entry_ready(int r);
-  void handle_preprocess_entry_safe(int r);
-
-  void process_entry();
-  void handle_process_entry_ready(int r);
-  void handle_process_entry_safe(const ReplayEntry& replay_entry,
-                                 const utime_t &m_replay_start_time, int r);
+  void handle_replayer_notification();
 
   void register_admin_socket_hook();
   void unregister_admin_socket_hook();
index 3568614bf21d7882f86b370b7604adc208cef207..f3bfa4da04b0dac85c44103de0eb9febba3e00de 100644 (file)
@@ -15,6 +15,8 @@ namespace image_replayer {
 struct Replayer {
   virtual ~Replayer() {}
 
+  virtual void destroy() = 0;
+
   virtual void init(Context* on_finish) = 0;
   virtual void shut_down(Context* on_finish) = 0;
 
index c63d2e824b32a38f638a961db4165da1a12eed2f..59c2ab239b8edda732a20f9d0e45f7bd7807a164 100644 (file)
@@ -151,7 +151,7 @@ struct Replayer<I>::LocalJournalListener
 template <typename I>
 Replayer<I>::Replayer(
     I** local_image_ctx, Journaler* remote_journaler,
-    std::string local_mirror_uuid, std::string remote_mirror_uuid,
+    const std::string& local_mirror_uuid, const std::string& remote_mirror_uuid,
     ReplayerListener* replayer_listener, Threads<I>* threads)
   : m_local_image_ctx(local_image_ctx),
     m_remote_journaler(remote_journaler),
@@ -162,11 +162,22 @@ Replayer<I>::Replayer(
     m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
       "rbd::mirror::image_replayer::journal::Replayer", this))) {
   dout(10) << dendl;
+
+  {
+    std::unique_lock locker{m_lock};
+    register_perf_counters();
+  }
 }
 
 template <typename I>
 Replayer<I>::~Replayer() {
   dout(10) << dendl;
+
+  {
+    std::unique_lock locker{m_lock};
+    unregister_perf_counters();
+  }
+
   ceph_assert(m_remote_listener == nullptr);
   ceph_assert(m_local_journal_listener == nullptr);
   ceph_assert(m_local_journal_replay == nullptr);
@@ -1070,6 +1081,8 @@ void Replayer<I>::handle_process_entry_ready(int r) {
   }
 
   if (update_status) {
+    unregister_perf_counters();
+    register_perf_counters();
     notify_status_updated();
   }
 
@@ -1196,6 +1209,40 @@ int Replayer<I>::validate_remote_client_state(
   return 0;
 }
 
+template <typename I>
+void Replayer<I>::register_perf_counters() {
+  dout(5) << dendl;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  ceph_assert(m_perf_counters == nullptr);
+
+  auto cct = static_cast<CephContext *>((*m_local_image_ctx)->cct);
+  auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
+  PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec,
+                          l_rbd_mirror_first, l_rbd_mirror_last);
+  plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
+  plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
+                      "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
+  plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
+                   "Replay latency", "rl", prio);
+  m_perf_counters = plb.create_perf_counters();
+  g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
+}
+
+template <typename I>
+void Replayer<I>::unregister_perf_counters() {
+  dout(5) << dendl;
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  PerfCounters *perf_counters = nullptr;
+  std::swap(perf_counters, m_perf_counters);
+
+  if (perf_counters != nullptr) {
+    g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
+    delete perf_counters;
+  }
+}
+
 } // namespace journal
 } // namespace image_replayer
 } // namespace mirror
index 3b464c9065d30484f3d53d263e5481a9d9ded7c9..af22b145c41def47a534bddc8a8522bdff7e4737 100644 (file)
@@ -44,12 +44,27 @@ class Replayer : public image_replayer::Replayer {
 public:
   typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
 
+  static Replayer* create(ImageCtxT** local_image_ctx,
+                          Journaler* remote_journaler,
+                          const std::string& local_mirror_uuid,
+                          const std::string& remote_mirror_uuid,
+                          ReplayerListener* replayer_listener,
+                          Threads<ImageCtxT>* threads) {
+    return new Replayer(local_image_ctx, remote_journaler, local_mirror_uuid,
+                        remote_mirror_uuid, replayer_listener, threads);
+  }
+
   Replayer(
       ImageCtxT** local_image_ctx, Journaler* remote_journaler,
-      std::string m_local_mirror_uuid, std::string m_remote_mirror_uuid,
+      const std::string& local_mirror_uuid,
+      const std::string& remote_mirror_uuid,
       ReplayerListener* replayer_listener, Threads<ImageCtxT>* threads);
   ~Replayer();
 
+  void destroy() override {
+    delete this;
+  }
+
   void init(Context* on_finish) override;
   void shut_down(Context* on_finish) override;
 
@@ -293,6 +308,9 @@ private:
       librbd::journal::MirrorPeerClientMeta* remote_client_meta,
       bool* resync_requested, std::string* error);
 
+  void register_perf_counters();
+  void unregister_perf_counters();
+
 };
 
 } // namespace journal