]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/cephfs_mirror: Efficient use of data sync threads
authorKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 08:34:44 +0000 (14:04 +0530)
committerKotresh HR <khiremat@redhat.com>
Sun, 22 Feb 2026 18:56:35 +0000 (00:26 +0530)
The job queue is something like below for data sync threads.

  |syncm1|---------|syncm2|------...---|syncmn|
     |                |                   |
   |m_sync_dataq|   |m_sync_dataq|    |m_sync_dataq|

There is global queue of SyncMechanism objects(syncm). Each syncm
object represents a single snapshot being synced and each syncm
object owns m_sync_dataq representing list of files in the snapshot
to be synced.

The data sync threads should consume the next syncm job
if the present syncm has no pending work. This can evidently
happen if the last file being synced in the present syncm
job is a large file from it's syncm_dataq. In this case, one
data sync thread is busy syncing the large file, the rest of
data sync threads just wait for it to finish to avoid busy loop.
Instead, the idle data sync threads could start consuming the next
syncm job.

This brings in a change to data structure.
 - syncm_q has to be std::deque instead of std::queue as syncm in the
   middle can finish syncing first and that needs to be removed before
   the front

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 6f97c3c6cda99eb58c61da27c81ea691c39b2e6f..502ea1bf659f12660ad38a9e4d83f81aa87895a3 100644 (file)
@@ -368,7 +368,7 @@ void PeerReplayer::remove_directory(string_view dir_root) {
 void PeerReplayer::enqueue_syncm(const std::shared_ptr<SyncMechanism>& item) {
   dout(20) << ": Enqueue syncm object=" << item << dendl;
   std::lock_guard lock(smq_lock);
-  syncm_q.push(item);
+  syncm_q.push_back(item);
   smq_cv.notify_all();
 }
 
@@ -1327,9 +1327,8 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
   dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
            << " crawl_finished=" << m_crawl_finished << dendl;
   if (m_sync_dataq.empty() && m_crawl_finished) {
-    dout(20) << ": snapshot data replayer dataq_empty and crawl finished - waiting for other"
-             << " inflight processing threads to finish!!!" << dendl;
-    sdq_cv.wait(lock, [this]{ return m_in_flight == 0;});
+    dout(20) << ": snapshot data replayer - finished processing syncm=" << this
+             << " Proceed with next syncm job " << dendl;
     return false; // no more work
   }
 
@@ -1341,6 +1340,13 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
   return true;
 }
 
+bool PeerReplayer::SyncMechanism::has_pending_work() const {
+  std::unique_lock lock(sdq_lock);
+  if (m_sync_dataq.empty() && m_crawl_finished)
+    return false;
+  return true;
+}
+
 void PeerReplayer::SyncMechanism::mark_crawl_finished() {
   std::unique_lock lock(sdq_lock);
   m_crawl_finished = true;
@@ -2176,6 +2182,31 @@ void PeerReplayer::run(SnapshotReplayerThread *replayer) {
   }
 }
 
+void PeerReplayer::remove_syncm(const std::shared_ptr<PeerReplayer::SyncMechanism>& syncm_obj)
+{
+    // caller holds lock
+    auto it = std::find(syncm_q.begin(), syncm_q.end(), syncm_obj);
+    if (it != syncm_q.end()) {
+        syncm_q.erase(it);
+    }
+}
+
+/* The data sync threads should consume the next syncm job if the present syncm has no
+ * pending work. This can evidently happen if the last file being synced in the present
+ * syncm job is a large file. In this case, one data sync thread is busy syncing the
+ * large file, the rest of data sync threads could start consuming the next syncm job
+ * instead of being idle waiting for the last file to be synced from present syncm job.
+ */
+std::shared_ptr<PeerReplayer::SyncMechanism> PeerReplayer::pick_next_syncm() const {
+  // caller holds lock
+  for (auto& syncm : syncm_q) {
+    if (syncm->has_pending_work()) {
+      return syncm;
+    }
+  }
+  return nullptr;
+}
+
 void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
   dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
 
@@ -2201,8 +2232,8 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     {
       std::unique_lock lock(smq_lock);
       dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
-      smq_cv.wait(lock, [this]{return !syncm_q.empty();});
-      syncm = syncm_q.front();
+      smq_cv.wait(lock, [this, &syncm] { syncm = pick_next_syncm(); return syncm != nullptr; });
+      // syncm is gauranteed to be non-null because of the predicate used in above wait.
       dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
     }
 
@@ -2254,12 +2285,17 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     {
       std::unique_lock smq_l1(smq_lock);
       std::unique_lock sdq_l1(syncm->get_sdq_lock());
-      if (!syncm_q.empty() && syncm_q.front() == syncm
-          && syncm->get_in_flight_unlocked() == 0
-          && syncm->get_crawl_finished_unlocked() == true) {
+      if (!syncm_q.empty() &&
+          syncm->get_in_flight_unlocked() == 0 &&
+          syncm->get_crawl_finished_unlocked() == true) {
         dout(20) << ": Dequeue syncm object=" << syncm << dendl;
         syncm->set_sync_finished_and_notify_unlocked(); // To wake up crawler thread waiting to take snapshot
-        syncm_q.pop();
+        if (syncm_q.front() == syncm) {
+          syncm_q.pop_front();
+        } else { // if syncms in the middle finishes first
+          remove_syncm(syncm);
+        }
+        dout(20) << ": syncm_q after removal " << syncm_q << dendl;
         smq_cv.notify_all();
       }
     }
index b6fe1b8a74d59647c190bd6a5ac29d888748dfb6..901fe094f33e781bacf81a0bb205b50d562e1856 100644 (file)
@@ -206,6 +206,7 @@ private:
 
     void push_dataq_entry(PeerReplayer::SyncEntry e);
     bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
+    bool has_pending_work() const;
     void mark_crawl_finished();
     bool get_crawl_finished_unlocked() {
       return m_crawl_finished;
@@ -213,14 +214,6 @@ private:
     void dec_in_flight() {
       std::unique_lock lock(sdq_lock);
       --m_in_flight;
-      /* If the crawler is done (m_crawl_finished = true) and m_sync_dataq
-       * is empty, threads will block until other pending threads which are syncing
-       * the entries picked up from queue are completed. So make sure to wake them
-       * up when the processing is complete. This is to avoid the busy loop of jobless
-       * data sync threads.
-       */
-      if (m_in_flight == 0)
-        sdq_cv.notify_all();
     }
     int get_in_flight_unlocked() {
       return m_in_flight;
@@ -256,7 +249,7 @@ private:
     boost::optional<Snapshot> m_prev;
     std::stack<PeerReplayer::SyncEntry> m_sync_stack;
 
-    ceph::mutex sdq_lock;
+    mutable ceph::mutex sdq_lock;
     ceph::condition_variable sdq_cv;
     std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
     int m_in_flight = 0;
@@ -459,7 +452,7 @@ private:
 
   ceph::mutex smq_lock;
   ceph::condition_variable smq_cv;
-  std::queue<std::shared_ptr<SyncMechanism>> syncm_q;
+  std::deque<std::shared_ptr<SyncMechanism>> syncm_q;
 
   ServiceDaemonStats m_service_daemon_stats;
 
@@ -467,6 +460,8 @@ private:
 
   void run(SnapshotReplayerThread *replayer);
   void run_datasync(SnapshotDataSyncThread *data_replayer);
+  void remove_syncm(const std::shared_ptr<SyncMechanism>& syncm_obj);
+  std::shared_ptr<SyncMechanism> pick_next_syncm() const;
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);