From 7234f1dcd97eb485dff4d94c32772f843364f728 Mon Sep 17 00:00:00 2001 From: Kotresh HR Date: Wed, 14 Jan 2026 14:17:07 +0530 Subject: [PATCH] tools/cephfs_mirror: Add SyncMechanism Queue Add a queue of shared_ptr of type SyncMechanism. Since it's shared_ptr, the queue can hold both shared_ptr to both RemoteSync and SnapDiffSync objects. Each SyncMechanism holds the queue for the SyncEntry items to be synced using the data sync threads. The SyncMechanism queue needs to be shared_ptr because all the data sync threads needs to access the object of SyncMechanism to process the SyncEntry Queue. This patch sets up the building blocks for the same. Fixes: https://tracker.ceph.com/issues/73452 Signed-off-by: Kotresh HR --- src/tools/cephfs_mirror/PeerReplayer.cc | 47 +++++++++++++++++++------ src/tools/cephfs_mirror/PeerReplayer.h | 9 ++++- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 9bd076b5cef..cd392fe9cd1 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -178,7 +178,8 @@ PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror, m_local_mount(mount), m_service_daemon(service_daemon), m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)), - m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) { + m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))), + smq_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::smq" + stringify(peer.uuid))) { // reset sync stats sent via service daemon m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0); @@ -365,6 +366,14 @@ void PeerReplayer::remove_directory(string_view dir_root) { m_cond.notify_all(); } +void PeerReplayer::enqueue_syncm(const std::shared_ptr& item) { + dout(20) << ": Enqueue syncm object=" << item << dendl; + std::lock_guard lock(smq_lock); + syncm_q.push(item); + smq_cv.notify_all(); +} + + boost::optional PeerReplayer::pick_directory() { dout(20) << dendl; @@ -809,7 +818,7 @@ close_local_fd: return r == 0 ? 0 : r; } -int PeerReplayer::remote_file_op(SyncMechanism *syncm, const std::string &dir_root, +int PeerReplayer::remote_file_op(std::shared_ptr& syncm, const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync) { dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync @@ -1787,13 +1796,14 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu return r; } - SyncMechanism *syncm; + std::shared_ptr syncm; if (fh.p_mnt == m_local_mount) { - syncm = new SnapDiffSync(dir_root, m_local_mount, m_remote_mount, &fh, - m_peer, current, prev); + syncm = std::make_shared(dir_root, m_local_mount, m_remote_mount, + &fh, m_peer, current, prev); + } else { - syncm = new RemoteSync(m_local_mount, m_remote_mount, &fh, - m_peer, current, boost::none); + syncm = std::make_shared(m_local_mount, m_remote_mount, &fh, + m_peer, current, boost::none); } r = syncm->init_sync(); @@ -1801,10 +1811,11 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu derr << ": failed to initialize sync mechanism" << dendl; ceph_close(m_local_mount, fh.c_fd); ceph_close(fh.p_mnt, fh.p_fd); - delete syncm; return r; } + enqueue_syncm(syncm); + // starting from this point we shouldn't care about manual closing of fh.c_fd, // it will be closed automatically when bound tdirp is closed. while (true) { @@ -1863,7 +1874,6 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu } syncm->finish_sync(); - delete syncm; dout(20) << " cur:" << fh.c_fd << " prev:" << fh.p_fd @@ -2144,12 +2154,29 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { break; } - // TODO Wait and fetch syncm from SyncMechanism Queue + std::shared_ptr syncm; + { + std::unique_lock lock(smq_lock); + dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl; + smq_cv.wait(lock, [this]{return !syncm_q.empty();}); + syncm = syncm_q.front(); + dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl; + } // TODO pre_sync and open handles // TODO Wait and fetch files from syncm data queue and sync + // Dequeue syncm object after processing + { + std::unique_lock lock(smq_lock); + if (!syncm_q.empty() && syncm_q.front() == syncm) { + dout(20) << ": Dequeue syncm object=" << syncm << dendl; + syncm_q.pop(); + smq_cv.notify_all(); + } + } + //lock again to satify m_cond locker.lock(); } diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index e6b3018e0c0..2b51d735b96 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -396,6 +396,10 @@ private: SnapshotDataReplayers m_data_replayers; + ceph::mutex smq_lock; + ceph::condition_variable smq_cv; + std::queue> syncm_q; + ServiceDaemonStats m_service_daemon_stats; PerfCounters *m_perf_counters; @@ -441,12 +445,15 @@ private: int do_sync_snaps(const std::string &dir_root); int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh); - int remote_file_op(SyncMechanism *syncm, const std::string &dir_root, + int remote_file_op(std::shared_ptr& syncm, const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync); int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, const FHandles &fh, uint64_t num_blocks, struct cblock *b); int sync_perms(const std::string& path); + + // add syncm to syncm_q + void enqueue_syncm(const std::shared_ptr& item); }; } // namespace mirror -- 2.47.3