From 4df913d10b2dd0444db806fccb2812547efa1b56 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Mon, 11 Jul 2016 15:32:45 -0400 Subject: [PATCH] rbd-mirror: preprocess journal events prior to applying Fixes: http://tracker.ceph.com/issues/16622 Signed-off-by: Jason Dillaman --- .../rbd_mirror/test_mock_ImageReplayer.cc | 24 +++++++++ src/tools/rbd_mirror/ImageReplayer.cc | 52 ++++++++++++++++--- src/tools/rbd_mirror/ImageReplayer.h | 9 ++++ 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index c14ac4a1f3e25..5f651af522387 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -6,6 +6,7 @@ #include "tools/rbd_mirror/ImageReplayer.h" #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h" #include "tools/rbd_mirror/ImageSyncThrottler.h" #include "test/journal/mock/MockJournaler.h" #include "test/librbd/mock/MockImageCtx.h" @@ -116,6 +117,28 @@ struct CloseImageRequest { MOCK_METHOD0(send, void()); }; +template<> +struct EventPreprocessor { + static EventPreprocessor *s_instance; + + static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx, + ::journal::MockJournalerProxy &remote_journaler, + const std::string &local_mirror_uuid, + librbd::journal::MirrorPeerClientMeta *client_meta, + ContextWQ *work_queue) { + assert(s_instance != nullptr); + return s_instance; + } + + EventPreprocessor() { + assert(s_instance == nullptr); + s_instance = this; + } + + MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &)); + MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *)); +}; + template<> struct ReplayStatusFormatter { static ReplayStatusFormatter* s_instance; @@ -136,6 +159,7 @@ struct ReplayStatusFormatter { BootstrapRequest* BootstrapRequest::s_instance = nullptr; CloseImageRequest* CloseImageRequest::s_instance = nullptr; +EventPreprocessor* EventPreprocessor::s_instance = nullptr; ReplayStatusFormatter* ReplayStatusFormatter::s_instance = nullptr; } // namespace image_replayer diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index f12f3b93e0047..8eb4e0760a5ef 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -23,6 +23,7 @@ #include "Threads.h" #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h" #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h" #define dout_subsys ceph_subsys_rbd_mirror @@ -296,6 +297,7 @@ ImageReplayer::ImageReplayer(Threads *threads, template ImageReplayer::~ImageReplayer() { + assert(m_event_preprocessor == nullptr); assert(m_replay_status_formatter == nullptr); assert(m_local_image_ctx == nullptr); assert(m_local_replay == nullptr); @@ -539,6 +541,9 @@ void ImageReplayer::handle_start_replay(int r) { std::swap(m_on_start_finish, on_finish); } + m_event_preprocessor = EventPreprocessor::create( + *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid, + &m_client_meta, m_threads->work_queue); m_replay_status_formatter = ReplayStatusFormatter::create(m_remote_journaler, m_local_mirror_uuid); @@ -703,7 +708,7 @@ void ImageReplayer::handle_replay_ready() } if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) { - process_entry(); + preprocess_entry(); return; } @@ -978,29 +983,56 @@ void ImageReplayer::handle_allocate_local_tag(int r) { return; } - process_entry(); + preprocess_entry(); } template -void ImageReplayer::process_entry() { - dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() +void ImageReplayer::preprocess_entry() { + dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid() << dendl; bufferlist data = m_replay_entry.get_data(); bufferlist::iterator it = data.begin(); - - librbd::journal::EventEntry event_entry; - int r = m_local_replay->decode(&it, &event_entry); + int r = m_local_replay->decode(&it, &m_event_entry); if (r < 0) { derr << "failed to decode journal event" << dendl; handle_replay_complete(r, "failed to decode journal event"); return; } + if (!m_event_preprocessor->is_required(m_event_entry)) { + process_entry(); + return; + } + + Context *ctx = create_context_callback< + ImageReplayer, &ImageReplayer::handle_preprocess_entry>(this); + m_event_preprocessor->preprocess(&m_event_entry, ctx); +} + +template +void ImageReplayer::handle_preprocess_entry(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to preprocess journal event" << dendl; + handle_replay_complete(r, "failed to preprocess journal event"); + return; + } + + process_entry(); +} + +template +void ImageReplayer::process_entry() { + dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() + << dendl; + Context *on_ready = create_context_callback< ImageReplayer, &ImageReplayer::handle_process_entry_ready>(this); Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry)); - m_local_replay->process(event_entry, on_ready, on_commit); + m_local_replay->process(m_event_entry, on_ready, on_commit); + m_event_entry = {}; } template @@ -1305,6 +1337,10 @@ void ImageReplayer::shut_down(int r, Context *on_start) { m_local_journal->stop_external_replay(); m_local_journal = nullptr; m_local_replay = nullptr; + + delete m_event_preprocessor; + m_event_preprocessor = nullptr; + ctx->complete(0); }); ctx = new FunctionContext([this, ctx](int r) { diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 17196a300425f..d3dcbc1e4347a 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -45,6 +45,7 @@ namespace mirror { struct Threads; namespace image_replayer { template class BootstrapRequest; } +namespace image_replayer { template class EventPreprocessor; } namespace image_replayer { template class ReplayStatusFormatter; } /** @@ -165,6 +166,9 @@ protected: * | ALLOCATE_LOCAL_TAG * * * * * * * | | | * * | v (error) * + * | PREPROCESS_ENTRY * * * * * * * + * | | | * + * | v (error) * * | PROCESS_ENTRY * * * * * * * * * * | | | * * | \---------------------/ * @@ -228,6 +232,7 @@ private: int m_last_r = 0; std::string m_state_desc; BootstrapProgressContext m_progress_cxt; + image_replayer::EventPreprocessor *m_event_preprocessor = nullptr; image_replayer::ReplayStatusFormatter *m_replay_status_formatter = nullptr; librados::IoCtx m_local_ioctx, m_remote_ioctx; @@ -263,6 +268,7 @@ private: uint64_t m_replay_tag_tid = 0; cls::journal::Tag m_replay_tag; librbd::journal::TagData m_replay_tag_data; + librbd::journal::EventEntry m_event_entry; struct C_ReplayCommitted : public Context { ImageReplayer *replayer; @@ -322,6 +328,9 @@ private: void allocate_local_tag(); void handle_allocate_local_tag(int r); + void preprocess_entry(); + void handle_preprocess_entry(int r); + void process_entry(); void handle_process_entry_ready(int r); void handle_process_entry_safe(const ReplayEntry& replay_entry, int r); -- 2.39.5