]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/cephfs_mirror: Handle shutdown/blocklist/cancel at syncm dataq wait
authorKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 15:40:08 +0000 (21:10 +0530)
committerKotresh HR <khiremat@redhat.com>
Sun, 22 Feb 2026 18:56:35 +0000 (00:26 +0530)
1. Add is_stopping() predicate at sdq_cv wait
2. Use the existing should_backoff() routine to validate
   shutdown/blocklsit/cancel errors and set corresponding errors.
3. Handle notify logic at the end
4. In shutdown(), notify all syncm's sdq_cv wait

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 00718ad55d5e2b361084d737754291e18f10d6d4..b77936a4e1ff965d74c87e558e03a7426488919a 100644 (file)
@@ -324,6 +324,10 @@ void PeerReplayer::shutdown() {
   // wake up all datasync threads waiting on syncm_q
   {
     std::unique_lock smq_l1(smq_lock);
+    for (auto& syncm : syncm_q) {
+      std::unique_lock sdq_l1(syncm->get_sdq_lock());
+      syncm->sdq_cv_notify_all_unlocked();
+    }
     smq_cv.notify_all(); // wake up syncm_q wait
   }
 
@@ -1337,9 +1341,36 @@ void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) {
 }
 
 bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
+  std::unique_lock smq_lock(m_peer_replayer.get_smq_lock());
   std::unique_lock lock(sdq_lock);
   dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
   sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error || m_crawl_error;});
+  while (true) {
+    bool ready = sdq_cv.wait_for(lock, 2s, [this] {
+      return m_peer_replayer.is_stopping() ||
+             !m_sync_dataq.empty() ||
+             m_crawl_finished ||
+             m_datasync_error ||
+             m_crawl_error;
+    });
+
+    // check for shutdown/blocklist/cancel
+    int r = 0;
+    if (m_peer_replayer.should_backoff(m_dir_root, &r)) {
+      dout(0) << ": backing off, shutdown/blocklist/cancel  r=" << r << dendl;
+      if (r == -ECANCELED) {
+        set_datasync_error_unlocked(r);
+        dout(5) << ": snapshot data replayer, mirroring cancelled for syncm=" << this << dendl;
+      } else { //shutdown/blocklist
+        m_peer_replayer.mark_all_syncms_to_backoff_unlocked(r);
+      }
+      return false;
+    }
+    // predicate true
+    if (ready)
+      break;
+    // otherwise timeout occured, nothing to do - loop again
+  }
   dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
            << " crawl_finished=" << m_crawl_finished << dendl;
   if (m_datasync_error || m_crawl_error) {
@@ -2277,6 +2308,30 @@ void PeerReplayer::mark_and_notify_syncms_to_backoff(int err) {
   smq_cv.notify_all();
 }
 
+void PeerReplayer::mark_all_syncms_to_backoff_unlocked(int err) {
+  // caller holds the smq_lock and sdq_lock
+  ceph_assert(ceph_mutex_is_locked_by_me(smq_lock));
+  for (auto& syncm : syncm_q) {
+    ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock()));
+    syncm->set_datasync_error_unlocked(err);
+    syncm->mark_backoff_unlocked();
+  }
+}
+
+void PeerReplayer::notify_all_syncms_to_backoff() {
+  // caller holds the smq_lock and sdq_lock
+  ceph_assert(ceph_mutex_is_locked_by_me(smq_lock));
+  if (get_active_datasync_threads() == 1) { //Last thread
+    for (auto& syncm : syncm_q) {
+      ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock()));
+      // To wake up crawler thread
+      syncm->sdq_cv_notify_all_unlocked();
+    }
+  }
+  // To wake up other datasync threads to speed up exit
+  smq_cv.notify_all();
+}
+
 void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
   dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
 
@@ -2387,6 +2442,15 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     {
       std::unique_lock smq_l1(smq_lock);
       std::unique_lock sdq_l1(syncm->get_sdq_lock());
+      // backoff ?
+      const bool syncm_backoff = syncm->get_backoff_unlocked();
+      int syncm_errno = syncm->get_datasync_errno_unlocked();
+      if (syncm_backoff && syncm_errno != -ECANCELED) {
+        notify_all_syncms_to_backoff(); //shutdown/blocklist
+        dout(5) << ": exiting snapshot data replayer=" << data_replayer
+                << " as client is blocklisted or mirroring is shutting down, error=" << syncm_errno << dendl;
+        break; // exit
+      }
       const bool last_in_flight_syncm = syncm->get_in_flight_unlocked() == 1;
       const bool crawl_finished = syncm->get_crawl_finished_unlocked();
       const bool sync_error =
index 2ba52ad57bb0664b15665c054f6849937abc9c4d..8472ad0f9afa7cb854c633586684e93bde691d95 100644 (file)
@@ -242,6 +242,9 @@ private:
     void mark_backoff_unlocked() {
       m_backoff = true;
     }
+    bool get_backoff_unlocked() {
+      return m_backoff;
+    }
     bool get_datasync_error_unlocked() {
       return m_datasync_error;
     }
@@ -249,6 +252,9 @@ private:
       std::unique_lock lock(sdq_lock);
       return m_datasync_errno;
     }
+    int get_datasync_errno_unlocked() {
+      return m_datasync_errno;
+    }
     bool get_crawl_error() {
       std::unique_lock lock(sdq_lock);
       return m_crawl_error;
@@ -521,6 +527,8 @@ private:
     return m_active_datasync_threads.load(std::memory_order_relaxed);
   }
   void mark_and_notify_syncms_to_backoff(int err);
+  void mark_all_syncms_to_backoff_unlocked(int err);
+  void notify_all_syncms_to_backoff();
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);
@@ -568,6 +576,9 @@ private:
 
   // add syncm to syncm_q
   void enqueue_syncm(const std::shared_ptr<SyncMechanism>& item);
+  ceph::mutex& get_smq_lock() {
+    return smq_lock;
+  }
 };
 
 } // namespace mirror