transition_state(STATE_FLUSHING_RESTART, r);
m_lock.Unlock();
- m_journal_replay->shut_down(create_context_callback<
+ m_journal_replay->shut_down(true, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_restart>(this));
} else {
transition_state(STATE_FLUSHING_REPLAY, 0);
m_lock.Unlock();
- m_journal_replay->shut_down(create_context_callback<
+ m_journal_replay->shut_down(false, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_replay>(this));
}
}
m_journaler->stop_replay();
transition_state(STATE_FLUSHING_RESTART, r);
- m_journal_replay->shut_down(create_context_callback<
+ m_journal_replay->shut_down(true, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_restart>(this));
return;
} else if (m_state == STATE_FLUSHING_REPLAY) {
}
template <typename I>
-void Replay<I>::shut_down(Context *on_finish) {
+void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
}
// cancel ops that are waiting to start
- for (auto &op_event_pair : m_op_events) {
- const OpEvent &op_event = op_event_pair.second;
- if (op_event.on_start_ready == nullptr) {
- cancel_op_tids.push_back(op_event_pair.first);
+ if (cancel_ops) {
+ for (auto &op_event_pair : m_op_events) {
+ const OpEvent &op_event = op_event_pair.second;
+ if (op_event.on_start_ready == nullptr) {
+ cancel_op_tids.push_back(op_event_pair.first);
+ }
}
}
void process(bufferlist::iterator *it, Context *on_ready, Context *on_safe);
- void shut_down(Context *on_finish);
+ void shut_down(bool cancel_ops, Context *on_finish);
void flush(Context *on_finish);
void replay_op_ready(uint64_t op_tid, Context *on_resume);
return ctx.wait();
}
- int when_shut_down(MockJournalReplay &mock_journal_replay) {
+ int when_shut_down(MockJournalReplay &mock_journal_replay, bool cancel_ops) {
C_SaferCond ctx;
- mock_journal_replay.shut_down(&ctx);
+ mock_journal_replay.shut_down(cancel_ops, &ctx);
return ctx.wait();
}
ASSERT_EQ(0, on_ready.wait());
expect_aio_flush(mock_image_ctx, mock_aio_image_request, 0);
- ASSERT_EQ(0, when_shut_down(mock_journal_replay));
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
ASSERT_EQ(0, on_safe.wait());
}
ASSERT_EQ(0, on_ready.wait());
expect_aio_flush(mock_image_ctx, mock_aio_image_request, 0);
- ASSERT_EQ(0, when_shut_down(mock_journal_replay));
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
ASSERT_EQ(0, on_safe.wait());
}
when_complete(mock_image_ctx, aio_comp, 0);
ASSERT_EQ(0, on_safe.wait());
- ASSERT_EQ(0, when_shut_down(mock_journal_replay));
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
ASSERT_EQ(0, on_ready.wait());
}
ASSERT_EQ(-EINVAL, on_safe.wait());
expect_aio_flush(mock_image_ctx, mock_aio_image_request, 0);
- ASSERT_EQ(0, when_shut_down(mock_journal_replay));
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
ASSERT_EQ(0, on_ready.wait());
}
ASSERT_EQ(0, on_safe.wait());
}
- ASSERT_EQ(0, when_shut_down(mock_journal_replay));
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
}
TEST_F(TestMockJournalReplay, PauseIO) {
ASSERT_EQ(0, on_safe.wait());
}
- ASSERT_EQ(0, when_shut_down(mock_journal_replay));
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
}
TEST_F(TestMockJournalReplay, Flush) {
ASSERT_EQ(0, on_ready.wait());
- ASSERT_EQ(0, when_shut_down(mock_journal_replay));
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, true));
ASSERT_EQ(-ERESTART, on_safe.wait());
}
s_instance = this;
}
- MOCK_METHOD1(shut_down, void(Context *));
+ MOCK_METHOD2(shut_down, void(bool cancel_ops, Context *));
MOCK_METHOD3(process, void(bufferlist::iterator*, Context *, Context *));
MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *));
};
return new Replay();
}
- void shut_down(Context *on_finish) {
- MockReplay::get_instance().shut_down(on_finish);
+ void shut_down(bool cancel_ops, Context *on_finish) {
+ MockReplay::get_instance().shut_down(cancel_ops, on_finish);
}
void process(bufferlist::iterator *it, Context *on_ready,
}
void expect_shut_down_replay(MockImageCtx &mock_image_ctx,
- MockJournalReplay &mock_journal_replay, int r) {
- EXPECT_CALL(mock_journal_replay, shut_down(_))
- .WillOnce(Invoke([this, &mock_image_ctx, r](Context *on_flush) {
- this->commit_replay(mock_image_ctx, on_flush, r);}));
+ MockJournalReplay &mock_journal_replay, int r,
+ bool cancel_ops = false) {
+ EXPECT_CALL(mock_journal_replay, shut_down(cancel_ops, _))
+ .WillOnce(WithArg<1>(Invoke([this, &mock_image_ctx, r](Context *on_flush) {
+ this->commit_replay(mock_image_ctx, on_flush, r);})));
}
void expect_get_data(::journal::MockReplayEntry &mock_replay_entry) {
MockJournalReplay mock_journal_replay;
expect_stop_replay(mock_journaler);
- expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
+ expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
// replay failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_try_pop_front(mock_journaler, false, mock_replay_entry);
expect_stop_replay(mock_journaler);
- expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
+ expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
// replay write-to-disk failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_stop_replay(mock_journaler);
Context *on_flush = nullptr;
- EXPECT_CALL(mock_journal_replay, shut_down(_))
- .WillOnce(DoAll(SaveArg<0>(&on_flush),
+ EXPECT_CALL(mock_journal_replay, shut_down(false, _))
+ .WillOnce(DoAll(SaveArg<1>(&on_flush),
InvokeWithoutArgs(this, &TestMockJournal::wake_up)));
// replay write-to-disk failure should result in replay-restart
if (m_local_replay) {
Mutex::Locker locker(m_lock);
- shut_down_journal_replay();
+ shut_down_journal_replay(true);
m_local_image_ctx->journal->stop_external_replay();
m_local_replay = nullptr;
}
m_state = STATE_STOPPING;
}
- shut_down_journal_replay();
+ shut_down_journal_replay(false);
m_local_image_ctx->journal->stop_external_replay();
m_local_replay = nullptr;
return r;
}
-void ImageReplayer::shut_down_journal_replay()
+void ImageReplayer::shut_down_journal_replay(bool cancel_ops)
{
C_SaferCond cond;
- m_local_replay->shut_down(&cond);
+ m_local_replay->shut_down(cancel_ops, &cond);
int r = cond.wait();
if (r < 0) {
derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
std::string *image_id);
int copy();
- void shut_down_journal_replay();
+ void shut_down_journal_replay(bool cancel_ops);
friend std::ostream &operator<<(std::ostream &os,
const ImageReplayer &replayer);