From e4cb7e4d78bcfc3cc96eea5313c101044013cb7b Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Thu, 6 Feb 2020 13:25:33 -0500 Subject: [PATCH] rbd-mirror: initial snapshot-based mirroring replayer logic This initial implementation will scan the remote image for new mirror snapshots. If a new primary snapshot it detected, it will start syncing it. If the remote was demoted, it will notify the image replayer to shut down. If an in-progress sync was previously interrupted, it will attempt to restart it. Signed-off-by: Jason Dillaman --- src/cls/rbd/cls_rbd_types.h | 13 + src/test/rbd_mirror/CMakeLists.txt | 1 + .../snapshot/test_mock_Replayer.cc | 1442 +++++++++++++++++ src/test/rbd_mirror/test_ImageReplayer.cc | 171 +- .../image_replayer/snapshot/Replayer.cc | 770 ++++++++- .../image_replayer/snapshot/Replayer.h | 152 +- 6 files changed, 2507 insertions(+), 42 deletions(-) create mode 100644 src/test/rbd_mirror/image_replayer/snapshot/test_mock_Replayer.cc diff --git a/src/cls/rbd/cls_rbd_types.h b/src/cls/rbd/cls_rbd_types.h index 18c7b0350fe97..1e4a537b04d73 100644 --- a/src/cls/rbd/cls_rbd_types.h +++ b/src/cls/rbd/cls_rbd_types.h @@ -555,6 +555,19 @@ struct MirrorSnapshotNamespace { primary_mirror_uuid(primary_mirror_uuid), primary_snap_id(primary_snap_id) { } + MirrorSnapshotNamespace(MirrorSnapshotState state, + const std::set &mirror_peer_uuids, + const std::string& primary_mirror_uuid, + snapid_t primary_snap_id, + bool complete, + uint64_t last_copied_object_number, + const SnapSeqs& snap_seqs) + : state(state), complete(complete), mirror_peer_uuids(mirror_peer_uuids), + primary_mirror_uuid(primary_mirror_uuid), + primary_snap_id(primary_snap_id), + last_copied_object_number(last_copied_object_number), + snap_seqs(snap_seqs) { + } inline bool is_primary() const { return (state == MIRROR_SNAPSHOT_STATE_PRIMARY || diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index dedabe8d3949d..0479a43c27fa1 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -43,6 +43,7 @@ add_executable(unittest_rbd_mirror image_replayer/journal/test_mock_EventPreprocessor.cc image_replayer/journal/test_mock_Replayer.cc image_replayer/snapshot/test_mock_CreateLocalImageRequest.cc + image_replayer/snapshot/test_mock_Replayer.cc image_sync/test_mock_SyncPointCreateRequest.cc image_sync/test_mock_SyncPointPruneRequest.cc pool_watcher/test_mock_RefreshImagesRequest.cc diff --git a/src/test/rbd_mirror/image_replayer/snapshot/test_mock_Replayer.cc b/src/test/rbd_mirror/image_replayer/snapshot/test_mock_Replayer.cc new file mode 100644 index 0000000000000..25835d97ec783 --- /dev/null +++ b/src/test/rbd_mirror/image_replayer/snapshot/test_mock_Replayer.cc @@ -0,0 +1,1442 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/rbd_mirror/test_mock_fixture.h" +#include "librbd/deep_copy/ImageCopyRequest.h" +#include "librbd/deep_copy/SnapshotCopyRequest.h" +#include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h" +#include "librbd/mirror/snapshot/GetImageStateRequest.h" +#include "librbd/mirror/snapshot/UnlinkPeerRequest.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/ReplayerListener.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "tools/rbd_mirror/image_replayer/snapshot/Replayer.h" +#include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h" +#include "test/librados_test_stub/MockTestMemIoCtxImpl.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/mock/MockContextWQ.h" +#include "test/rbd_mirror/mock/MockSafeTimer.h" + +namespace librbd { +namespace { + +struct MockTestImageCtx : public librbd::MockImageCtx { + explicit MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +namespace deep_copy { + +template <> +struct ImageCopyRequest { + uint64_t src_snap_id_start; + uint64_t src_snap_id_end; + uint64_t dst_snap_id_start; + librbd::deep_copy::ObjectNumber object_number; + librbd::SnapSeqs snap_seqs; + + static ImageCopyRequest* s_instance; + static ImageCopyRequest* create(MockTestImageCtx *src_image_ctx, + MockTestImageCtx *dst_image_ctx, + librados::snap_t src_snap_id_start, + librados::snap_t src_snap_id_end, + librados::snap_t dst_snap_id_start, + bool flatten, + const ObjectNumber &object_number, + const SnapSeqs &snap_seqs, + ProgressContext *prog_ctx, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->src_snap_id_start = src_snap_id_start; + s_instance->src_snap_id_end = src_snap_id_end; + s_instance->dst_snap_id_start = dst_snap_id_start; + s_instance->object_number = object_number; + s_instance->snap_seqs = snap_seqs; + s_instance->on_finish = on_finish; + return s_instance; + } + + Context* on_finish = nullptr; + + ImageCopyRequest() { + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +template <> +struct SnapshotCopyRequest { + librados::snap_t src_snap_id_start; + librados::snap_t src_snap_id_end; + librados::snap_t dst_snap_id_start; + SnapSeqs* snap_seqs = nullptr; + + static SnapshotCopyRequest* s_instance; + static SnapshotCopyRequest* create(MockTestImageCtx *src_image_ctx, + MockTestImageCtx *dst_image_ctx, + librados::snap_t src_snap_id_start, + librados::snap_t src_snap_id_end, + librados::snap_t dst_snap_id_start, + bool flatten, + ::MockContextWQ *work_queue, + SnapSeqs *snap_seqs, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->src_snap_id_start = src_snap_id_start; + s_instance->src_snap_id_end = src_snap_id_end; + s_instance->dst_snap_id_start = dst_snap_id_start; + s_instance->snap_seqs = snap_seqs; + s_instance->on_finish = on_finish; + return s_instance; + } + + Context* on_finish = nullptr; + + SnapshotCopyRequest() { + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +ImageCopyRequest* ImageCopyRequest::s_instance = nullptr; +SnapshotCopyRequest* SnapshotCopyRequest::s_instance = nullptr; + +} // namespace deep_copy + +namespace mirror { +namespace snapshot { + +template <> +struct CreateNonPrimaryRequest { + bool demoted = false; + std::string primary_mirror_uuid; + uint64_t primary_snap_id; + SnapSeqs snap_seqs; + uint64_t* snap_id = nullptr; + + static CreateNonPrimaryRequest* s_instance; + static CreateNonPrimaryRequest* create(MockTestImageCtx *image_ctx, + bool demoted, + const std::string &primary_mirror_uuid, + uint64_t primary_snap_id, + const SnapSeqs& snap_seqs, + const ImageState &image_state, + uint64_t *snap_id, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->demoted = demoted; + s_instance->primary_mirror_uuid = primary_mirror_uuid; + s_instance->primary_snap_id = primary_snap_id; + s_instance->snap_seqs = snap_seqs; + s_instance->snap_id = snap_id; + s_instance->on_finish = on_finish; + return s_instance; + } + + Context* on_finish = nullptr; + + CreateNonPrimaryRequest() { + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +template <> +struct GetImageStateRequest { + uint64_t snap_id = CEPH_NOSNAP; + + static GetImageStateRequest* s_instance; + static GetImageStateRequest* create(MockTestImageCtx *image_ctx, + uint64_t snap_id, + ImageState *image_state, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->snap_id = snap_id; + s_instance->on_finish = on_finish; + return s_instance; + } + + Context* on_finish = nullptr; + + GetImageStateRequest() { + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +template <> +struct UnlinkPeerRequest { + uint64_t snap_id; + std::string mirror_peer_uuid; + + static UnlinkPeerRequest* s_instance; + static UnlinkPeerRequest*create (MockTestImageCtx *image_ctx, + uint64_t snap_id, + const std::string &mirror_peer_uuid, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->snap_id = snap_id; + s_instance->mirror_peer_uuid = mirror_peer_uuid; + s_instance->on_finish = on_finish; + return s_instance; + } + + Context* on_finish = nullptr; + + UnlinkPeerRequest() { + s_instance = this; + } + + MOCK_METHOD0(send, void()); +}; + +CreateNonPrimaryRequest* CreateNonPrimaryRequest::s_instance = nullptr; +GetImageStateRequest* GetImageStateRequest::s_instance = nullptr; +UnlinkPeerRequest* UnlinkPeerRequest::s_instance = nullptr; + +} // namespace snapshot +} // namespace mirror +} // namespace librbd + +namespace rbd { +namespace mirror { + +template <> +struct Threads { + MockSafeTimer *timer; + ceph::mutex &timer_lock; + + MockContextWQ *work_queue; + + Threads(Threads* threads) + : timer(new MockSafeTimer()), + timer_lock(threads->timer_lock), + work_queue(new MockContextWQ()) { + } + ~Threads() { + delete timer; + delete work_queue; + } +}; + +namespace { + +struct MockReplayerListener : public image_replayer::ReplayerListener { + MOCK_METHOD0(handle_notification, void()); +}; + +} // anonymous namespace + +namespace image_replayer { + +template<> +struct CloseImageRequest { + static CloseImageRequest* s_instance; + librbd::MockTestImageCtx **image_ctx = nullptr; + Context *on_finish = nullptr; + + static CloseImageRequest* create(librbd::MockTestImageCtx **image_ctx, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->image_ctx = image_ctx; + s_instance->on_finish = on_finish; + return s_instance; + } + + CloseImageRequest() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~CloseImageRequest() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(send, void()); +}; + +CloseImageRequest* CloseImageRequest::s_instance = nullptr; + +namespace snapshot { + +template<> +struct StateBuilder { + StateBuilder(librbd::MockTestImageCtx& local_image_ctx, + librbd::MockTestImageCtx& remote_image_ctx) + : local_image_ctx(&local_image_ctx), + remote_image_ctx(&remote_image_ctx) { + } + + librbd::MockTestImageCtx* local_image_ctx; + librbd::MockTestImageCtx* remote_image_ctx; + + std::string remote_mirror_uuid = "remote mirror uuid"; +}; + +} // namespace snapshot +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +#include "tools/rbd_mirror/image_replayer/snapshot/Replayer.cc" + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace snapshot { + +using ::testing::_; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::ReturnArg; +using ::testing::StrEq; +using ::testing::WithArg; + +class TestMockImageReplayerSnapshotReplayer : public TestMockFixture { +public: + typedef Replayer MockReplayer; + typedef StateBuilder MockStateBuilder; + typedef Threads MockThreads; + typedef CloseImageRequest MockCloseImageRequest; + typedef librbd::deep_copy::ImageCopyRequest MockImageCopyRequest; + typedef librbd::deep_copy::SnapshotCopyRequest MockSnapshotCopyRequest; + typedef librbd::mirror::snapshot::CreateNonPrimaryRequest MockCreateNonPrimaryRequest; + typedef librbd::mirror::snapshot::GetImageStateRequest MockGetImageStateRequest; + typedef librbd::mirror::snapshot::UnlinkPeerRequest MockUnlinkPeerRequest; + + void SetUp() override { + TestMockFixture::SetUp(); + + librbd::RBD rbd; + ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size)); + ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx)); + + ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, + m_image_size)); + ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, + &m_remote_image_ctx)); + } + + void expect_work_queue_repeatedly(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) + .WillRepeatedly(Invoke([this](Context *ctx, int r) { + m_threads->work_queue->queue(ctx, r); + })); + } + + void expect_add_event_after_repeatedly(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.timer, add_event_after(_, _)) + .WillRepeatedly( + DoAll(Invoke([this](double seconds, Context *ctx) { + m_threads->timer->add_event_after(seconds, ctx); + }), + ReturnArg<1>())); + EXPECT_CALL(*mock_threads.timer, cancel_event(_)) + .WillRepeatedly( + Invoke([this](Context *ctx) { + return m_threads->timer->cancel_event(ctx); + })); + } + + void expect_register_update_watcher(librbd::MockTestImageCtx& mock_image_ctx, + librbd::UpdateWatchCtx** update_watch_ctx, + uint64_t watch_handle, int r) { + EXPECT_CALL(*mock_image_ctx.state, register_update_watcher(_, _)) + .WillOnce(Invoke([update_watch_ctx, watch_handle, r] + (librbd::UpdateWatchCtx* ctx, uint64_t* handle) { + if (r >= 0) { + *update_watch_ctx = ctx; + *handle = watch_handle; + } + return r; + })); + } + + void expect_unregister_update_watcher(librbd::MockTestImageCtx& mock_image_ctx, + uint64_t watch_handle, int r) { + EXPECT_CALL(*mock_image_ctx.state, unregister_update_watcher(watch_handle, _)) + .WillOnce(WithArg<1>(Invoke([this, r](Context* ctx) { + m_threads->work_queue->queue(ctx, r); + }))); + } + + void expect_is_refresh_required(librbd::MockTestImageCtx& mock_image_ctx, + bool is_required) { + EXPECT_CALL(*mock_image_ctx.state, is_refresh_required()) + .WillOnce(Return(is_required)); + } + + void expect_refresh(librbd::MockTestImageCtx& mock_image_ctx, + const std::map& snaps, + int r) { + EXPECT_CALL(*mock_image_ctx.state, refresh(_)) + .WillOnce(Invoke([this, &mock_image_ctx, snaps, r](Context* ctx) { + mock_image_ctx.snap_info = snaps; + m_threads->work_queue->queue(ctx, r); + })); + } + + void expect_notify_update(librbd::MockTestImageCtx& mock_image_ctx) { + EXPECT_CALL(mock_image_ctx, notify_update(_)) + .WillOnce(Invoke([this](Context* ctx) { + m_threads->work_queue->queue(ctx, 0); + })); + } + + void expect_snapshot_copy(MockSnapshotCopyRequest& mock_snapshot_copy_request, + uint64_t src_snap_id_start, + uint64_t src_snap_id_end, + uint64_t dst_snap_id_start, + const librbd::SnapSeqs& snap_seqs, int r) { + EXPECT_CALL(mock_snapshot_copy_request, send()) + .WillOnce(Invoke([this, &req=mock_snapshot_copy_request, + src_snap_id_start, src_snap_id_end, dst_snap_id_start, + snap_seqs, r]() { + ASSERT_EQ(src_snap_id_start, req.src_snap_id_start); + ASSERT_EQ(src_snap_id_end, req.src_snap_id_end); + ASSERT_EQ(dst_snap_id_start, req.dst_snap_id_start); + *req.snap_seqs = snap_seqs; + m_threads->work_queue->queue(req.on_finish, r); + })); + } + + void expect_get_image_state(MockGetImageStateRequest& mock_get_image_state_request, + uint64_t snap_id, int r) { + EXPECT_CALL(mock_get_image_state_request, send()) + .WillOnce(Invoke([this, &req=mock_get_image_state_request, snap_id, r]() { + ASSERT_EQ(snap_id, req.snap_id); + m_threads->work_queue->queue(req.on_finish, r); + })); + } + + void expect_create_non_primary_request(MockCreateNonPrimaryRequest& mock_create_non_primary_request, + bool demoted, + const std::string& primary_mirror_uuid, + uint64_t primary_snap_id, + const librbd::SnapSeqs& snap_seqs, + uint64_t snap_id, int r) { + EXPECT_CALL(mock_create_non_primary_request, send()) + .WillOnce(Invoke([this, &req=mock_create_non_primary_request, demoted, + primary_mirror_uuid, primary_snap_id, snap_seqs, + snap_id, r]() { + ASSERT_EQ(demoted, req.demoted); + ASSERT_EQ(primary_mirror_uuid, req.primary_mirror_uuid); + ASSERT_EQ(primary_snap_id, req.primary_snap_id); + ASSERT_EQ(snap_seqs, req.snap_seqs); + *req.snap_id = snap_id; + m_threads->work_queue->queue(req.on_finish, r); + })); + } + + void expect_image_copy(MockImageCopyRequest& mock_image_copy_request, + uint64_t src_snap_id_start, uint64_t src_snap_id_end, + uint64_t dst_snap_id_start, + const librbd::deep_copy::ObjectNumber& object_number, + const librbd::SnapSeqs& snap_seqs, int r) { + EXPECT_CALL(mock_image_copy_request, send()) + .WillOnce(Invoke([this, &req=mock_image_copy_request, src_snap_id_start, + src_snap_id_end, dst_snap_id_start, object_number, + snap_seqs, r]() { + ASSERT_EQ(src_snap_id_start, req.src_snap_id_start); + ASSERT_EQ(src_snap_id_end, req.src_snap_id_end); + ASSERT_EQ(dst_snap_id_start, req.dst_snap_id_start); + ASSERT_EQ(object_number, req.object_number); + ASSERT_EQ(snap_seqs, req.snap_seqs); + m_threads->work_queue->queue(req.on_finish, r); + })); + } + + void expect_unlink_peer(MockUnlinkPeerRequest& mock_unlink_peer_request, + uint64_t snap_id, const std::string& mirror_peer_uuid, + int r) { + EXPECT_CALL(mock_unlink_peer_request, send()) + .WillOnce(Invoke([this, &req=mock_unlink_peer_request, snap_id, + mirror_peer_uuid, r]() { + ASSERT_EQ(snap_id, req.snap_id); + ASSERT_EQ(mirror_peer_uuid, req.mirror_peer_uuid); + m_threads->work_queue->queue(req.on_finish, r); + })); + } + + void expect_mirror_image_snapshot_set_copy_progress( + librbd::MockTestImageCtx& mock_test_image_ctx, uint64_t snap_id, + bool completed, uint64_t last_copied_object, int r) { + bufferlist bl; + encode(snap_id, bl); + encode(completed, bl); + encode(last_copied_object, bl); + + EXPECT_CALL(get_mock_io_ctx(mock_test_image_ctx.md_ctx), + exec(mock_test_image_ctx.header_oid, _, StrEq("rbd"), + StrEq("mirror_image_snapshot_set_copy_progress"), + ContentsEqual(bl), _, _)) + .WillOnce(Return(r)); + } + + void expect_send(MockCloseImageRequest &mock_close_image_request, int r) { + EXPECT_CALL(mock_close_image_request, send()) + .WillOnce(Invoke([this, &mock_close_image_request, r]() { + *mock_close_image_request.image_ctx = nullptr; + m_threads->work_queue->queue(mock_close_image_request.on_finish, r); + })); + } + + void expect_notification(MockThreads& mock_threads, + MockReplayerListener& mock_replayer_listener) { + EXPECT_CALL(mock_replayer_listener, handle_notification()) + .WillRepeatedly(Invoke([this]() { + std::unique_lock locker{m_lock}; + ++m_notifications; + m_cond.notify_all(); + })); + } + + int wait_for_notification(uint32_t count) { + std::unique_lock locker{m_lock}; + for (uint32_t idx = 0; idx < count; ++idx) { + while (m_notifications == 0) { + if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) { + return -ETIMEDOUT; + } + } + --m_notifications; + } + return 0; + } + + int init_entry_replayer(MockReplayer& mock_replayer, + MockThreads& mock_threads, + librbd::MockTestImageCtx& mock_local_image_ctx, + librbd::MockTestImageCtx& mock_remote_image_ctx, + MockReplayerListener& mock_replayer_listener, + librbd::UpdateWatchCtx** update_watch_ctx) { + expect_register_update_watcher(mock_remote_image_ctx, update_watch_ctx, 123, + 0); + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + int r = init_ctx.wait(); + if (r < 0) { + return r; + } + + return wait_for_notification(2); + } + + int shut_down_entry_replayer(MockReplayer& mock_replayer, + MockThreads& mock_threads, + librbd::MockTestImageCtx& mock_remote_image_ctx) { + expect_unregister_update_watcher(mock_remote_image_ctx, 123, 0); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + return shutdown_ctx.wait(); + } + + librbd::ImageCtx* m_local_image_ctx = nullptr; + librbd::ImageCtx* m_remote_image_ctx = nullptr; + + PoolMetaCache m_pool_meta_cache{g_ceph_context}; + + ceph::mutex m_lock = ceph::make_mutex( + "TestMockImageReplayerSnapshotReplayer"); + ceph::condition_variable m_cond; + uint32_t m_notifications = 0; +}; + +TEST_F(TestMockImageReplayerSnapshotReplayer, InitShutDown) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, SyncSnapshot) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + // it should sync two snapshots and skip two (user and mirror w/o matching + // peer uuid) + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, + "", 0U}, + 0, {}, 0, 0, {}}}, + {2U, librbd::SnapInfo{"snap2", cls::rbd::UserSnapshotNamespace{}, + 0, {}, 0, 0, {}}}, + {3U, librbd::SnapInfo{"snap3", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {""}, + "", 0U}, + 0, {}, 0, 0, {}}}, + {4U, librbd::SnapInfo{"snap4", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, + "", 0U}, + 0, {}, 0, 0, {}}}}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + + // init + expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 123, + 0); + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + 0); + MockGetImageStateRequest mock_get_image_state_request; + expect_get_image_state(mock_get_image_state_request, 1, 0); + MockCreateNonPrimaryRequest mock_create_non_primary_request; + expect_create_non_primary_request(mock_create_non_primary_request, + false, "remote mirror uuid", 1, + {{1, CEPH_NOSNAP}}, 11, 0); + MockImageCopyRequest mock_image_copy_request; + expect_image_copy(mock_image_copy_request, 0, 1, 0, {}, + {{1, CEPH_NOSNAP}}, 0); + MockUnlinkPeerRequest mock_unlink_peer_request; + expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid", + 0); + expect_mirror_image_snapshot_set_copy_progress( + mock_local_image_ctx, 11, true, 0, 0); + expect_notify_update(mock_local_image_ctx); + + // sync snap4 + expect_is_refresh_required(mock_local_image_ctx, true); + expect_refresh( + mock_local_image_ctx, { + {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid", + 1, true, 0, {}}, + 0, {}, 0, 0, {}}}, + }, 0); + expect_is_refresh_required(mock_remote_image_ctx, false); + expect_snapshot_copy(mock_snapshot_copy_request, 1, 4, 11, + {{1, 11}, {4, CEPH_NOSNAP}}, 0); + expect_get_image_state(mock_get_image_state_request, 4, 0); + expect_create_non_primary_request(mock_create_non_primary_request, + false, "remote mirror uuid", 4, + {{1, 11}, {4, CEPH_NOSNAP}}, 14, 0); + expect_image_copy(mock_image_copy_request, 1, 4, 11, {}, + {{1, 11}, {4, CEPH_NOSNAP}}, 0); + expect_unlink_peer(mock_unlink_peer_request, 4, "remote mirror peer uuid", + 0); + expect_mirror_image_snapshot_set_copy_progress( + mock_local_image_ctx, 14, true, 0, 0); + expect_notify_update(mock_local_image_ctx); + + // idle + expect_is_refresh_required(mock_local_image_ctx, true); + expect_refresh( + mock_local_image_ctx, { + {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid", + 1, true, 0, {}}, + 0, {}, 0, 0, {}}}, + {14U, librbd::SnapInfo{"snap4", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid", + 4, true, 0, {}}, + 0, {}, 0, 0, {}}}, + }, 0); + expect_is_refresh_required(mock_remote_image_ctx, false); + + // fire init + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(0, init_ctx.wait()); + + // wait for sync to complete + ASSERT_EQ(0, wait_for_notification(4)); + + // shut down + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, InterruptedSync) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject a incomplete sync snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, + "", 0U}, + 0, {}, 0, 0, {}}}}; + mock_local_image_ctx.snap_info = { + {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid", + 1, false, 123, {{1, CEPH_NOSNAP}}}, + 0, {}, 0, 0, {}}}}; + + // re-sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockImageCopyRequest mock_image_copy_request; + expect_image_copy(mock_image_copy_request, 0, 1, 0, + librbd::deep_copy::ObjectNumber{123U}, + {{1, CEPH_NOSNAP}}, 0); + MockUnlinkPeerRequest mock_unlink_peer_request; + expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid", + 0); + expect_mirror_image_snapshot_set_copy_progress( + mock_local_image_ctx, 11, true, 123, 0); + expect_notify_update(mock_local_image_ctx); + + // idle + expect_is_refresh_required(mock_local_image_ctx, true); + expect_refresh( + mock_local_image_ctx, { + {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid", + 1, true, 0, {}}, + 0, {}, 0, 0, {}}}, + }, 0); + expect_is_refresh_required(mock_remote_image_ctx, false); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete + ASSERT_EQ(0, wait_for_notification(2)); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, RemoteImageDemoted) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject a demotion snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED, + {"remote mirror peer uuid"}, "", 0U}, + 0, {}, 0, 0, {}}}}; + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + 0); + MockGetImageStateRequest mock_get_image_state_request; + expect_get_image_state(mock_get_image_state_request, 1, 0); + MockCreateNonPrimaryRequest mock_create_non_primary_request; + expect_create_non_primary_request(mock_create_non_primary_request, + true, "remote mirror uuid", 1, + {{1, CEPH_NOSNAP}}, 11, 0); + MockImageCopyRequest mock_image_copy_request; + expect_image_copy(mock_image_copy_request, 0, 1, 0, {}, + {{1, CEPH_NOSNAP}}, 0); + MockUnlinkPeerRequest mock_unlink_peer_request; + expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid", + 0); + expect_mirror_image_snapshot_set_copy_progress( + mock_local_image_ctx, 11, true, 0, 0); + expect_notify_update(mock_local_image_ctx); + + // idle + expect_is_refresh_required(mock_local_image_ctx, true); + expect_refresh( + mock_local_image_ctx, { + {11U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {}, "remote mirror uuid", + 1, true, 0, {}}, + 0, {}, 0, 0, {}}}, + }, 0); + expect_is_refresh_required(mock_remote_image_ctx, false); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(2)); + ASSERT_FALSE(mock_replayer.is_replaying()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, LocalImagePromoted) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject a promotion snapshot + mock_local_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, + {"remote mirror peer uuid"}, "", 0U, true, 0, {}}, + 0, {}, 0, 0, {}}}}; + + // idle + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, RegisterUpdateWatcherError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayerListener mock_replayer_listener; + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + // init + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + expect_register_update_watcher(mock_remote_image_ctx, &update_watch_ctx, 123, + -EINVAL); + + // fire init + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, UnregisterUpdateWatcherError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + + // shut down + expect_unregister_update_watcher(mock_remote_image_ctx, 123, -EINVAL); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EINVAL, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, RefreshLocalImageError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // sync + expect_is_refresh_required(mock_local_image_ctx, true); + expect_refresh(mock_local_image_ctx, {}, -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, RefreshRemoteImageError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // sync + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, true); + expect_refresh(mock_remote_image_ctx, {}, -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, CopySnapshotsError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "", + 0U}, + 0, {}, 0, 0, {}}}}; + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, GetImageStateError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "", + 0U}, + 0, {}, 0, 0, {}}}}; + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + 0); + MockGetImageStateRequest mock_get_image_state_request; + expect_get_image_state(mock_get_image_state_request, 1, -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, CreateNonPrimarySnapshotError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "", + 0U}, + 0, {}, 0, 0, {}}}}; + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + 0); + MockGetImageStateRequest mock_get_image_state_request; + expect_get_image_state(mock_get_image_state_request, 1, 0); + MockCreateNonPrimaryRequest mock_create_non_primary_request; + expect_create_non_primary_request(mock_create_non_primary_request, + false, "remote mirror uuid", 1, + {{1, CEPH_NOSNAP}}, 11, -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, CopyImageError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "", + 0U}, + 0, {}, 0, 0, {}}}}; + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + 0); + MockGetImageStateRequest mock_get_image_state_request; + expect_get_image_state(mock_get_image_state_request, 1, 0); + MockCreateNonPrimaryRequest mock_create_non_primary_request; + expect_create_non_primary_request(mock_create_non_primary_request, + false, "remote mirror uuid", 1, + {{1, CEPH_NOSNAP}}, 11, 0); + MockImageCopyRequest mock_image_copy_request; + expect_image_copy(mock_image_copy_request, 0, 1, 0, {}, + {{1, CEPH_NOSNAP}}, -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, UnlinkPeerError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "", + 0U}, + 0, {}, 0, 0, {}}}}; + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + 0); + MockGetImageStateRequest mock_get_image_state_request; + expect_get_image_state(mock_get_image_state_request, 1, 0); + MockCreateNonPrimaryRequest mock_create_non_primary_request; + expect_create_non_primary_request(mock_create_non_primary_request, + false, "remote mirror uuid", 1, + {{1, CEPH_NOSNAP}}, 11, 0); + MockImageCopyRequest mock_image_copy_request; + expect_image_copy(mock_image_copy_request, 0, 1, 0, {}, + {{1, CEPH_NOSNAP}}, 0); + MockUnlinkPeerRequest mock_unlink_peer_request; + expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid", + 0); + expect_mirror_image_snapshot_set_copy_progress( + mock_local_image_ctx, 11, true, 0, -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +TEST_F(TestMockImageReplayerSnapshotReplayer, UpdateNonPrimarySnapshotError) { + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx}; + librbd::MockTestImageCtx mock_remote_image_ctx{*m_remote_image_ctx}; + + MockThreads mock_threads(m_threads); + expect_work_queue_repeatedly(mock_threads); + + MockReplayerListener mock_replayer_listener; + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_image_ctx); + MockReplayer mock_replayer{&mock_threads, "local mirror uuid", + &m_pool_meta_cache, &mock_state_builder, + &mock_replayer_listener}; + m_pool_meta_cache.set_remote_pool_meta( + m_remote_io_ctx.get_id(), + {"remote mirror uuid", "remote mirror peer uuid"}); + + librbd::UpdateWatchCtx* update_watch_ctx = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_local_image_ctx, + mock_remote_image_ctx, + mock_replayer_listener, + &update_watch_ctx)); + + // inject snapshot + mock_remote_image_ctx.snap_info = { + {1U, librbd::SnapInfo{"snap1", cls::rbd::MirrorSnapshotNamespace{ + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"remote mirror peer uuid"}, "", + 0U}, + 0, {}, 0, 0, {}}}}; + + // sync snap1 + expect_is_refresh_required(mock_local_image_ctx, false); + expect_is_refresh_required(mock_remote_image_ctx, false); + MockSnapshotCopyRequest mock_snapshot_copy_request; + expect_snapshot_copy(mock_snapshot_copy_request, 0, 1, 0, {{1, CEPH_NOSNAP}}, + 0); + MockGetImageStateRequest mock_get_image_state_request; + expect_get_image_state(mock_get_image_state_request, 1, 0); + MockCreateNonPrimaryRequest mock_create_non_primary_request; + expect_create_non_primary_request(mock_create_non_primary_request, + false, "remote mirror uuid", 1, + {{1, CEPH_NOSNAP}}, 11, 0); + MockImageCopyRequest mock_image_copy_request; + expect_image_copy(mock_image_copy_request, 0, 1, 0, {}, + {{1, CEPH_NOSNAP}}, 0); + MockUnlinkPeerRequest mock_unlink_peer_request; + expect_unlink_peer(mock_unlink_peer_request, 1, "remote mirror peer uuid", + 0); + expect_mirror_image_snapshot_set_copy_progress( + mock_local_image_ctx, 11, true, 0, -EINVAL); + + // wake-up replayer + update_watch_ctx->handle_notify(); + + // wait for sync to complete and expect replay complete + ASSERT_EQ(0, wait_for_notification(1)); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_remote_image_ctx)); +} + +} // namespace snapshot +} // namespace image_replayer +} // namespace mirror +} // namespace rbd diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 59d90d0e1e842..caa2daa4f9cfa 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -121,6 +121,18 @@ public: } else { EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_remote_ioctx, RBD_MIRROR_MODE_IMAGE)); + + uuid_d uuid_gen; + uuid_gen.generate_random(); + std::string remote_peer_uuid = uuid_gen.to_string(); + + EXPECT_EQ(0, librbd::cls_client::mirror_peer_add( + &m_remote_ioctx, {remote_peer_uuid, + cls::rbd::MIRROR_PEER_DIRECTION_RX_TX, + "siteA", "client", m_local_mirror_uuid})); + + m_pool_meta_cache.set_remote_pool_meta( + m_remote_ioctx.get_id(), {m_remote_mirror_uuid, remote_peer_uuid}); } m_image_name = get_temp_image_name(); @@ -196,8 +208,14 @@ public: m_replayer->start(&cond); ASSERT_EQ(0, cond.wait()); + std::string oid; + if (MIRROR_IMAGE_MODE == cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) { + oid = ::journal::Journaler::header_oid(m_remote_image_id); + } else { + oid = librbd::util::header_name(m_remote_image_id); + } + ASSERT_EQ(0U, m_watch_handle); - std::string oid = ::journal::Journaler::header_oid(m_remote_image_id); create_watch_ctx(oid); ASSERT_EQ(0, m_remote_ioctx.watch2(oid, &m_watch_handle, m_watch_ctx)); } @@ -339,15 +357,45 @@ public: return true; } - void wait_for_replay_complete() - { + int get_last_mirror_snapshot(librados::IoCtx& io_ctx, + const std::string& image_id, + uint64_t* mirror_snap_id, + cls::rbd::MirrorSnapshotNamespace* mirror_ns) { + auto header_oid = librbd::util::header_name(image_id); + ::SnapContext snapc; + int r = librbd::cls_client::get_snapcontext(&io_ctx, header_oid, &snapc); + if (r < 0) { + return r; + } + + // stored in reverse order + for (auto snap_id : snapc.snaps) { + cls::rbd::SnapshotInfo snap_info; + r = librbd::cls_client::snapshot_get(&io_ctx, header_oid, snap_id, + &snap_info); + if (r < 0) { + return r; + } + + auto ns = boost::get( + &snap_info.snapshot_namespace); + if (ns != nullptr) { + *mirror_snap_id = snap_id; + *mirror_ns = *ns; + return 0; + } + } + + return -ENOENT; + } + + void wait_for_journal_synced() { cls::journal::ObjectPosition master_position; cls::journal::ObjectPosition mirror_position; - for (int i = 0; i < 100; i++) { get_commit_positions(&master_position, &mirror_position); if (master_position == mirror_position) { - break; + break; } wait_for_watcher_notify(1); } @@ -355,6 +403,55 @@ public: ASSERT_EQ(master_position, mirror_position); } + void wait_for_snapshot_synced() { + uint64_t remote_snap_id = CEPH_NOSNAP; + cls::rbd::MirrorSnapshotNamespace remote_mirror_ns; + ASSERT_EQ(0, get_last_mirror_snapshot(m_remote_ioctx, m_remote_image_id, + &remote_snap_id, &remote_mirror_ns)); + + std::string local_image_id; + ASSERT_EQ(0, librbd::cls_client::mirror_image_get_image_id( + &m_local_ioctx, m_global_image_id, &local_image_id)); + + uint64_t local_snap_id = CEPH_NOSNAP; + cls::rbd::MirrorSnapshotNamespace local_mirror_ns; + for (int i = 0; i < 100; i++) { + int r = get_last_mirror_snapshot(m_local_ioctx, local_image_id, + &local_snap_id, &local_mirror_ns); + if (r == 0 && + ((remote_mirror_ns.state == + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY && + local_mirror_ns.state == + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY) || + (remote_mirror_ns.state == + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED && + local_mirror_ns.state == + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED)) && + local_mirror_ns.primary_mirror_uuid == m_remote_mirror_uuid && + local_mirror_ns.primary_snap_id == remote_snap_id && + local_mirror_ns.complete) { + return; + } + + wait_for_watcher_notify(1); + } + + ADD_FAILURE() << "failed to locate matching snapshot: " + << "remote_snap_id=" << remote_snap_id << ", " + << "remote_snap_ns=" << remote_mirror_ns << ", " + << "local_snap_id=" << local_snap_id << ", " + << "local_snap_ns=" << local_mirror_ns; + } + + void wait_for_replay_complete() + { + if (MIRROR_IMAGE_MODE == cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) { + wait_for_journal_synced(); + } else { + wait_for_snapshot_synced(); + } + } + void wait_for_stopped() { for (int i = 0; i < 100; i++) { if (m_replayer->is_stopped()) { @@ -411,9 +508,15 @@ public: ASSERT_EQ(0, c->wait_for_complete()); c->put(); - C_SaferCond journal_flush_ctx; - ictx->journal->flush_commit_position(&journal_flush_ctx); - ASSERT_EQ(0, journal_flush_ctx.wait()); + if (MIRROR_IMAGE_MODE == cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) { + C_SaferCond journal_flush_ctx; + ictx->journal->flush_commit_position(&journal_flush_ctx); + ASSERT_EQ(0, journal_flush_ctx.wait()); + } else { + uint64_t snap_id = CEPH_NOSNAP; + ASSERT_EQ(0, librbd::api::Mirror<>::image_snapshot_create( + ictx, &snap_id)); + } printf("flushed\n"); } @@ -437,8 +540,8 @@ public: std::string m_remote_image_id; std::string m_global_image_id; ImageReplayer<> *m_replayer = nullptr; - C_WatchCtx *m_watch_ctx; - uint64_t m_watch_handle; + C_WatchCtx *m_watch_ctx = nullptr; + uint64_t m_watch_handle = 0; char m_test_data[TEST_IO_SIZE + 1]; std::string m_journal_commit_age; }; @@ -453,7 +556,9 @@ public: }; typedef ::testing::Types> + cls::rbd::MIRROR_IMAGE_MODE_JOURNAL>, + TestImageReplayerParams< + cls::rbd::MIRROR_IMAGE_MODE_SNAPSHOT>> TestImageReplayerTypes; TYPED_TEST_CASE(TestImageReplayer, TestImageReplayerTypes); @@ -510,19 +615,25 @@ TYPED_TEST(TestImageReplayer, BootstrapErrorMirrorDisabled) TYPED_TEST(TestImageReplayer, BootstrapMirrorDisabling) { // set remote image mirroring state to DISABLING - ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(this->m_remote_ioctx, - RBD_MIRROR_MODE_IMAGE)); - librbd::ImageCtx *ictx; - this->open_remote_image(&ictx); - ASSERT_EQ(0, librbd::api::Mirror<>::image_enable( - ictx, RBD_MIRROR_IMAGE_MODE_JOURNAL, false)); + if (gtest_TypeParam_::MIRROR_IMAGE_MODE == + cls::rbd::MIRROR_IMAGE_MODE_JOURNAL) { + ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(this->m_remote_ioctx, + RBD_MIRROR_MODE_IMAGE)); + librbd::ImageCtx *ictx; + this->open_remote_image(&ictx); + ASSERT_EQ(0, librbd::api::Mirror<>::image_enable( + ictx, RBD_MIRROR_IMAGE_MODE_JOURNAL, false)); + this->close_image(ictx); + } + cls::rbd::MirrorImage mirror_image; ASSERT_EQ(0, librbd::cls_client::mirror_image_get(&this->m_remote_ioctx, - ictx->id, &mirror_image)); + this->m_remote_image_id, + &mirror_image)); mirror_image.state = cls::rbd::MirrorImageState::MIRROR_IMAGE_STATE_DISABLING; ASSERT_EQ(0, librbd::cls_client::mirror_image_set(&this->m_remote_ioctx, - ictx->id, mirror_image)); - this->close_image(ictx); + this->m_remote_image_id, + mirror_image)); this->create_replayer(); C_SaferCond cond; @@ -704,8 +815,9 @@ TEST_F(TestImageReplayerJournal, NextTag) this->stop(); } -TYPED_TEST(TestImageReplayer, Resync) +TEST_F(TestImageReplayerJournal, Resync) { + // TODO add support to snapshot-based mirroring this->bootstrap(); librbd::ImageCtx *ictx; @@ -753,8 +865,9 @@ TYPED_TEST(TestImageReplayer, Resync) this->stop(); } -TYPED_TEST(TestImageReplayer, Resync_While_Stop) +TEST_F(TestImageReplayerJournal, Resync_While_Stop) { + // TODO add support to snapshot-based mirroring this->bootstrap(); @@ -813,8 +926,9 @@ TYPED_TEST(TestImageReplayer, Resync_While_Stop) this->stop(); } -TYPED_TEST(TestImageReplayer, Resync_StartInterrupted) +TEST_F(TestImageReplayerJournal, Resync_StartInterrupted) { + // TODO add support to snapshot-based mirroring this->bootstrap(); @@ -923,7 +1037,7 @@ TEST_F(TestImageReplayerJournal, MultipleReplayFailures_SingleEpoch) { this->close_image(ictx); } -TYPED_TEST(TestImageReplayer, MultipleReplayFailures_MultiEpoch) { +TEST_F(TestImageReplayerJournal, MultipleReplayFailures_MultiEpoch) { this->bootstrap(); // inject a snapshot that cannot be unprotected @@ -1067,8 +1181,9 @@ TEST_F(TestImageReplayerJournal, Disconnect) this->stop(); } -TYPED_TEST(TestImageReplayer, UpdateFeatures) +TEST_F(TestImageReplayerJournal, UpdateFeatures) { + // TODO add support to snapshot-based mirroring const uint64_t FEATURES_TO_UPDATE = RBD_FEATURE_OBJECT_MAP | RBD_FEATURE_FAST_DIFF; @@ -1159,8 +1274,9 @@ TYPED_TEST(TestImageReplayer, UpdateFeatures) this->stop(); } -TYPED_TEST(TestImageReplayer, MetadataSetRemove) +TEST_F(TestImageReplayerJournal, MetadataSetRemove) { + // TODO add support to snapshot-based mirroring const std::string KEY = "test_key"; const std::string VALUE = "test_value"; @@ -1204,8 +1320,9 @@ TYPED_TEST(TestImageReplayer, MetadataSetRemove) this->stop(); } -TYPED_TEST(TestImageReplayer, MirroringDelay) +TEST_F(TestImageReplayerJournal, MirroringDelay) { + // TODO add support to snapshot-based mirroring const double DELAY = 10; // set less than wait_for_replay_complete timeout librbd::ImageCtx *ictx; diff --git a/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc b/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc index e4a2ab38ae3b3..e2fcc6836c346 100644 --- a/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc +++ b/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc @@ -6,8 +6,15 @@ #include "common/errno.h" #include "common/Timer.h" #include "common/WorkQueue.h" -#include "librbd/Journal.h" +#include "cls/rbd/cls_rbd_client.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" #include "librbd/Utils.h" +#include "librbd/deep_copy/ImageCopyRequest.h" +#include "librbd/deep_copy/SnapshotCopyRequest.h" +#include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h" +#include "librbd/mirror/snapshot/GetImageStateRequest.h" +#include "librbd/mirror/snapshot/UnlinkPeerRequest.h" #include "tools/rbd_mirror/PoolMetaCache.h" #include "tools/rbd_mirror/Threads.h" #include "tools/rbd_mirror/Types.h" @@ -31,6 +38,48 @@ namespace snapshot { using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; +using librbd::util::create_rados_callback; + +template +struct Replayer::C_UpdateWatchCtx : public librbd::UpdateWatchCtx { + Replayer* replayer; + + C_UpdateWatchCtx(Replayer* replayer) : replayer(replayer) { + } + + void handle_notify() override { + replayer->handle_remote_image_update_notify(); + } +}; + +template +struct Replayer::C_TrackedOp : public Context { + Replayer *replayer; + Context* ctx; + + C_TrackedOp(Replayer* replayer, Context* ctx) + : replayer(replayer), ctx(ctx) { + replayer->m_in_flight_op_tracker.start_op(); + } + + void finish(int r) override { + ctx->complete(r); + replayer->m_in_flight_op_tracker.finish_op(); + } +}; + +template +struct Replayer::ProgressContext : public librbd::ProgressContext { + Replayer *replayer; + + ProgressContext(Replayer* replayer) : replayer(replayer) { + } + + int update_progress(uint64_t offset, uint64_t total) override { + replayer->handle_copy_image_progress(offset, total); + return 0; + } +}; template Replayer::Replayer( @@ -52,24 +101,59 @@ Replayer::Replayer( template Replayer::~Replayer() { dout(10) << dendl; + ceph_assert(m_state == STATE_COMPLETE); + ceph_assert(m_update_watch_ctx == nullptr); + ceph_assert(m_progress_ctx == nullptr); } template void Replayer::init(Context* on_finish) { dout(10) << dendl; - // TODO - m_state = STATE_REPLAYING; - m_threads->work_queue->queue(on_finish, 0); + ceph_assert(m_state == STATE_INIT); + + RemotePoolMeta remote_pool_meta; + int r = m_pool_meta_cache->get_remote_pool_meta( + m_state_builder->remote_image_ctx->md_ctx.get_id(), &remote_pool_meta); + if (r < 0 || remote_pool_meta.mirror_peer_uuid.empty()) { + derr << "failed to retrieve mirror peer uuid from remote pool" << dendl; + m_state = STATE_COMPLETE; + m_threads->work_queue->queue(on_finish, r); + return; + } + + m_remote_mirror_peer_uuid = remote_pool_meta.mirror_peer_uuid; + dout(10) << "remote_mirror_peer_uuid=" << m_remote_mirror_peer_uuid << dendl; + + ceph_assert(m_on_init_shutdown == nullptr); + m_on_init_shutdown = on_finish; + + register_update_watcher(); } template void Replayer::shut_down(Context* on_finish) { dout(10) << dendl; - // TODO - m_state = STATE_COMPLETE; - m_threads->work_queue->queue(on_finish, 0); + std::unique_lock locker{m_lock}; + ceph_assert(m_on_init_shutdown == nullptr); + m_on_init_shutdown = on_finish; + m_error_code = 0; + m_error_description = ""; + + ceph_assert(m_state != STATE_INIT); + auto state = STATE_COMPLETE; + std::swap(m_state, state); + + if (state == STATE_REPLAYING) { + // TODO interrupt snapshot copy and image copy state machines even if remote + // cluster is unreachable + dout(10) << "shut down pending on completion of snapshot replay" << dendl; + return; + } + locker.unlock(); + + unregister_update_watcher(); } template @@ -90,6 +174,678 @@ bool Replayer::get_replay_status(std::string* description, return true; } +template +void Replayer::refresh_local_image() { + if (!m_state_builder->local_image_ctx->state->is_refresh_required()) { + refresh_remote_image(); + return; + } + + dout(10) << dendl; + auto ctx = create_context_callback< + Replayer, &Replayer::handle_refresh_local_image>(this); + m_state_builder->local_image_ctx->state->refresh(ctx); +} + +template +void Replayer::handle_refresh_local_image(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to refresh local image: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to refresh local image"); + return; + } + + refresh_remote_image(); +} + +template +void Replayer::refresh_remote_image() { + if (!m_state_builder->remote_image_ctx->state->is_refresh_required()) { + scan_local_mirror_snapshots(); + return; + } + + dout(10) << dendl; + auto ctx = create_context_callback< + Replayer, &Replayer::handle_refresh_remote_image>(this); + m_state_builder->remote_image_ctx->state->refresh(ctx); +} + +template +void Replayer::handle_refresh_remote_image(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to refresh remote image: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to refresh remote image"); + return; + } + + scan_local_mirror_snapshots(); +} + +template +void Replayer::scan_local_mirror_snapshots() { + if (is_replay_interrupted()) { + return; + } + + dout(10) << dendl; + + m_local_snap_id_start = 0; + m_local_snap_id_end = CEPH_NOSNAP; + m_local_mirror_snap_ns = {}; + + m_remote_snap_id_start = 0; + m_remote_snap_id_end = CEPH_NOSNAP; + m_remote_mirror_snap_ns = {}; + + auto local_image_ctx = m_state_builder->local_image_ctx; + std::shared_lock image_locker{local_image_ctx->image_lock}; + for (auto snap_info_it = local_image_ctx->snap_info.begin(); + snap_info_it != local_image_ctx->snap_info.end(); ++snap_info_it) { + const auto& snap_ns = snap_info_it->second.snap_namespace; + auto mirror_ns = boost::get< + cls::rbd::MirrorSnapshotNamespace>(&snap_ns); + if (mirror_ns == nullptr) { + continue; + } + + dout(15) << "local mirror snapshot: id=" << snap_info_it->first << ", " + << "mirror_ns=" << *mirror_ns << dendl; + m_local_mirror_snap_ns = *mirror_ns; + + auto local_snap_id = snap_info_it->first; + if (mirror_ns->is_non_primary()) { + if (mirror_ns->complete) { + // if remote has new snapshots, we would sync from here + m_local_snap_id_start = local_snap_id; + m_local_snap_id_end = CEPH_NOSNAP; + } else { + // start snap will be last complete mirror snapshot or initial + // image revision + m_local_snap_id_end = local_snap_id; + } + } else if (mirror_ns->is_primary()) { + if (mirror_ns->complete) { + m_local_snap_id_start = local_snap_id; + m_local_snap_id_end = CEPH_NOSNAP; + } else { + derr << "incomplete local primary snapshot" << dendl; + handle_replay_complete(-EINVAL, "incomplete local primary snapshot"); + return; + } + } else { + derr << "unknown local mirror snapshot state" << dendl; + handle_replay_complete(-EINVAL, "invalid local mirror snapshot state"); + return; + } + } + image_locker.unlock(); + + if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) { + if (m_local_mirror_snap_ns.is_non_primary() && + m_local_mirror_snap_ns.primary_mirror_uuid != + m_state_builder->remote_mirror_uuid) { + // TODO support multiple peers + derr << "local image linked to unknown peer: " + << m_local_mirror_snap_ns.primary_mirror_uuid << dendl; + handle_replay_complete(-EEXIST, "local image linked to unknown peer"); + return; + } else if (m_local_mirror_snap_ns.state == + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY) { + dout(5) << "local image promoted" << dendl; + handle_replay_complete(0, "force promoted"); + return; + } + + dout(10) << "found local mirror snapshot: " + << "local_snap_id_start=" << m_local_snap_id_start << ", " + << "local_snap_id_end=" << m_local_snap_id_end << ", " + << "local_snap_ns=" << m_local_mirror_snap_ns << dendl; + if (m_local_mirror_snap_ns.complete) { + // our remote sync should start after this completed snapshot + m_remote_snap_id_start = m_local_mirror_snap_ns.primary_snap_id; + } + } + + // we don't have any mirror snapshots or only completed non-primary + // mirror snapshots + scan_remote_mirror_snapshots(); +} + +template +void Replayer::scan_remote_mirror_snapshots() { + dout(10) << dendl; + + { + // reset state in case new snapshot is added while we are scanning + std::unique_lock locker{m_lock}; + m_remote_image_updated = false; + } + + bool remote_demoted = false; + auto remote_image_ctx = m_state_builder->remote_image_ctx; + std::shared_lock image_locker{remote_image_ctx->image_lock}; + for (auto snap_info_it = remote_image_ctx->snap_info.begin(); + snap_info_it != remote_image_ctx->snap_info.end(); ++snap_info_it) { + const auto& snap_ns = snap_info_it->second.snap_namespace; + auto mirror_ns = boost::get< + cls::rbd::MirrorSnapshotNamespace>(&snap_ns); + if (mirror_ns == nullptr) { + continue; + } + + dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", " + << "mirror_ns=" << *mirror_ns << dendl; + if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) { + derr << "unknown remote mirror snapshot state" << dendl; + handle_replay_complete(-EINVAL, "invalid remote mirror snapshot state"); + return; + } else { + remote_demoted = (mirror_ns->is_primary() && mirror_ns->is_demoted()); + } + + auto remote_snap_id = snap_info_it->first; + if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) { + // we have a local mirror snapshot + if (m_local_mirror_snap_ns.is_non_primary()) { + // previously validated that it was linked to remote + ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid == + m_state_builder->remote_mirror_uuid); + + if (m_local_mirror_snap_ns.complete && + m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) { + // skip past completed remote snapshot + m_remote_snap_id_start = remote_snap_id; + dout(15) << "skipping synced remote snapshot " << remote_snap_id + << dendl; + continue; + } else if (!m_local_mirror_snap_ns.complete && + m_local_mirror_snap_ns.primary_snap_id > remote_snap_id) { + // skip until we get to the in-progress remote snapshot + dout(15) << "skipping synced remote snapshot " << remote_snap_id + << " while search for in-progress sync" << dendl; + m_remote_snap_id_start = remote_snap_id; + continue; + } + } else if (m_local_mirror_snap_ns.state == + cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED) { + // find the matching demotion snapshot in remote image + ceph_assert(m_local_snap_id_start > 0); + if (mirror_ns->state == + cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED && + mirror_ns->primary_mirror_uuid == m_local_mirror_uuid && + mirror_ns->primary_snap_id == m_local_snap_id_start) { + dout(10) << "located matching demotion snapshot: " + << "remote_snap_id=" << remote_snap_id << ", " + << "local_snap_id=" << m_local_snap_id_start << dendl; + m_remote_snap_id_start = remote_snap_id; + continue; + } else if (m_remote_snap_id_start == 0) { + // still looking for our matching demotion snapshot + dout(15) << "skipping remote snapshot " << remote_snap_id << " " + << "while searching for demotion" << dendl; + continue; + } + } else { + // should not have been able to reach this + ceph_assert(false); + } + } + + // find first snapshot where were are listed as a peer + if (!mirror_ns->is_primary()) { + dout(15) << "skipping non-primary remote snapshot" << dendl; + continue; + } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) == + 0) { + dout(15) << "skipping remote snapshot due to missing mirror peer" + << dendl; + continue; + } + + m_remote_snap_id_end = remote_snap_id; + m_remote_mirror_snap_ns = *mirror_ns; + break; + } + image_locker.unlock(); + + if (m_remote_snap_id_end != CEPH_NOSNAP) { + dout(10) << "found remote mirror snapshot: " + << "remote_snap_id_start=" << m_remote_snap_id_start << ", " + << "remote_snap_id_end=" << m_remote_snap_id_end << ", " + << "remote_snap_ns=" << m_remote_mirror_snap_ns << dendl; + + if (m_local_snap_id_end != CEPH_NOSNAP && + !m_local_mirror_snap_ns.complete) { + // attempt to resume image-sync + dout(10) << "local image contains in-progress mirror snapshot" + << dendl; + copy_image(); + } else { + copy_snapshots(); + } + return; + } + + std::unique_lock locker{m_lock}; + if (m_remote_image_updated) { + // received update notification while scanning image, restart ... + m_remote_image_updated = false; + locker.unlock(); + + dout(10) << "restarting snapshot scan due to remote update notification" + << dendl; + refresh_local_image(); + return; + } + + if (is_replay_interrupted(&locker)) { + return; + } else if (remote_demoted) { + locker.unlock(); + + dout(10) << "remote image demoted" << dendl; + handle_replay_complete(0, "remote image demoted"); + return; + } + + dout(10) << "all remote snapshots synced: idling waiting for new snapshot" + << dendl; + ceph_assert(m_state == STATE_REPLAYING); + m_state = STATE_IDLE; + + notify_status_updated(); +} + +template +void Replayer::copy_snapshots() { + dout(10) << dendl; + + ceph_assert(m_remote_snap_id_start != CEPH_NOSNAP); + ceph_assert(m_remote_snap_id_end > 0 && + m_remote_snap_id_end != CEPH_NOSNAP); + ceph_assert(m_local_snap_id_start != CEPH_NOSNAP); + + m_local_mirror_snap_ns = {}; + auto ctx = create_context_callback< + Replayer, &Replayer::handle_copy_snapshots>(this); + auto req = librbd::deep_copy::SnapshotCopyRequest::create( + m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx, + m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, + false, m_threads->work_queue, &m_local_mirror_snap_ns.snap_seqs, + ctx); + req->send(); +} + +template +void Replayer::handle_copy_snapshots(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to copy snapshots from remote to local image: " + << cpp_strerror(r) << dendl; + handle_replay_complete( + r, "failed to copy snapshots from remote to local image"); + return; + } + + dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", " + << "remote_snap_id_end=" << m_remote_snap_id_end << ", " + << "local_snap_id_start=" << m_local_snap_id_start << ", " + << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl; + get_image_state(); +} + +template +void Replayer::get_image_state() { + dout(10) << dendl; + + auto ctx = create_context_callback< + Replayer, &Replayer::handle_get_image_state>(this); + auto req = librbd::mirror::snapshot::GetImageStateRequest::create( + m_state_builder->remote_image_ctx, m_remote_snap_id_end, + &m_image_state, ctx); + req->send(); +} + +template +void Replayer::handle_get_image_state(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to retrieve remote snapshot image state: " + << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to retrieve remote snapshot image state"); + return; + } + + create_non_primary_snapshot(); +} + +template +void Replayer::create_non_primary_snapshot() { + dout(10) << dendl; + + auto ctx = create_context_callback< + Replayer, &Replayer::handle_create_non_primary_snapshot>(this); + auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest::create( + m_state_builder->local_image_ctx, m_remote_mirror_snap_ns.is_demoted(), + m_state_builder->remote_mirror_uuid, m_remote_snap_id_end, + m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx); + req->send(); +} + +template +void Replayer::handle_create_non_primary_snapshot(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to create local mirror snapshot: " << cpp_strerror(r) + << dendl; + handle_replay_complete(r, "failed to create local mirror snapshot"); + return; + } + + copy_image(); +} + +template +void Replayer::copy_image() { + dout(10) << dendl; + + m_progress_ctx = new ProgressContext(this); + auto ctx = create_context_callback< + Replayer, &Replayer::handle_copy_image>(this); + auto req = librbd::deep_copy::ImageCopyRequest::create( + m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx, + m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, false, + (m_local_mirror_snap_ns.last_copied_object_number > 0 ? + librbd::deep_copy::ObjectNumber{ + m_local_mirror_snap_ns.last_copied_object_number} : + librbd::deep_copy::ObjectNumber{}), + m_local_mirror_snap_ns.snap_seqs, m_progress_ctx, ctx); + req->send(); +} + +template +void Replayer::handle_copy_image(int r) { + dout(10) << "r=" << r << dendl; + + delete m_progress_ctx; + m_progress_ctx = nullptr; + + if (r < 0) { + derr << "failed to copy remote image to local image: " << cpp_strerror(r) + << dendl; + handle_replay_complete(r, "failed to copy remote image"); + return; + } + + unlink_peer(); +} + +template +void Replayer::handle_copy_image_progress(uint64_t offset, uint64_t total) { + dout(10) << "offset=" << offset << ", total=" << total << dendl; + + // TODO +} + +template +void Replayer::unlink_peer() { + dout(10) << dendl; + + auto ctx = create_context_callback< + Replayer, &Replayer::handle_unlink_peer>(this); + auto req = librbd::mirror::snapshot::UnlinkPeerRequest::create( + m_state_builder->remote_image_ctx, m_remote_snap_id_end, + m_remote_mirror_peer_uuid, ctx); + req->send(); +} + +template +void Replayer::handle_unlink_peer(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0 && r != -ENOENT) { + derr << "failed to unlink local peer from remote image: " << cpp_strerror(r) + << dendl; + handle_replay_complete(r, "failed to unlink local peer from remote image"); + return; + } + + update_non_primary_snapshot(true); +} + +template +void Replayer::update_non_primary_snapshot(bool complete) { + dout(10) << dendl; + + if (complete) { + m_local_mirror_snap_ns.complete = true; + } + + librados::ObjectWriteOperation op; + librbd::cls_client::mirror_image_snapshot_set_copy_progress( + &op, m_local_snap_id_end, m_local_mirror_snap_ns.complete, + m_local_mirror_snap_ns.last_copied_object_number); + + auto ctx = new LambdaContext([this, complete](int r) { + handle_update_non_primary_snapshot(complete, r); + }); + auto aio_comp = create_rados_callback(ctx); + int r = m_state_builder->local_image_ctx->md_ctx.aio_operate( + m_state_builder->local_image_ctx->header_oid, aio_comp, &op); + ceph_assert(r == 0); + aio_comp->release(); +} + +template +void Replayer::handle_update_non_primary_snapshot(bool complete, int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to update local snapshot progress: " << cpp_strerror(r) + << dendl; + handle_replay_complete(r, "failed to update local snapshot progress"); + return; + } + + { + std::unique_lock locker{m_lock}; + notify_status_updated(); + } + + notify_image_update(); +} + +template +void Replayer::notify_image_update() { + dout(10) << dendl; + + auto ctx = create_context_callback< + Replayer, &Replayer::handle_notify_image_update>(this); + m_state_builder->local_image_ctx->notify_update(ctx); +} + +template +void Replayer::handle_notify_image_update(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to notify local image update: " << cpp_strerror(r) << dendl; + } + + if (is_replay_interrupted()) { + return; + } + + refresh_local_image(); +} + +template +void Replayer::register_update_watcher() { + dout(10) << dendl; + + m_update_watch_ctx = new C_UpdateWatchCtx(this); + int r = m_state_builder->remote_image_ctx->state->register_update_watcher( + m_update_watch_ctx, &m_update_watcher_handle); + auto ctx = create_context_callback< + Replayer, &Replayer::handle_register_update_watcher>(this); + m_threads->work_queue->queue(ctx, r); +} + + +template +void Replayer::handle_register_update_watcher(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to register update watcher: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to register remote image update watcher"); + m_state = STATE_COMPLETE; + + delete m_update_watch_ctx; + m_update_watch_ctx = nullptr; + } else { + m_state = STATE_REPLAYING; + } + + Context* on_init = nullptr; + std::swap(on_init, m_on_init_shutdown); + on_init->complete(r); + + // delay initial snapshot scan until after we have alerted + // image replayer that we have initialized in case an error + // occurs + if (r >= 0) { + { + std::unique_lock locker{m_lock}; + notify_status_updated(); + } + + refresh_local_image(); + } +} + +template +void Replayer::unregister_update_watcher() { + dout(10) << dendl; + + auto ctx = create_context_callback< + Replayer, + &Replayer::handle_unregister_update_watcher>(this); + m_state_builder->remote_image_ctx->state->unregister_update_watcher( + m_update_watcher_handle, ctx); +} + +template +void Replayer::handle_unregister_update_watcher(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to unregister update watcher: " << cpp_strerror(r) << dendl; + handle_replay_complete( + r, "failed to unregister remote image update watcher"); + } + + delete m_update_watch_ctx; + m_update_watch_ctx = nullptr; + + wait_for_in_flight_ops(); +} + +template +void Replayer::wait_for_in_flight_ops() { + dout(10) << dendl; + + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_wait_for_in_flight_ops>(this)); + m_in_flight_op_tracker.wait_for_ops(ctx); +} + +template +void Replayer::handle_wait_for_in_flight_ops(int r) { + dout(10) << "r=" << r << dendl; + + Context* on_shutdown = nullptr; + { + std::unique_lock locker{m_lock}; + ceph_assert(m_on_init_shutdown != nullptr); + std::swap(on_shutdown, m_on_init_shutdown); + } + on_shutdown->complete(m_error_code); +} + +template +void Replayer::handle_remote_image_update_notify() { + dout(10) << dendl; + + std::unique_lock locker{m_lock}; + if (m_state == STATE_REPLAYING) { + dout(15) << "flagging snapshot rescan required" << dendl; + m_remote_image_updated = true; + } else if (m_state == STATE_IDLE) { + m_state = STATE_REPLAYING; + locker.unlock(); + + dout(15) << "restarting idle replayer" << dendl; + refresh_local_image(); + } +} + +template +void Replayer::handle_replay_complete(int r, + const std::string& description) { + std::unique_lock locker{m_lock}; + if (m_error_code == 0) { + m_error_code = r; + m_error_description = description; + } + + if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) { + return; + } + + m_state = STATE_COMPLETE; + notify_status_updated(); +} + +template +void Replayer::notify_status_updated() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << dendl; + auto ctx = new C_TrackedOp(this, new LambdaContext( + [this](int) { + m_replayer_listener->handle_notification(); + })); + m_threads->work_queue->queue(ctx, 0); +} + +template +bool Replayer::is_replay_interrupted() { + std::unique_lock locker{m_lock}; + return is_replay_interrupted(&locker); +} + +template +bool Replayer::is_replay_interrupted(std::unique_lock* locker) { + if (m_state == STATE_COMPLETE) { + locker->unlock(); + + dout(10) << "resuming pending shut down" << dendl; + unregister_update_watcher(); + return true; + } + return false; +} + } // namespace snapshot } // namespace image_replayer } // namespace mirror diff --git a/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.h b/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.h index e9be6aa121c5e..fd30bbdcce334 100644 --- a/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.h +++ b/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.h @@ -6,6 +6,9 @@ #include "tools/rbd_mirror/image_replayer/Replayer.h" #include "common/ceph_mutex.h" +#include "common/AsyncOpTracker.h" +#include "cls/rbd/cls_rbd_types.h" +#include "librbd/mirror/snapshot/Types.h" #include #include @@ -66,7 +69,7 @@ public: bool is_replaying() const override { std::unique_lock locker{m_lock}; - return (m_state == STATE_REPLAYING); + return (m_state == STATE_REPLAYING || m_state == STATE_IDLE); } bool is_resync_requested() const override { @@ -77,25 +80,80 @@ public: int get_error_code() const override { std::unique_lock locker(m_lock); - // TODO - return 0; + return m_error_code; } std::string get_error_description() const override { std::unique_lock locker(m_lock); - // TODO - return ""; + return m_error_description; } private: - // TODO /** * @verbatim * - * + * + * | + * v + * REGISTER_UPDATE_WATCHER + * | + * v (skip if not needed) + * REFRESH_LOCAL_IMAGE <------------------------------\ + * | | + * v (skip if not needed) | + * REFRESH_REMOTE_IMAGE | + * | | + * | | + * | (interrupted sync) | + * |\--------------------------------------------\ | + * | | | + * | (new snapshot) | | + * |\--------------> COPY_SNAPSHOTS | | + * | | | | + * | v | | + * | GET_IMAGE_STATE | | + * | | | | + * | v | | + * | CREATE_NON_PRIMARY_SNAPSHOT | | + * | | | | + * | |/----------------------/ | + * | | | + * | v | + * | COPY_IMAGE | + * | | | + * | v | + * | UNLINK_PEER | + * | | | + * | v | + * | UPDATE_NON_PRIMARY_SNAPSHOT | + * | | | + * | v | + * | NOTIFY_IMAGE_UPDATE | + * | | | + * | v | + * | NOTIFY_LISTENER | + * | | | + * | \------------------------/| + * | | + * | (remote demoted) | + * \---------------> NOTIFY_LISTENER | + * | | | + * |/--------------------/ | + * | | + * | (update notification) | + * --------------------------------------------/ * | * v - * + * + * | + * v + * UNREGISTER_UPDATE_WATCHER + * | + * v + * WAIT_FOR_IN_FLIGHT_OPS + * | + * v + * * * @endverbatim */ @@ -103,9 +161,14 @@ private: enum State { STATE_INIT, STATE_REPLAYING, + STATE_IDLE, STATE_COMPLETE }; + struct C_UpdateWatchCtx; + struct C_TrackedOp; + struct ProgressContext; + Threads* m_threads; std::string m_local_mirror_uuid; PoolMetaCache* m_pool_meta_cache; @@ -115,6 +178,79 @@ private: mutable ceph::mutex m_lock; State m_state = STATE_INIT; + + Context* m_on_init_shutdown = nullptr; + + int m_error_code = 0; + std::string m_error_description; + + C_UpdateWatchCtx* m_update_watch_ctx; + uint64_t m_update_watcher_handle = 0; + + AsyncOpTracker m_in_flight_op_tracker; + + uint64_t m_local_snap_id_start = 0; + uint64_t m_local_snap_id_end = CEPH_NOSNAP; + cls::rbd::MirrorSnapshotNamespace m_local_mirror_snap_ns; + + std::string m_remote_mirror_peer_uuid; + uint64_t m_remote_snap_id_start = 0; + uint64_t m_remote_snap_id_end = CEPH_NOSNAP; + cls::rbd::MirrorSnapshotNamespace m_remote_mirror_snap_ns; + + librbd::mirror::snapshot::ImageState m_image_state; + ProgressContext* m_progress_ctx = nullptr; + + bool m_remote_image_updated = false; + + void refresh_local_image(); + void handle_refresh_local_image(int r); + + void refresh_remote_image(); + void handle_refresh_remote_image(int r); + + void scan_local_mirror_snapshots(); + void scan_remote_mirror_snapshots(); + + void copy_snapshots(); + void handle_copy_snapshots(int r); + + void get_image_state(); + void handle_get_image_state(int r); + + void create_non_primary_snapshot(); + void handle_create_non_primary_snapshot(int r); + + void copy_image(); + void handle_copy_image(int r); + void handle_copy_image_progress(uint64_t offset, uint64_t total); + + void unlink_peer(); + void handle_unlink_peer(int r); + + void update_non_primary_snapshot(bool complete); + void handle_update_non_primary_snapshot(bool complete, int r); + + void notify_image_update(); + void handle_notify_image_update(int r); + + void register_update_watcher(); + void handle_register_update_watcher(int r); + + void unregister_update_watcher(); + void handle_unregister_update_watcher(int r); + + void wait_for_in_flight_ops(); + void handle_wait_for_in_flight_ops(int r); + + void handle_remote_image_update_notify(); + + void handle_replay_complete(int r, const std::string& description); + void notify_status_updated(); + + bool is_replay_interrupted(); + bool is_replay_interrupted(std::unique_lock* lock); + }; } // namespace snapshot -- 2.39.5