]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: framework for replay allocating tags in local journal
authorJason Dillaman <dillaman@redhat.com>
Thu, 24 Mar 2016 19:56:46 +0000 (15:56 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 29 Mar 2016 19:19:25 +0000 (15:19 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index 07334f979895c7e82f3f3e2459778a8965092c6a..14564cf206f99db99dbdb554df274ce732df5530 100644 (file)
@@ -9,7 +9,6 @@
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "journal/Journaler.h"
-#include "journal/ReplayEntry.h"
 #include "journal/ReplayHandler.h"
 #include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
@@ -62,22 +61,6 @@ struct ReplayHandler : public ::journal::ReplayHandler {
   }
 };
 
-template <typename I>
-struct C_ReplayCommitted : public Context {
-  typedef typename librbd::journal::TypeTraits<I>::ReplayEntry ReplayEntry;
-
-  ImageReplayer<I> *replayer;
-  ReplayEntry replay_entry;
-
-  C_ReplayCommitted(ImageReplayer<I> *replayer,
-                    ReplayEntry &&replay_entry)
-    : replayer(replayer), replay_entry(std::move(replay_entry)) {
-  }
-  virtual void finish(int r) {
-    replayer->handle_replay_committed(&replay_entry, r);
-  }
-};
-
 class ImageReplayerAdminSocketCommand {
 public:
   virtual ~ImageReplayerAdminSocketCommand() {}
@@ -194,7 +177,6 @@ ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot
   m_local_replay(nullptr),
   m_remote_journaler(nullptr),
   m_replay_handler(nullptr),
-  m_on_finish(nullptr),
   m_asok_hook(nullptr)
 {
 }
@@ -206,7 +188,8 @@ ImageReplayer<I>::~ImageReplayer()
   assert(m_local_replay == nullptr);
   assert(m_remote_journaler == nullptr);
   assert(m_replay_handler == nullptr);
-
+  assert(m_on_start_finish == nullptr);
+  assert(m_on_stop_finish == nullptr);
   delete m_asok_hook;
 }
 
@@ -214,17 +197,16 @@ template <typename I>
 void ImageReplayer<I>::start(Context *on_finish,
                             const BootstrapParams *bootstrap_params)
 {
-  dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
-          << dendl;
+  assert(m_on_start_finish == nullptr);
+  assert(m_on_stop_finish == nullptr);
+  dout(20) << "on_finish=" << on_finish << dendl;
 
   {
     Mutex::Locker locker(m_lock);
     assert(is_stopped_());
 
     m_state = STATE_STARTING;
-
-    assert(m_on_finish == nullptr);
-    m_on_finish = on_finish;
+    m_on_start_finish = on_finish;
   }
 
   int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
@@ -344,18 +326,18 @@ void ImageReplayer<I>::start_replay() {
   Context *on_finish(nullptr);
   {
     Mutex::Locker locker(m_lock);
-    if (m_state == STATE_STOPPING) {
+    if (m_stop_requested) {
       on_start_fail_start(-EINTR);
       return;
     }
 
     assert(m_state == STATE_STARTING);
     m_state = STATE_REPLAYING;
-    std::swap(m_on_finish, on_finish);
+    std::swap(m_on_start_finish, on_finish);
   }
 
   dout(20) << "start succeeded" << dendl;
-  if (on_finish) {
+  if (on_finish != nullptr) {
     dout(20) << "on finish complete, r=" << r << dendl;
     on_finish->complete(r);
   }
@@ -408,25 +390,31 @@ void ImageReplayer<I>::on_start_fail_finish(int r)
   m_local_ioctx.close();
   m_remote_ioctx.close();
 
-  Context *on_finish(nullptr);
-
+  Context *on_start_finish(nullptr);
+  Context *on_stop_finish(nullptr);
   {
     Mutex::Locker locker(m_lock);
-    if (m_state == STATE_STOPPING) {
+    if (m_stop_requested) {
       assert(r == -EINTR);
       dout(20) << "start interrupted" << dendl;
       m_state = STATE_STOPPED;
+      m_stop_requested = false;
     } else {
       assert(m_state == STATE_STARTING);
       dout(20) << "start failed" << dendl;
       m_state = STATE_UNINITIALIZED;
     }
-    std::swap(m_on_finish, on_finish);
+    std::swap(m_on_start_finish, on_start_finish);
+    std::swap(m_on_stop_finish, on_stop_finish);
   }
 
-  if (on_finish) {
-    dout(20) << "on finish complete, r=" << r << dendl;
-    on_finish->complete(r);
+  if (on_start_finish != nullptr) {
+    dout(20) << "on start finish complete, r=" << r << dendl;
+    on_start_finish->complete(r);
+  }
+  if (on_stop_finish != nullptr) {
+    dout(20) << "on stop finish complete, r=" << r << dendl;
+    on_stop_finish->complete(0);
   }
 }
 
@@ -434,13 +422,11 @@ template <typename I>
 bool ImageReplayer<I>::on_start_interrupted()
 {
   Mutex::Locker locker(m_lock);
-
-  if (m_state == STATE_STARTING) {
+  assert(m_state == STATE_STARTING);
+  if (m_on_stop_finish == nullptr) {
     return false;
   }
 
-  assert(m_state == STATE_STOPPING);
-
   on_start_fail_start(-EINTR);
   return true;
 }
@@ -448,48 +434,32 @@ bool ImageReplayer<I>::on_start_interrupted()
 template <typename I>
 void ImageReplayer<I>::stop(Context *on_finish)
 {
-  dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
-          << dendl;
+  dout(20) << "on_finish=" << on_finish << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(is_running_());
-
-  if (m_state == STATE_STARTING) {
-    dout(20) << "interrupting start" << dendl;
-
-    if (on_finish) {
-      Context *on_start_finish = m_on_finish;
-      FunctionContext *ctx = new FunctionContext(
-       [this, on_start_finish, on_finish](int r) {
-         if (on_start_finish) {
-           on_start_finish->complete(r);
-         }
-         on_finish->complete(0);
-       });
-
-      m_on_finish = ctx;
-    }
-  } else if (m_state == STATE_FLUSHING_REPLAY) {
-    dout(20) << "interrupting flush" << dendl;
-
-    if (on_finish) {
-      Context *on_flush_finish = m_on_finish;
-      FunctionContext *ctx = new FunctionContext(
-       [this, on_flush_finish, on_finish](int r) {
-         if (on_flush_finish) {
-           on_flush_finish->complete(r);
-         }
-         on_finish->complete(0);
-       });
-
-      m_on_finish = ctx;
+  bool shut_down_replay = false;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(is_running_());
+
+    if (!is_stopped_()) {
+      if (m_state == STATE_STARTING) {
+        dout(20) << "interrupting start" << dendl;
+      } else {
+        dout(20) << "interrupting replay" << dendl;
+        shut_down_replay = true;
+      }
+
+      assert(m_on_stop_finish == nullptr);
+      std::swap(m_on_stop_finish, on_finish);
+      m_stop_requested = true;
     }
-  } else {
-    assert(m_on_finish == nullptr);
-    m_on_finish = on_finish;
+  }
+
+  if (shut_down_replay) {
     on_stop_journal_replay_shut_down_start();
+  } else if (on_finish != nullptr) {
+    on_finish->complete(0);
   }
-  m_state = STATE_STOPPING;
 }
 
 template <typename I>
@@ -502,20 +472,32 @@ void ImageReplayer<I>::on_stop_journal_replay_shut_down_start()
       on_stop_journal_replay_shut_down_finish(r);
     });
 
-  m_local_replay->shut_down(false, ctx);
+  {
+    Mutex::Locker locker(m_lock);
+
+    // as we complete in-flight records, we might receive multiple stop requests
+    if (m_state != STATE_REPLAYING) {
+      return;
+    }
+    m_state = STATE_STOPPING;
+    m_local_replay->shut_down(false, ctx);
+  }
 }
 
 template <typename I>
 void ImageReplayer<I>::on_stop_journal_replay_shut_down_finish(int r)
 {
   dout(20) << "r=" << r << dendl;
-
   if (r < 0) {
     derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
   }
 
-  m_local_image_ctx->journal->stop_external_replay();
-  m_local_replay = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_state == STATE_STOPPING);
+    m_local_image_ctx->journal->stop_external_replay();
+    m_local_replay = nullptr;
+  }
 
   on_stop_local_image_close_start();
 }
@@ -561,13 +543,13 @@ void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
     assert(m_state == STATE_STOPPING);
 
     m_state = STATE_STOPPED;
-
-    std::swap(m_on_finish, on_finish);
+    m_stop_requested = false;
+    std::swap(m_on_stop_finish, on_finish);
   }
 
   dout(20) << "stop complete" << dendl;
 
-  if (on_finish) {
+  if (on_finish != nullptr) {
     dout(20) << "on finish complete, r=" << r << dendl;
     on_finish->complete(r);
   }
@@ -583,20 +565,16 @@ template <typename I>
 void ImageReplayer<I>::handle_replay_ready()
 {
   dout(20) << "enter" << dendl;
-
-  ReplayEntry replay_entry;
-  if (!m_remote_journaler->try_pop_front(&replay_entry)) {
+  if (on_replay_interrupted()) {
     return;
   }
 
-  dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl;
+  if (!m_remote_journaler->try_pop_front(&m_replay_entry)) {
+    return;
+  }
 
-  bufferlist data = replay_entry.get_data();
-  bufferlist::iterator it = data.begin();
-  Context *on_ready = create_context_callback<
-    ImageReplayer, &ImageReplayer<I>::handle_replay_process_ready>(this);
-  Context *on_commit = new C_ReplayCommitted<I>(this, std::move(replay_entry));
-  m_local_replay->process(&it, on_ready, on_commit);
+  // TODO
+  process_entry();
 }
 
 template <typename I>
@@ -604,127 +582,89 @@ void ImageReplayer<I>::flush(Context *on_finish)
 {
   dout(20) << "enter" << dendl;
 
-  bool start_flush = false;
-
   {
     Mutex::Locker locker(m_lock);
-
-    if (m_state == STATE_REPLAYING) {
-      assert(m_on_finish == nullptr);
-      m_on_finish = on_finish;
-
-      m_state = STATE_FLUSHING_REPLAY;
-
-      start_flush = true;
+    if (m_state == STATE_REPLAYING || m_state == STATE_REPLAYING) {
+      Context *ctx = new FunctionContext(
+        [on_finish](int r) {
+          if (on_finish != nullptr) {
+            on_finish->complete(r);
+          }
+        });
+      on_flush_local_replay_flush_start(ctx);
     }
   }
 
-  if (start_flush) {
-    on_flush_local_replay_flush_start();
-  } else if (on_finish) {
+  if (on_finish) {
     on_finish->complete(0);
   }
 }
 
 template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_start()
+void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
 {
   dout(20) << "enter" << dendl;
-
   FunctionContext *ctx = new FunctionContext(
-    [this](int r) {
-      on_flush_local_replay_flush_finish(r);
+    [this, on_flush](int r) {
+      on_flush_local_replay_flush_finish(on_flush, r);
     });
 
+  assert(m_lock.is_locked());
+  assert(m_state == STATE_REPLAYING);
   m_local_replay->flush(ctx);
 }
 
 template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_finish(int r)
+void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
+                                                          int r)
 {
   dout(20) << "r=" << r << dendl;
-
   if (r < 0) {
     derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
-  }
-
-  if (on_flush_interrupted()) {
+    on_flush->complete(r);
     return;
   }
 
-  on_flush_flush_commit_position_start(r);
+  on_flush_flush_commit_position_start(on_flush);
 }
 
 template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_start(int last_r)
+void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
 {
-
   FunctionContext *ctx = new FunctionContext(
-    [this, last_r](int r) {
-      on_flush_flush_commit_position_finish(last_r, r);
+    [this, on_flush](int r) {
+      on_flush_flush_commit_position_finish(on_flush, r);
     });
 
   m_remote_journaler->flush_commit_position(ctx);
 }
 
 template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_finish(int last_r, int r)
+void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
+                                                             int r)
 {
   if (r < 0) {
     derr << "error flushing remote journal commit position: "
         << cpp_strerror(r) << dendl;
-  } else {
-    r = last_r;
-  }
-
-  Context *on_finish(nullptr);
-
-  {
-    Mutex::Locker locker(m_lock);
-    if (m_state == STATE_STOPPING) {
-      r = -EINTR;
-    } else {
-      assert(m_state == STATE_FLUSHING_REPLAY);
-
-      m_state = STATE_REPLAYING;
-    }
-    std::swap(m_on_finish, on_finish);
   }
 
   dout(20) << "flush complete, r=" << r << dendl;
-
-  if (on_finish) {
-    dout(20) << "on finish complete, r=" << r << dendl;
-    on_finish->complete(r);
-  }
+  on_flush->complete(r);
 }
 
 template <typename I>
-bool ImageReplayer<I>::on_flush_interrupted()
+bool ImageReplayer<I>::on_replay_interrupted()
 {
-  Context *on_finish(nullptr);
-
+  bool shut_down;
   {
     Mutex::Locker locker(m_lock);
-
-    if (m_state == STATE_FLUSHING_REPLAY) {
-      return false;
-    }
-
-    assert(m_state == STATE_STOPPING);
-
-    std::swap(m_on_finish, on_finish);
+    shut_down = m_stop_requested;
   }
 
-  dout(20) << "flush interrupted" << dendl;
-
-  if (on_finish) {
-    int r = -EINTR;
-    dout(20) << "on finish complete, r=" << r << dendl;
-    on_finish->complete(r);
+  if (shut_down) {
+    on_stop_journal_replay_shut_down_start();
   }
-
-  return true;
+  return shut_down;
 }
 
 template <typename I>
@@ -746,37 +686,99 @@ void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_replay_process_ready(int r)
+void ImageReplayer<I>::handle_replay_complete(int r)
 {
-  // journal::Replay is ready for more events -- attempt to pop another
-
-  dout(20) << "enter" << dendl;
-
+  dout(20) << "r=" << r << dendl;
   if (r < 0) {
-    derr << "error replaying journal entry: " << cpp_strerror(r)
-        << dendl;
-    // TODO: handle error
+    derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
   }
 
-  assert(r == 0);
-  handle_replay_ready();
+  {
+    Mutex::Locker locker(m_lock);
+    m_stop_requested = true;
+  }
+  on_replay_interrupted();
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_replay_complete(int r)
-{
-  dout(20) "r=" << r << dendl;
+void ImageReplayer<I>::replay_flush() {
+  dout(20) << dendl;
 
-  //m_remote_journaler->stop_replay();
+  // TODO
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_replay_committed(ReplayEntry *replay_entry, int r)
-{
-  dout(20) << "commit_tid=" << replay_entry->get_commit_tid() << ", r=" << r
+void ImageReplayer<I>::handle_replay_flush(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::get_remote_tag() {
+  dout(20) << dendl;
+
+  // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_get_remote_tag(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::allocate_local_tag() {
+  dout(20) << dendl;
+
+  // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_allocate_local_tag(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  // TODO
+}
+
+template <typename I>
+void ImageReplayer<I>::process_entry() {
+  dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+           << dendl;
+
+  bufferlist data = m_replay_entry.get_data();
+  bufferlist::iterator it = data.begin();
+
+  Context *on_ready = create_context_callback<
+    ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
+  Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
+  m_local_replay->process(&it, on_ready, on_commit);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_process_entry_ready(int r) {
+  dout(20) << dendl;
+  assert(r == 0);
+
+  // attempt to process the next event
+  handle_replay_ready();
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
+                                                 int r) {
+  dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
           << dendl;
 
-  m_remote_journaler->committed(*replay_entry);
+  if (r < 0) {
+    derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
+
+    handle_replay_complete(r);
+    return;
+  }
+
+  m_remote_journaler->committed(replay_entry);
 }
 
 template <typename I>
@@ -799,8 +801,6 @@ std::string ImageReplayer<I>::to_string(const State state) {
     return "Starting";
   case ImageReplayer<I>::STATE_REPLAYING:
     return "Replaying";
-  case ImageReplayer<I>::STATE_FLUSHING_REPLAY:
-    return "FlushingReplay";
   case ImageReplayer<I>::STATE_STOPPING:
     return "Stopping";
   case ImageReplayer<I>::STATE_STOPPED:
index ffff469954d455b2762afcae2c74156c5194f8db..9073a5b69edeadce2948c831e84138e7607f0c9f 100644 (file)
@@ -12,6 +12,7 @@
 #include "common/WorkQueue.h"
 #include "include/rados/librados.hpp"
 #include "cls/journal/cls_journal_types.h"
+#include "journal/ReplayEntry.h"
 #include "librbd/journal/Types.h"
 #include "librbd/journal/TypeTraits.h"
 #include "types.h"
@@ -49,7 +50,6 @@ public:
     STATE_UNINITIALIZED,
     STATE_STARTING,
     STATE_REPLAYING,
-    STATE_FLUSHING_REPLAY,
     STATE_STOPPING,
     STATE_STOPPED,
   };
@@ -67,7 +67,7 @@ public:
   };
 
   ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
-               const std::string &mirror_uuid, int64_t local_pool_id,
+                const std::string &mirror_uuid, int64_t local_pool_id,
                int64_t remote_pool_id, const std::string &remote_image_id,
                 const std::string &global_image_id);
   virtual ~ImageReplayer();
@@ -88,11 +88,8 @@ public:
   void print_status(Formatter *f, stringstream *ss);
 
   virtual void handle_replay_ready();
-  virtual void handle_replay_process_ready(int r);
   virtual void handle_replay_complete(int r);
 
-  virtual void handle_replay_committed(ReplayEntry* replay_entry, int r);
-
   inline int64_t get_remote_pool_id() const {
     return m_remote_pool_id;
   }
@@ -103,32 +100,56 @@ protected:
   /**
    * @verbatim
    *                   (error)
-   * <uninitialized> <------------------------ FAIL
-   *    |                                       ^
-   *    v                                       *
-   * <starting>                                 *
-   *    |                                       *
-   *    v                               (error) *
-   * BOOTSTRAP_IMAGE  * * * * * * * * * * * * * *
-   *    |                                       *
-   *    v                               (error) *
-   * INIT_REMOTE_JOURNALER  * * * * * * * * * * *
-   *    |                                       *
-   *    v                               (error) *
-   * START_REPLAY * * * * * * * * * * * * * * * *
+   * <uninitialized> <------------------------------------ FAIL
+   *    |                                                   ^
+   *    v                                                   *
+   * <starting>                                             *
+   *    |                                                   *
+   *    v                                           (error) *
+   * BOOTSTRAP_IMAGE  * * * * * * * * * * * * * * * * * * * *
+   *    |                                                   *
+   *    v                                           (error) *
+   * INIT_REMOTE_JOURNALER  * * * * * * * * * * * * * * * * *
+   *    |                                                   *
+   *    v                                           (error) *
+   * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
+   *    |
+   *    |  /--------------------------------------------\
+   *    |  |                                            |
+   *    v  v   (asok flush)                             |
+   * REPLAYING -------------> LOCAL_REPLAY_FLUSH        |
+   *    |       \                 |                     |
+   *    |       |                 v                     |
+   *    |       |             FLUSH_COMMIT_POSITION     |
+   *    |       |                 |                     |
+   *    |       |                 \--------------------/|
+   *    |       |                                       |
+   *    |       | (entries available)                   |
+   *    |       \-----------> REPLAY_READY              |
+   *    |                         |                     |
+   *    |                         | (skip if not        |
+   *    |                         v  needed)        (error)
+   *    |                     REPLAY_FLUSH  * * * * * * * * *
+   *    |                         |                     |   *
+   *    |                         | (skip if not        |   *
+   *    |                         v  needed)        (error) *
+   *    |                     GET_REMOTE_TAG  * * * * * * * *
+   *    |                         |                     |   *
+   *    |                         | (skip if not        |   *
+   *    |                         v  needed)        (error) *
+   *    |                     ALLOCATE_LOCAL_TAG  * * * * * *
+   *    |                         |                     |   *
+   *    |                         v                 (error) *
+   *    |                     PROCESS_ENTRY * * * * * * * * *
+   *    |                         |                     |   *
+   *    |                         \---------------------/   *
+   *    v                                                   *
+   * REPLAY_COMPLETE  < * * * * * * * * * * * * * * * * * * *
    *    |
-   *    |   /-------------------------------------------\
-   *    |   |                                           |
-   *    v   v                                           |
-   * <replaying> --------------> <flushing_replay>      |
-   *    |                           |                   |
-   *    v                           v                   |
-   * <stopping>                  LOCAL_REPLAY_FLUSH     |
-   *    |                           |                   |
-   *    v                           v                   |
-   * JOURNAL_REPLAY_SHUT_DOWN    FLUSH_COMMIT_POSITION  |
-   *    |                           |                   |
-   *    v                           \-------------------/
+   *    v
+   * JOURNAL_REPLAY_SHUT_DOWN
+   *    |
+   *    v
    * LOCAL_IMAGE_CLOSE
    *    |
    *    v
@@ -146,11 +167,12 @@ protected:
   virtual void on_stop_local_image_close_start();
   virtual void on_stop_local_image_close_finish(int r);
 
-  virtual void on_flush_local_replay_flush_start();
-  virtual void on_flush_local_replay_flush_finish(int r);
-  virtual void on_flush_flush_commit_position_start(int last_r);
-  virtual void on_flush_flush_commit_position_finish(int last_r, int r);
-  virtual bool on_flush_interrupted();
+  virtual void on_flush_local_replay_flush_start(Context *on_flush);
+  virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
+  virtual void on_flush_flush_commit_position_start(Context *on_flush);
+  virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r);
+
+  bool on_replay_interrupted();
 
   void close_local_image(Context *on_finish); // for tests
 
@@ -171,11 +193,30 @@ private:
   librbd::journal::Replay<ImageCtxT> *m_local_replay;
   Journaler* m_remote_journaler;
   ::journal::ReplayHandler *m_replay_handler;
-  Context *m_on_finish;
+
+  Context *m_on_start_finish = nullptr;
+  Context *m_on_stop_finish = nullptr;
+  bool m_stop_requested = false;
+
   AdminSocketHook *m_asok_hook;
 
   librbd::journal::MirrorPeerClientMeta m_client_meta;
 
+  ReplayEntry m_replay_entry;
+
+  struct C_ReplayCommitted : public Context {
+    ImageReplayer *replayer;
+    ReplayEntry replay_entry;
+
+    C_ReplayCommitted(ImageReplayer *replayer,
+                      ReplayEntry &&replay_entry)
+      : replayer(replayer), replay_entry(std::move(replay_entry)) {
+    }
+    virtual void finish(int r) {
+      replayer->handle_process_entry_safe(replay_entry, r);
+    }
+  };
+
   static std::string to_string(const State state);
 
   State get_state_() const { return m_state; }
@@ -192,6 +233,20 @@ private:
   void handle_init_remote_journaler(int r);
 
   void start_replay();
+
+  void replay_flush();
+  void handle_replay_flush(int r);
+
+  void get_remote_tag();
+  void handle_get_remote_tag(int r);
+
+  void allocate_local_tag();
+  void handle_allocate_local_tag(int r);
+
+  void process_entry();
+  void handle_process_entry_ready(int r);
+  void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);
+
 };
 
 } // namespace mirror