]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Fix data sync threads completion logic
authorKotresh HR <khiremat@redhat.com>
Wed, 14 Jan 2026 12:05:33 +0000 (17:35 +0530)
committerKotresh HR <khiremat@redhat.com>
Tue, 17 Feb 2026 20:10:50 +0000 (01:40 +0530)
We need to exactly know when all data threads completes
the processing of a syncm. If a few threads finishes the
job, they all need to wait for the in processing threads
of that syncm to complete. Otherwise the finished threads
would be busy loop until in processing threads finishes.

And only after all threads finishes processing, the crawler
thread can be notified to take the snapshot.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index c24fbd0d5f6f86000fd4eebf0cb3c7b405a5738d..bf144106e80ba3872aeed22753063f25e00201ff 100644 (file)
@@ -1323,12 +1323,18 @@ bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
   std::unique_lock lock(sdq_lock);
   dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
   sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_sync_crawl_finished;});
-  dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << dendl;
-  if (m_sync_dataq.empty() && m_sync_crawl_finished)
+  dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this
+          << " crawl_finished=" << m_sync_crawl_finished << dendl;
+  if (m_sync_dataq.empty() && m_sync_crawl_finished) {
+     dout(20) << ": snapshot data replayer dataq_empty and crawl finished - waiting for other"
+              << " inflight processing threads to finish!!!" << dendl;
+     sdq_cv.wait(lock, [this]{ return m_in_flight == 0;});
      return false; // no more work
+  }
 
   out_entry = std::move(m_sync_dataq.front());
   m_sync_dataq.pop();
+  m_in_flight++;
   dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this
           << " epath=" << out_entry.epath << dendl;
   return true;
@@ -2207,12 +2213,16 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
     SyncEntry entry;
     while (syncm->pop_dataq_entry(entry)) {
       //TODO Process entry
+      syncm->dec_in_flight();
     }
 
     // Dequeue syncm object after processing
     {
-      std::unique_lock lock(smq_lock);
-      if (!syncm_q.empty() && syncm_q.front() == syncm) {
+      std::unique_lock smq_l1(smq_lock);
+      std::unique_lock sdq_l1(syncm->get_sdq_lock());
+      if (!syncm_q.empty() && syncm_q.front() == syncm
+          && syncm->get_in_flight_unlocked() == 0
+          && syncm->get_crawl_finished_unlocked() == true) {
         dout(20) << ": Dequeue syncm object=" << syncm << dendl;
         syncm_q.pop();
         smq_cv.notify_all();
index 80962cc9e18d67d0ae39476ee24b994afcfde73f..7e4a6a426bf851520ec8671ae61caa14769eb30f 100644 (file)
@@ -198,6 +198,27 @@ private:
     void push_dataq_entry(PeerReplayer::SyncEntry e);
     bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
     void mark_crawl_finished();
+    bool get_crawl_finished_unlocked() {
+      return m_sync_crawl_finished;
+    }
+    void dec_in_flight() {
+      std::unique_lock lock(sdq_lock);
+      --m_in_flight;
+      /* If the crawler is done (m_sync_crawl_finished = true) and m_sync_dataq
+       * is empty, threads will block until other pending threads which are syncing
+       * the entries picked up from queue are completed. So make sure to wake them
+       * up when the processing is complete. This is to avoid the busy loop of jobless
+       * data sync threads.
+       */
+      if (m_in_flight == 0)
+        sdq_cv.notify_all();
+    }
+    int get_in_flight_unlocked() {
+      return m_in_flight;
+    }
+    ceph::mutex& get_sdq_lock() {
+      return sdq_lock;
+    }
 
     int remote_mkdir(const std::string &epath, const struct ceph_statx &stx);
   protected:
@@ -212,6 +233,7 @@ private:
     ceph::mutex sdq_lock;
     ceph::condition_variable sdq_cv;
     std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
+    int m_in_flight = 0;
     bool m_sync_crawl_finished = false;
   };