]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Add SyncMechanism Queue
authorKotresh HR <khiremat@redhat.com>
Wed, 14 Jan 2026 08:47:07 +0000 (14:17 +0530)
committerKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 20:12:39 +0000 (01:42 +0530)
Add a queue of shared_ptr of type SyncMechanism.
Since it's shared_ptr, the queue can hold both
shared_ptr to both RemoteSync and SnapDiffSync objects.
Each SyncMechanism holds the queue for the SyncEntry
items to be synced using the data sync threads.

The SyncMechanism queue needs to be shared_ptr because
all the data sync threads needs to access the object
of SyncMechanism to process the SyncEntry Queue.

This patch sets up the building blocks for the same.

Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 9bd076b5cefaf28b9c33fc859a7a0c53c89f7200..cd392fe9cd1a4859b01096cd4e94b1771e13c191 100644 (file)
@@ -178,7 +178,8 @@ PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
     m_local_mount(mount),
     m_service_daemon(service_daemon),
     m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
-    m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+    m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))),
+    smq_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::smq" + stringify(peer.uuid))) {
   // reset sync stats sent via service daemon
   m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
                                                  SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
@@ -365,6 +366,14 @@ void PeerReplayer::remove_directory(string_view dir_root) {
   m_cond.notify_all();
 }
 
+void PeerReplayer::enqueue_syncm(const std::shared_ptr<SyncMechanism>& item) {
+  dout(20) << ": Enqueue syncm object=" << item << dendl;
+  std::lock_guard lock(smq_lock);
+  syncm_q.push(item);
+  smq_cv.notify_all();
+}
+
+
 boost::optional<std::string> PeerReplayer::pick_directory() {
   dout(20) << dendl;
 
@@ -809,7 +818,7 @@ close_local_fd:
   return r == 0 ? 0 : r;
 }
 
-int PeerReplayer::remote_file_op(SyncMechanism *syncm, const std::string &dir_root,
+int PeerReplayer::remote_file_op(std::shared_ptr<SyncMechanism>& syncm, const std::string &dir_root,
                                  const std::string &epath, const struct ceph_statx &stx,
                                  bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync) {
   dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync
@@ -1787,13 +1796,14 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
     return r;
   }
 
-  SyncMechanism *syncm;
+  std::shared_ptr<SyncMechanism> syncm;
   if (fh.p_mnt == m_local_mount) {
-    syncm = new SnapDiffSync(dir_root, m_local_mount, m_remote_mount, &fh,
-                             m_peer, current, prev);
+    syncm = std::make_shared<SnapDiffSync>(dir_root, m_local_mount, m_remote_mount,
+                                          &fh, m_peer, current, prev);
+
   } else {
-    syncm = new RemoteSync(m_local_mount, m_remote_mount, &fh,
-                           m_peer, current, boost::none);
+    syncm = std::make_shared<RemoteSync>(m_local_mount, m_remote_mount, &fh,
+                                             m_peer, current, boost::none);
   }
 
   r = syncm->init_sync();
@@ -1801,10 +1811,11 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
     derr << ": failed to initialize sync mechanism" << dendl;
     ceph_close(m_local_mount, fh.c_fd);
     ceph_close(fh.p_mnt, fh.p_fd);
-    delete syncm;
     return r;
   }
 
+  enqueue_syncm(syncm);
+
   // starting from this point we shouldn't care about manual closing of fh.c_fd,
   // it will be closed automatically when bound tdirp is closed.
   while (true) {
@@ -1863,7 +1874,6 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
   }
 
   syncm->finish_sync();
-  delete syncm;
 
   dout(20) << " cur:" << fh.c_fd
            << " prev:" << fh.p_fd
@@ -2144,12 +2154,29 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
       break;
     }
 
-    // TODO Wait and fetch syncm from SyncMechanism Queue
+    std::shared_ptr<SyncMechanism> syncm;
+    {
+      std::unique_lock lock(smq_lock);
+      dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
+      smq_cv.wait(lock, [this]{return !syncm_q.empty();});
+      syncm = syncm_q.front();
+      dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
+    }
 
     // TODO pre_sync and open handles
 
     // TODO Wait and fetch files from syncm data queue and sync
 
+    // Dequeue syncm object after processing
+    {
+      std::unique_lock lock(smq_lock);
+      if (!syncm_q.empty() && syncm_q.front() == syncm) {
+        dout(20) << ": Dequeue syncm object=" << syncm << dendl;
+        syncm_q.pop();
+        smq_cv.notify_all();
+      }
+    }
+
     //lock again to satify m_cond
     locker.lock();
   }
index e6b3018e0c0f168326405e3e39bc473ec27ccc2d..2b51d735b9644d0eb26662cf9648ebebcfd62e5f 100644 (file)
@@ -396,6 +396,10 @@ private:
 
   SnapshotDataReplayers m_data_replayers;
 
+  ceph::mutex smq_lock;
+  ceph::condition_variable smq_cv;
+  std::queue<std::shared_ptr<SyncMechanism>> syncm_q;
+
   ServiceDaemonStats m_service_daemon_stats;
 
   PerfCounters *m_perf_counters;
@@ -441,12 +445,15 @@ private:
   int do_sync_snaps(const std::string &dir_root);
 
   int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh);
-  int remote_file_op(SyncMechanism *syncm, const std::string &dir_root,
+  int remote_file_op(std::shared_ptr<SyncMechanism>& syncm, const std::string &dir_root,
                      const std::string &epath, const struct ceph_statx &stx,
                      bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync);
   int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
                      const FHandles &fh, uint64_t num_blocks, struct cblock *b);
   int sync_perms(const std::string& path);
+
+  // add syncm to syncm_q
+  void enqueue_syncm(const std::shared_ptr<SyncMechanism>& item);
 };
 
 } // namespace mirror