]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Efficient use of data sync threads
authorKotresh HR <khiremat@redhat.com>
Wed, 14 Jan 2026 13:07:13 +0000 (18:37 +0530)
committerKotresh HR <khiremat@redhat.com>
Wed, 4 Feb 2026 08:53:14 +0000 (14:23 +0530)
The job queue is something like below for data sync threads.

  |syncm1|---------|syncm2|------...---|syncmn|
     |                |                   |
   |m_sync_dataq|   |m_sync_dataq|    |m_sync_dataq|

There is global queue of SyncMechanism objects(syncm). Each syncm
object represents a single snapshot being synced and each syncm
object owns m_sync_dataq representing list of files in the snapshot
to be synced.

The data sync threads should consume the next syncm job
if the present syncm has no pending work. This can evidently
happen if the last file being synced in the present syncm
job is a large file from it's syncm_dataq. In this case, one
data sync thread is busy syncing the large file, the rest of
data sync threads just wait for it to finish to avoid busy loop.
Instead, the idle data sync threads could start consuming the next
syncm job.

This brings in a change to data structure.
 - syncm_q has to be std::deque instead of std::queue as syncm in the
   middle can finish syncing first and that needs to be removed before
   the front

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 8cb3d612bf67dfa39e18e579cdeed0805f6d1e1d..3030d6a881c91424da4ec4ae43ba18c2c257da2b 100644 (file)
@@ -368,7 +368,7 @@ void PeerReplayer::remove_directory(string_view dir_root) {
 void PeerReplayer::enqueue_syncm(const std::shared_ptr<SyncMechanism>& item) {
   dout(20) << ": Enqueue syncm object=" << item << dendl;
   std::lock_guard lock(smq_lock);
-  syncm_q.push(item);
+  syncm_q.push_back(item);
   smq_cv.notify_all();
 }
 
@@ -1327,9 +1327,8 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
   dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
           << " crawl_finished=" << m_sync_crawl_finished << dendl;
   if (m_sync_dataq.empty() && m_sync_crawl_finished) {
-     dout(20) << ": snapshot data replayer dataq_empty and crawl finished - waiting for other"
-              << " inflight processing threads to finish!!!" << dendl;
-     sdq_cv.wait(lock, [this]{ return m_in_flight == 0;});
+     dout(20) << ": snapshot data replayer - finished processing syncm=" << this
+              << " Proceed with next syncm job " << dendl;
      return false; // no more work
   }
 
@@ -1341,6 +1340,13 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
   return true;
 }
 
+bool PeerReplayer::SyncMechanism::has_pending_work() const {
+  std::unique_lock lock(sdq_lock);
+  if (m_sync_dataq.empty() && m_sync_crawl_finished)
+    return false;
+  return true;
+}
+
 void PeerReplayer::SyncMechanism::mark_crawl_finished() {
   std::unique_lock lock(sdq_lock);
   m_sync_crawl_finished = true;
@@ -2176,6 +2182,31 @@ void PeerReplayer::run(SnapshotReplayerThread *replayer) {
   }
 }
 
+void PeerReplayer::remove_syncm(const std::shared_ptr<PeerReplayer::SyncMechanism>& syncm_obj)
+{
+    // caller holds lock
+    auto it = std::find(syncm_q.begin(), syncm_q.end(), syncm_obj);
+    if (it != syncm_q.end()) {
+        syncm_q.erase(it);
+    }
+}
+
+/* The data sync threads should consume the next syncm job if the present syncm has no
+ * pending work. This can evidently happen if the last file being synced in the present
+ * syncm job is a large file. In this case, one data sync thread is busy syncing the
+ * large file, the rest of data sync threads could start consuming the next syncm job
+ * instead of being idle waiting for the last file to be synced from present syncm job.
+ */
+std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm() const {
+  // caller holds lock
+  for (auto& syncm : syncm_q) {
+    if (syncm->has_pending_work()) {
+      return syncm;
+    }
+  }
+  return nullptr;
+}
+
 void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
   dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
 
@@ -2201,8 +2232,9 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     {
       std::unique_lock lock(smq_lock);
       dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
-      smq_cv.wait(lock, [this]{return !syncm_q.empty();});
-      syncm = syncm_q.front();
+      smq_cv.wait(lock, [this] { return !syncm_q.empty() && pick_next_syncm();});
+      // syncm is gauranteed to be non-null because of the predicate used in above wait.
+      syncm = pick_next_syncm();
       dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
     }
 
@@ -2254,13 +2286,18 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     {
       std::unique_lock smq_l1(smq_lock);
       std::unique_lock sdq_l1(syncm->get_sdq_lock());
-      if (!syncm_q.empty() && syncm_q.front() == syncm
-          && syncm->get_in_flight_unlocked() == 0
-          && syncm->get_crawl_finished_unlocked() == true) {
+      if (!syncm_q.empty() &&
+          syncm->get_in_flight_unlocked() == 0 &&
+          syncm->get_crawl_finished_unlocked() == true) {
         dout(20) << ": Dequeue syncm object=" << syncm << dendl;
         syncm->set_snapshot_unlocked();
         syncm->sdq_cv_notify_all_unlocked(); // To wake up crawler thread waiting to take snapshot
-        syncm_q.pop();
+        if (syncm_q.front() == syncm) {
+          syncm_q.pop_front();
+        } else { // if syncms in the middle finishes first
+          remove_syncm(syncm);
+        }
+        dout(20) << ": syncm_q after removal " << syncm_q << dendl;
         smq_cv.notify_all();
       }
     }
index c91c010e8e774149f9ed7d90a62fae2de821abee..e695f6f87d3af2731e1f130a0b2d40cfd1237c12 100644 (file)
@@ -206,6 +206,7 @@ private:
 
     void push_dataq_entry(PeerReplayer::SyncEntry e);
     bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
+    bool has_pending_work() const;
     void mark_crawl_finished();
     bool get_crawl_finished_unlocked() {
       return m_sync_crawl_finished;
@@ -213,14 +214,6 @@ private:
     void dec_in_flight() {
       std::unique_lock lock(sdq_lock);
       --m_in_flight;
-      /* If the crawler is done (m_sync_crawl_finished = true) and m_sync_dataq
-       * is empty, threads will block until other pending threads which are syncing
-       * the entries picked up from queue are completed. So make sure to wake them
-       * up when the processing is complete. This is to avoid the busy loop of jobless
-       * data sync threads.
-       */
-      if (m_in_flight == 0)
-        sdq_cv.notify_all();
     }
     int get_in_flight_unlocked() {
       return m_in_flight;
@@ -255,7 +248,7 @@ private:
     boost::optional<Snapshot> m_prev;
     std::stack<PeerReplayer::SyncEntry> m_sync_stack;
 
-    ceph::mutex sdq_lock;
+    mutable ceph::mutex sdq_lock;
     ceph::condition_variable sdq_cv;
     std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
     int m_in_flight = 0;
@@ -458,7 +451,7 @@ private:
 
   ceph::mutex smq_lock;
   ceph::condition_variable smq_cv;
-  std::queue<std::shared_ptr<SyncMechanism>> syncm_q;
+  std::deque<std::shared_ptr<SyncMechanism>> syncm_q;
 
   ServiceDaemonStats m_service_daemon_stats;
 
@@ -466,6 +459,8 @@ private:
 
   void run(SnapshotReplayerThread *replayer);
   void run_datasync(SnapshotDataSyncThread *data_replayer);
+  void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
+  std::shared_ptr<SyncMechanism> pick_next_syncm() const;
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);