From: Kotresh HR Date: Sat, 21 Feb 2026 14:06:39 +0000 (+0530) Subject: tools/cephfs_mirror: Monitor num of active datasync threads X-Git-Tag: testing/wip-vshankar-testing-20260224.100235^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f9014ba3a403e3b50756ecfb7f424a06ba139857;p=ceph-ci.git tools/cephfs_mirror: Monitor num of active datasync threads Introduce an atomic counter in PeerReplayer to track the number of active SnapshotDataSyncThread instances. The counter is incremented when a datasync thread enters its entry() function and decremented automatically on exit via a small RAII guard (DataSyncThreadGuard). This ensures accurate accounting even in the presence of early returns or future refactoring. This change helps in handling of shutdown and blocklist scenarios. At the time of shutdown or blocklisting, datasync threads may still be processing multiple jobs across different SyncMechanism instances. It is therefore essential that only the final exiting datasync thread performs the notifications for all relevant waiters, including the syncm data queue, syncm queue, and m_cond. This approach ensures orderly teardown by keeping crawler threads active until all datasync threads have completed execution. Terminating crawler threads prematurely—before datasync threads have exited—can lead to inconsistencies, as crawler threads deregister the mirroring directory while datasync threads may still be accessing it. Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 06873df50e0..bcab8e3018f 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -95,6 +95,24 @@ private: PeerReplayer *m_peer_replayer; }; + class SnapshotDataSyncThreadGuard { + public: + explicit SnapshotDataSyncThreadGuard(PeerReplayer *peer_replayer) + : m_peer_replayer(peer_replayer) { + m_peer_replayer->m_active_datasync_threads.fetch_add(1, std::memory_order_relaxed); + } + + ~SnapshotDataSyncThreadGuard() { + m_peer_replayer->m_active_datasync_threads.fetch_sub(1, std::memory_order_relaxed); + } + + SnapshotDataSyncThreadGuard(const SnapshotDataSyncThreadGuard&) = delete; + SnapshotDataSyncThreadGuard& operator=(const SnapshotDataSyncThreadGuard&) = delete; + + private: + PeerReplayer* m_peer_replayer; + }; + class SnapshotDataSyncThread : public Thread { public: SnapshotDataSyncThread(PeerReplayer *peer_replayer) @@ -102,6 +120,7 @@ private: } void *entry() override { + SnapshotDataSyncThreadGuard guard(m_peer_replayer); //active thread counter m_peer_replayer->run_datasync(this); return 0; } @@ -475,6 +494,7 @@ private: SnapshotReplayers m_replayers; SnapshotDataReplayers m_data_replayers; + std::atomic m_active_datasync_threads{0}; ceph::mutex smq_lock; ceph::condition_variable smq_cv; @@ -489,6 +509,9 @@ private: void remove_syncm(const std::shared_ptr& syncm_obj); bool is_syncm_active(const std::shared_ptr& syncm_obj); std::shared_ptr pick_next_syncm_and_mark(); + int get_active_datasync_threads() const { + return m_active_datasync_threads.load(std::memory_order_relaxed); + } boost::optional pick_directory(); int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);