rbd-mirror: initial snapshot-based mirroring replayer logic
authorJason Dillaman <dillaman@redhat.com>
Thu, 6 Feb 2020 18:25:33 +0000 (13:25 -0500)
committerJason Dillaman <dillaman@redhat.com>
Wed, 19 Feb 2020 15:36:40 +0000 (10:36 -0500)
This initial implementation will scan the remote image for new mirror
snapshots. If a new primary snapshot it detected, it will start syncing
it. If the remote was demoted, it will notify the image replayer to
shut down. If an in-progress sync was previously interrupted, it will
attempt to restart it.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/cls/rbd/cls_rbd_types.h
src/test/rbd_mirror/CMakeLists.txt
src/test/rbd_mirror/image_replayer/snapshot/test_mock_Replayer.cc [new file with mode: 0644]
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc
src/tools/rbd_mirror/image_replayer/snapshot/Replayer.h

index 18c7b0350fe977dcca6efe60c87dd41a283999c1..1e4a537b04d730d7f9b503b0b26e4c5f7559ca1b 100644 (file)
@@ -555,6 +555,19 @@ struct MirrorSnapshotNamespace {
       primary_mirror_uuid(primary_mirror_uuid),
       primary_snap_id(primary_snap_id) {
   }
+  MirrorSnapshotNamespace(MirrorSnapshotState state,
+                          const std::set<std::string> &mirror_peer_uuids,
+                          const std::string& primary_mirror_uuid,
+                          snapid_t primary_snap_id,
+                          bool complete,
+                          uint64_t last_copied_object_number,
+                          const SnapSeqs& snap_seqs)
+    : state(state), complete(complete), mirror_peer_uuids(mirror_peer_uuids),
+      primary_mirror_uuid(primary_mirror_uuid),
+      primary_snap_id(primary_snap_id),
+      last_copied_object_number(last_copied_object_number),
+      snap_seqs(snap_seqs) {
+  }
 
   inline bool is_primary() const {
     return (state == MIRROR_SNAPSHOT_STATE_PRIMARY ||
index dedabe8d3949d5b1b27375755a3c43c02ee19f8d..0479a43c27fa1baaa1bb0693f7f168b52b94d347 100644 (file)
@@ -43,6 +43,7 @@ add_executable(unittest_rbd_mirror
   image_replayer/journal/test_mock_EventPreprocessor.cc
   image_replayer/journal/test_mock_Replayer.cc
   image_replayer/snapshot/test_mock_CreateLocalImageRequest.cc
+  image_replayer/snapshot/test_mock_Replayer.cc
   image_sync/test_mock_SyncPointCreateRequest.cc
   image_sync/test_mock_SyncPointPruneRequest.cc
   pool_watcher/test_mock_RefreshImagesRequest.cc
diff --git a/src/test/rbd_mirror/image_replayer/snapshot/test_mock_Replayer.cc b/src/test/rbd_mirror/image_replayer/snapshot/test_mock_Replayer.cc
new file mode 100644 (file)
index 0000000..25835d9
--- /dev/null
@@ -0,0 +1,1442 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "librbd/deep_copy/ImageCopyRequest.h"
+#include "librbd/deep_copy/SnapshotCopyRequest.h"
+#include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
+#include "librbd/mirror/snapshot/GetImageStateRequest.h"
+#include "librbd/mirror/snapshot/UnlinkPeerRequest.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/snapshot/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/rbd_mirror/mock/MockContextWQ.h"
+#include "test/rbd_mirror/mock/MockSafeTimer.h"
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public librbd::MockImageCtx {
+  explicit MockTestImageCtx(librbd::ImageCtx &image_ctx)
+    : librbd::MockImageCtx(image_ctx) {
+  }
+};
+
+} // anonymous namespace
+
+namespace deep_copy {
+
+template <>
+struct ImageCopyRequest<MockTestImageCtx> {
+  uint64_t src_snap_id_start;
+  uint64_t src_snap_id_end;
+  uint64_t dst_snap_id_start;
+  librbd::deep_copy::ObjectNumber object_number;
+  librbd::SnapSeqs snap_seqs;
+
+  static ImageCopyRequest* s_instance;
+  static ImageCopyRequest* create(MockTestImageCtx *src_image_ctx,
+                                  MockTestImageCtx *dst_image_ctx,
+                                  librados::snap_t src_snap_id_start,
+                                  librados::snap_t src_snap_id_end,
+                                  librados::snap_t dst_snap_id_start,
+                                  bool flatten,
+                                  const ObjectNumber &object_number,
+                                  const SnapSeqs &snap_seqs,
+                                  ProgressContext *prog_ctx,
+                                  Context *on_finish) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->src_snap_id_start = src_snap_id_start;
+    s_instance->src_snap_id_end = src_snap_id_end;
+    s_instance->dst_snap_id_start = dst_snap_id_start;
+    s_instance->object_number = object_number;
+    s_instance->snap_seqs = snap_seqs;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  Context* on_finish = nullptr;
+
+  ImageCopyRequest() {
+    s_instance = this;
+  }
+
+  MOCK_METHOD0(send, void());
+};
+
+template <>
+struct SnapshotCopyRequest<MockTestImageCtx> {
+  librados::snap_t src_snap_id_start;
+  librados::snap_t src_snap_id_end;
+  librados::snap_t dst_snap_id_start;
+  SnapSeqs* snap_seqs = nullptr;
+
+  static SnapshotCopyRequest* s_instance;
+  static SnapshotCopyRequest* create(MockTestImageCtx *src_image_ctx,
+                                     MockTestImageCtx *dst_image_ctx,
+                                     librados::snap_t src_snap_id_start,
+                                     librados::snap_t src_snap_id_end,
+                                     librados::snap_t dst_snap_id_start,
+                                     bool flatten,
+                                     ::MockContextWQ *work_queue,
+                                     SnapSeqs *snap_seqs,
+                                     Context *on_finish) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->src_snap_id_start = src_snap_id_start;
+    s_instance->src_snap_id_end = src_snap_id_end;
+    s_instance->dst_snap_id_start = dst_snap_id_start;
+    s_instance->snap_seqs = snap_seqs;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  Context* on_finish = nullptr;
+
+  SnapshotCopyRequest() {
+    s_instance = this;
+  }
+
+  MOCK_METHOD0(send, void());
+};
+
+ImageCopyRequest<MockTestImageCtx>* ImageCopyRequest<MockTestImageCtx>::s_instance = nullptr;
+SnapshotCopyRequest<MockTestImageCtx>* SnapshotCopyRequest<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace deep_copy
+
+namespace mirror {
+namespace snapshot {
+
+template <>
+struct CreateNonPrimaryRequest<MockTestImageCtx> {
+  bool demoted = false;
+  std::string primary_mirror_uuid;
+  uint64_t primary_snap_id;
+  SnapSeqs snap_seqs;
+  uint64_t* snap_id = nullptr;
+
+  static CreateNonPrimaryRequest* s_instance;
+  static CreateNonPrimaryRequest* create(MockTestImageCtx *image_ctx,
+                                         bool demoted,
+                                         const std::string &primary_mirror_uuid,
+                                         uint64_t primary_snap_id,
+                                         const SnapSeqs& snap_seqs,
+                                         const ImageState &image_state,
+                                         uint64_t *snap_id,
+                                         Context *on_finish) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->demoted = demoted;
+    s_instance->primary_mirror_uuid = primary_mirror_uuid;
+    s_instance->primary_snap_id = primary_snap_id;
+    s_instance->snap_seqs = snap_seqs;
+    s_instance->snap_id = snap_id;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  Context* on_finish = nullptr;
+
+  CreateNonPrimaryRequest() {
+    s_instance = this;
+  }
+
+  MOCK_METHOD0(send, void());
+};
+
+template <>
+struct GetImageStateRequest<MockTestImageCtx> {
+  uint64_t snap_id = CEPH_NOSNAP;
+
+  static GetImageStateRequest* s_instance;
+  static GetImageStateRequest* create(MockTestImageCtx *image_ctx,
+                                      uint64_t snap_id,
+                                      ImageState *image_state,
+                                      Context *on_finish) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->snap_id = snap_id;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  Context* on_finish = nullptr;
+
+  GetImageStateRequest() {
+    s_instance = this;
+  }
+
+  MOCK_METHOD0(send, void());
+};
+
+template <>
+struct UnlinkPeerRequest<MockTestImageCtx> {
+  uint64_t snap_id;
+  std::string mirror_peer_uuid;
+
+  static UnlinkPeerRequest* s_instance;
+  static UnlinkPeerRequest*create (MockTestImageCtx *image_ctx,
+                                   uint64_t snap_id,
+                                   const std::string &mirror_peer_uuid,
+                                   Context *on_finish) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->snap_id = snap_id;
+    s_instance->mirror_peer_uuid = mirror_peer_uuid;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  Context* on_finish = nullptr;
+
+  UnlinkPeerRequest() {
+    s_instance = this;
+  }
+
+  MOCK_METHOD0(send, void());
+};
+
+CreateNonPrimaryRequest<MockTestImageCtx>* CreateNonPrimaryRequest<MockTestImageCtx>::s_instance = nullptr;
+GetImageStateRequest<MockTestImageCtx>* GetImageStateRequest<MockTestImageCtx>::s_instance = nullptr;
+UnlinkPeerRequest<MockTestImageCtx>* UnlinkPeerRequest<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace snapshot
+} // namespace mirror
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+  MockSafeTimer *timer;
+  ceph::mutex &timer_lock;
+
+  MockContextWQ *work_queue;
+
+  Threads(Threads<librbd::ImageCtx>* threads)
+    : timer(new MockSafeTimer()),
+      timer_lock(threads->timer_lock),
+      work_queue(new MockContextWQ()) {
+  }
+  ~Threads() {
+    delete timer;
+    delete work_queue;
+  }
+};
+
+namespace {
+
+struct MockReplayerListener : public image_replayer::ReplayerListener {
+  MOCK_METHOD0(handle_notification, void());
+};
+
+} // anonymous namespace
+
+namespace image_replayer {
+
+template<>
+struct CloseImageRequest<librbd::MockTestImageCtx> {
+  static CloseImageRequest* s_instance;
+  librbd::MockTestImageCtx **image_ctx = nullptr;
+  Context *on_finish = nullptr;
+
+  static CloseImageRequest* create(librbd::MockTestImageCtx **image_ctx,
+                                   Context *on_finish) {
+    ceph_assert(s_instance != nullptr);
+    s_instance->image_ctx = image_ctx;
+    s_instance->on_finish = on_finish;
+    return s_instance;
+  }
+
+  CloseImageRequest() {
+    ceph_assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  ~CloseImageRequest() {
+    ceph_assert(s_instance == this);
+    s_instance = nullptr;
+  }
+
+  MOCK_METHOD0(send, void());
+};
+
+CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+namespace snapshot {
+
+template<>
+struct StateBuilder<librbd::MockTestImageCtx> {
+  StateBuilder(librbd::MockTestImageCtx& local_image_ctx,
+               librbd::MockTestImageCtx& remote_image_ctx)
+    : local_image_ctx(&local_image_ctx),
+      remote_image_ctx(&remote_image_ctx) {
+  }
+
+  librbd::MockTestImageCtx* local_image_ctx;
+  librbd::MockTestImageCtx* remote_image_ctx;
+
+  std::string remote_mirror_uuid = "remote mirror uuid";
+};
+
+} // namespace snapshot
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#include "tools/rbd_mirror/image_replayer/snapshot/Replayer.cc"
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace snapshot {
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::ReturnArg;
+using ::testing::StrEq;
+using ::testing::WithArg;
+
+class TestMockImageReplayerSnapshotReplayer : public TestMockFixture {
+public:
+  typedef Replayer<librbd::MockTestImageCtx> MockReplayer;
+  typedef StateBuilder<librbd::MockTestImageCtx> MockStateBuilder;
+  typedef Threads<librbd::MockTestImageCtx> MockThreads;
+  typedef CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
+  typedef librbd::deep_copy::ImageCopyRequest<librbd::MockTestImageCtx> MockImageCopyRequest;
+  typedef librbd::deep_copy::SnapshotCopyRequest<librbd::MockTestImageCtx> MockSnapshotCopyRequest;
+  typedef librbd::mirror::snapshot::CreateNonPrimaryRequest<librbd::MockTestImageCtx> MockCreateNonPrimaryRequest;
+  typedef librbd::mirror::snapshot::GetImageStateRequest<librbd::MockTestImageCtx> MockGetImageStateRequest;
+  typedef librbd::mirror::snapshot::UnlinkPeerRequest<librbd::MockTestImageCtx> MockUnlinkPeerRequest;
+
+  void SetUp() override {
+    TestMockFixture::SetUp();
+
+    librbd::RBD rbd;
+    ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size));
+    ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx));
+
+    ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name,
+                              m_image_size));
+    ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name,
+                            &m_remote_image_ctx));
+  }
+
+  void expect_work_queue_repeatedly(MockThreads &mock_threads) {
+    EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+      .WillRepeatedly(Invoke([this](Context *ctx, int r) {
+          m_threads->work_queue->queue(ctx, r);
+        }));
+  }
+
+  void expect_add_event_after_repeatedly(MockThreads &mock_threads) {
+    EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
+      .WillRepeatedly(
+        DoAll(Invoke([this](double seconds, Context *ctx) {
+                       m_threads->timer->add_event_after(seconds, ctx);
+                     }),
+          ReturnArg<1>()));
+    EXPECT_CALL(*mock_threads.timer, cancel_event(_))
+      .WillRepeatedly(
+        Invoke([this](Context *ctx) {
+          return m_threads->timer->cancel_event(ctx);
+        }));
+  }
+
+  void expect_register_update_watcher(librbd::MockTestImageCtx& mock_image_ctx,
+                                      librbd::UpdateWatchCtx** update_watch_ctx,
+                                      uint64_t watch_handle, int r) {
+    EXPECT_CALL(*mock_image_ctx.state, register_update_watcher(_, _))
+      .WillOnce(Invoke([update_watch_ctx, watch_handle, r]
+                       (librbd::UpdateWatchCtx* ctx, uint64_t* handle) {
+          if (r >= 0) {
+            *update_watch_ctx = ctx;
+            *handle = watch_handle;
+          }
+          return r;
+        }));
+  }
+
+  void expect_unregister_update_watcher(librbd::MockTestImageCtx& mock_image_ctx,
+                                        uint64_t watch_handle, int r) {
+    EXPECT_CALL(*mock_image_ctx.state, unregister_update_watcher(watch_handle, _))
+      .WillOnce(WithArg<1>(Invoke([this, r](Context* ctx) {
+          m_threads->work_queue->queue(ctx, r);
+        })));
+  }
+
+  void expect_is_refresh_required(librbd::MockTestImageCtx& mock_image_ctx,
+                                  bool is_required) {
+    EXPECT_CALL(*mock_image_ctx.state, is_refresh_required())
+      .WillOnce(Return(is_required));
+  }
+
+  void expect_refresh(librbd::MockTestImageCtx& mock_image_ctx,
+                      const std::map<uint64_t, librbd::SnapInfo>& snaps,
+                      int r) {
+    EXPECT_CALL(*mock_image_ctx.state, refresh(_))
+      .WillOnce(Invoke([this, &mock_image_ctx, snaps, r](Context* ctx) {
+        mock_image_ctx.snap_info = snaps;
+        m_threads->work_queue->queue(ctx, r);
+      }));
+  }
+
+  void expect_notify_update(librbd::MockTestImageCtx& mock_image_ctx) {
+    EXPECT_CALL(mock_image_ctx, notify_update(_))
+      .WillOnce(Invoke([this](Context* ctx) {
+        m_threads->work_queue->queue(ctx, 0);
+      }));
+  }
+
+  void expect_snapshot_copy(MockSnapshotCopyRequest& mock_snapshot_copy_request,
+                            uint64_t src_snap_id_start,
+                            uint64_t src_snap_id_end,
+                            uint64_t dst_snap_id_start,
+                            const librbd::SnapSeqs& snap_seqs, int r) {
+    EXPECT_CALL(mock_snapshot_copy_request, send())
+      .WillOnce(Invoke([this, &req=mock_snapshot_copy_request,
+                        src_snap_id_start, src_snap_id_end, dst_snap_id_start,
+                        snap_seqs, r]() {
+        ASSERT_EQ(src_snap_id_start, req.src_snap_id_start);
+        ASSERT_EQ(src_snap_id_end, req.src_snap_id_end);
+        ASSERT_EQ(dst_snap_id_start, req.dst_snap_id_start);
+        *req.snap_seqs = snap_seqs;
+        m_threads->work_queue->queue(req.on_finish, r);
+      }));
+  }
+
+  void expect_get_image_state(MockGetImageStateRequest& mock_get_image_state_request,
+                              uint64_t snap_id, int r) {
+    EXPECT_CALL(mock_get_image_state_request, send())
+      .WillOnce(Invoke([this, &req=mock_get_image_state_request, snap_id, r]() {
+        ASSERT_EQ(snap_id, req.snap_id);
+        m_threads->work_queue->queue(req.on_finish, r);
+      }));
+  }
+
+  void expect_create_non_primary_request(MockCreateNonPrimaryRequest& mock_create_non_primary_request,
+                                         bool demoted,
+                                         const std::string& primary_mirror_uuid,
+                                         uint64_t primary_snap_id,
+                                         const librbd::SnapSeqs& snap_seqs,
+                                         uint64_t snap_id, int r) {
+    EXPECT_CALL(mock_create_non_primary_request, send())
+      .WillOnce(Invoke([this, &req=mock_create_non_primary_request, demoted,
+                        primary_mirror_uuid, primary_snap_id, snap_seqs,
+                        snap_id, r]() {
+        ASSERT_EQ(demoted, req.demoted);
+        ASSERT_EQ(primary_mirror_uuid, req.primary_mirror_uuid);
+        ASSERT_EQ(primary_snap_id, req.primary_snap_id);
+        ASSERT_EQ(snap_seqs, req.snap_seqs);
+        *req.snap_id = snap_id;
+        m_threads->work_queue->queue(req.on_finish, r);
+      }));
+  }
+
+  void expect_image_copy(MockImageCopyRequest& mock_image_copy_request,
+                         uint64_t src_snap_id_start, uint64_t src_snap_id_end,
+                         uint64_t dst_snap_id_start,
+                         const librbd::deep_copy::ObjectNumber& object_number,
+                         const librbd::SnapSeqs& snap_seqs, int r) {
+    EXPECT_CALL(mock_image_copy_request, send())
+      .WillOnce(Invoke([this, &req=mock_image_copy_request, src_snap_id_start,
+                        src_snap_id_end, dst_snap_id_start, object_number,
+                        snap_seqs, r]() {
+        ASSERT_EQ(src_snap_id_start, req.src_snap_id_start);
+        ASSERT_EQ(src_snap_id_end, req.src_snap_id_end);
+        ASSERT_EQ(dst_snap_id_start, req.dst_snap_id_start);
+        ASSERT_EQ(object_number, req.object_number);
+        ASSERT_EQ(snap_seqs, req.snap_seqs);
+        m_threads->work_queue->queue(req.on_finish, r);
+      }));
+  }
+
+  void expect_unlink_peer(MockUnlinkPeerRequest& mock_unlink_peer_request,
+                          uint64_t snap_id, const std::string& mirror_peer_uuid,
+                          int r) {
+    EXPECT_CALL(mock_unlink_peer_request, send())
+      .WillOnce(Invoke([this, &req=mock_unlink_peer_request, snap_id,
+                        mirror_peer_uuid, r]() {
+        ASSERT_EQ(snap_id, req.snap_id);
+        ASSERT_EQ(mirror_peer_uuid, req.mirror_peer_uuid);
+        m_threads->work_queue->queue(req.on_finish, r);
+      }));
+  }
+
+  void expect_mirror_image_snapshot_set_copy_progress(
+      librbd::MockTestImageCtx& mock_test_image_ctx, uint64_t snap_id,
+      bool completed, uint64_t last_copied_object, int r) {
+    bufferlist bl;
+    encode(snap_id, bl);
+    encode(completed, bl);
+    encode(last_copied_object, bl);
+
+    EXPECT_CALL(get_mock_io_ctx(mock_test_image_ctx.md_ctx),
+                exec(mock_test_image_ctx.header_oid, _, StrEq("rbd"),
+                     StrEq("mirror_image_snapshot_set_copy_progress"),
+                     ContentsEqual(bl), _, _))
+      .WillOnce(Return(r));
+  }
+
+  void expect_send(MockCloseImageRequest &mock_close_image_request, int r) {
+    EXPECT_CALL(mock_close_image_request, send())
+      .WillOnce(Invoke([this, &mock_close_image_request, r]() {
+            *mock_close_image_request.image_ctx = nullptr;
+            m_threads->work_queue->queue(mock_close_image_request.on_finish, r);
+          }));
+  }
+
+  void expect_notification(MockThreads& mock_threads,
+                           MockReplayerListener& mock_replayer_listener) {
+    EXPECT_CALL(mock_replayer_listener, handle_notification())
+      .WillRepeatedly(Invoke([this]() {
+          std::unique_lock locker{m_lock};
+          ++m_notifications;
+          m_cond.notify_all();
+        }));
+  }
+
+  int wait_for_notification(uint32_t count) {
+    std::unique_lock locker{m_lock};
+    for (uint32_t idx = 0; idx < count; ++idx) {
+      while (m_notifications == 0) {
+        if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) {
+          return -ETIMEDOUT;
+        }
+      }
+      --m_notifications;
+    }
+    return 0;
+  }
+
+  int init_entry_replayer(MockReplayer& mock_replayer,
+                          MockThreads& mock_threads,
+                          librbd::MockTestImageCtx& mock_local_image_ctx,
+                          librbd::MockTestImageCtx& mock_remote_image_ctx,
+                          MockReplayerListener& mock_replayer_listener,
+                          librbd::UpdateWatchCtx** update_watch_ctx) {
+    expect_register_update_watcher(mock_remote_image_ctx, update_watch_ctx, 123,
+                                   0);
+    expect_is_refresh_required(mock_local_image_ctx, false);
+    expect_is_refresh_required(mock_remote_image_ctx, false);
+
+    C_SaferCond init_ctx;
+    mock_replayer.init(&init_ctx);
+    int r = init_ctx.wait();
+    if (r < 0) {
+      return r;
+    }
+
+    return wait_for_notification(2);
+  }
+
+  int shut_down_entry_replayer(MockReplayer& mock_replayer,
+                               MockThreads& mock_threads,
+                               librbd::MockTestImageCtx& mock_remote_image_ctx) {
+    expect_unregister_update_watcher(mock_remote_image_ctx, 123, 0);
+
+    C_SaferCond shutdown_ctx;
+    mock_replayer.shut_down(&shutdown_ctx);
+    return shutdown_ctx.wait();
+  }
+
+  librbd::ImageCtx* m_local_image_ctx = nullptr;
+  librbd::ImageCtx* m_remote_image_ctx = nullptr;
+
+  PoolMetaCache m_pool_meta_cache{g_ceph_context};
+
+  ceph::mutex m_lock = ceph::make_mutex(
+    "TestMockImageReplayerSnapshotReplayer");
+  ceph::condition_variable m_cond;
+  uint32_t m_notifications = 0;
+};
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, InitShutDown) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, SyncSnapshot) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  // it should sync two snapshots and skip two (user and mirror w/o matching
+  // peer uuid)
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"},
+       "", 0U},
+     0, {}, 0, 0, {}}},
+    {2U, librbd::SnapInfo{"snap2", cls::rbd::UserSnapshotNamespace{},
+     0, {}, 0, 0, {}}},
+    {3U, librbd::SnapInfo{"snap3", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {""},
+       "", 0U},
+     0, {}, 0, 0, {}}},
+    {4U, librbd::SnapInfo{"snap4", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"},
+       "", 0U},
+     0, {}, 0, 0, {}}}};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+
+  // init
+  expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 123,
+                                 0);
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       0);
+  MockGetImageStateRequest mock_get_image_state_request;
+  expect_get_image_state(mock_get_image_state_request, 1, 0);
+  MockCreateNonPrimaryRequest mock_create_non_primary_request;
+  expect_create_non_primary_request(mock_create_non_primary_request,
+                                    false, "remote mirror uuid", 1,
+                                    {{1, CEPH_NOSNAP}}, 11, 0);
+  MockImageCopyRequest mock_image_copy_request;
+  expect_image_copy(mock_image_copy_request, 0, 1, 0, {},
+                    {{1, CEPH_NOSNAP}}, 0);
+  MockUnlinkPeerRequest mock_unlink_peer_request;
+  expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid",
+                     0);
+  expect_mirror_image_snapshot_set_copy_progress(
+    mock_local_image_ctx, 11, true, 0, 0);
+  expect_notify_update(mock_local_image_ctx);
+
+  // sync snap4
+  expect_is_refresh_required(mock_local_image_ctx, true);
+  expect_refresh(
+    mock_local_image_ctx, {
+      {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+         cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid",
+         1, true, 0, {}},
+       0, {}, 0, 0, {}}},
+    }, 0);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  expect_snapshot_copy(mock_snapshot_copy_request, 1, 4, 11,
+                       {{1, 11}, {4, CEPH_NOSNAP}}, 0);
+  expect_get_image_state(mock_get_image_state_request, 4, 0);
+  expect_create_non_primary_request(mock_create_non_primary_request,
+                                    false, "remote mirror uuid", 4,
+                                    {{1, 11}, {4, CEPH_NOSNAP}}, 14, 0);
+  expect_image_copy(mock_image_copy_request, 1, 4, 11, {},
+                    {{1, 11}, {4, CEPH_NOSNAP}}, 0);
+  expect_unlink_peer(mock_unlink_peer_request, 4, "remote mirror peer uuid",
+                     0);
+  expect_mirror_image_snapshot_set_copy_progress(
+    mock_local_image_ctx, 14, true, 0, 0);
+  expect_notify_update(mock_local_image_ctx);
+
+  // idle
+  expect_is_refresh_required(mock_local_image_ctx, true);
+  expect_refresh(
+    mock_local_image_ctx, {
+      {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+         cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid",
+         1, true, 0, {}},
+       0, {}, 0, 0, {}}},
+      {14U, librbd::SnapInfo{"snap4", cls::rbd::MirrorSnapshotNamespace{
+         cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid",
+         4, true, 0, {}},
+       0, {}, 0, 0, {}}},
+    }, 0);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+
+  // fire init
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(0, init_ctx.wait());
+
+  // wait for sync to complete
+  ASSERT_EQ(0, wait_for_notification(4));
+
+  // shut down
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, InterruptedSync) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject a incomplete sync snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"},
+       "", 0U},
+     0, {}, 0, 0, {}}}};
+  mock_local_image_ctx.snap_info = {
+    {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid",
+       1, false, 123, {{1, CEPH_NOSNAP}}},
+     0, {}, 0, 0, {}}}};
+
+  // re-sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockImageCopyRequest mock_image_copy_request;
+  expect_image_copy(mock_image_copy_request, 0, 1, 0,
+                    librbd::deep_copy::ObjectNumber{123U},
+                    {{1, CEPH_NOSNAP}}, 0);
+  MockUnlinkPeerRequest mock_unlink_peer_request;
+  expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid",
+                     0);
+  expect_mirror_image_snapshot_set_copy_progress(
+    mock_local_image_ctx, 11, true, 123, 0);
+  expect_notify_update(mock_local_image_ctx);
+
+  // idle
+  expect_is_refresh_required(mock_local_image_ctx, true);
+  expect_refresh(
+    mock_local_image_ctx, {
+      {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+         cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid",
+         1, true, 0, {}},
+       0, {}, 0, 0, {}}},
+    }, 0);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete
+  ASSERT_EQ(0, wait_for_notification(2));
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, RemoteImageDemoted) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject a demotion snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED,
+       {"remote mirror peer uuid"}, "", 0U},
+     0, {}, 0, 0, {}}}};
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       0);
+  MockGetImageStateRequest mock_get_image_state_request;
+  expect_get_image_state(mock_get_image_state_request, 1, 0);
+  MockCreateNonPrimaryRequest mock_create_non_primary_request;
+  expect_create_non_primary_request(mock_create_non_primary_request,
+                                    true, "remote mirror uuid", 1,
+                                    {{1, CEPH_NOSNAP}}, 11, 0);
+  MockImageCopyRequest mock_image_copy_request;
+  expect_image_copy(mock_image_copy_request, 0, 1, 0, {},
+                    {{1, CEPH_NOSNAP}}, 0);
+  MockUnlinkPeerRequest mock_unlink_peer_request;
+  expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid",
+                     0);
+  expect_mirror_image_snapshot_set_copy_progress(
+    mock_local_image_ctx, 11, true, 0, 0);
+  expect_notify_update(mock_local_image_ctx);
+
+  // idle
+  expect_is_refresh_required(mock_local_image_ctx, true);
+  expect_refresh(
+    mock_local_image_ctx, {
+      {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+         cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid",
+         1, true, 0, {}},
+       0, {}, 0, 0, {}}},
+    }, 0);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(2));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, LocalImagePromoted) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject a promotion snapshot
+  mock_local_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY,
+       {"remote mirror peer uuid"}, "", 0U, true, 0, {}},
+     0, {}, 0, 0, {}}}};
+
+  // idle
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, RegisterUpdateWatcherError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayerListener mock_replayer_listener;
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  // init
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 123,
+                                 -EINVAL);
+
+  // fire init
+  C_SaferCond init_ctx;
+  mock_replayer.init(&init_ctx);
+  ASSERT_EQ(-EINVAL, init_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, UnregisterUpdateWatcherError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+
+  // shut down
+  expect_unregister_update_watcher(mock_remote_image_ctx, 123, -EINVAL);
+
+  C_SaferCond shutdown_ctx;
+  mock_replayer.shut_down(&shutdown_ctx);
+  ASSERT_EQ(-EINVAL, shutdown_ctx.wait());
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, RefreshLocalImageError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // sync
+  expect_is_refresh_required(mock_local_image_ctx, true);
+  expect_refresh(mock_local_image_ctx, {}, -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, RefreshRemoteImageError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // sync
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, true);
+  expect_refresh(mock_remote_image_ctx, {}, -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, CopySnapshotsError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "",
+       0U},
+     0, {}, 0, 0, {}}}};
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, GetImageStateError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "",
+       0U},
+     0, {}, 0, 0, {}}}};
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       0);
+  MockGetImageStateRequest mock_get_image_state_request;
+  expect_get_image_state(mock_get_image_state_request, 1, -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, CreateNonPrimarySnapshotError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "",
+       0U},
+     0, {}, 0, 0, {}}}};
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       0);
+  MockGetImageStateRequest mock_get_image_state_request;
+  expect_get_image_state(mock_get_image_state_request, 1, 0);
+  MockCreateNonPrimaryRequest mock_create_non_primary_request;
+  expect_create_non_primary_request(mock_create_non_primary_request,
+                                    false, "remote mirror uuid", 1,
+                                    {{1, CEPH_NOSNAP}}, 11, -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, CopyImageError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "",
+       0U},
+     0, {}, 0, 0, {}}}};
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       0);
+  MockGetImageStateRequest mock_get_image_state_request;
+  expect_get_image_state(mock_get_image_state_request, 1, 0);
+  MockCreateNonPrimaryRequest mock_create_non_primary_request;
+  expect_create_non_primary_request(mock_create_non_primary_request,
+                                    false, "remote mirror uuid", 1,
+                                    {{1, CEPH_NOSNAP}}, 11, 0);
+  MockImageCopyRequest mock_image_copy_request;
+  expect_image_copy(mock_image_copy_request, 0, 1, 0, {},
+                    {{1, CEPH_NOSNAP}}, -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, UnlinkPeerError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "",
+       0U},
+     0, {}, 0, 0, {}}}};
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       0);
+  MockGetImageStateRequest mock_get_image_state_request;
+  expect_get_image_state(mock_get_image_state_request, 1, 0);
+  MockCreateNonPrimaryRequest mock_create_non_primary_request;
+  expect_create_non_primary_request(mock_create_non_primary_request,
+                                    false, "remote mirror uuid", 1,
+                                    {{1, CEPH_NOSNAP}}, 11, 0);
+  MockImageCopyRequest mock_image_copy_request;
+  expect_image_copy(mock_image_copy_request, 0, 1, 0, {},
+                    {{1, CEPH_NOSNAP}}, 0);
+  MockUnlinkPeerRequest mock_unlink_peer_request;
+  expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid",
+                     0);
+  expect_mirror_image_snapshot_set_copy_progress(
+    mock_local_image_ctx, 11, true, 0, -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+TEST_F(TestMockImageReplayerSnapshotReplayer, UpdateNonPrimarySnapshotError) {
+  librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx};
+  librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx};
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue_repeatedly(mock_threads);
+
+  MockReplayerListener mock_replayer_listener;
+  expect_notification(mock_threads, mock_replayer_listener);
+
+  InSequence seq;
+
+  MockStateBuilder mock_state_builder(mock_local_image_ctx,
+                                      mock_remote_image_ctx);
+  MockReplayer mock_replayer{&mock_threads, "local mirror uuid",
+                             &m_pool_meta_cache, &mock_state_builder,
+                             &mock_replayer_listener};
+  m_pool_meta_cache.set_remote_pool_meta(
+    m_remote_io_ctx.get_id(),
+    {"remote mirror uuid", "remote mirror peer uuid"});
+
+  librbd::UpdateWatchCtx* update_watch_ctx = nullptr;
+  ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads,
+                                   mock_local_image_ctx,
+                                   mock_remote_image_ctx,
+                                   mock_replayer_listener,
+                                   &update_watch_ctx));
+
+  // inject snapshot
+  mock_remote_image_ctx.snap_info = {
+    {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{
+       cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "",
+       0U},
+     0, {}, 0, 0, {}}}};
+
+  // sync snap1
+  expect_is_refresh_required(mock_local_image_ctx, false);
+  expect_is_refresh_required(mock_remote_image_ctx, false);
+  MockSnapshotCopyRequest mock_snapshot_copy_request;
+  expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}},
+                       0);
+  MockGetImageStateRequest mock_get_image_state_request;
+  expect_get_image_state(mock_get_image_state_request, 1, 0);
+  MockCreateNonPrimaryRequest mock_create_non_primary_request;
+  expect_create_non_primary_request(mock_create_non_primary_request,
+                                    false, "remote mirror uuid", 1,
+                                    {{1, CEPH_NOSNAP}}, 11, 0);
+  MockImageCopyRequest mock_image_copy_request;
+  expect_image_copy(mock_image_copy_request, 0, 1, 0, {},
+                    {{1, CEPH_NOSNAP}}, 0);
+  MockUnlinkPeerRequest mock_unlink_peer_request;
+  expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid",
+                     0);
+  expect_mirror_image_snapshot_set_copy_progress(
+    mock_local_image_ctx, 11, true, 0, -EINVAL);
+
+  // wake-up replayer
+  update_watch_ctx->handle_notify();
+
+  // wait for sync to complete and expect replay complete
+  ASSERT_EQ(0, wait_for_notification(1));
+  ASSERT_FALSE(mock_replayer.is_replaying());
+  ASSERT_EQ(-EINVAL, mock_replayer.get_error_code());
+
+  ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads,
+                                        mock_remote_image_ctx));
+}
+
+} // namespace snapshot
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
index 59d90d0e1e8424f66dd0f0eb7baf1d62da64b744..caa2daa4f9cfa58b94b81f05418d0fca95d85c9a 100644 (file)
@@ -121,6 +121,18 @@ public:
     } else {
       EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_remote_ioctx,
                                                    RBD_MIRROR_MODE_IMAGE));
+
+      uuid_d uuid_gen;
+      uuid_gen.generate_random();
+      std::string remote_peer_uuid = uuid_gen.to_string();
+
+      EXPECT_EQ(0, librbd::cls_client::mirror_peer_add(
+        &m_remote_ioctx, {remote_peer_uuid,
+                          cls::rbd::MIRROR_PEER_DIRECTION_RX_TX,
+                          "siteA", "client", m_local_mirror_uuid}));
+
+      m_pool_meta_cache.set_remote_pool_meta(
+        m_remote_ioctx.get_id(), {m_remote_mirror_uuid, remote_peer_uuid});
     }
 
     m_image_name = get_temp_image_name();
@@ -196,8 +208,14 @@ public:
     m_replayer->start(&cond);
     ASSERT_EQ(0, cond.wait());
 
+    std::string oid;
+    if (MIRROR_IMAGE_MODE == cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) {
+      oid = ::journal::Journaler::header_oid(m_remote_image_id);
+    } else {
+      oid = librbd::util::header_name(m_remote_image_id);
+    }
+
     ASSERT_EQ(0U, m_watch_handle);
-    std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
     create_watch_ctx(oid);
     ASSERT_EQ(0, m_remote_ioctx.watch2(oid, &m_watch_handle, m_watch_ctx));
   }
@@ -339,15 +357,45 @@ public:
     return true;
   }
 
-  void wait_for_replay_complete()
-  {
+  int get_last_mirror_snapshot(librados::IoCtx& io_ctx,
+                               const std::string& image_id,
+                               uint64_t* mirror_snap_id,
+                               cls::rbd::MirrorSnapshotNamespace* mirror_ns) {
+    auto header_oid = librbd::util::header_name(image_id);
+    ::SnapContext snapc;
+    int r = librbd::cls_client::get_snapcontext(&io_ctx, header_oid, &snapc);
+    if (r < 0) {
+      return r;
+    }
+
+    // stored in reverse order
+    for (auto snap_id : snapc.snaps) {
+      cls::rbd::SnapshotInfo snap_info;
+      r = librbd::cls_client::snapshot_get(&io_ctx, header_oid, snap_id,
+                                           &snap_info);
+      if (r < 0) {
+        return r;
+      }
+
+      auto ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+        &snap_info.snapshot_namespace);
+      if (ns != nullptr) {
+        *mirror_snap_id = snap_id;
+        *mirror_ns = *ns;
+        return 0;
+      }
+    }
+
+    return -ENOENT;
+  }
+
+  void wait_for_journal_synced() {
     cls::journal::ObjectPosition master_position;
     cls::journal::ObjectPosition mirror_position;
-
     for (int i = 0; i < 100; i++) {
       get_commit_positions(&master_position, &mirror_position);
       if (master_position == mirror_position) {
-       break;
+        break;
       }
       wait_for_watcher_notify(1);
     }
@@ -355,6 +403,55 @@ public:
     ASSERT_EQ(master_position, mirror_position);
   }
 
+  void wait_for_snapshot_synced() {
+    uint64_t remote_snap_id = CEPH_NOSNAP;
+    cls::rbd::MirrorSnapshotNamespace remote_mirror_ns;
+    ASSERT_EQ(0, get_last_mirror_snapshot(m_remote_ioctx, m_remote_image_id,
+                                          &remote_snap_id, &remote_mirror_ns));
+
+    std::string local_image_id;
+    ASSERT_EQ(0, librbd::cls_client::mirror_image_get_image_id(
+                   &m_local_ioctx, m_global_image_id, &local_image_id));
+
+    uint64_t local_snap_id = CEPH_NOSNAP;
+    cls::rbd::MirrorSnapshotNamespace local_mirror_ns;
+    for (int i = 0; i < 100; i++) {
+      int r = get_last_mirror_snapshot(m_local_ioctx, local_image_id,
+                                       &local_snap_id, &local_mirror_ns);
+      if (r == 0 &&
+          ((remote_mirror_ns.state ==
+              cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY &&
+            local_mirror_ns.state ==
+              cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY) ||
+           (remote_mirror_ns.state ==
+              cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED &&
+            local_mirror_ns.state ==
+              cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED)) &&
+          local_mirror_ns.primary_mirror_uuid == m_remote_mirror_uuid &&
+          local_mirror_ns.primary_snap_id == remote_snap_id &&
+          local_mirror_ns.complete) {
+        return;
+      }
+
+      wait_for_watcher_notify(1);
+    }
+
+    ADD_FAILURE() << "failed to locate matching snapshot: "
+                  << "remote_snap_id=" << remote_snap_id << ", "
+                  << "remote_snap_ns=" << remote_mirror_ns << ", "
+                  << "local_snap_id=" << local_snap_id << ", "
+                  << "local_snap_ns=" << local_mirror_ns;
+  }
+
+  void wait_for_replay_complete()
+  {
+    if (MIRROR_IMAGE_MODE == cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) {
+      wait_for_journal_synced();
+    } else {
+      wait_for_snapshot_synced();
+    }
+  }
+
   void wait_for_stopped() {
     for (int i = 0; i < 100; i++) {
       if (m_replayer->is_stopped()) {
@@ -411,9 +508,15 @@ public:
     ASSERT_EQ(0, c->wait_for_complete());
     c->put();
 
-    C_SaferCond journal_flush_ctx;
-    ictx->journal->flush_commit_position(&journal_flush_ctx);
-    ASSERT_EQ(0, journal_flush_ctx.wait());
+    if (MIRROR_IMAGE_MODE == cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) {
+      C_SaferCond journal_flush_ctx;
+      ictx->journal->flush_commit_position(&journal_flush_ctx);
+      ASSERT_EQ(0, journal_flush_ctx.wait());
+    } else {
+      uint64_t snap_id = CEPH_NOSNAP;
+      ASSERT_EQ(0, librbd::api::Mirror<>::image_snapshot_create(
+                     ictx, &snap_id));
+    }
 
     printf("flushed\n");
   }
@@ -437,8 +540,8 @@ public:
   std::string m_remote_image_id;
   std::string m_global_image_id;
   ImageReplayer<> *m_replayer = nullptr;
-  C_WatchCtx *m_watch_ctx;
-  uint64_t m_watch_handle;
+  C_WatchCtx *m_watch_ctx = nullptr;
+  uint64_t m_watch_handle = 0;
   char m_test_data[TEST_IO_SIZE + 1];
   std::string m_journal_commit_age;
 };
@@ -453,7 +556,9 @@ public:
 };
 
 typedef ::testing::Types<TestImageReplayerParams<
-                           cls::rbd::MIRROR_IMAGE_MODE_JOURNAL>>
+                           cls::rbd::MIRROR_IMAGE_MODE_JOURNAL>,
+                         TestImageReplayerParams<
+                           cls::rbd::MIRROR_IMAGE_MODE_SNAPSHOT>>
     TestImageReplayerTypes;
 
 TYPED_TEST_CASE(TestImageReplayer, TestImageReplayerTypes);
@@ -510,19 +615,25 @@ TYPED_TEST(TestImageReplayer, BootstrapErrorMirrorDisabled)
 TYPED_TEST(TestImageReplayer, BootstrapMirrorDisabling)
 {
   // set remote image mirroring state to DISABLING
-  ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(this->m_remote_ioctx,
-                                               RBD_MIRROR_MODE_IMAGE));
-  librbd::ImageCtx *ictx;
-  this->open_remote_image(&ictx);
-  ASSERT_EQ(0, librbd::api::Mirror<>::image_enable(
-              ictx, RBD_MIRROR_IMAGE_MODE_JOURNAL, false));
+  if (gtest_TypeParam_::MIRROR_IMAGE_MODE ==
+        cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) {
+    ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(this->m_remote_ioctx,
+                                                 RBD_MIRROR_MODE_IMAGE));
+    librbd::ImageCtx *ictx;
+    this->open_remote_image(&ictx);
+     ASSERT_EQ(0, librbd::api::Mirror<>::image_enable(
+                   ictx, RBD_MIRROR_IMAGE_MODE_JOURNAL, false));
+    this->close_image(ictx);
+  }
+
   cls::rbd::MirrorImage mirror_image;
   ASSERT_EQ(0, librbd::cls_client::mirror_image_get(&this->m_remote_ioctx,
-                                                    ictx->id, &mirror_image));
+                                                    this->m_remote_image_id,
+                                                    &mirror_image));
   mirror_image.state = cls::rbd::MirrorImageState::MIRROR_IMAGE_STATE_DISABLING;
   ASSERT_EQ(0, librbd::cls_client::mirror_image_set(&this->m_remote_ioctx,
-                                                    ictx->id, mirror_image));
-  this->close_image(ictx);
+                                                    this->m_remote_image_id,
+                                                    mirror_image));
 
   this->create_replayer();
   C_SaferCond cond;
@@ -704,8 +815,9 @@ TEST_F(TestImageReplayerJournal, NextTag)
   this->stop();
 }
 
-TYPED_TEST(TestImageReplayer, Resync)
+TEST_F(TestImageReplayerJournal, Resync)
 {
+  // TODO add support to snapshot-based mirroring
   this->bootstrap();
 
   librbd::ImageCtx *ictx;
@@ -753,8 +865,9 @@ TYPED_TEST(TestImageReplayer, Resync)
   this->stop();
 }
 
-TYPED_TEST(TestImageReplayer, Resync_While_Stop)
+TEST_F(TestImageReplayerJournal, Resync_While_Stop)
 {
+  // TODO add support to snapshot-based mirroring
 
   this->bootstrap();
 
@@ -813,8 +926,9 @@ TYPED_TEST(TestImageReplayer, Resync_While_Stop)
   this->stop();
 }
 
-TYPED_TEST(TestImageReplayer, Resync_StartInterrupted)
+TEST_F(TestImageReplayerJournal, Resync_StartInterrupted)
 {
+  // TODO add support to snapshot-based mirroring
 
   this->bootstrap();
 
@@ -923,7 +1037,7 @@ TEST_F(TestImageReplayerJournal, MultipleReplayFailures_SingleEpoch) {
   this->close_image(ictx);
 }
 
-TYPED_TEST(TestImageReplayer, MultipleReplayFailures_MultiEpoch) {
+TEST_F(TestImageReplayerJournal, MultipleReplayFailures_MultiEpoch) {
   this->bootstrap();
 
   // inject a snapshot that cannot be unprotected
@@ -1067,8 +1181,9 @@ TEST_F(TestImageReplayerJournal, Disconnect)
   this->stop();
 }
 
-TYPED_TEST(TestImageReplayer, UpdateFeatures)
+TEST_F(TestImageReplayerJournal, UpdateFeatures)
 {
+  // TODO add support to snapshot-based mirroring
   const uint64_t FEATURES_TO_UPDATE =
     RBD_FEATURE_OBJECT_MAP | RBD_FEATURE_FAST_DIFF;
 
@@ -1159,8 +1274,9 @@ TYPED_TEST(TestImageReplayer, UpdateFeatures)
   this->stop();
 }
 
-TYPED_TEST(TestImageReplayer, MetadataSetRemove)
+TEST_F(TestImageReplayerJournal, MetadataSetRemove)
 {
+  // TODO add support to snapshot-based mirroring
   const std::string KEY = "test_key";
   const std::string VALUE = "test_value";
 
@@ -1204,8 +1320,9 @@ TYPED_TEST(TestImageReplayer, MetadataSetRemove)
   this->stop();
 }
 
-TYPED_TEST(TestImageReplayer, MirroringDelay)
+TEST_F(TestImageReplayerJournal, MirroringDelay)
 {
+  // TODO add support to snapshot-based mirroring
   const double DELAY = 10; // set less than wait_for_replay_complete timeout
 
   librbd::ImageCtx *ictx;
index e4a2ab38ae3b378a42212fa241c36c23f8e39f08..e2fcc6836c346eef3a9497e84fce5e42f61d9eac 100644 (file)
@@ -6,8 +6,15 @@
 #include "common/errno.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
-#include "librbd/Journal.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
 #include "librbd/Utils.h"
+#include "librbd/deep_copy/ImageCopyRequest.h"
+#include "librbd/deep_copy/SnapshotCopyRequest.h"
+#include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
+#include "librbd/mirror/snapshot/GetImageStateRequest.h"
+#include "librbd/mirror/snapshot/UnlinkPeerRequest.h"
 #include "tools/rbd_mirror/PoolMetaCache.h"
 #include "tools/rbd_mirror/Threads.h"
 #include "tools/rbd_mirror/Types.h"
@@ -31,6 +38,48 @@ namespace snapshot {
 
 using librbd::util::create_async_context_callback;
 using librbd::util::create_context_callback;
+using librbd::util::create_rados_callback;
+
+template <typename I>
+struct Replayer<I>::C_UpdateWatchCtx : public librbd::UpdateWatchCtx {
+  Replayer<I>* replayer;
+
+  C_UpdateWatchCtx(Replayer<I>* replayer) : replayer(replayer) {
+  }
+
+  void handle_notify() override {
+     replayer->handle_remote_image_update_notify();
+  }
+};
+
+template <typename I>
+struct Replayer<I>::C_TrackedOp : public Context {
+  Replayer *replayer;
+  Context* ctx;
+
+  C_TrackedOp(Replayer* replayer, Context* ctx)
+    : replayer(replayer), ctx(ctx) {
+    replayer->m_in_flight_op_tracker.start_op();
+  }
+
+  void finish(int r) override {
+    ctx->complete(r);
+    replayer->m_in_flight_op_tracker.finish_op();
+  }
+};
+
+template <typename I>
+struct Replayer<I>::ProgressContext : public librbd::ProgressContext {
+  Replayer *replayer;
+
+  ProgressContext(Replayer* replayer) : replayer(replayer) {
+  }
+
+  int update_progress(uint64_t offset, uint64_t total) override {
+    replayer->handle_copy_image_progress(offset, total);
+    return 0;
+  }
+};
 
 template <typename I>
 Replayer<I>::Replayer(
@@ -52,24 +101,59 @@ Replayer<I>::Replayer(
 template <typename I>
 Replayer<I>::~Replayer() {
   dout(10) << dendl;
+  ceph_assert(m_state == STATE_COMPLETE);
+  ceph_assert(m_update_watch_ctx == nullptr);
+  ceph_assert(m_progress_ctx == nullptr);
 }
 
 template <typename I>
 void Replayer<I>::init(Context* on_finish) {
   dout(10) << dendl;
 
-  // TODO
-  m_state = STATE_REPLAYING;
-  m_threads->work_queue->queue(on_finish, 0);
+  ceph_assert(m_state == STATE_INIT);
+
+  RemotePoolMeta remote_pool_meta;
+  int r = m_pool_meta_cache->get_remote_pool_meta(
+    m_state_builder->remote_image_ctx->md_ctx.get_id(), &remote_pool_meta);
+  if (r < 0 || remote_pool_meta.mirror_peer_uuid.empty()) {
+    derr << "failed to retrieve mirror peer uuid from remote pool" << dendl;
+    m_state = STATE_COMPLETE;
+    m_threads->work_queue->queue(on_finish, r);
+    return;
+  }
+
+  m_remote_mirror_peer_uuid = remote_pool_meta.mirror_peer_uuid;
+  dout(10) << "remote_mirror_peer_uuid=" << m_remote_mirror_peer_uuid << dendl;
+
+  ceph_assert(m_on_init_shutdown == nullptr);
+  m_on_init_shutdown = on_finish;
+
+  register_update_watcher();
 }
 
 template <typename I>
 void Replayer<I>::shut_down(Context* on_finish) {
   dout(10) << dendl;
 
-  // TODO
-  m_state = STATE_COMPLETE;
-  m_threads->work_queue->queue(on_finish, 0);
+  std::unique_lock locker{m_lock};
+  ceph_assert(m_on_init_shutdown == nullptr);
+  m_on_init_shutdown = on_finish;
+  m_error_code = 0;
+  m_error_description = "";
+
+  ceph_assert(m_state != STATE_INIT);
+  auto state = STATE_COMPLETE;
+  std::swap(m_state, state);
+
+  if (state == STATE_REPLAYING) {
+    // TODO interrupt snapshot copy and image copy state machines even if remote
+    // cluster is unreachable
+    dout(10) << "shut down pending on completion of snapshot replay" << dendl;
+    return;
+  }
+  locker.unlock();
+
+  unregister_update_watcher();
 }
 
 template <typename I>
@@ -90,6 +174,678 @@ bool Replayer<I>::get_replay_status(std::string* description,
   return true;
 }
 
+template <typename I>
+void Replayer<I>::refresh_local_image() {
+  if (!m_state_builder->local_image_ctx->state->is_refresh_required()) {
+    refresh_remote_image();
+    return;
+  }
+
+  dout(10) << dendl;
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_refresh_local_image>(this);
+  m_state_builder->local_image_ctx->state->refresh(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_refresh_local_image(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to refresh local image: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "failed to refresh local image");
+    return;
+  }
+
+  refresh_remote_image();
+}
+
+template <typename I>
+void Replayer<I>::refresh_remote_image() {
+  if (!m_state_builder->remote_image_ctx->state->is_refresh_required()) {
+    scan_local_mirror_snapshots();
+    return;
+  }
+
+  dout(10) << dendl;
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_refresh_remote_image>(this);
+  m_state_builder->remote_image_ctx->state->refresh(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_refresh_remote_image(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to refresh remote image: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "failed to refresh remote image");
+    return;
+  }
+
+  scan_local_mirror_snapshots();
+}
+
+template <typename I>
+void Replayer<I>::scan_local_mirror_snapshots() {
+  if (is_replay_interrupted()) {
+    return;
+  }
+
+  dout(10) << dendl;
+
+  m_local_snap_id_start = 0;
+  m_local_snap_id_end = CEPH_NOSNAP;
+  m_local_mirror_snap_ns = {};
+
+  m_remote_snap_id_start = 0;
+  m_remote_snap_id_end = CEPH_NOSNAP;
+  m_remote_mirror_snap_ns = {};
+
+  auto local_image_ctx = m_state_builder->local_image_ctx;
+  std::shared_lock image_locker{local_image_ctx->image_lock};
+  for (auto snap_info_it = local_image_ctx->snap_info.begin();
+       snap_info_it != local_image_ctx->snap_info.end(); ++snap_info_it) {
+    const auto& snap_ns = snap_info_it->second.snap_namespace;
+    auto mirror_ns = boost::get<
+      cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
+    if (mirror_ns == nullptr) {
+      continue;
+    }
+
+    dout(15) << "local mirror snapshot: id=" << snap_info_it->first << ", "
+             << "mirror_ns=" << *mirror_ns << dendl;
+    m_local_mirror_snap_ns = *mirror_ns;
+
+    auto local_snap_id = snap_info_it->first;
+    if (mirror_ns->is_non_primary()) {
+      if (mirror_ns->complete) {
+        // if remote has new snapshots, we would sync from here
+        m_local_snap_id_start = local_snap_id;
+        m_local_snap_id_end = CEPH_NOSNAP;
+      } else {
+        // start snap will be last complete mirror snapshot or initial
+        // image revision
+        m_local_snap_id_end = local_snap_id;
+      }
+    } else if (mirror_ns->is_primary()) {
+      if (mirror_ns->complete) {
+        m_local_snap_id_start = local_snap_id;
+        m_local_snap_id_end = CEPH_NOSNAP;
+      } else {
+        derr << "incomplete local primary snapshot" << dendl;
+        handle_replay_complete(-EINVAL, "incomplete local primary snapshot");
+        return;
+      }
+    } else {
+      derr << "unknown local mirror snapshot state" << dendl;
+      handle_replay_complete(-EINVAL, "invalid local mirror snapshot state");
+      return;
+    }
+  }
+  image_locker.unlock();
+
+  if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
+    if (m_local_mirror_snap_ns.is_non_primary() &&
+        m_local_mirror_snap_ns.primary_mirror_uuid !=
+          m_state_builder->remote_mirror_uuid) {
+      // TODO support multiple peers
+      derr << "local image linked to unknown peer: "
+           << m_local_mirror_snap_ns.primary_mirror_uuid << dendl;
+      handle_replay_complete(-EEXIST, "local image linked to unknown peer");
+      return;
+    } else if (m_local_mirror_snap_ns.state ==
+                 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY) {
+      dout(5) << "local image promoted" << dendl;
+      handle_replay_complete(0, "force promoted");
+      return;
+    }
+
+    dout(10) << "found local mirror snapshot: "
+             << "local_snap_id_start=" << m_local_snap_id_start << ", "
+             << "local_snap_id_end=" << m_local_snap_id_end << ", "
+             << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
+    if (m_local_mirror_snap_ns.complete) {
+      // our remote sync should start after this completed snapshot
+      m_remote_snap_id_start = m_local_mirror_snap_ns.primary_snap_id;
+    }
+  }
+
+  // we don't have any mirror snapshots or only completed non-primary
+  // mirror snapshots
+  scan_remote_mirror_snapshots();
+}
+
+template <typename I>
+void Replayer<I>::scan_remote_mirror_snapshots() {
+  dout(10) << dendl;
+
+  {
+    // reset state in case new snapshot is added while we are scanning
+    std::unique_lock locker{m_lock};
+    m_remote_image_updated = false;
+  }
+
+  bool remote_demoted = false;
+  auto remote_image_ctx = m_state_builder->remote_image_ctx;
+  std::shared_lock image_locker{remote_image_ctx->image_lock};
+  for (auto snap_info_it = remote_image_ctx->snap_info.begin();
+       snap_info_it != remote_image_ctx->snap_info.end(); ++snap_info_it) {
+    const auto& snap_ns = snap_info_it->second.snap_namespace;
+    auto mirror_ns = boost::get<
+      cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
+    if (mirror_ns == nullptr) {
+      continue;
+    }
+
+    dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", "
+             << "mirror_ns=" << *mirror_ns << dendl;
+    if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) {
+      derr << "unknown remote mirror snapshot state" << dendl;
+      handle_replay_complete(-EINVAL, "invalid remote mirror snapshot state");
+      return;
+    } else {
+      remote_demoted = (mirror_ns->is_primary() && mirror_ns->is_demoted());
+    }
+
+    auto remote_snap_id = snap_info_it->first;
+    if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
+      // we have a local mirror snapshot
+      if (m_local_mirror_snap_ns.is_non_primary()) {
+        // previously validated that it was linked to remote
+        ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid ==
+                      m_state_builder->remote_mirror_uuid);
+
+        if (m_local_mirror_snap_ns.complete &&
+            m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) {
+          // skip past completed remote snapshot
+          m_remote_snap_id_start = remote_snap_id;
+          dout(15) << "skipping synced remote snapshot " << remote_snap_id
+                   << dendl;
+          continue;
+        } else if (!m_local_mirror_snap_ns.complete &&
+                   m_local_mirror_snap_ns.primary_snap_id > remote_snap_id) {
+          // skip until we get to the in-progress remote snapshot
+          dout(15) << "skipping synced remote snapshot " << remote_snap_id
+                   << " while search for in-progress sync" << dendl;
+          m_remote_snap_id_start = remote_snap_id;
+          continue;
+        }
+      } else if (m_local_mirror_snap_ns.state ==
+                   cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED) {
+        // find the matching demotion snapshot in remote image
+        ceph_assert(m_local_snap_id_start > 0);
+        if (mirror_ns->state ==
+              cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED &&
+            mirror_ns->primary_mirror_uuid == m_local_mirror_uuid &&
+            mirror_ns->primary_snap_id == m_local_snap_id_start) {
+          dout(10) << "located matching demotion snapshot: "
+                   << "remote_snap_id=" << remote_snap_id << ", "
+                   << "local_snap_id=" << m_local_snap_id_start << dendl;
+          m_remote_snap_id_start = remote_snap_id;
+          continue;
+        } else if (m_remote_snap_id_start == 0) {
+          // still looking for our matching demotion snapshot
+          dout(15) << "skipping remote snapshot " << remote_snap_id << " "
+                   << "while searching for demotion" << dendl;
+          continue;
+        }
+      } else {
+        // should not have been able to reach this
+        ceph_assert(false);
+      }
+    }
+
+    // find first snapshot where were are listed as a peer
+    if (!mirror_ns->is_primary()) {
+      dout(15) << "skipping non-primary remote snapshot" << dendl;
+      continue;
+    } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
+                 0) {
+      dout(15) << "skipping remote snapshot due to missing mirror peer"
+               << dendl;
+      continue;
+    }
+
+    m_remote_snap_id_end = remote_snap_id;
+    m_remote_mirror_snap_ns = *mirror_ns;
+    break;
+  }
+  image_locker.unlock();
+
+  if (m_remote_snap_id_end != CEPH_NOSNAP) {
+    dout(10) << "found remote mirror snapshot: "
+             << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
+             << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
+             << "remote_snap_ns=" << m_remote_mirror_snap_ns << dendl;
+
+    if (m_local_snap_id_end != CEPH_NOSNAP &&
+        !m_local_mirror_snap_ns.complete) {
+      // attempt to resume image-sync
+      dout(10) << "local image contains in-progress mirror snapshot"
+               << dendl;
+      copy_image();
+    } else {
+      copy_snapshots();
+    }
+    return;
+  }
+
+  std::unique_lock locker{m_lock};
+  if (m_remote_image_updated) {
+    // received update notification while scanning image, restart ...
+    m_remote_image_updated = false;
+    locker.unlock();
+
+    dout(10) << "restarting snapshot scan due to remote update notification"
+             << dendl;
+    refresh_local_image();
+    return;
+  }
+
+  if (is_replay_interrupted(&locker)) {
+    return;
+  } else if (remote_demoted) {
+    locker.unlock();
+
+    dout(10) << "remote image demoted" << dendl;
+    handle_replay_complete(0, "remote image demoted");
+    return;
+  }
+
+  dout(10) << "all remote snapshots synced: idling waiting for new snapshot"
+           << dendl;
+  ceph_assert(m_state == STATE_REPLAYING);
+  m_state = STATE_IDLE;
+
+  notify_status_updated();
+}
+
+template <typename I>
+void Replayer<I>::copy_snapshots() {
+  dout(10) << dendl;
+
+  ceph_assert(m_remote_snap_id_start != CEPH_NOSNAP);
+  ceph_assert(m_remote_snap_id_end > 0 &&
+              m_remote_snap_id_end != CEPH_NOSNAP);
+  ceph_assert(m_local_snap_id_start != CEPH_NOSNAP);
+
+  m_local_mirror_snap_ns = {};
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_copy_snapshots>(this);
+  auto req = librbd::deep_copy::SnapshotCopyRequest<I>::create(
+    m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
+    m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start,
+    false, m_threads->work_queue, &m_local_mirror_snap_ns.snap_seqs,
+    ctx);
+  req->send();
+}
+
+template <typename I>
+void Replayer<I>::handle_copy_snapshots(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to copy snapshots from remote to local image: "
+         << cpp_strerror(r) << dendl;
+    handle_replay_complete(
+      r, "failed to copy snapshots from remote to local image");
+    return;
+  }
+
+  dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
+           << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
+           << "local_snap_id_start=" << m_local_snap_id_start << ", "
+           << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
+  get_image_state();
+}
+
+template <typename I>
+void Replayer<I>::get_image_state() {
+  dout(10) << dendl;
+
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_get_image_state>(this);
+  auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
+    m_state_builder->remote_image_ctx, m_remote_snap_id_end,
+    &m_image_state, ctx);
+  req->send();
+}
+
+template <typename I>
+void Replayer<I>::handle_get_image_state(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to retrieve remote snapshot image state: "
+         << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "failed to retrieve remote snapshot image state");
+    return;
+  }
+
+  create_non_primary_snapshot();
+}
+
+template <typename I>
+void Replayer<I>::create_non_primary_snapshot() {
+  dout(10) << dendl;
+
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_create_non_primary_snapshot>(this);
+  auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest<I>::create(
+    m_state_builder->local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
+    m_state_builder->remote_mirror_uuid, m_remote_snap_id_end,
+    m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx);
+  req->send();
+}
+
+template <typename I>
+void Replayer<I>::handle_create_non_primary_snapshot(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to create local mirror snapshot: " << cpp_strerror(r)
+         << dendl;
+    handle_replay_complete(r, "failed to create local mirror snapshot");
+    return;
+  }
+
+  copy_image();
+}
+
+template <typename I>
+void Replayer<I>::copy_image() {
+  dout(10) << dendl;
+
+  m_progress_ctx = new ProgressContext(this);
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_copy_image>(this);
+  auto req = librbd::deep_copy::ImageCopyRequest<I>::create(
+    m_state_builder->remote_image_ctx,  m_state_builder->local_image_ctx,
+    m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, false,
+    (m_local_mirror_snap_ns.last_copied_object_number > 0 ?
+      librbd::deep_copy::ObjectNumber{
+        m_local_mirror_snap_ns.last_copied_object_number} :
+      librbd::deep_copy::ObjectNumber{}),
+    m_local_mirror_snap_ns.snap_seqs, m_progress_ctx, ctx);
+  req->send();
+}
+
+template <typename I>
+void Replayer<I>::handle_copy_image(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  delete m_progress_ctx;
+  m_progress_ctx = nullptr;
+
+  if (r < 0) {
+    derr << "failed to copy remote image to local image: " << cpp_strerror(r)
+         << dendl;
+    handle_replay_complete(r, "failed to copy remote image");
+    return;
+  }
+
+  unlink_peer();
+}
+
+template <typename I>
+void Replayer<I>::handle_copy_image_progress(uint64_t offset, uint64_t total) {
+  dout(10) << "offset=" << offset << ", total=" << total << dendl;
+
+  // TODO
+}
+
+template <typename I>
+void Replayer<I>::unlink_peer() {
+  dout(10) << dendl;
+
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_unlink_peer>(this);
+  auto req = librbd::mirror::snapshot::UnlinkPeerRequest<I>::create(
+    m_state_builder->remote_image_ctx, m_remote_snap_id_end,
+    m_remote_mirror_peer_uuid, ctx);
+  req->send();
+}
+
+template <typename I>
+void Replayer<I>::handle_unlink_peer(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0 && r != -ENOENT) {
+    derr << "failed to unlink local peer from remote image: " << cpp_strerror(r)
+         << dendl;
+    handle_replay_complete(r, "failed to unlink local peer from remote image");
+    return;
+  }
+
+  update_non_primary_snapshot(true);
+}
+
+template <typename I>
+void Replayer<I>::update_non_primary_snapshot(bool complete) {
+  dout(10) << dendl;
+
+  if (complete) {
+    m_local_mirror_snap_ns.complete = true;
+  }
+
+  librados::ObjectWriteOperation op;
+  librbd::cls_client::mirror_image_snapshot_set_copy_progress(
+    &op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
+    m_local_mirror_snap_ns.last_copied_object_number);
+
+  auto ctx = new LambdaContext([this, complete](int r) {
+      handle_update_non_primary_snapshot(complete, r);
+    });
+  auto aio_comp = create_rados_callback(ctx);
+  int r = m_state_builder->local_image_ctx->md_ctx.aio_operate(
+    m_state_builder->local_image_ctx->header_oid, aio_comp, &op);
+  ceph_assert(r == 0);
+  aio_comp->release();
+}
+
+template <typename I>
+void Replayer<I>::handle_update_non_primary_snapshot(bool complete, int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to update local snapshot progress: " << cpp_strerror(r)
+         << dendl;
+    handle_replay_complete(r, "failed to update local snapshot progress");
+    return;
+  }
+
+  {
+    std::unique_lock locker{m_lock};
+    notify_status_updated();
+  }
+
+  notify_image_update();
+}
+
+template <typename I>
+void Replayer<I>::notify_image_update() {
+  dout(10) << dendl;
+
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_notify_image_update>(this);
+  m_state_builder->local_image_ctx->notify_update(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_notify_image_update(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to notify local image update: " << cpp_strerror(r) << dendl;
+  }
+
+  if (is_replay_interrupted()) {
+    return;
+  }
+
+  refresh_local_image();
+}
+
+template <typename I>
+void Replayer<I>::register_update_watcher() {
+  dout(10) << dendl;
+
+  m_update_watch_ctx = new C_UpdateWatchCtx(this);
+  int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
+    m_update_watch_ctx, &m_update_watcher_handle);
+  auto ctx = create_context_callback<
+    Replayer<I>, &Replayer<I>::handle_register_update_watcher>(this);
+  m_threads->work_queue->queue(ctx, r);
+}
+
+
+template <typename I>
+void Replayer<I>::handle_register_update_watcher(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to register update watcher: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(r, "failed to register remote image update watcher");
+    m_state = STATE_COMPLETE;
+
+    delete m_update_watch_ctx;
+    m_update_watch_ctx = nullptr;
+  } else {
+    m_state = STATE_REPLAYING;
+  }
+
+  Context* on_init = nullptr;
+  std::swap(on_init, m_on_init_shutdown);
+  on_init->complete(r);
+
+  // delay initial snapshot scan until after we have alerted
+  // image replayer that we have initialized in case an error
+  // occurs
+  if (r >= 0) {
+    {
+      std::unique_lock locker{m_lock};
+      notify_status_updated();
+    }
+
+    refresh_local_image();
+  }
+}
+
+template <typename I>
+void Replayer<I>::unregister_update_watcher() {
+  dout(10) << dendl;
+
+  auto ctx = create_context_callback<
+    Replayer<I>,
+    &Replayer<I>::handle_unregister_update_watcher>(this);
+  m_state_builder->remote_image_ctx->state->unregister_update_watcher(
+    m_update_watcher_handle, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_unregister_update_watcher(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to unregister update watcher: " << cpp_strerror(r) << dendl;
+    handle_replay_complete(
+      r, "failed to unregister remote image update watcher");
+  }
+
+  delete m_update_watch_ctx;
+  m_update_watch_ctx = nullptr;
+
+  wait_for_in_flight_ops();
+}
+
+template <typename I>
+void Replayer<I>::wait_for_in_flight_ops() {
+  dout(10) << dendl;
+
+  auto ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<
+      Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
+  m_in_flight_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  Context* on_shutdown = nullptr;
+  {
+    std::unique_lock locker{m_lock};
+    ceph_assert(m_on_init_shutdown != nullptr);
+    std::swap(on_shutdown, m_on_init_shutdown);
+  }
+  on_shutdown->complete(m_error_code);
+}
+
+template <typename I>
+void Replayer<I>::handle_remote_image_update_notify() {
+  dout(10) << dendl;
+
+  std::unique_lock locker{m_lock};
+  if (m_state == STATE_REPLAYING) {
+    dout(15) << "flagging snapshot rescan required" << dendl;
+    m_remote_image_updated = true;
+  } else if (m_state == STATE_IDLE) {
+    m_state = STATE_REPLAYING;
+    locker.unlock();
+
+    dout(15) << "restarting idle replayer" << dendl;
+    refresh_local_image();
+  }
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_complete(int r,
+                                         const std::string& description) {
+  std::unique_lock locker{m_lock};
+  if (m_error_code == 0) {
+    m_error_code = r;
+    m_error_description = description;
+  }
+
+  if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
+    return;
+  }
+
+  m_state = STATE_COMPLETE;
+  notify_status_updated();
+}
+
+template <typename I>
+void Replayer<I>::notify_status_updated() {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  dout(10) << dendl;
+  auto ctx = new C_TrackedOp(this, new LambdaContext(
+    [this](int) {
+      m_replayer_listener->handle_notification();
+    }));
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_interrupted() {
+  std::unique_lock locker{m_lock};
+  return is_replay_interrupted(&locker);
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
+  if (m_state == STATE_COMPLETE) {
+    locker->unlock();
+
+    dout(10) << "resuming pending shut down" << dendl;
+    unregister_update_watcher();
+    return true;
+  }
+  return false;
+}
+
 } // namespace snapshot
 } // namespace image_replayer
 } // namespace mirror
index e9be6aa121c5ee5f323aa36f1fc2d64ff42c3efd..fd30bbdcce334cecf6e199c8cd37ee45c3f85b43 100644 (file)
@@ -6,6 +6,9 @@
 
 #include "tools/rbd_mirror/image_replayer/Replayer.h"
 #include "common/ceph_mutex.h"
+#include "common/AsyncOpTracker.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include "librbd/mirror/snapshot/Types.h"
 #include <string>
 #include <type_traits>
 
@@ -66,7 +69,7 @@ public:
 
   bool is_replaying() const override {
     std::unique_lock locker{m_lock};
-    return (m_state == STATE_REPLAYING);
+    return (m_state == STATE_REPLAYING || m_state == STATE_IDLE);
   }
 
   bool is_resync_requested() const override {
@@ -77,25 +80,80 @@ public:
 
   int get_error_code() const override {
     std::unique_lock locker(m_lock);
-    // TODO
-    return 0;
+    return m_error_code;
   }
 
   std::string get_error_description() const override {
     std::unique_lock locker(m_lock);
-    // TODO
-    return "";
+    return m_error_description;
   }
 
 private:
-  // TODO
   /**
    * @verbatim
    *
-   *  <init>
+   * <init>
+   *    |
+   *    v
+   * REGISTER_UPDATE_WATCHER
+   *    |
+   *    v (skip if not needed)
+   * REFRESH_LOCAL_IMAGE <------------------------------\
+   *    |                                               |
+   *    v (skip if not needed)                          |
+   * REFRESH_REMOTE_IMAGE                               |
+   *    |                                               |
+   *    |                                               |
+   *    | (interrupted sync)                            |
+   *    |\--------------------------------------------\ |
+   *    |                                             | |
+   *    | (new snapshot)                              | |
+   *    |\--------------> COPY_SNAPSHOTS              | |
+   *    |                     |                       | |
+   *    |                     v                       | |
+   *    |                 GET_IMAGE_STATE             | |
+   *    |                     |                       | |
+   *    |                     v                       | |
+   *    |                 CREATE_NON_PRIMARY_SNAPSHOT | |
+   *    |                     |                       | |
+   *    |                     |/----------------------/ |
+   *    |                     |                         |
+   *    |                     v                         |
+   *    |                 COPY_IMAGE                    |
+   *    |                     |                         |
+   *    |                     v                         |
+   *    |                 UNLINK_PEER                   |
+   *    |                     |                         |
+   *    |                     v                         |
+   *    |                 UPDATE_NON_PRIMARY_SNAPSHOT   |
+   *    |                     |                         |
+   *    |                     v                         |
+   *    |                 NOTIFY_IMAGE_UPDATE           |
+   *    |                     |                         |
+   *    |                     v                         |
+   *    |                 NOTIFY_LISTENER               |
+   *    |                     |                         |
+   *    |                     \------------------------/|
+   *    |                                               |
+   *    | (remote demoted)                              |
+   *    \---------------> NOTIFY_LISTENER               |
+   *    |                     |                         |
+   *    |/--------------------/                         |
+   *    |                                               |
+   *    |   (update notification)                       |
+   * <idle> --------------------------------------------/
    *    |
    *    v
-   * <shutdown>
+   * <shut down>
+   *    |
+   *    v
+   * UNREGISTER_UPDATE_WATCHER
+   *    |
+   *    v
+   * WAIT_FOR_IN_FLIGHT_OPS
+   *    |
+   *    v
+   * <finish>
    *
    * @endverbatim
    */
@@ -103,9 +161,14 @@ private:
   enum State {
     STATE_INIT,
     STATE_REPLAYING,
+    STATE_IDLE,
     STATE_COMPLETE
   };
 
+  struct C_UpdateWatchCtx;
+  struct C_TrackedOp;
+  struct ProgressContext;
+
   Threads<ImageCtxT>* m_threads;
   std::string m_local_mirror_uuid;
   PoolMetaCache* m_pool_meta_cache;
@@ -115,6 +178,79 @@ private:
   mutable ceph::mutex m_lock;
 
   State m_state = STATE_INIT;
+
+  Context* m_on_init_shutdown = nullptr;
+
+  int m_error_code = 0;
+  std::string m_error_description;
+
+  C_UpdateWatchCtx* m_update_watch_ctx;
+  uint64_t m_update_watcher_handle = 0;
+
+  AsyncOpTracker m_in_flight_op_tracker;
+
+  uint64_t m_local_snap_id_start = 0;
+  uint64_t m_local_snap_id_end = CEPH_NOSNAP;
+  cls::rbd::MirrorSnapshotNamespace m_local_mirror_snap_ns;
+
+  std::string m_remote_mirror_peer_uuid;
+  uint64_t m_remote_snap_id_start = 0;
+  uint64_t m_remote_snap_id_end = CEPH_NOSNAP;
+  cls::rbd::MirrorSnapshotNamespace m_remote_mirror_snap_ns;
+
+  librbd::mirror::snapshot::ImageState m_image_state;
+  ProgressContext* m_progress_ctx = nullptr;
+
+  bool m_remote_image_updated = false;
+
+  void refresh_local_image();
+  void handle_refresh_local_image(int r);
+
+  void refresh_remote_image();
+  void handle_refresh_remote_image(int r);
+
+  void scan_local_mirror_snapshots();
+  void scan_remote_mirror_snapshots();
+
+  void copy_snapshots();
+  void handle_copy_snapshots(int r);
+
+  void get_image_state();
+  void handle_get_image_state(int r);
+
+  void create_non_primary_snapshot();
+  void handle_create_non_primary_snapshot(int r);
+
+  void copy_image();
+  void handle_copy_image(int r);
+  void handle_copy_image_progress(uint64_t offset, uint64_t total);
+
+  void unlink_peer();
+  void handle_unlink_peer(int r);
+
+  void update_non_primary_snapshot(bool complete);
+  void handle_update_non_primary_snapshot(bool complete, int r);
+
+  void notify_image_update();
+  void handle_notify_image_update(int r);
+
+  void register_update_watcher();
+  void handle_register_update_watcher(int r);
+
+  void unregister_update_watcher();
+  void handle_unregister_update_watcher(int r);
+
+  void wait_for_in_flight_ops();
+  void handle_wait_for_in_flight_ops(int r);
+
+  void handle_remote_image_update_notify();
+
+  void handle_replay_complete(int r, const std::string& description);
+  void notify_status_updated();
+
+  bool is_replay_interrupted();
+  bool is_replay_interrupted(std::unique_lock<ceph::mutex>* lock);
+
 };
 
 } // namespace snapshot