cancel_delayed_preprocess_task();
cancel_flush_local_replay_task();
- shut_down_local_journal_replay();
+ wait_for_flush();
}
template <typename I>
}
template <typename I>
-void Replayer<I>::shut_down_local_journal_replay() {
+void Replayer<I>::wait_for_flush() {
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ // ensure that we don't have two concurrent local journal replay shut downs
+ dout(10) << dendl;
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_wait_for_flush>(this));
+ m_flush_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_flush(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ shut_down_local_journal_replay();
+}
+
+template <typename I>
+void Replayer<I>::shut_down_local_journal_replay() {
+ std::unique_lock locker{m_lock};
+
if (m_local_journal_replay == nullptr) {
wait_for_event_replay();
return;
}
+ // It's required to stop the local journal replay state machine prior to
+ // waiting for the events to complete. This is to ensure that IO is properly
+ // flushed (it might be batched), wait for any running ops to complete, and
+ // to cancel any ops waiting for their associated OnFinish events.
dout(10) << dendl;
auto ctx = create_context_callback<
Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
template <typename I>
void Replayer<I>::replay_flush() {
dout(10) << dendl;
+ m_flush_tracker.start_op();
// shut down the replay to flush all IO and ops and create a new
// replayer to handle the new tag epoch
template <typename I>
void Replayer<I>::handle_replay_flush(int r) {
dout(10) << "r=" << r << dendl;
+ m_flush_tracker.finish_op();
+
if (r < 0) {
derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
handle_replay_complete(r, "replay flush encountered an error");
* REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * *
* | *
* v *
+ * WAIT_FOR_FLUSH *
+ * | *
+ * v *
* SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
* | *
* v *
librbd::journal::TagData m_replay_tag_data;
librbd::journal::EventEntry m_event_entry;
+ AsyncOpTracker m_flush_tracker;
+
AsyncOpTracker m_event_replay_tracker;
Context *m_delayed_preprocess_task = nullptr;
bool notify_init_complete(std::unique_lock<ceph::mutex>& locker);
+ void wait_for_flush();
+ void handle_wait_for_flush(int r);
+
void shut_down_local_journal_replay();
void handle_shut_down_local_journal_replay(int r);