]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: separate journal event decoding and processing
authorJason Dillaman <dillaman@redhat.com>
Fri, 8 Jul 2016 18:37:14 +0000 (14:37 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 17 Aug 2016 17:22:04 +0000 (13:22 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 57cd75e8058b84b5dce38f3d8f4b7b4138ac6c9a)

src/librbd/Journal.cc
src/librbd/journal/Replay.cc
src/librbd/journal/Replay.h
src/test/librbd/journal/test_mock_Replay.cc
src/test/librbd/test_mock_Journal.cc
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc

index 96f9321e614d730d131f6ac7a315fed369935099..96f6969ceb8e02b61f4c12d2ec1c93aa63522bc1 100644 (file)
@@ -1330,6 +1330,7 @@ void Journal<I>::handle_get_tags(int r) {
 
 template <typename I>
 void Journal<I>::handle_replay_ready() {
+  CephContext *cct = m_image_ctx.cct;
   ReplayEntry replay_entry;
   {
     Mutex::Locker locker(m_lock);
@@ -1337,7 +1338,6 @@ void Journal<I>::handle_replay_ready() {
       return;
     }
 
-    CephContext *cct = m_image_ctx.cct;
     ldout(cct, 20) << this << " " << __func__ << dendl;
     if (!m_journaler->try_pop_front(&replay_entry)) {
       return;
@@ -1350,11 +1350,20 @@ void Journal<I>::handle_replay_ready() {
 
   bufferlist data = replay_entry.get_data();
   bufferlist::iterator it = data.begin();
+
+  journal::EventEntry event_entry;
+  int r = m_journal_replay->decode(&it, &event_entry);
+  if (r < 0) {
+    lderr(cct) << this << " " << __func__
+               << ": failed to decode journal event entry" << dendl;
+    handle_replay_process_safe(replay_entry, r);
+    return;
+  }
+
   Context *on_ready = create_context_callback<
     Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
   Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
-
-  m_journal_replay->process(&it, on_ready, on_commit);
+  m_journal_replay->process(event_entry, on_ready, on_commit);
 }
 
 template <typename I>
index ce647ed60b178c668d32996a63ca8cef9d204d45..857edb2ff45a30049fabf4c4efeea8fd85e79cc2 100644 (file)
@@ -152,23 +152,24 @@ Replay<I>::~Replay() {
 }
 
 template <typename I>
-void Replay<I>::process(bufferlist::iterator *it, Context *on_ready,
-                        Context *on_safe) {
+int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
+  try {
+    ::decode(*event_entry, *it);
+  } catch (const buffer::error &err) {
+    return -EBADMSG;
+  }
+  return 0;
+}
+
+template <typename I>
+void Replay<I>::process(const EventEntry &event_entry,
+                        Context *on_ready, Context *on_safe) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": "
                  << "on_ready=" << on_ready << ", on_safe=" << on_safe << dendl;
 
   on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
 
-  journal::EventEntry event_entry;
-  try {
-    ::decode(event_entry, *it);
-  } catch (const buffer::error &err) {
-    lderr(cct) << "failed to decode event entry: " << err.what() << dendl;
-    on_ready->complete(-EINVAL);
-    return;
-  }
-
   RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
   boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
                        event_entry.event);
index 9ee6cd6dea27b447d7aabb33398edccf83991630..648b5318cfe67603c99c45e4ad48d9ca77f064d2 100644 (file)
@@ -33,7 +33,9 @@ public:
   Replay(ImageCtxT &image_ctx);
   ~Replay();
 
-  void process(bufferlist::iterator *it, Context *on_ready, Context *on_safe);
+  int decode(bufferlist::iterator *it, EventEntry *event_entry);
+  void process(const EventEntry &event_entry,
+               Context *on_ready, Context *on_safe);
 
   void shut_down(bool cancel_ops, Context *on_finish);
   void flush(Context *on_finish);
index 639ae5a306bfddcb336873a16f3c33b481658fd7..ca12bdda9cd2288395d9b5e2fa9a1bb968b6488f 100644 (file)
@@ -212,7 +212,11 @@ public:
   void when_process(MockJournalReplay &mock_journal_replay,
                     bufferlist::iterator *it, Context *on_ready,
                     Context *on_safe) {
-    mock_journal_replay.process(it, on_ready, on_safe);
+    EventEntry event_entry;
+    int r = mock_journal_replay.decode(it, &event_entry);
+    ASSERT_EQ(0, r);
+
+    mock_journal_replay.process(event_entry, on_ready, on_safe);
   }
 
   void when_complete(MockReplayImageCtx &mock_image_ctx, AioCompletion *aio_comp,
index d6a0f7451ac373714d6f7ab67f342fa0e76a88ce..77adac0738c246c44d646821d497ea8fde7bb6bd 100644 (file)
@@ -56,7 +56,8 @@ struct MockReplay {
   }
 
   MOCK_METHOD2(shut_down, void(bool cancel_ops, Context *));
-  MOCK_METHOD3(process, void(bufferlist::iterator*, Context *, Context *));
+  MOCK_METHOD2(decode, int(bufferlist::iterator*, EventEntry *));
+  MOCK_METHOD3(process, void(const EventEntry&, Context *, Context *));
   MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *));
 };
 
@@ -71,9 +72,13 @@ public:
     MockReplay::get_instance().shut_down(cancel_ops, on_finish);
   }
 
-  void process(bufferlist::iterator *it, Context *on_ready,
+  int decode(bufferlist::iterator *it, EventEntry *event_entry) {
+    return MockReplay::get_instance().decode(it, event_entry);
+  }
+
+  void process(const EventEntry& event_entry, Context *on_ready,
                Context *on_commit) {
-    MockReplay::get_instance().process(it, on_ready, on_commit);
+    MockReplay::get_instance().process(event_entry, on_ready, on_commit);
   }
 
   void replay_op_ready(uint64_t op_tid, Context *on_resume) {
@@ -231,6 +236,8 @@ public:
   }
 
   void expect_replay_process(MockJournalReplay &mock_journal_replay) {
+    EXPECT_CALL(mock_journal_replay, decode(_, _))
+                  .WillOnce(Return(0));
     EXPECT_CALL(mock_journal_replay, process(_, _, _))
                   .WillOnce(DoAll(WithArg<1>(CompleteContext(0, NULL)),
                                   WithArg<2>(Invoke(this, &TestMockJournal::save_commit_context))));
@@ -592,6 +599,58 @@ TEST_F(TestMockJournal, FlushReplayError) {
   ASSERT_EQ(0, when_close(mock_journal));
 }
 
+TEST_F(TestMockJournal, CorruptEntry) {
+  REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockJournalImageCtx mock_image_ctx(*ictx);
+  MockJournal mock_journal(mock_image_ctx);
+  expect_op_work_queue(mock_image_ctx);
+
+  InSequence seq;
+
+  ::journal::MockJournaler mock_journaler;
+  expect_construct_journaler(mock_journaler);
+  expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
+  expect_get_journaler_cached_client(mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_start_replay(
+    mock_image_ctx, mock_journaler, {
+      std::bind(&invoke_replay_ready, _1),
+      std::bind(&invoke_replay_complete, _1, 0)
+    });
+
+  ::journal::MockReplayEntry mock_replay_entry;
+  MockJournalReplay mock_journal_replay;
+  expect_try_pop_front(mock_journaler, true, mock_replay_entry);
+  EXPECT_CALL(mock_journal_replay, decode(_, _)).WillOnce(Return(-EBADMSG));
+  expect_stop_replay(mock_journaler);
+  expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
+  expect_shut_down_journaler(mock_journaler);
+
+  // replay failure should result in replay-restart
+  expect_construct_journaler(mock_journaler);
+  expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
+  expect_get_journaler_cached_client(mock_journaler, 0);
+  expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+  expect_start_replay(
+    mock_image_ctx, mock_journaler, {
+      std::bind(&invoke_replay_complete, _1, 0)
+    });
+  expect_stop_replay(mock_journaler);
+  expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
+  expect_start_append(mock_journaler);
+  ASSERT_EQ(0, when_open(mock_journal));
+
+  expect_stop_append(mock_journaler, -EINVAL);
+  expect_shut_down_journaler(mock_journaler);
+  ASSERT_EQ(-EINVAL, when_close(mock_journal));
+}
+
 TEST_F(TestMockJournal, StopError) {
   REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
 
index 92fae2616b92c23d05e8c27e2d478edd46cd9501..c14ac4a1f3e257ea8653f06b5c251850f94b7add 100644 (file)
@@ -34,7 +34,8 @@ namespace journal {
 
 template<>
 struct Replay<MockTestImageCtx> {
-  MOCK_METHOD3(process, void(bufferlist::iterator *, Context *, Context *));
+  MOCK_METHOD2(decode, int(bufferlist::iterator *, EventEntry *));
+  MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *));
   MOCK_METHOD1(flush, void(Context*));
   MOCK_METHOD2(shut_down, void(bool, Context*));
 };
index bbb1cbe5384d3d7c65d348d34c14d6efdb893bbf..25f7209465e60948067ab3e0bd71f01566410d0f 100644 (file)
@@ -996,10 +996,18 @@ void ImageReplayer<I>::process_entry() {
   bufferlist data = m_replay_entry.get_data();
   bufferlist::iterator it = data.begin();
 
+  librbd::journal::EventEntry event_entry;
+  int r = m_local_replay->decode(&it, &event_entry);
+  if (r < 0) {
+    derr << "failed to decode journal event" << dendl;
+    handle_replay_complete(r, "failed to decode journal event");
+    return;
+  }
+
   Context *on_ready = create_context_callback<
     ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
   Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
-  m_local_replay->process(&it, on_ready, on_commit);
+  m_local_replay->process(event_entry, on_ready, on_commit);
 }
 
 template <typename I>