From: Kotresh HR Date: Wed, 14 Jan 2026 09:56:25 +0000 (+0530) Subject: tools/cephfs_mirror: Add m_sync_data queue X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=eddd735f41f29645af5fcedb17f54466869397a2;p=ceph-ci.git tools/cephfs_mirror: Add m_sync_data queue Add data sync queue for each SyncMechanism. Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index cd392fe9cd1..4392fe588b9 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -1305,12 +1305,42 @@ PeerReplayer::SyncMechanism::SyncMechanism(MountRef local, MountRef remote, FHan m_fh(fh), m_peer(peer), m_current(current), - m_prev(prev) { + m_prev(prev), + sdq_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::SyncMechanism" + stringify(peer.uuid))) { } PeerReplayer::SyncMechanism::~SyncMechanism() { } +void PeerReplayer::SyncMechanism::push_dataq_entry(SyncEntry e) { + dout(10) << ": snapshot data replayer dataq pushed" << " syncm=" << this + << " epath=" << e.epath << dendl; + std::unique_lock lock(sdq_lock); + m_sync_dataq.push(std::move(e)); + sdq_cv.notify_all(); +} + +bool PeerReplayer::SyncMechanism::pop_dataq_entry(SyncEntry &out_entry) { + std::unique_lock lock(sdq_lock); + dout(20) << ": snapshot data replayer waiting on m_sync_dataq, syncm=" << this << dendl; + sdq_cv.wait(lock, [this]{ return !m_sync_dataq.empty() || m_sync_crawl_finished;}); + dout(20) << ": snapshot data replayer woke up to process m_syncm_dataq, syncm=" << this << dendl; + if (m_sync_dataq.empty() && m_sync_crawl_finished) + return false; // no more work + + out_entry = std::move(m_sync_dataq.front()); + m_sync_dataq.pop(); + dout(10) << ": snapshot data replayer dataq popped" << " syncm=" << this + << " epath=" << out_entry.epath << dendl; + return true; +} + +void PeerReplayer::SyncMechanism::mark_crawl_finished() { + std::unique_lock lock(sdq_lock); + m_sync_crawl_finished = true; + sdq_cv.notify_all(); +} + int PeerReplayer::SyncMechanism::get_changed_blocks(const std::string &epath, const struct ceph_statx &stx, bool sync_check, const std::function &callback) { @@ -2165,7 +2195,11 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { // TODO pre_sync and open handles - // TODO Wait and fetch files from syncm data queue and sync + // Wait on data sync queue for entries to process + SyncEntry entry; + while (syncm->pop_dataq_entry(entry)) { + //TODO Process entry + } // Dequeue syncm object after processing { diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 2b51d735b96..e4a0f8c30ff 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -195,6 +195,10 @@ private: virtual void finish_sync() = 0; + void push_dataq_entry(PeerReplayer::SyncEntry e); + bool pop_dataq_entry(PeerReplayer::SyncEntry &out); + void mark_crawl_finished(); + protected: MountRef m_local; MountRef m_remote; @@ -203,6 +207,11 @@ private: Snapshot m_current; boost::optional m_prev; std::stack m_sync_stack; + + ceph::mutex sdq_lock; + ceph::condition_variable sdq_cv; + std::queue m_sync_dataq; + bool m_sync_crawl_finished = false; }; class RemoteSync : public SyncMechanism {