]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Handle shutdown/blocklist at syncm_q wait
authorKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 14:20:51 +0000 (19:50 +0530)
committerKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 15:34:08 +0000 (21:04 +0530)
1. Convert smq_cv.wait to timed wait as blocklist doesn't have
   predicate to evaluate. Evaluate is_shutdown() as predicate.
   When either of the two is true, set corresponding error and
   backoff flag in all the syncm objects. The last thread data
   sync thread would wake up all the crawler threads. This is
   necessary to wake up the crawler threads whose data queue
   is not picked by any datasync threads.
2. In shutdown(), change the order of join, join datasync threads
   first. The idea is kill datasync threads first before crawler
   threads as datasync threads are extension of crawler threads
   and othewise might cause issues. Also wake up smq_cv wait for
   shutdown.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 88c78799f8a14cdf3b3bb9b1b599e35e2910cef0..ca539659623c1f6ef1e5979049c78d8fa4417b6c 100644 (file)
@@ -320,19 +320,29 @@ void PeerReplayer::shutdown() {
     dout(1) << ": shutdown is already in progress - return"<< dendl;
     return;
   }
+
+  // wake up all datasync threads waiting on syncm_q
+  {
+    std::unique_lock smq_l1(smq_lock);
+    smq_cv.notify_all(); // wake up syncm_q wait
+  }
+
+  // Join data sync threads first
+  for (auto &replayer : m_data_replayers) {
+    replayer->join();
+  }
+  m_data_replayers.clear();
+
+  // Wake up crawler thread shutdown wait after datasync thread die
   {
     std::scoped_lock lock(m_lock);
-    m_cond.notify_all(); //wake up shutdown wait
+    m_cond.notify_all();
   }
 
   for (auto &replayer : m_replayers) {
     replayer->join();
   }
   m_replayers.clear();
-  for (auto &replayer : m_data_replayers) {
-    replayer->join();
-  }
-  m_data_replayers.clear();
 
   ceph_unmount(m_remote_mount);
   ceph_release(m_remote_mount);
@@ -2251,32 +2261,66 @@ std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm_and_m
   return nullptr;
 }
 
+void PeerReplayer::mark_and_notify_syncms_to_backoff(int err) {
+  // caller holds the smq_lock
+  ceph_assert(ceph_mutex_is_locked_by_me(smq_lock));
+  for (auto& syncm : syncm_q) {
+    std::unique_lock sdq_lock(syncm->get_sdq_lock());
+    syncm->set_datasync_error_unlocked(err);
+    syncm->mark_backoff_unlocked();
+    if (get_active_datasync_threads() == 1) { //Last thread
+      // To wake up crawler thread whose dataq process is not started
+      syncm->sdq_cv_notify_all_unlocked();
+    }
+  }
+  // To wake up other datasync threads to speed up exit
+  smq_cv.notify_all();
+}
+
 void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
   dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
 
-  /* The entire snapshot is synced outside the lock. The m_lock and m_cond pair
-   * is used for these threads along with crawler threads to work well with all
-   * terminal conditions like shutdown.
+  /* The m_stopping is made atomic and m_lock is no longer required for state
+   * change or access. Hence data sync thread can get rid of waiting for is_stopping
+   * using m_lock
    */
-  std::unique_lock locker(m_lock);
   while (true) {
-    m_cond.wait_for(locker, 1s, [this]{return is_stopping();});
-    if (is_stopping()) {
-      dout(5) << ": exiting snapshot data replayer=" << data_replayer << dendl;
-      break;
-    }
-    // do not check if client is blocklisted under lock
-    locker.unlock();
-    if (m_fs_mirror->is_blocklisted()) {
-      dout(5) << ": exiting snapshot data replayer=" << data_replayer << " as client is blocklisted" << dendl;
-      break;
-    }
-
+    bool shutdown = false;
+    bool blocklist = false;
     std::shared_ptr<SyncMechanism> syncm;
     {
       std::unique_lock lock(smq_lock);
       dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
-      smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm_and_mark(); return syncm != nullptr; });
+      while (true) {
+        bool ready = smq_cv.wait_for(lock, 2s, [this, &syncm] {
+                syncm = pick_next_syncm_and_mark();
+                return is_stopping() || syncm != nullptr;
+                });
+        // immediate shutdown - predicate is true
+        if (is_stopping()) {
+          dout(5) << ": exiting snapshot data replayer=" << data_replayer
+                  << " as mirroring is shutting down" << dendl;
+          shutdown = true;
+          mark_and_notify_syncms_to_backoff(-EINPROGRESS);
+          break;
+        }
+        // work available - predicate is true
+        if (ready)
+          break;
+        // Timed wake up path - blocklist check
+        if (m_fs_mirror->is_blocklisted()) {
+          dout(5) << ": exiting snapshot data replayer=" << data_replayer
+                  << " as client is blocklisted" << dendl;
+          blocklist = true;
+          mark_and_notify_syncms_to_backoff(-EBLOCKLISTED);
+          break;
+        }
+        // otherwise timeout occured, nothing to do - loop again
+      }
+
+      if (shutdown || blocklist) {
+        break; //exit
+      }
       // syncm is gauranteed to be non-null because of the predicate used in above wait.
       dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
     }
@@ -2365,9 +2409,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
       }
     }
     syncm->dec_in_flight();
-    //lock again to satify m_cond
-    locker.lock();
-  }
+  } // outer while
 }
 
 void PeerReplayer::peer_status(Formatter *f) {
index 066d77310b10a83b8121335c0085f84c47aaaa05..32f4e74de13a534908163b1a95b3e53b1cfa77bc 100644 (file)
@@ -235,6 +235,13 @@ private:
       m_datasync_error = true;
       m_datasync_errno = err;
     }
+    void set_datasync_error_unlocked(int err) {
+      m_datasync_error = true;
+      m_datasync_errno = err;
+    }
+    void mark_backoff_unlocked() {
+      m_backoff = true;
+    }
     bool get_datasync_error_unlocked() {
       return m_datasync_error;
     }
@@ -303,6 +310,7 @@ private:
     bool m_sync_done = false;
     bool m_datasync_error = false;
     int m_datasync_errno = 0;
+    bool m_backoff = false;
   };
 
   class RemoteSync : public SyncMechanism {
@@ -513,6 +521,7 @@ private:
   int get_active_datasync_threads() const {
     return m_active_datasync_threads.load(std::memory_order_relaxed);
   }
+  void mark_and_notify_syncms_to_backoff(int err);
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);