From: Kotresh HR Date: Sat, 21 Feb 2026 15:40:08 +0000 (+0530) Subject: tools/cephfs_mirror: Handle shutdown/blocklist/cancel at syncm dataq wait X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=99a36ea87213ffda9302a37f3330ead4b99a8ee5;p=ceph-ci.git tools/cephfs_mirror: Handle shutdown/blocklist/cancel at syncm dataq wait 1. Add is_stopping() predicate at sdq_cv wait 2. Use the existing should_backoff() routine to validate shutdown/blocklsit/cancel errors and set corresponding errors. 3. Handle notify logic at the end 4. In shutdown(), notify all syncm's sdq_cv wait Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index ca539659623..884ec615093 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -324,6 +324,10 @@ void PeerReplayer::shutdown() { // wake up all datasync threads waiting on syncm_q { std::unique_lock smq_l1(smq_lock); + for (auto& syncm : syncm_q) { + std::unique_lock sdq_l1(syncm->get_sdq_lock()); + syncm->sdq_cv_notify_all_unlocked(); + } smq_cv.notify_all(); // wake up syncm_q wait } @@ -1337,9 +1341,35 @@ void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) { } bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) { + std::unique_lock smq_lock(m_peer_replayer.get_smq_lock()); std::unique_lock lock(sdq_lock); dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl; sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished || m_datasync_error || m_crawl_error;}); + while (true) { + bool ready = sdq_cv.wait_for(lock, 2s, [this] { + return m_peer_replayer.is_stopping() || + !m_sync_dataq.empty() || + m_crawl_finished || + m_datasync_error || + m_crawl_error; + }); + + // check for shutdown/blocklist/cancel + int r = 0; + if (m_peer_replayer.should_backoff(m_dir_root, &r)) { + dout(0) << ": backing off, shutdown/blocklist/cancel r=" << r << dendl; + if (r == -ECANCELED) { + set_datasync_error_unlocked(r); + } else { //shutdown/blocklist + m_peer_replayer.mark_all_syncms_to_backoff_unlocked(r); + } + return false; + } + // predicate true + if (ready) + break; + // otherwise timeout occured, nothing to do - loop again + } dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << " crawl_finished=" << m_crawl_finished << dendl; if (m_datasync_error || m_crawl_error) { @@ -2277,6 +2307,30 @@ void PeerReplayer::mark_and_notify_syncms_to_backoff(int err) { smq_cv.notify_all(); } +void PeerReplayer::mark_all_syncms_to_backoff_unlocked(int err) { + // caller holds the smq_lock and sdq_lock + ceph_assert(ceph_mutex_is_locked_by_me(smq_lock)); + for (auto& syncm : syncm_q) { + ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock())); + syncm->set_datasync_error_unlocked(err); + syncm->mark_backoff_unlocked(); + } +} + +void PeerReplayer::notify_all_syncms_to_backoff() { + // caller holds the smq_lock and sdq_lock + ceph_assert(ceph_mutex_is_locked_by_me(smq_lock)); + if (get_active_datasync_threads() == 1) { //Last thread + for (auto& syncm : syncm_q) { + ceph_assert(ceph_mutex_is_locked_by_me(syncm->get_sdq_lock())); + // To wake up crawler thread + syncm->sdq_cv_notify_all_unlocked(); + } + } + // To wake up other datasync threads to speed up exit + smq_cv.notify_all(); +} + void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl; @@ -2387,6 +2441,13 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { { std::unique_lock smq_l1(smq_lock); std::unique_lock sdq_l1(syncm->get_sdq_lock()); + // backoff ? + const bool syncm_backoff = syncm->get_backoff_unlocked(); + int syncm_errno = syncm->get_datasync_errno_unlocked(); + if (syncm_backoff && syncm_errno != -ECANCELED) { + notify_all_syncms_to_backoff(); //shutdown/blocklist + break; // exit + } const bool last_in_flight_syncm = syncm->get_in_flight_unlocked() == 1; const bool crawl_finished = syncm->get_crawl_finished_unlocked(); const bool sync_error = diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 32f4e74de13..bbc2e3c9f82 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -242,6 +242,9 @@ private: void mark_backoff_unlocked() { m_backoff = true; } + bool get_backoff_unlocked() { + return m_backoff; + } bool get_datasync_error_unlocked() { return m_datasync_error; } @@ -249,6 +252,9 @@ private: std::unique_lock lock(sdq_lock); return m_datasync_errno; } + int get_datasync_errno_unlocked() { + return m_datasync_errno; + } bool get_crawl_error() { std::unique_lock lock(sdq_lock); return m_crawl_error; @@ -522,6 +528,8 @@ private: return m_active_datasync_threads.load(std::memory_order_relaxed); } void mark_and_notify_syncms_to_backoff(int err); + void mark_all_syncms_to_backoff_unlocked(int err); + void notify_all_syncms_to_backoff(); boost::optional pick_directory(); int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer); @@ -569,6 +577,9 @@ private: // add syncm to syncm_q void enqueue_syncm(const std::shared_ptr& item); + ceph::mutex& get_smq_lock() { + return smq_lock; + } }; } // namespace mirror