#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
MOCK_METHOD0(send, void());
};
+template<>
+struct EventPreprocessor<librbd::MockTestImageCtx> {
+ static EventPreprocessor *s_instance;
+
+ static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx,
+ ::journal::MockJournalerProxy &remote_journaler,
+ const std::string &local_mirror_uuid,
+ librbd::journal::MirrorPeerClientMeta *client_meta,
+ ContextWQ *work_queue) {
+ assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ EventPreprocessor() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &));
+ MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
+};
+
template<>
struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
static ReplayStatusFormatter* s_instance;
BootstrapRequest<librbd::MockTestImageCtx>* BootstrapRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace image_replayer
#include "Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
#define dout_subsys ceph_subsys_rbd_mirror
template <typename I>
ImageReplayer<I>::~ImageReplayer()
{
+ assert(m_event_preprocessor == nullptr);
assert(m_replay_status_formatter == nullptr);
assert(m_local_image_ctx == nullptr);
assert(m_local_replay == nullptr);
std::swap(m_on_start_finish, on_finish);
}
+ m_event_preprocessor = EventPreprocessor<I>::create(
+ *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
+ &m_client_meta, m_threads->work_queue);
m_replay_status_formatter =
ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
}
if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
- process_entry();
+ preprocess_entry();
return;
}
return;
}
- process_entry();
+ preprocess_entry();
}
template <typename I>
-void ImageReplayer<I>::process_entry() {
- dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+void ImageReplayer<I>::preprocess_entry() {
+ dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
<< dendl;
bufferlist data = m_replay_entry.get_data();
bufferlist::iterator it = data.begin();
-
- librbd::journal::EventEntry event_entry;
- int r = m_local_replay->decode(&it, &event_entry);
+ int r = m_local_replay->decode(&it, &m_event_entry);
if (r < 0) {
derr << "failed to decode journal event" << dendl;
handle_replay_complete(r, "failed to decode journal event");
return;
}
+ if (!m_event_preprocessor->is_required(m_event_entry)) {
+ process_entry();
+ return;
+ }
+
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry>(this);
+ m_event_preprocessor->preprocess(&m_event_entry, ctx);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_preprocess_entry(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to preprocess journal event" << dendl;
+ handle_replay_complete(r, "failed to preprocess journal event");
+ return;
+ }
+
+ process_entry();
+}
+
+template <typename I>
+void ImageReplayer<I>::process_entry() {
+ dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+ << dendl;
+
Context *on_ready = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
- m_local_replay->process(event_entry, on_ready, on_commit);
+ m_local_replay->process(m_event_entry, on_ready, on_commit);
+ m_event_entry = {};
}
template <typename I>
m_local_journal->stop_external_replay();
m_local_journal = nullptr;
m_local_replay = nullptr;
+
+ delete m_event_preprocessor;
+ m_event_preprocessor = nullptr;
+
ctx->complete(0);
});
ctx = new FunctionContext([this, ctx](int r) {
struct Threads;
namespace image_replayer { template <typename> class BootstrapRequest; }
+namespace image_replayer { template <typename> class EventPreprocessor; }
namespace image_replayer { template <typename> class ReplayStatusFormatter; }
/**
* | ALLOCATE_LOCAL_TAG * * * * * *
* | | | *
* | v (error) *
+ * | PREPROCESS_ENTRY * * * * * * *
+ * | | | *
+ * | v (error) *
* | PROCESS_ENTRY * * * * * * * * *
* | | | *
* | \---------------------/ *
int m_last_r = 0;
std::string m_state_desc;
BootstrapProgressContext m_progress_cxt;
+ image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
nullptr;
librados::IoCtx m_local_ioctx, m_remote_ioctx;
uint64_t m_replay_tag_tid = 0;
cls::journal::Tag m_replay_tag;
librbd::journal::TagData m_replay_tag_data;
+ librbd::journal::EventEntry m_event_entry;
struct C_ReplayCommitted : public Context {
ImageReplayer *replayer;
void allocate_local_tag();
void handle_allocate_local_tag(int r);
+ void preprocess_entry();
+ void handle_preprocess_entry(int r);
+
void process_entry();
void handle_process_entry_ready(int r);
void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);