]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: track journal replay flush requests to prevent race 35348/head
authorJason Dillaman <dillaman@redhat.com>
Wed, 3 Jun 2020 13:40:32 +0000 (09:40 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 3 Jun 2020 13:40:32 +0000 (09:40 -0400)
If a journal replay flush is in-progress when the ImageReplayer is stopped,
it can race and result in an assertion failure due to two attempted shutdowns
of the same journal replay state machine.

Fixes: https://tracker.ceph.com/issues/45409
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
src/tools/rbd_mirror/image_replayer/journal/Replayer.h

index f1552ac68ca591743fdb2ec9d406875e6428a9ef..3af886fdf3abd9f21acac3676746509e7dd67b30 100644 (file)
@@ -226,7 +226,7 @@ void Replayer<I>::shut_down(Context* on_finish) {
 
   cancel_delayed_preprocess_task();
   cancel_flush_local_replay_task();
-  shut_down_local_journal_replay();
+  wait_for_flush();
 }
 
 template <typename I>
@@ -405,14 +405,37 @@ bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
 }
 
 template <typename I>
-void Replayer<I>::shut_down_local_journal_replay() {
+void Replayer<I>::wait_for_flush() {
   ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
 
+  // ensure that we don't have two concurrent local journal replay shut downs
+  dout(10) << dendl;
+  auto ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<
+      Replayer<I>, &Replayer<I>::handle_wait_for_flush>(this));
+  m_flush_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_flush(int r) {
+  dout(10) << "r=" << r << dendl;
+
+  shut_down_local_journal_replay();
+}
+
+template <typename I>
+void Replayer<I>::shut_down_local_journal_replay() {
+  std::unique_lock locker{m_lock};
+
   if (m_local_journal_replay == nullptr) {
     wait_for_event_replay();
     return;
   }
 
+  // It's required to stop the local journal replay state machine prior to
+  // waiting for the events to complete. This is to ensure that IO is properly
+  // flushed (it might be batched), wait for any running ops to complete, and
+  // to cancel any ops waiting for their associated OnFinish events.
   dout(10) << dendl;
   auto ctx = create_context_callback<
     Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
@@ -799,6 +822,7 @@ void Replayer<I>::handle_replay_ready(
 template <typename I>
 void Replayer<I>::replay_flush() {
   dout(10) << dendl;
+  m_flush_tracker.start_op();
 
   // shut down the replay to flush all IO and ops and create a new
   // replayer to handle the new tag epoch
@@ -831,6 +855,8 @@ void Replayer<I>::handle_replay_flush_shut_down(int r) {
 template <typename I>
 void Replayer<I>::handle_replay_flush(int r) {
   dout(10) << "r=" << r << dendl;
+  m_flush_tracker.finish_op();
+
   if (r < 0) {
     derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
     handle_replay_complete(r, "replay flush encountered an error");
index f5d59a07f8a4733d1f9e864dab4be2e815aea1fe..b279aa9a297e19308cb2bfef72f04cb96313ad78 100644 (file)
@@ -145,6 +145,9 @@ private:
    * REPLAY_COMPLETE  < * * * * * * * * * * * * * * * * * * *   *
    *    |                                                       *
    *    v                                                       *
+   * WAIT_FOR_FLUSH                                             *
+   *    |                                                       *
+   *    v                                                       *
    * SHUT_DOWN_LOCAL_JOURNAL_REPLAY                             *
    *    |                                                       *
    *    v                                                       *
@@ -214,6 +217,8 @@ private:
   librbd::journal::TagData m_replay_tag_data;
   librbd::journal::EventEntry m_event_entry;
 
+  AsyncOpTracker m_flush_tracker;
+
   AsyncOpTracker m_event_replay_tracker;
   Context *m_delayed_preprocess_task = nullptr;
 
@@ -240,6 +245,9 @@ private:
 
   bool notify_init_complete(std::unique_lock<ceph::mutex>& locker);
 
+  void wait_for_flush();
+  void handle_wait_for_flush(int r);
+
   void shut_down_local_journal_replay();
   void handle_shut_down_local_journal_replay(int r);