From: Jason Dillaman Date: Fri, 8 Jul 2016 18:37:14 +0000 (-0400) Subject: librbd: separate journal event decoding and processing X-Git-Tag: v10.2.3~50^2~20 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=14db53587aa9a918174b616f006c38d1b4c99e58;p=ceph.git librbd: separate journal event decoding and processing Signed-off-by: Jason Dillaman (cherry picked from commit 57cd75e8058b84b5dce38f3d8f4b7b4138ac6c9a) --- diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 96f9321e614..96f6969ceb8 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -1330,6 +1330,7 @@ void Journal::handle_get_tags(int r) { template void Journal::handle_replay_ready() { + CephContext *cct = m_image_ctx.cct; ReplayEntry replay_entry; { Mutex::Locker locker(m_lock); @@ -1337,7 +1338,6 @@ void Journal::handle_replay_ready() { return; } - CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; if (!m_journaler->try_pop_front(&replay_entry)) { return; @@ -1350,11 +1350,20 @@ void Journal::handle_replay_ready() { bufferlist data = replay_entry.get_data(); bufferlist::iterator it = data.begin(); + + journal::EventEntry event_entry; + int r = m_journal_replay->decode(&it, &event_entry); + if (r < 0) { + lderr(cct) << this << " " << __func__ + << ": failed to decode journal event entry" << dendl; + handle_replay_process_safe(replay_entry, r); + return; + } + Context *on_ready = create_context_callback< Journal, &Journal::handle_replay_process_ready>(this); Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry)); - - m_journal_replay->process(&it, on_ready, on_commit); + m_journal_replay->process(event_entry, on_ready, on_commit); } template diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc index ce647ed60b1..857edb2ff45 100644 --- a/src/librbd/journal/Replay.cc +++ b/src/librbd/journal/Replay.cc @@ -152,23 +152,24 @@ Replay::~Replay() { } template -void Replay::process(bufferlist::iterator *it, Context *on_ready, - Context *on_safe) { +int Replay::decode(bufferlist::iterator *it, EventEntry *event_entry) { + try { + ::decode(*event_entry, *it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; +} + +template +void Replay::process(const EventEntry &event_entry, + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": " << "on_ready=" << on_ready << ", on_safe=" << on_safe << dendl; on_ready = util::create_async_context_callback(m_image_ctx, on_ready); - journal::EventEntry event_entry; - try { - ::decode(event_entry, *it); - } catch (const buffer::error &err) { - lderr(cct) << "failed to decode event entry: " << err.what() << dendl; - on_ready->complete(-EINVAL); - return; - } - RWLock::RLocker owner_lock(m_image_ctx.owner_lock); boost::apply_visitor(EventVisitor(this, on_ready, on_safe), event_entry.event); diff --git a/src/librbd/journal/Replay.h b/src/librbd/journal/Replay.h index 9ee6cd6dea2..648b5318cfe 100644 --- a/src/librbd/journal/Replay.h +++ b/src/librbd/journal/Replay.h @@ -33,7 +33,9 @@ public: Replay(ImageCtxT &image_ctx); ~Replay(); - void process(bufferlist::iterator *it, Context *on_ready, Context *on_safe); + int decode(bufferlist::iterator *it, EventEntry *event_entry); + void process(const EventEntry &event_entry, + Context *on_ready, Context *on_safe); void shut_down(bool cancel_ops, Context *on_finish); void flush(Context *on_finish); diff --git a/src/test/librbd/journal/test_mock_Replay.cc b/src/test/librbd/journal/test_mock_Replay.cc index 639ae5a306b..ca12bdda9cd 100644 --- a/src/test/librbd/journal/test_mock_Replay.cc +++ b/src/test/librbd/journal/test_mock_Replay.cc @@ -212,7 +212,11 @@ public: void when_process(MockJournalReplay &mock_journal_replay, bufferlist::iterator *it, Context *on_ready, Context *on_safe) { - mock_journal_replay.process(it, on_ready, on_safe); + EventEntry event_entry; + int r = mock_journal_replay.decode(it, &event_entry); + ASSERT_EQ(0, r); + + mock_journal_replay.process(event_entry, on_ready, on_safe); } void when_complete(MockReplayImageCtx &mock_image_ctx, AioCompletion *aio_comp, diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index d6a0f7451ac..77adac0738c 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -56,7 +56,8 @@ struct MockReplay { } MOCK_METHOD2(shut_down, void(bool cancel_ops, Context *)); - MOCK_METHOD3(process, void(bufferlist::iterator*, Context *, Context *)); + MOCK_METHOD2(decode, int(bufferlist::iterator*, EventEntry *)); + MOCK_METHOD3(process, void(const EventEntry&, Context *, Context *)); MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *)); }; @@ -71,9 +72,13 @@ public: MockReplay::get_instance().shut_down(cancel_ops, on_finish); } - void process(bufferlist::iterator *it, Context *on_ready, + int decode(bufferlist::iterator *it, EventEntry *event_entry) { + return MockReplay::get_instance().decode(it, event_entry); + } + + void process(const EventEntry& event_entry, Context *on_ready, Context *on_commit) { - MockReplay::get_instance().process(it, on_ready, on_commit); + MockReplay::get_instance().process(event_entry, on_ready, on_commit); } void replay_op_ready(uint64_t op_tid, Context *on_resume) { @@ -231,6 +236,8 @@ public: } void expect_replay_process(MockJournalReplay &mock_journal_replay) { + EXPECT_CALL(mock_journal_replay, decode(_, _)) + .WillOnce(Return(0)); EXPECT_CALL(mock_journal_replay, process(_, _, _)) .WillOnce(DoAll(WithArg<1>(CompleteContext(0, NULL)), WithArg<2>(Invoke(this, &TestMockJournal::save_commit_context)))); @@ -592,6 +599,58 @@ TEST_F(TestMockJournal, FlushReplayError) { ASSERT_EQ(0, when_close(mock_journal)); } +TEST_F(TestMockJournal, CorruptEntry) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); + expect_op_work_queue(mock_image_ctx); + + InSequence seq; + + ::journal::MockJournaler mock_journaler; + expect_construct_journaler(mock_journaler); + expect_init_journaler(mock_journaler, 0); + expect_get_max_append_size(mock_journaler, 1 << 16); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_start_replay( + mock_image_ctx, mock_journaler, { + std::bind(&invoke_replay_ready, _1), + std::bind(&invoke_replay_complete, _1, 0) + }); + + ::journal::MockReplayEntry mock_replay_entry; + MockJournalReplay mock_journal_replay; + expect_try_pop_front(mock_journaler, true, mock_replay_entry); + EXPECT_CALL(mock_journal_replay, decode(_, _)).WillOnce(Return(-EBADMSG)); + expect_stop_replay(mock_journaler); + expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true); + expect_shut_down_journaler(mock_journaler); + + // replay failure should result in replay-restart + expect_construct_journaler(mock_journaler); + expect_init_journaler(mock_journaler, 0); + expect_get_max_append_size(mock_journaler, 1 << 16); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); + expect_start_replay( + mock_image_ctx, mock_journaler, { + std::bind(&invoke_replay_complete, _1, 0) + }); + expect_stop_replay(mock_journaler); + expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); + expect_start_append(mock_journaler); + ASSERT_EQ(0, when_open(mock_journal)); + + expect_stop_append(mock_journaler, -EINVAL); + expect_shut_down_journaler(mock_journaler); + ASSERT_EQ(-EINVAL, when_close(mock_journal)); +} + TEST_F(TestMockJournal, StopError) { REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index 92fae2616b9..c14ac4a1f3e 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -34,7 +34,8 @@ namespace journal { template<> struct Replay { - MOCK_METHOD3(process, void(bufferlist::iterator *, Context *, Context *)); + MOCK_METHOD2(decode, int(bufferlist::iterator *, EventEntry *)); + MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *)); MOCK_METHOD1(flush, void(Context*)); MOCK_METHOD2(shut_down, void(bool, Context*)); }; diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index bbb1cbe5384..25f7209465e 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -996,10 +996,18 @@ void ImageReplayer::process_entry() { bufferlist data = m_replay_entry.get_data(); bufferlist::iterator it = data.begin(); + librbd::journal::EventEntry event_entry; + int r = m_local_replay->decode(&it, &event_entry); + if (r < 0) { + derr << "failed to decode journal event" << dendl; + handle_replay_complete(r, "failed to decode journal event"); + return; + } + Context *on_ready = create_context_callback< ImageReplayer, &ImageReplayer::handle_process_entry_ready>(this); Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry)); - m_local_replay->process(&it, on_ready, on_commit); + m_local_replay->process(event_entry, on_ready, on_commit); } template