]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Monitor num of active datasync threads
authorKotresh HR <khiremat@redhat.com>
Sun, 15 Feb 2026 06:39:45 +0000 (12:09 +0530)
committerKotresh HR <khiremat@redhat.com>
Tue, 17 Feb 2026 20:10:51 +0000 (01:40 +0530)
Introduce an atomic counter in PeerReplayer to track the number of
active SnapshotDataSyncThread instances.

The counter is incremented when a datasync thread enters its entry()
function and decremented automatically on exit via a small RAII guard
(DataSyncThreadGuard). This ensures accurate accounting even in the
presence of early returns or future refactoring.

This change helps in handling of shutdown and blocklist scenarios.
At the time of shutdown or blocklisting, datasync threads may still
be processing multiple jobs across different SyncMechanism instances.
It is therefore essential that only the final exiting datasync thread
performs the notifications for all relevant waiters, including the
syncm data queue, syncm queue, and m_cond.

This approach ensures orderly teardown by keeping crawler threads
active until all datasync threads have completed execution.
Terminating crawler threads prematurely—before datasync threads have
exited—can lead to inconsistencies, as crawler threads deregister the
mirroring directory while datasync threads may still be accessing it.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.h

index 3bb16867ffbf0a3fe645049abbfa9060ad1fa5e1..2c94235dbed41cccd4f5b56435c273b4e3df0702 100644 (file)
@@ -95,6 +95,24 @@ private:
     PeerReplayer *m_peer_replayer;
   };
 
+  class SnapshotDataSyncThreadGuard {
+  public:
+    explicit SnapshotDataSyncThreadGuard(PeerReplayer *peer_replayer)
+      : m_peer_replayer(peer_replayer) {
+      m_peer_replayer->m_active_datasync_threads.fetch_add(1, std::memory_order_relaxed);
+    }
+
+    ~SnapshotDataSyncThreadGuard() {
+      m_peer_replayer->m_active_datasync_threads.fetch_sub(1, std::memory_order_relaxed);
+    }
+
+    SnapshotDataSyncThreadGuard(const SnapshotDataSyncThreadGuard&) = delete;
+    SnapshotDataSyncThreadGuard& operator=(const SnapshotDataSyncThreadGuard&) = delete;
+
+  private:
+    PeerReplayer* m_peer_replayer;
+  };
+
   class SnapshotDataSyncThread : public Thread {
   public:
     SnapshotDataSyncThread(PeerReplayer *peer_replayer)
@@ -102,6 +120,7 @@ private:
     }
 
     void *entry() override {
+      SnapshotDataSyncThreadGuard guard(m_peer_replayer); //active thread counter
       m_peer_replayer->run_datasync(this);
       return 0;
     }
@@ -477,6 +496,7 @@ private:
   SnapshotReplayers m_replayers;
 
   SnapshotDataReplayers m_data_replayers;
+  std::atomic<int> m_active_datasync_threads{0};
 
   ceph::mutex smq_lock;
   ceph::condition_variable smq_cv;
@@ -491,6 +511,9 @@ private:
   void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
   bool is_syncm_active(const std::shared_ptr<SyncMechanism>& syncm_obj);
   std::shared_ptr<SyncMechanism> pick_next_syncm() const;
+  int get_active_datasync_threads() const {
+    return m_active_datasync_threads.load(std::memory_order_relaxed);
+  }
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);