From 6547e06a7b8456154c373d5238da0a77b35093ca Mon Sep 17 00:00:00 2001 From: Kotresh HR Date: Sun, 22 Feb 2026 23:40:32 +0530 Subject: [PATCH] tools/cephfs_mirror: Handle shutdown/blocklist at syncm_q wait 1. Convert smq_cv.wait to timed wait as blocklist doesn't have predicate to evaluate. Evaluate is_shutdown() as predicate. When either of the two is true, set corresponding error and backoff flag in all the syncm objects. The last thread data sync thread would wake up all the crawler threads. This is necessary to wake up the crawler threads whose data queue is not picked by any datasync threads. 2. In shutdown(), change the order of join, join datasync threads first. The idea is kill datasync threads first before crawler threads as datasync threads are extension of crawler threads and othewise might cause issues. Also wake up smq_cv wait for shutdown. Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- src/tools/cephfs_mirror/PeerReplayer.cc | 92 ++++++++++++++++++------- src/tools/cephfs_mirror/PeerReplayer.h | 9 +++ 2 files changed, 76 insertions(+), 25 deletions(-) diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 4338806549b..00718ad55d5 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -320,19 +320,29 @@ void PeerReplayer::shutdown() { dout(1) << ": shutdown is already in progress - return"<< dendl; return; } + + // wake up all datasync threads waiting on syncm_q + { + std::unique_lock smq_l1(smq_lock); + smq_cv.notify_all(); // wake up syncm_q wait + } + + // Join data sync threads first + for (auto &replayer : m_data_replayers) { + replayer->join(); + } + m_data_replayers.clear(); + + // Wake up crawler thread shutdown wait after datasync thread die { std::scoped_lock lock(m_lock); - m_cond.notify_all(); //wake up shutdown wait + m_cond.notify_all(); } for (auto &replayer : m_replayers) { replayer->join(); } m_replayers.clear(); - for (auto &replayer : m_data_replayers) { - replayer->join(); - } - m_data_replayers.clear(); ceph_unmount(m_remote_mount); ceph_release(m_remote_mount); @@ -2251,32 +2261,66 @@ std::shared_ptr PeerReplayer::pick_next_syncm_and_m return nullptr; } +void PeerReplayer::mark_and_notify_syncms_to_backoff(int err) { + // caller holds the smq_lock + ceph_assert(ceph_mutex_is_locked_by_me(smq_lock)); + for (auto& syncm : syncm_q) { + std::unique_lock sdq_lock(syncm->get_sdq_lock()); + syncm->set_datasync_error_unlocked(err); + syncm->mark_backoff_unlocked(); + if (get_active_datasync_threads() == 1) { //Last thread + // To wake up crawler thread whose dataq process is not started + syncm->sdq_cv_notify_all_unlocked(); + } + } + // To wake up other datasync threads to speed up exit + smq_cv.notify_all(); +} + void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl; - /* The entire snapshot is synced outside the lock. The m_lock and m_cond pair - * is used for these threads along with crawler threads to work well with all - * terminal conditions like shutdown. + /* The m_stopping is made atomic and m_lock is no longer required for state + * change or access. Hence data sync thread can get rid of waiting for is_stopping + * using m_lock */ - std::unique_lock locker(m_lock); while (true) { - m_cond.wait_for(locker, 1s, [this]{return is_stopping();}); - if (is_stopping()) { - dout(5) << ": exiting snapshot data replayer=" << data_replayer << dendl; - break; - } - // do not check if client is blocklisted under lock - locker.unlock(); - if (m_fs_mirror->is_blocklisted()) { - dout(5) << ": exiting snapshot data replayer=" << data_replayer << " as client is blocklisted" << dendl; - break; - } - + bool shutdown = false; + bool blocklist = false; std::shared_ptr syncm; { std::unique_lock lock(smq_lock); dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl; - smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm_and_mark(); return syncm != nullptr; }); + while (true) { + bool ready = smq_cv.wait_for(lock, 2s, [this, &syncm] { + syncm = pick_next_syncm_and_mark(); + return is_stopping() || syncm != nullptr; + }); + // immediate shutdown - predicate is true + if (is_stopping()) { + dout(5) << ": exiting snapshot data replayer=" << data_replayer + << " as mirroring is shutting down" << dendl; + shutdown = true; + mark_and_notify_syncms_to_backoff(-EINPROGRESS); + break; + } + // work available - predicate is true + if (ready) + break; + // Timed wake up path - blocklist check + if (m_fs_mirror->is_blocklisted()) { + dout(5) << ": exiting snapshot data replayer=" << data_replayer + << " as client is blocklisted" << dendl; + blocklist = true; + mark_and_notify_syncms_to_backoff(-EBLOCKLISTED); + break; + } + // otherwise timeout occured, nothing to do - loop again + } + + if (shutdown || blocklist) { + break; //exit + } // syncm is gauranteed to be non-null because of the predicate used in above wait. dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl; } @@ -2366,9 +2410,7 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { // Decrement should be within this locked block where the comparison happens, moving outside faults inc/dec logic syncm->dec_in_flight_unlocked(); } - //lock again to satify m_cond - locker.lock(); - } + } // outer while } void PeerReplayer::peer_status(Formatter *f) { diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index bcab8e3018f..2ba52ad57bb 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -235,6 +235,13 @@ private: m_datasync_error = true; m_datasync_errno = err; } + void set_datasync_error_unlocked(int err) { + m_datasync_error = true; + m_datasync_errno = err; + } + void mark_backoff_unlocked() { + m_backoff = true; + } bool get_datasync_error_unlocked() { return m_datasync_error; } @@ -302,6 +309,7 @@ private: bool m_sync_done = false; bool m_datasync_error = false; int m_datasync_errno = 0; + bool m_backoff = false; }; class RemoteSync : public SyncMechanism { @@ -512,6 +520,7 @@ private: int get_active_datasync_threads() const { return m_active_datasync_threads.load(std::memory_order_relaxed); } + void mark_and_notify_syncms_to_backoff(int err); boost::optional pick_directory(); int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer); -- 2.47.3