]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/cephfs_mirror: Add a pool of datasync threads
authorKotresh HR <khiremat@redhat.com>
Mon, 24 Nov 2025 14:43:04 +0000 (20:13 +0530)
committerKotresh HR <khiremat@redhat.com>
Sat, 21 Feb 2026 20:12:39 +0000 (01:42 +0530)
Fixes: https://tracker.ceph.com/issues/73452
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 83108b8bcf5d600db75dcc243cff9dc1c39f6dcc..9c3f9013a35762c647a9ff429ccc92e675be7bcb 100644 (file)
@@ -298,6 +298,17 @@ int PeerReplayer::init() {
     m_replayers.push_back(std::move(replayer));
   }
 
+  //TODO: Have a separate tuneable for data sync threads
+  nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
+    "cephfs_mirror_max_concurrent_directory_syncs");
+  dout(20) << ": spawning " << nr_replayers << " snapshot data replayer(s)" << dendl;
+  while (nr_replayers-- > 0) {
+    std::unique_ptr<SnapshotDataSyncThread> data_replayer(
+      new SnapshotDataSyncThread(this));
+    std::string name("d_replayer-" + stringify(nr_replayers));
+    data_replayer->create(name.c_str());
+    m_data_replayers.push_back(std::move(data_replayer));
+  }
   return 0;
 }
 
@@ -2107,6 +2118,22 @@ void PeerReplayer::run(SnapshotReplayerThread *replayer) {
   }
 }
 
+void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
+  dout(10) << ": snapshot datasync replayer=" << data_replayer << dendl;
+
+  // TODO Do we need separate m_lock/m_cond for synchornization or can you use the same?
+
+  while (true) {
+    // TODO is_stopping and is_blocklisted
+
+    // TODO Wait and fetch syncm from SyncMechanism Queue
+
+    // TODO pre_sync and open handles
+
+    // TODO Wait and fetch files from syncm data queue and sync
+  }
+}
+
 void PeerReplayer::peer_status(Formatter *f) {
   std::scoped_lock locker(m_lock);
   f->open_object_section("stats");
index c99d75abafcf799dfe61d95ae6743a4d6a1b4651..e6b3018e0c0f168326405e3e39bc473ec27ccc2d 100644 (file)
@@ -95,6 +95,21 @@ private:
     PeerReplayer *m_peer_replayer;
   };
 
+  class SnapshotDataSyncThread : public Thread {
+  public:
+    SnapshotDataSyncThread(PeerReplayer *peer_replayer)
+      : m_peer_replayer(peer_replayer) {
+    }
+
+    void *entry() override {
+      m_peer_replayer->run_datasync(this);
+      return 0;
+    }
+
+  private:
+    PeerReplayer *m_peer_replayer;
+  };
+
   struct DirRegistry {
     int fd;
     bool canceled = false;
@@ -357,6 +372,7 @@ private:
   }
 
   typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
+  typedef std::vector<std::unique_ptr<SnapshotDataSyncThread>> SnapshotDataReplayers;
 
   CephContext *m_cct;
   FSMirror *m_fs_mirror;
@@ -378,11 +394,14 @@ private:
   bool m_stopping = false;
   SnapshotReplayers m_replayers;
 
+  SnapshotDataReplayers m_data_replayers;
+
   ServiceDaemonStats m_service_daemon_stats;
 
   PerfCounters *m_perf_counters;
 
   void run(SnapshotReplayerThread *replayer);
+  void run_datasync(SnapshotDataSyncThread *data_replayer);
 
   boost::optional<std::string> pick_directory();
   int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);