template <typename I>
void Journal<I>::handle_replay_ready() {
+ CephContext *cct = m_image_ctx.cct;
ReplayEntry replay_entry;
{
Mutex::Locker locker(m_lock);
return;
}
- CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
if (!m_journaler->try_pop_front(&replay_entry)) {
return;
bufferlist data = replay_entry.get_data();
bufferlist::iterator it = data.begin();
+
+ journal::EventEntry event_entry;
+ int r = m_journal_replay->decode(&it, &event_entry);
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__
+ << ": failed to decode journal event entry" << dendl;
+ handle_replay_process_safe(replay_entry, r);
+ return;
+ }
+
Context *on_ready = create_context_callback<
Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
-
- m_journal_replay->process(&it, on_ready, on_commit);
+ m_journal_replay->process(event_entry, on_ready, on_commit);
}
template <typename I>
}
template <typename I>
-void Replay<I>::process(bufferlist::iterator *it, Context *on_ready,
- Context *on_safe) {
+int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
+ try {
+ ::decode(*event_entry, *it);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+template <typename I>
+void Replay<I>::process(const EventEntry &event_entry,
+ Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "on_ready=" << on_ready << ", on_safe=" << on_safe << dendl;
on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
- journal::EventEntry event_entry;
- try {
- ::decode(event_entry, *it);
- } catch (const buffer::error &err) {
- lderr(cct) << "failed to decode event entry: " << err.what() << dendl;
- on_ready->complete(-EINVAL);
- return;
- }
-
RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
event_entry.event);
}
MOCK_METHOD2(shut_down, void(bool cancel_ops, Context *));
- MOCK_METHOD3(process, void(bufferlist::iterator*, Context *, Context *));
+ MOCK_METHOD2(decode, int(bufferlist::iterator*, EventEntry *));
+ MOCK_METHOD3(process, void(const EventEntry&, Context *, Context *));
MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *));
};
MockReplay::get_instance().shut_down(cancel_ops, on_finish);
}
- void process(bufferlist::iterator *it, Context *on_ready,
+ int decode(bufferlist::iterator *it, EventEntry *event_entry) {
+ return MockReplay::get_instance().decode(it, event_entry);
+ }
+
+ void process(const EventEntry& event_entry, Context *on_ready,
Context *on_commit) {
- MockReplay::get_instance().process(it, on_ready, on_commit);
+ MockReplay::get_instance().process(event_entry, on_ready, on_commit);
}
void replay_op_ready(uint64_t op_tid, Context *on_resume) {
}
void expect_replay_process(MockJournalReplay &mock_journal_replay) {
+ EXPECT_CALL(mock_journal_replay, decode(_, _))
+ .WillOnce(Return(0));
EXPECT_CALL(mock_journal_replay, process(_, _, _))
.WillOnce(DoAll(WithArg<1>(CompleteContext(0, NULL)),
WithArg<2>(Invoke(this, &TestMockJournal::save_commit_context))));
ASSERT_EQ(0, when_close(mock_journal));
}
+TEST_F(TestMockJournal, CorruptEntry) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+
+ ::journal::MockJournaler mock_journaler;
+ expect_construct_journaler(mock_journaler);
+ expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
+ expect_get_journaler_cached_client(mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_start_replay(
+ mock_image_ctx, mock_journaler, {
+ std::bind(&invoke_replay_ready, _1),
+ std::bind(&invoke_replay_complete, _1, 0)
+ });
+
+ ::journal::MockReplayEntry mock_replay_entry;
+ MockJournalReplay mock_journal_replay;
+ expect_try_pop_front(mock_journaler, true, mock_replay_entry);
+ EXPECT_CALL(mock_journal_replay, decode(_, _)).WillOnce(Return(-EBADMSG));
+ expect_stop_replay(mock_journaler);
+ expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
+ expect_shut_down_journaler(mock_journaler);
+
+ // replay failure should result in replay-restart
+ expect_construct_journaler(mock_journaler);
+ expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
+ expect_get_journaler_cached_client(mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_start_replay(
+ mock_image_ctx, mock_journaler, {
+ std::bind(&invoke_replay_complete, _1, 0)
+ });
+ expect_stop_replay(mock_journaler);
+ expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
+ expect_start_append(mock_journaler);
+ ASSERT_EQ(0, when_open(mock_journal));
+
+ expect_stop_append(mock_journaler, -EINVAL);
+ expect_shut_down_journaler(mock_journaler);
+ ASSERT_EQ(-EINVAL, when_close(mock_journal));
+}
+
TEST_F(TestMockJournal, StopError) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);