From: Kotresh HR Date: Sat, 21 Feb 2026 08:34:44 +0000 (+0530) Subject: tools/cephfs_mirror: Efficient use of data sync threads X-Git-Tag: testing/wip-vshankar-testing-20260224.100235^2~16 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=28d23381b9e1741b7137034ed70b829e3e1af68a;p=ceph-ci.git tools/cephfs_mirror: Efficient use of data sync threads The job queue is something like below for data sync threads. |syncm1|---------|syncm2|------...---|syncmn| | | | |m_sync_dataq| |m_sync_dataq| |m_sync_dataq| There is global queue of SyncMechanism objects(syncm). Each syncm object represents a single snapshot being synced and each syncm object owns m_sync_dataq representing list of files in the snapshot to be synced. The data sync threads should consume the next syncm job if the present syncm has no pending work. This can evidently happen if the last file being synced in the present syncm job is a large file from it's syncm_dataq. In this case, one data sync thread is busy syncing the large file, the rest of data sync threads just wait for it to finish to avoid busy loop. Instead, the idle data sync threads could start consuming the next syncm job. This brings in a change to data structure. - syncm_q has to be std::deque instead of std::queue as syncm in the middle can finish syncing first and that needs to be removed before the front Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 6f97c3c6cda..502ea1bf659 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -368,7 +368,7 @@ void PeerReplayer::remove_directory(string_view dir_root) { void PeerReplayer::enqueue_syncm(const std::shared_ptr& item) { dout(20) << ": Enqueue syncm object=" << item << dendl; std::lock_guard lock(smq_lock); - syncm_q.push(item); + syncm_q.push_back(item); smq_cv.notify_all(); } @@ -1327,9 +1327,8 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) { dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << " crawl_finished=" << m_crawl_finished << dendl; if (m_sync_dataq.empty() && m_crawl_finished) { - dout(20) << ": snapshot data replayer dataq_empty and crawl finished - waiting for other" - << " inflight processing threads to finish!!!" << dendl; - sdq_cv.wait(lock, [this]{ return m_in_flight == 0;}); + dout(20) << ": snapshot data replayer - finished processing syncm=" << this + << " Proceed with next syncm job " << dendl; return false; // no more work } @@ -1341,6 +1340,13 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) { return true; } +bool PeerReplayer::SyncMechanism::has_pending_work() const { + std::unique_lock lock(sdq_lock); + if (m_sync_dataq.empty() && m_crawl_finished) + return false; + return true; +} + void PeerReplayer::SyncMechanism::mark_crawl_finished() { std::unique_lock lock(sdq_lock); m_crawl_finished = true; @@ -2176,6 +2182,31 @@ void PeerReplayer::run(SnapshotReplayerThread *replayer) { } } +void PeerReplayer::remove_syncm(const std::shared_ptr& syncm_obj) +{ + // caller holds lock + auto it = std::find(syncm_q.begin(), syncm_q.end(), syncm_obj); + if (it != syncm_q.end()) { + syncm_q.erase(it); + } +} + +/* The data sync threads should consume the next syncm job if the present syncm has no + * pending work. This can evidently happen if the last file being synced in the present + * syncm job is a large file. In this case, one data sync thread is busy syncing the + * large file, the rest of data sync threads could start consuming the next syncm job + * instead of being idle waiting for the last file to be synced from present syncm job. + */ +std::shared_ptr PeerReplayer::pick_next_syncm() const { + // caller holds lock + for (auto& syncm : syncm_q) { + if (syncm->has_pending_work()) { + return syncm; + } + } + return nullptr; +} + void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl; @@ -2201,8 +2232,8 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { { std::unique_lock lock(smq_lock); dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl; - smq_cv.wait(lock, [this]{return !syncm_q.empty();}); - syncm = syncm_q.front(); + smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm(); return syncm != nullptr; }); + // syncm is gauranteed to be non-null because of the predicate used in above wait. dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl; } @@ -2254,12 +2285,17 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { { std::unique_lock smq_l1(smq_lock); std::unique_lock sdq_l1(syncm->get_sdq_lock()); - if (!syncm_q.empty() && syncm_q.front() == syncm - && syncm->get_in_flight_unlocked() == 0 - && syncm->get_crawl_finished_unlocked() == true) { + if (!syncm_q.empty() && + syncm->get_in_flight_unlocked() == 0 && + syncm->get_crawl_finished_unlocked() == true) { dout(20) << ": Dequeue syncm object=" << syncm << dendl; syncm->set_sync_finished_and_notify_unlocked(); // To wake up crawler thread waiting to take snapshot - syncm_q.pop(); + if (syncm_q.front() == syncm) { + syncm_q.pop_front(); + } else { // if syncms in the middle finishes first + remove_syncm(syncm); + } + dout(20) << ": syncm_q after removal " << syncm_q << dendl; smq_cv.notify_all(); } } diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index b6fe1b8a74d..901fe094f33 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -206,6 +206,7 @@ private: void push_dataq_entry(PeerReplayer::SyncEntry e); bool pop_dataq_entry(PeerReplayer::SyncEntry &out); + bool has_pending_work() const; void mark_crawl_finished(); bool get_crawl_finished_unlocked() { return m_crawl_finished; @@ -213,14 +214,6 @@ private: void dec_in_flight() { std::unique_lock lock(sdq_lock); --m_in_flight; - /* If the crawler is done (m_crawl_finished = true) and m_sync_dataq - * is empty, threads will block until other pending threads which are syncing - * the entries picked up from queue are completed. So make sure to wake them - * up when the processing is complete. This is to avoid the busy loop of jobless - * data sync threads. - */ - if (m_in_flight == 0) - sdq_cv.notify_all(); } int get_in_flight_unlocked() { return m_in_flight; @@ -256,7 +249,7 @@ private: boost::optional m_prev; std::stack m_sync_stack; - ceph::mutex sdq_lock; + mutable ceph::mutex sdq_lock; ceph::condition_variable sdq_cv; std::queue m_sync_dataq; int m_in_flight = 0; @@ -459,7 +452,7 @@ private: ceph::mutex smq_lock; ceph::condition_variable smq_cv; - std::queue> syncm_q; + std::deque> syncm_q; ServiceDaemonStats m_service_daemon_stats; @@ -467,6 +460,8 @@ private: void run(SnapshotReplayerThread *replayer); void run_datasync(SnapshotDataSyncThread *data_replayer); + void remove_syncm(const std::shared_ptr& syncm_obj); + std::shared_ptr pick_next_syncm() const; boost::optional pick_directory(); int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);