]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/cephfs_mirror: Monitor num of active datasync threads
authorKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 14:06:39 +0000 (19:36 +0530)
committerKotresh HR <khiremat@redhat.com>
Sun, 22 Feb 2026 18:56:35 +0000 (00:26 +0530)
Introduce an atomic counter in PeerReplayer to track the number of
active SnapshotDataSyncThread instances.

The counter is incremented when a datasync thread enters its entry()
function and decremented automatically on exit via a small RAII guard
(DataSyncThreadGuard). This ensures accurate accounting even in the
presence of early returns or future refactoring.

This change helps in handling of shutdown and blocklist scenarios.
At the time of shutdown or blocklisting, datasync threads may still
be processing multiple jobs across different SyncMechanism instances.
It is therefore essential that only the final exiting datasync thread
performs the notifications for all relevant waiters, including the
syncm data queue, syncm queue, and m_cond.

This approach ensures orderly teardown by keeping crawler threads
active until all datasync threads have completed execution.
Terminating crawler threads prematurely—before datasync threads have
exited—can lead to inconsistencies, as crawler threads deregister the
mirroring directory while datasync threads may still be accessing it.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.h

index 06873df50e07e894b59b58b095a1db81043214eb..bcab8e3018fc78df6343e40941eb6b65d6584cba 100644 (file)
@@ -95,6 +95,24 @@ private:
     PeerReplayer *m_peer_replayer;
   };
 
+  class SnapshotDataSyncThreadGuard {
+  public:
+    explicit SnapshotDataSyncThreadGuard(PeerReplayer *peer_replayer)
+      : m_peer_replayer(peer_replayer) {
+      m_peer_replayer->m_active_datasync_threads.fetch_add(1, std::memory_order_relaxed);
+    }
+
+    ~SnapshotDataSyncThreadGuard() {
+      m_peer_replayer->m_active_datasync_threads.fetch_sub(1, std::memory_order_relaxed);
+    }
+
+    SnapshotDataSyncThreadGuard(const SnapshotDataSyncThreadGuard&) = delete;
+    SnapshotDataSyncThreadGuard& operator=(const SnapshotDataSyncThreadGuard&) = delete;
+
+  private:
+    PeerReplayer* m_peer_replayer;
+  };
+
   class SnapshotDataSyncThread : public Thread {
   public:
     SnapshotDataSyncThread(PeerReplayer *peer_replayer)
@@ -102,6 +120,7 @@ private:
     }
 
     void *entry() override {
+      SnapshotDataSyncThreadGuard guard(m_peer_replayer); //active thread counter
       m_peer_replayer->run_datasync(this);
       return 0;
     }
@@ -475,6 +494,7 @@ private:
   SnapshotReplayers m_replayers;
 
   SnapshotDataReplayers m_data_replayers;
+  std::atomic<int> m_active_datasync_threads{0};
 
   ceph::mutex smq_lock;
   ceph::condition_variable smq_cv;
@@ -489,6 +509,9 @@ private:
   void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
   bool is_syncm_active(const std::shared_ptr<SyncMechanism>& syncm_obj);
   std::shared_ptr<SyncMechanism> pick_next_syncm_and_mark();
+  int get_active_datasync_threads() const {
+    return m_active_datasync_threads.load(std::memory_order_relaxed);
+  }
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);