]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/cephfs_mirror: Fix data sync threads completion logic
authorKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 08:18:15 +0000 (13:48 +0530)
committerKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 20:12:39 +0000 (01:42 +0530)
We need to exactly know when all data threads completes
the processing of a syncm. If a few threads finishes the
job, they all need to wait for the in processing threads
of that syncm to complete. Otherwise the finished threads
would be busy loop until in processing threads finishes.

And only after all threads finishes processing, the crawler
thread can be notified to take the snapshot.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index faccb5ed30ba46cb57425e0ec7c322eebd9d403c..2ceab740e9ea01e4519d683653f508619a7c48f0 100644 (file)
@@ -1323,12 +1323,18 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
   std::unique_lock lock(sdq_lock);
   dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
   sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_crawl_finished;});
-  dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << dendl;
-  if (m_sync_dataq.empty() && m_crawl_finished)
-     return false; // no more work
+  dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
+           << " crawl_finished=" << m_crawl_finished << dendl;
+  if (m_sync_dataq.empty() && m_crawl_finished) {
+    dout(20) << ": snapshot data replayer dataq_empty and crawl finished - waiting for other"
+             << " inflight processing threads to finish!!!" << dendl;
+    sdq_cv.wait(lock, [this]{ return m_in_flight == 0;});
+    return false; // no more work
+  }
 
   out_entry = std::move(m_sync_dataq.front());
   m_sync_dataq.pop();
+  m_in_flight++;
   dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this
           << " epath=" << out_entry.epath << dendl;
   return true;
@@ -2207,12 +2213,16 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     SyncEntry entry;
     while (syncm->pop_dataq_entry(entry)) {
       //TODO Process entry
+      syncm->dec_in_flight();
     }
 
     // Dequeue syncm object after processing
     {
-      std::unique_lock lock(smq_lock);
-      if (!syncm_q.empty() && syncm_q.front() == syncm) {
+      std::unique_lock smq_l1(smq_lock);
+      std::unique_lock sdq_l1(syncm->get_sdq_lock());
+      if (!syncm_q.empty() && syncm_q.front() == syncm
+          && syncm->get_in_flight_unlocked() == 0
+          && syncm->get_crawl_finished_unlocked() == true) {
         dout(20) << ": Dequeue syncm object=" << syncm << dendl;
         syncm_q.pop();
         smq_cv.notify_all();
index 34a27dc73aa46348e4432afe1ee3d6c513dd71b9..b95316abbca887ce3e5657b39f8c4e96fc5f9032 100644 (file)
@@ -198,6 +198,27 @@ private:
     void push_dataq_entry(PeerReplayer::SyncEntry e);
     bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
     void mark_crawl_finished();
+    bool get_crawl_finished_unlocked() {
+      return m_crawl_finished;
+    }
+    void dec_in_flight() {
+      std::unique_lock lock(sdq_lock);
+      --m_in_flight;
+      /* If the crawler is done (m_crawl_finished = true) and m_sync_dataq
+       * is empty, threads will block until other pending threads which are syncing
+       * the entries picked up from queue are completed. So make sure to wake them
+       * up when the processing is complete. This is to avoid the busy loop of jobless
+       * data sync threads.
+       */
+      if (m_in_flight == 0)
+        sdq_cv.notify_all();
+    }
+    int get_in_flight_unlocked() {
+      return m_in_flight;
+    }
+    ceph::mutex& get_sdq_lock() {
+      return sdq_lock;
+    }
 
     int remote_mkdir(const std::string &epath, const struct ceph_statx &stx);
   protected:
@@ -212,6 +233,7 @@ private:
     ceph::mutex sdq_lock;
     ceph::condition_variable sdq_cv;
     std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
+    int m_in_flight = 0;
     bool m_crawl_finished = false;
   };