]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Add m_sync_data queue
authorKotresh HR <khiremat@redhat.com>
Wed, 14 Jan 2026 09:56:25 +0000 (15:26 +0530)
committerKotresh HR <khiremat@redhat.com>
Tue, 17 Feb 2026 20:10:50 +0000 (01:40 +0530)
Add data sync queue for each SyncMechanism.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index cd392fe9cd1a4859b01096cd4e94b1771e13c191..4392fe588b967a4fbaa433ef272c5f13a45a80e4 100644 (file)
@@ -1305,12 +1305,42 @@ PeerReplayer::SyncMechanism::SyncMechanism(MountRef local, MountRef remote, FHan
       m_fh(fh),
       m_peer(peer),
       m_current(current),
-      m_prev(prev) {
+      m_prev(prev),
+      sdq_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::SyncMechanism" + stringify(peer.uuid))) {
   }
 
 PeerReplayer::SyncMechanism::~SyncMechanism() {
 }
 
+void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) {
+  dout(10) << ": snapshot data replayer dataq pushed" << " syncm=" << this
+          << " epath=" << e.epath << dendl;
+  std::unique_lock lock(sdq_lock);
+  m_sync_dataq.push(std::move(e));
+  sdq_cv.notify_all();
+}
+
+bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) {
+  std::unique_lock lock(sdq_lock);
+  dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl;
+  sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_sync_crawl_finished;});
+  dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << dendl;
+  if (m_sync_dataq.empty() && m_sync_crawl_finished)
+     return false; // no more work
+
+  out_entry = std::move(m_sync_dataq.front());
+  m_sync_dataq.pop();
+  dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this
+          << " epath=" << out_entry.epath << dendl;
+  return true;
+}
+
+void PeerReplayer::SyncMechanism::mark_crawl_finished() {
+  std::unique_lock lock(sdq_lock);
+  m_sync_crawl_finished = true;
+  sdq_cv.notify_all();
+}
+
 int PeerReplayer::SyncMechanism::get_changed_blocks(const std::string &epath,
                                                     const struct ceph_statx &stx, bool sync_check,
                                                     const std::function<int (uint64_t, struct cblock *)> &callback) {
@@ -2165,7 +2195,11 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
 
     // TODO pre_sync and open handles
 
-    // TODO Wait and fetch files from syncm data queue and sync
+    // Wait on data sync queue for entries to process
+    SyncEntry entry;
+    while (syncm->pop_dataq_entry(entry)) {
+      //TODO Process entry
+    }
 
     // Dequeue syncm object after processing
     {
index 2b51d735b9644d0eb26662cf9648ebebcfd62e5f..e4a0f8c30ffc9328eb9e6bbc408a0b03cab09be6 100644 (file)
@@ -195,6 +195,10 @@ private:
 
     virtual void finish_sync() = 0;
 
+    void push_dataq_entry(PeerReplayer::SyncEntry e);
+    bool pop_dataq_entry(PeerReplayer::SyncEntry &out);
+    void mark_crawl_finished();
+
   protected:
     MountRef m_local;
     MountRef m_remote;
@@ -203,6 +207,11 @@ private:
     Snapshot m_current;
     boost::optional<Snapshot> m_prev;
     std::stack<PeerReplayer::SyncEntry> m_sync_stack;
+
+    ceph::mutex sdq_lock;
+    ceph::condition_variable sdq_cv;
+    std::queue<PeerReplayer::SyncEntry> m_sync_dataq;
+    bool m_sync_crawl_finished = false;
   };
 
   class RemoteSync : public SyncMechanism {