]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: preprocess journal events prior to applying 10249/head
authorJason Dillaman <dillaman@redhat.com>
Mon, 11 Jul 2016 19:32:45 +0000 (15:32 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 19 Jul 2016 13:28:36 +0000 (09:28 -0400)
Fixes: http://tracker.ceph.com/issues/16622
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h

index c14ac4a1f3e257ea8653f06b5c251850f94b7add..5f651af522387465b9db5d907984574aee6f0f5c 100644 (file)
@@ -6,6 +6,7 @@
 #include "tools/rbd_mirror/ImageReplayer.h"
 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
 #include "tools/rbd_mirror/ImageSyncThrottler.h"
 #include "test/journal/mock/MockJournaler.h"
 #include "test/librbd/mock/MockImageCtx.h"
@@ -116,6 +117,28 @@ struct CloseImageRequest<librbd::MockTestImageCtx> {
   MOCK_METHOD0(send, void());
 };
 
+template<>
+struct EventPreprocessor<librbd::MockTestImageCtx> {
+  static EventPreprocessor *s_instance;
+
+  static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx,
+                                   ::journal::MockJournalerProxy &remote_journaler,
+                                   const std::string &local_mirror_uuid,
+                                   librbd::journal::MirrorPeerClientMeta *client_meta,
+                                   ContextWQ *work_queue) {
+    assert(s_instance != nullptr);
+    return s_instance;
+  }
+
+  EventPreprocessor() {
+    assert(s_instance == nullptr);
+    s_instance = this;
+  }
+
+  MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &));
+  MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
+};
+
 template<>
 struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
   static ReplayStatusFormatter* s_instance;
@@ -136,6 +159,7 @@ struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
 
 BootstrapRequest<librbd::MockTestImageCtx>* BootstrapRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
 CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
+EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
 ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
 
 } // namespace image_replayer
index f12f3b93e0047e65f70af479c64c9c631d75d549..8eb4e0760a5ef919d9e9bc26b3c6bccc7f4a40e3 100644 (file)
@@ -23,6 +23,7 @@
 #include "Threads.h"
 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
 
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -296,6 +297,7 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
 template <typename I>
 ImageReplayer<I>::~ImageReplayer()
 {
+  assert(m_event_preprocessor == nullptr);
   assert(m_replay_status_formatter == nullptr);
   assert(m_local_image_ctx == nullptr);
   assert(m_local_replay == nullptr);
@@ -539,6 +541,9 @@ void ImageReplayer<I>::handle_start_replay(int r) {
     std::swap(m_on_start_finish, on_finish);
   }
 
+  m_event_preprocessor = EventPreprocessor<I>::create(
+    *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
+    &m_client_meta, m_threads->work_queue);
   m_replay_status_formatter =
     ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
 
@@ -703,7 +708,7 @@ void ImageReplayer<I>::handle_replay_ready()
   }
 
   if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
-    process_entry();
+    preprocess_entry();
     return;
   }
 
@@ -978,29 +983,56 @@ void ImageReplayer<I>::handle_allocate_local_tag(int r) {
     return;
   }
 
-  process_entry();
+  preprocess_entry();
 }
 
 template <typename I>
-void ImageReplayer<I>::process_entry() {
-  dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+void ImageReplayer<I>::preprocess_entry() {
+  dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
            << dendl;
 
   bufferlist data = m_replay_entry.get_data();
   bufferlist::iterator it = data.begin();
-
-  librbd::journal::EventEntry event_entry;
-  int r = m_local_replay->decode(&it, &event_entry);
+  int r = m_local_replay->decode(&it, &m_event_entry);
   if (r < 0) {
     derr << "failed to decode journal event" << dendl;
     handle_replay_complete(r, "failed to decode journal event");
     return;
   }
 
+  if (!m_event_preprocessor->is_required(m_event_entry)) {
+    process_entry();
+    return;
+  }
+
+  Context *ctx = create_context_callback<
+    ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry>(this);
+  m_event_preprocessor->preprocess(&m_event_entry, ctx);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_preprocess_entry(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "failed to preprocess journal event" << dendl;
+    handle_replay_complete(r, "failed to preprocess journal event");
+    return;
+  }
+
+  process_entry();
+}
+
+template <typename I>
+void ImageReplayer<I>::process_entry() {
+  dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+           << dendl;
+
   Context *on_ready = create_context_callback<
     ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
   Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
-  m_local_replay->process(event_entry, on_ready, on_commit);
+  m_local_replay->process(m_event_entry, on_ready, on_commit);
+  m_event_entry = {};
 }
 
 template <typename I>
@@ -1305,6 +1337,10 @@ void ImageReplayer<I>::shut_down(int r, Context *on_start) {
         m_local_journal->stop_external_replay();
         m_local_journal = nullptr;
         m_local_replay = nullptr;
+
+        delete m_event_preprocessor;
+        m_event_preprocessor = nullptr;
+
         ctx->complete(0);
       });
     ctx = new FunctionContext([this, ctx](int r) {
index 17196a300425fce7a0fdd088f312ac7a116ccbf7..d3dcbc1e4347a978d31058ebbc3dfb361078089a 100644 (file)
@@ -45,6 +45,7 @@ namespace mirror {
 struct Threads;
 
 namespace image_replayer { template <typename> class BootstrapRequest; }
+namespace image_replayer { template <typename> class EventPreprocessor; }
 namespace image_replayer { template <typename> class ReplayStatusFormatter; }
 
 /**
@@ -165,6 +166,9 @@ protected:
    *    |                     ALLOCATE_LOCAL_TAG  * * * * * *
    *    |                         |                     |   *
    *    |                         v                 (error) *
+   *    |                     PREPROCESS_ENTRY  * * * * * * *
+   *    |                         |                     |   *
+   *    |                         v                 (error) *
    *    |                     PROCESS_ENTRY * * * * * * * * *
    *    |                         |                     |   *
    *    |                         \---------------------/   *
@@ -228,6 +232,7 @@ private:
   int m_last_r = 0;
   std::string m_state_desc;
   BootstrapProgressContext m_progress_cxt;
+  image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
   image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
     nullptr;
   librados::IoCtx m_local_ioctx, m_remote_ioctx;
@@ -263,6 +268,7 @@ private:
   uint64_t m_replay_tag_tid = 0;
   cls::journal::Tag m_replay_tag;
   librbd::journal::TagData m_replay_tag_data;
+  librbd::journal::EventEntry m_event_entry;
 
   struct C_ReplayCommitted : public Context {
     ImageReplayer *replayer;
@@ -322,6 +328,9 @@ private:
   void allocate_local_tag();
   void handle_allocate_local_tag(int r);
 
+  void preprocess_entry();
+  void handle_preprocess_entry(int r);
+
   void process_entry();
   void handle_process_entry_ready(int r);
   void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);