From 6054dcfde3cecdf3dc0e55de39f5473b5773c797 Mon Sep 17 00:00:00 2001 From: Kotresh HR Date: Sun, 15 Feb 2026 14:49:59 +0530 Subject: [PATCH] tools/cephfs_mirror: Handle shutdown/blocklist/cancel at syncm dataq wait 1. Add is_stopping() predicate at sdq_cv wait 2. Use the existing should_backoff() routine to validate shutdown/blocklsit/cancel errors and set corresponding errors. 3. Handle notify logic at the end 4. In shutdown(), notify all syncm's sdq_cv wait Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- src/tools/cephfs_mirror/PeerReplayer.cc | 66 ++++++++++++++++++++++++- src/tools/cephfs_mirror/PeerReplayer.h | 11 +++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 026d8a4512a..c2d25be9a8b 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -324,6 +324,10 @@ void PeerReplayer::shutdown() { // wake up all datasync threads waiting on syncm_q { std::unique_lock smq_l1(smq_lock); + for (auto& syncm : syncm_q) { + std::unique_lock sdq_l1(syncm->get_sdq_lock()); + syncm->sdq_cv_notify_all_unlocked(); + } smq_cv.notify_all(); // wake up syncm_q wait } @@ -1343,9 +1347,37 @@ void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) { } bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) { + std::unique_lock smq_lock(m_peer_replayer.get_smq_lock()); std::unique_lock lock(sdq_lock); dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl; - sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_sync_crawl_finished || m_datasync_error || m_sync_crawl_error;}); + while (true) { + bool ready = sdq_cv.wait_for( + lock, + 2s, + [this] { + return m_peer_replayer.is_stopping() || + !m_sync_dataq.empty() || + m_sync_crawl_finished || + m_datasync_error || + m_sync_crawl_error; + }); + + // check for shutdown/blocklist/cancel + int r = 0; + if (m_peer_replayer.should_backoff(m_dir_root, &r)) { + dout(0) << ": backing off, shutdown/blocklist/cancel r=" << r << dendl; + if (r == -ECANCELED) { + set_datasync_error_unlocked(r); + } else { //shutdown/blocklist + m_peer_replayer.mark_all_syncms_to_backoff_unlocked(r); + } + return false; + } + // predicate true + if (ready) + break; + // otherwise timeout occured, nothing to do - loop again + } dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << " crawl_finished=" << m_sync_crawl_finished << dendl; if (m_datasync_error || m_sync_crawl_error) { @@ -2282,6 +2314,30 @@ void PeerReplayer::mark_and_notify_syncms_to_backoff(int err) { smq_cv.notify_all(); } +void PeerReplayer::mark_all_syncms_to_backoff_unlocked(int err) { + // caller holds the smq_lock and sdq_lock + ceph_assert(ceph_mutex_is_locked_by_me(smq_lock)); + for (auto& syncm : syncm_q) { + ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock())); + syncm->set_datasync_error_unlocked(err); + syncm->mark_backoff_unlocked(); + } +} + +void PeerReplayer::notify_all_syncms_to_backoff() { + // caller holds the smq_lock and sdq_lock + ceph_assert(ceph_mutex_is_locked_by_me(smq_lock)); + if (get_active_datasync_threads() == 1) { //Last thread + for (auto& syncm : syncm_q) { + ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock())); + // To wake up crawler thread + syncm->sdq_cv_notify_all_unlocked(); + } + } + // To wake up other datasync threads to speed up exit + smq_cv.notify_all(); +} + void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl; @@ -2392,6 +2448,14 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { { std::unique_lock smq_l1(smq_lock); std::unique_lock sdq_l1(syncm->get_sdq_lock()); + // backoff ? + const bool syncm_backoff = syncm->get_backoff_unlocked(); + int syncm_errno = syncm->get_datasync_errno_unlocked(); + if (syncm_backoff && syncm_errno != -ECANCELED) { + notify_all_syncms_to_backoff(); //shutdown/blocklist + break; // exit + } + const bool no_in_flight_syncm_jobs = syncm->get_in_flight_unlocked() == 0; const bool crawl_finished = syncm->get_crawl_finished_unlocked(); const bool sync_error = diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index c25c0c120a0..afd79316cd3 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -248,6 +248,9 @@ private: void mark_backoff_unlocked() { m_backoff = true; } + bool get_backoff_unlocked() { + return m_backoff; + } bool get_datasync_error_unlocked() { return m_datasync_error; } @@ -255,6 +258,9 @@ private: std::unique_lock lock(sdq_lock); return m_datasync_errno; } + int get_datasync_errno_unlocked() { + return m_datasync_errno; + } bool get_crawl_error() { std::unique_lock lock(sdq_lock); return m_sync_crawl_error; @@ -523,6 +529,8 @@ private: return m_active_datasync_threads.load(std::memory_order_relaxed); } void mark_and_notify_syncms_to_backoff(int err); + void mark_all_syncms_to_backoff_unlocked(int err); + void notify_all_syncms_to_backoff(); boost::optional pick_directory(); int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer); @@ -570,6 +578,9 @@ private: // add syncm to syncm_q void enqueue_syncm(const std::shared_ptr& item); + ceph::mutex& get_smq_lock() { + return smq_lock; + } }; } // namespace mirror -- 2.47.3