assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
assert(m_journaler == NULL);
assert(m_journal_replay == NULL);
+ assert(m_on_replay_close_request == nullptr);
assert(m_wait_for_state_contexts.empty());
}
stop_recording();
}
+ // interrupt external replay if active
+ if (m_on_replay_close_request != nullptr) {
+ m_on_replay_close_request->complete(0);
+ m_on_replay_close_request = nullptr;
+ }
+
m_close_pending = true;
wait_for_steady_state(on_finish);
}
template <typename I>
void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
- Context *on_finish) {
+ Context *on_start,
+ Context *on_close_request) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);
assert(m_journal_replay == nullptr);
+ assert(m_on_replay_close_request == nullptr);
+ m_on_replay_close_request = on_close_request;
- on_finish = util::create_async_context_callback(m_image_ctx, on_finish);
- on_finish = new FunctionContext(
- [this, journal_replay, on_finish](int r) {
- handle_start_external_replay(r, journal_replay, on_finish);
+ on_start = util::create_async_context_callback(m_image_ctx, on_start);
+ on_start = new FunctionContext(
+ [this, journal_replay, on_start](int r) {
+ handle_start_external_replay(r, journal_replay, on_start);
});
// safely flush all in-flight events before starting external replay
m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
- on_finish));
+ on_start));
}
template <typename I>
lderr(cct) << "failed to stop recording: " << cpp_strerror(r) << dendl;
*journal_replay = nullptr;
+ if (m_on_replay_close_request != nullptr) {
+ m_on_replay_close_request->complete(r);
+ m_on_replay_close_request = nullptr;
+ }
+
// get back to a sane-state
start_append();
on_finish->complete(r);
assert(m_journal_replay != nullptr);
assert(m_state == STATE_REPLAYING);
+ if (m_on_replay_close_request != nullptr) {
+ m_on_replay_close_request->complete(-ECANCELED);
+ m_on_replay_close_request = nullptr;
+ }
+
delete m_journal_replay;
m_journal_replay = nullptr;
+ if (m_close_pending) {
+ destroy_journaler(0);
+ return;
+ }
+
start_append();
}
}
void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
- Context *on_finish);
+ Context *on_start, Context *on_close_request);
void stop_external_replay();
private:
bool m_blocking_writes;
journal::Replay<ImageCtxT> *m_journal_replay;
+ Context *m_on_replay_close_request = nullptr;
uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
expect_shut_down_journaler(mock_journaler);
}
+TEST_F(TestMockJournal, ExternalReplay) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ ::journal::MockJournaler mock_journaler;
+ open_journal(mock_image_ctx, mock_journal, mock_journaler);
+ BOOST_SCOPE_EXIT_ALL(&) {
+ close_journal(mock_journal, mock_journaler);
+ };
+
+ InSequence seq;
+ expect_stop_append(mock_journaler, 0);
+ expect_start_append(mock_journaler);
+ expect_shut_down_journaler(mock_journaler);
+
+ C_SaferCond start_ctx;
+ C_SaferCond close_request_ctx;
+
+ journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
+ mock_journal.start_external_replay(&journal_replay, &start_ctx,
+ &close_request_ctx);
+ ASSERT_EQ(0, start_ctx.wait());
+
+ mock_journal.stop_external_replay();
+ ASSERT_EQ(-ECANCELED, close_request_ctx.wait());
+}
+
+TEST_F(TestMockJournal, ExternalReplayFailure) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ ::journal::MockJournaler mock_journaler;
+ open_journal(mock_image_ctx, mock_journal, mock_journaler);
+ BOOST_SCOPE_EXIT_ALL(&) {
+ close_journal(mock_journal, mock_journaler);
+ };
+
+ InSequence seq;
+ expect_stop_append(mock_journaler, -EINVAL);
+ expect_start_append(mock_journaler);
+ expect_shut_down_journaler(mock_journaler);
+
+ C_SaferCond start_ctx;
+ C_SaferCond close_request_ctx;
+
+ journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
+ mock_journal.start_external_replay(&journal_replay, &start_ctx,
+ &close_request_ctx);
+ ASSERT_EQ(-EINVAL, start_ctx.wait());
+ ASSERT_EQ(-EINVAL, close_request_ctx.wait());
+}
+
+TEST_F(TestMockJournal, ExternalReplayCloseRequest) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ ::journal::MockJournaler mock_journaler;
+ open_journal(mock_image_ctx, mock_journal, mock_journaler);
+
+ InSequence seq;
+ expect_stop_append(mock_journaler, 0);
+ expect_shut_down_journaler(mock_journaler);
+
+ C_SaferCond start_ctx;
+ C_SaferCond close_request_ctx;
+
+ journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
+ mock_journal.start_external_replay(&journal_replay, &start_ctx,
+ &close_request_ctx);
+ ASSERT_EQ(0, start_ctx.wait());
+
+ C_SaferCond close_ctx;
+ mock_journal.close(&close_ctx);
+
+ ASSERT_EQ(0, close_request_ctx.wait());
+ mock_journal.stop_external_replay();
+
+ ASSERT_EQ(0, close_ctx.wait());
+}
+
+
} // namespace librbd
};
struct MockTestJournal : public MockJournal {
- MOCK_METHOD2(start_external_replay, void(journal::Replay<MockTestImageCtx> **,
- Context *on_finish));
+ MOCK_METHOD3(start_external_replay, void(journal::Replay<MockTestImageCtx> **,
+ Context *on_finish,
+ Context *on_close_request));
MOCK_METHOD0(stop_external_replay, void());
};
if (m_local_image_ctx->journal != nullptr) {
m_local_journal = m_local_image_ctx->journal;
- Context *ctx = create_context_callback<
+ Context *start_ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
- m_local_journal->start_external_replay(&m_local_replay, ctx);
+ Context *stop_ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_stop_replay_request>(this);
+ m_local_journal->start_external_replay(&m_local_replay, start_ctx,
+ stop_ctx);
return;
}
}
on_replay_interrupted();
}
+template <typename I>
+void ImageReplayer<I>::handle_stop_replay_request(int r) {
+ if (r < 0) {
+ // error starting or we requested the stop -- ignore
+ return;
+ }
+
+ // journal close has been requested, stop replay so the journal
+ // can be closed (since it will wait on replay to finish)
+ dout(20) << dendl;
+ on_stop_journal_replay();
+}
+
template <typename I>
void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
{
// might be invoked multiple times while stopping
return;
}
+ m_stop_requested = true;
m_state = STATE_STOPPING;
}
void start_replay();
void handle_start_replay(int r);
+ void handle_stop_replay_request(int r);
void replay_flush();
void handle_replay_flush(int r);