AioCompletion *flush_comp = nullptr;
OpTids cancel_op_tids;
+ Contexts op_finish_events;
on_finish = util::create_async_context_callback(
m_image_ctx, on_finish);
flush_comp = create_aio_flush_completion(nullptr, nullptr);;
}
- // cancel ops that are waiting to start
- if (cancel_ops) {
- for (auto &op_event_pair : m_op_events) {
- const OpEvent &op_event = op_event_pair.second;
+ for (auto &op_event_pair : m_op_events) {
+ OpEvent &op_event = op_event_pair.second;
+ if (cancel_ops) {
+ // cancel ops that are waiting to start (waiting for
+ // OpFinishEvent or waiting for ready)
if (op_event.on_start_ready == nullptr) {
cancel_op_tids.push_back(op_event_pair.first);
}
+ } else if (op_event.on_op_finish_event != nullptr) {
+ // start ops waiting for OpFinishEvent
+ Context *on_op_finish_event = nullptr;
+ std::swap(on_op_finish_event, op_event.on_op_finish_event);
+ m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
+ } else {
+ // waiting for op ready
+ assert(op_event.on_start_ready != nullptr);
+ op_event_pair.second.finish_on_ready = true;
}
}
on_start_ready->complete(0);
// cancel has been requested -- send error to paused state machine
- if (m_flush_ctx != nullptr) {
+ if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
return;
}
[on_resume](int r) {
on_resume->complete(r);
});
+
+ // shut down request -- don't expect OpFinishEvent
+ if (op_event.finish_on_ready) {
+ m_image_ctx.op_work_queue->queue(on_resume, 0);
+ }
}
template <typename I>
OpEvent op_event;
Context *on_flush = nullptr;
+ bool shutting_down = false;
{
Mutex::Locker locker(m_lock);
auto op_it = m_op_events.find(op_tid);
op_event = std::move(op_it->second);
m_op_events.erase(op_it);
+ shutting_down = (m_flush_ctx != nullptr);
if (m_op_events.empty() &&
(m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
on_flush = m_flush_ctx;
} else {
// event kicked off by OpFinishEvent
assert((op_event.on_finish_ready != nullptr &&
- op_event.on_finish_safe != nullptr) || r == -ERESTART);
+ op_event.on_finish_safe != nullptr) || shutting_down);
}
// skipped upon error -- so clean up if non-null
expect_op_work_queue(mock_image_ctx);
InSequence seq;
- C_SaferCond on_ready;
- C_SaferCond on_safe;
- when_process(mock_journal_replay, EventEntry{SnapRemoveEvent(123, "snap")},
- &on_ready, &on_safe);
+ Context *on_snap_create_finish = nullptr;
+ expect_snap_create(mock_image_ctx, &on_snap_create_finish, "snap", 123);
- ASSERT_EQ(0, on_ready.wait());
+ Context *on_snap_remove_finish = nullptr;
+ expect_snap_remove(mock_image_ctx, &on_snap_remove_finish, "snap");
+
+ C_SaferCond on_snap_remove_ready;
+ C_SaferCond on_snap_remove_safe;
+ when_process(mock_journal_replay, EventEntry{SnapRemoveEvent(122, "snap")},
+ &on_snap_remove_ready, &on_snap_remove_safe);
+ ASSERT_EQ(0, on_snap_remove_ready.wait());
+
+ C_SaferCond on_snap_create_ready;
+ C_SaferCond on_snap_create_safe;
+ when_process(mock_journal_replay, EventEntry{SnapCreateEvent(123, "snap")},
+ &on_snap_create_ready, &on_snap_create_safe);
+
+ C_SaferCond on_shut_down;
+ mock_journal_replay.shut_down(false, &on_shut_down);
+
+ wait_for_op_invoked(&on_snap_remove_finish, 0);
+ ASSERT_EQ(0, on_snap_remove_safe.wait());
+
+ C_SaferCond on_snap_create_resume;
+ when_replay_op_ready(mock_journal_replay, 123, &on_snap_create_resume);
+ ASSERT_EQ(0, on_snap_create_resume.wait());
+
+ on_snap_create_finish->complete(0);
+ ASSERT_EQ(0, on_snap_create_ready.wait());
+ ASSERT_EQ(0, on_snap_create_safe.wait());
+
+ ASSERT_EQ(0, on_shut_down.wait());
+}
+
+TEST_F(TestMockJournalReplay, MissingOpFinishEventCancelOps) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockJournalReplay mock_journal_replay(mock_image_ctx);
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ Context *on_snap_create_finish = nullptr;
+ expect_snap_create(mock_image_ctx, &on_snap_create_finish, "snap", 123);
+
+ C_SaferCond on_snap_remove_ready;
+ C_SaferCond on_snap_remove_safe;
+ when_process(mock_journal_replay, EventEntry{SnapRemoveEvent(122, "snap")},
+ &on_snap_remove_ready, &on_snap_remove_safe);
+ ASSERT_EQ(0, on_snap_remove_ready.wait());
+
+ C_SaferCond on_snap_create_ready;
+ C_SaferCond on_snap_create_safe;
+ when_process(mock_journal_replay, EventEntry{SnapCreateEvent(123, "snap")},
+ &on_snap_create_ready, &on_snap_create_safe);
+
+ C_SaferCond on_resume;
+ when_replay_op_ready(mock_journal_replay, 123, &on_resume);
+ ASSERT_EQ(0, on_snap_create_ready.wait());
ASSERT_EQ(0, when_shut_down(mock_journal_replay, true));
- ASSERT_EQ(-ERESTART, on_safe.wait());
+ ASSERT_EQ(-ERESTART, on_snap_remove_safe.wait());
+ ASSERT_EQ(-ERESTART, on_snap_create_safe.wait());
}
TEST_F(TestMockJournalReplay, UnknownOpFinishEvent) {