]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/cephfs_mirror: Add ETA metrics
authorKotresh HR <khiremat@redhat.com>
Fri, 23 Jan 2026 07:26:57 +0000 (12:56 +0530)
committerKotresh HR <khiremat@redhat.com>
Wed, 4 Feb 2026 08:53:14 +0000 (14:23 +0530)
Signed-off-by: Kotresh HR <khiremat@redhat.com>
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h

index 8fe82379bb68fcb1456a2f1076e450429cb01be3..eef0237a4e987c18aacbc815a42dc4dba5d0e2c4 100644 (file)
@@ -685,6 +685,15 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root,  const std::string
   void *ptr;
   struct iovec iov[NR_IOVECS];
 
+  using clock = std::chrono::steady_clock;
+  using seconds = std::chrono::duration<double>;
+  uint64_t bytes_read = 0;
+  uint64_t bytes_written = 0;
+  uint64_t discovered_delta = 0;
+  seconds read_time{0};
+  seconds write_time{0};
+
+  inc_files_started(dir_root);
   int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0);
   if (r < 0) {
     derr << ": failed to open local file path=" << epath << ": "
@@ -712,6 +721,8 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root,  const std::string
   while (num_blocks > 0) {
     auto offset = b->offset;
     auto len = b->len;
+    discovered_delta += b->len;
+    inc_delta_bytes(dir_root, discovered_delta);
 
     dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", block: ["
              << offset << "~" << len << "]" << dendl;
@@ -747,12 +758,16 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root,  const std::string
         }
       }
 
+      auto rs = clock::now();
       r = ceph_preadv(m_local_mount, l_fd, iov, num_buffers, offset);
+      auto re = clock::now();
+      read_time += seconds(re - rs);
       if (r < 0) {
         derr << ": failed to read local file path=" << epath << ": "
              << cpp_strerror(r) << dendl;
         break;
       }
+      bytes_read += r;
       dout(10) << ": read: " << r << " bytes" << dendl;
       if (r == 0) {
         break;
@@ -766,12 +781,17 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root,  const std::string
       }
 
       dout(10) << ": writing to offset: " << offset << dendl;
+      auto ws = clock::now();
       r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, offset);
+      auto we = clock::now();
+      write_time += seconds(we - ws);
       if (r < 0) {
         derr << ": failed to write remote file path=" << epath << ": "
              << cpp_strerror(r) << dendl;
         break;
       }
+      bytes_written += r;
+      inc_actual_sync_bytes(dir_root, r);
 
       offset += r;
     }
@@ -780,6 +800,9 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root,  const std::string
     ++b;
   }
 
+  //io accounting for eta
+  add_io(dir_root, bytes_read, bytes_written, read_time.count(), write_time.count());
+
   if (num_blocks == 0 && r >= 0) { // handle blocklist case
     dout(20) << ": truncating epath=" << epath << " to " << stx.stx_size << " bytes"
              << dendl;
@@ -1664,8 +1687,19 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath,
   dout(20) << ": dir_root=" << m_dir_root << ", epath=" << epath
            << ", sync_check=" << sync_check << dendl;
 
+  using clock = std::chrono::steady_clock;
+  using seconds = std::chrono::duration<double>;
+  seconds blockdiff_time{0};
+  uint64_t bd_synced_bytes = 0;
+
   if (!sync_check) {
-    return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+    auto bd_s = clock::now();
+    int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+    auto bd_e = clock::now();
+    blockdiff_time = seconds(bd_s - bd_e);
+    bd_synced_bytes = stx.stx_size;
+    m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count());
+    return r;
   }
 
   ceph_file_blockdiff_info info;
@@ -1678,10 +1712,17 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath,
 
   if (r < 0) {
     dout(20) << ": new file epath=" << epath << dendl;
-    return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+    auto bd_s = clock::now();
+    int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+    auto bd_e = clock::now();
+    blockdiff_time = seconds(bd_s - bd_e);
+    bd_synced_bytes = stx.stx_size;
+    m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count());
+    return r;
   }
 
   r = 1;
+  auto bd_s = clock::now();
   while (true) {
     ceph_file_blockdiff_changedblocks blocks;
     r = ceph_file_blockdiff(&info, &blocks);
@@ -1692,6 +1733,13 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath,
 
     int rr = r;
     if (blocks.num_blocks) {
+      auto bd_num_blocks = blocks.num_blocks;
+      auto bd_cblock = blocks.b;
+      while(bd_num_blocks > 0) {
+        bd_synced_bytes += bd_cblock->len;
+       --bd_num_blocks;
+       bd_cblock++;
+      }
       r = callback(blocks.num_blocks, blocks.b);
       ceph_free_file_blockdiff_buffer(&blocks);
       if (r < 0) {
@@ -1705,6 +1753,10 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath,
     }
     // else fetch next changed blocks
   }
+  // blockdiff bandwidth
+  auto bd_e = clock::now();
+  blockdiff_time = seconds(bd_e - bd_s);
+  m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count());
 
   ceph_file_blockdiff_finish(&info);
   return r;
@@ -1909,10 +1961,11 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
   if (fh.p_mnt == m_local_mount) {
     syncm = std::make_shared<SnapDiffSync>(*this, dir_root, m_local_mount, m_remote_mount,
                                           &fh, m_peer, current, prev);
-
+    set_snapdiff(dir_root, true); //for stats
   } else {
     syncm = std::make_shared<RemoteSync>(*this, dir_root, m_local_mount, m_remote_mount,
                                          &fh, m_peer, current, boost::none);
+    set_snapdiff(dir_root, false); //for stats
   }
 
   r = syncm->init_sync();
@@ -2379,6 +2432,88 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) {
   }
 }
 
+std::string format_eta(double eta_sec) {
+  if (eta_sec < 0) {
+    return "calculating...";
+  }
+  uint64_t sec = static_cast<uint64_t>(eta_sec + 0.5); // round
+  uint64_t days = sec / 86400;
+  sec %= 86400;
+  uint64_t hours = sec / 3600;
+  sec %= 3600;
+  uint64_t minutes = sec / 60;
+  sec %= 60;
+
+  std::ostringstream out;
+  if (days > 0)
+    out << days << "d ";
+  if (hours > 0 || days > 0)
+    out << hours << "h ";
+  if (minutes > 0 || hours > 0 || days > 0)
+    out << minutes << "m ";
+  out << sec << "s";
+  return out.str();
+}
+
+double PeerReplayer::compute_eta(PeerReplayer::SnapSyncStat& sync_stat) {
+  // mlock is held by the caller
+
+  static constexpr uint64_t MIN_FILES_SAMPLE = 25;
+  static constexpr uint64_t MIN_BYTES_SAMPLE = 1ULL * 16 * 1024 * 1024; // 16MiB
+
+  double read_time = sync_stat.read_time_sec;
+  double write_time = sync_stat.write_time_sec;
+  double bytes_read = sync_stat.bytes_read;
+  double bytes_written = sync_stat.bytes_written;
+  uint64_t files_started = sync_stat.files_started;
+  double read_bw = bytes_read/read_time;
+  double write_bw = bytes_written/write_time;
+  uint64_t discovered_delta_bytes = sync_stat.discovered_delta_bytes;
+  uint64_t synced_bytes = sync_stat.actual_sync_bytes;
+  uint64_t file_synced_bytes = sync_stat.sync_bytes;
+  uint64_t total_files = sync_stat.total_files;
+
+  if (read_time == 0 || write_time == 0)
+    return -1.0; //Calculating
+  if (files_started < MIN_FILES_SAMPLE)
+    return -1.0; //Calculating
+  if (file_synced_bytes < MIN_BYTES_SAMPLE)
+    return -1.0; //Calculating
+
+
+  if (sync_stat.snapdiff) {
+    /* blockdiff :
+     *   - total number of files with delta to be synced is known
+     *   - delta of each file is unknown, the avg delta per file is estimated
+     *   - effective bandwidth should accommodate blockdiff time
+     *   - effective bandwidth is based cumulative synced bytes and time taken, so
+     *     it considers total average past performance and hence it is mostly pessimistic
+     */
+    double avg_delta_per_file = (double)discovered_delta_bytes / (double)files_started;
+    double estimated_total_delta = avg_delta_per_file * total_files;
+    double effective_bw = (double) sync_stat.bd_sync_bytes / sync_stat.blockdiff_time_sec;
+
+    uint64_t remaining =
+        estimated_total_delta > synced_bytes
+        ? static_cast<uint64_t>(estimated_total_delta - synced_bytes)
+        : 0;
+
+    if (effective_bw <= 0)
+      return -1.0;
+    return remaining / effective_bw;
+  } else {
+    /* full sync :
+     *   -  effective bandwidth is mostly dependant on read and write, again
+     *      it's cumulative, so prediction is based on average past performace.
+     */
+    uint64_t remaining = sync_stat.total_bytes - synced_bytes;
+    double effective_bw = std::min(read_bw, write_bw);
+    if (effective_bw <= 0)
+      return -1.0; //Calculating
+    return remaining / effective_bw;
+  }
+}
+
 void PeerReplayer::peer_status(Formatter *f) {
   std::scoped_lock locker(m_lock);
   f->open_object_section("stats");
@@ -2393,6 +2528,10 @@ void PeerReplayer::peer_status(Formatter *f) {
       f->dump_string("state", "idle");
     } else {
       f->dump_string("state", "syncing");
+      if (sync_stat.snapdiff)
+        f->dump_string("sync-mode", "delta");
+      else
+        f->dump_string("sync-mode", "full");
       f->open_object_section("current_syncing_snap");
       f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first);
       f->dump_string("name", (*sync_stat.current_syncing_snap).second);
@@ -2428,6 +2567,7 @@ void PeerReplayer::peer_status(Formatter *f) {
         f->dump_string("sync_percent", os.str());
       }
       f->close_section(); //files
+      f->dump_string("eta", format_eta(compute_eta(sync_stat)));
       f->close_section(); //current_syncing_snap
     }
     if (sync_stat.last_synced_snap) {
index ad56a2a724ffce992a214e8e80f484670459b27e..94622f41fd6bee70ec712b2520ab90d19c473fe4 100644 (file)
@@ -355,7 +355,19 @@ private:
     uint64_t sync_bytes = 0; //sync bytes counter, independently for each directory sync.
     uint64_t total_bytes = 0; //total bytes counter, independently for each directory sync.
     uint64_t sync_files = 0; //sync files counter, independently for each directory sync.
+    // files_started will be ahead of sync_files as it's incremented just after delta discovery
+    uint64_t files_started = 0; //files picked up for sync counter, independently for each directory sync.
     uint64_t total_files = 0; //total files counter, independently for each directory sync.
+    bool snapdiff = false; // RemoteSync or Snapdiff
+    // actual io accounting
+    uint64_t bytes_read = 0; //actual bytes read counter, independently for each directory sync.
+    uint64_t bytes_written = 0; //actual bytes written counter, independently for each directory sync.
+    double read_time_sec = 0.0; //actual read time in seconds counter, independently for each directroy sync.
+    double write_time_sec = 0.0; //actual write time in seconds counter, independently for each directroy sync.
+    uint64_t actual_sync_bytes = 0; //actual bytes synced using delta, counter, independently for each directory sync.
+    uint64_t discovered_delta_bytes = 0; //discovered delta bytes counter, independently for each directory sync.
+    uint64_t bd_sync_bytes = 0; //actual bytes synced using SnapDiff/blockdiff counter
+    double blockdiff_time_sec = 0.0; //actual sync time using SnapDiff/blockdiff counter
   };
 
   void _inc_failed_count(const std::string &dir_root) {
@@ -397,9 +409,15 @@ private:
     _set_last_synced_snap(dir_root, snap_id, snap_name);
     auto &sync_stat = m_snap_sync_stats.at(dir_root);
     sync_stat.sync_bytes = 0;
+    sync_stat.actual_sync_bytes = 0;
+    sync_stat.bd_sync_bytes = 0;
     sync_stat.total_bytes = 0;
+    sync_stat.discovered_delta_bytes = 0;
     sync_stat.sync_files = 0;
     sync_stat.total_files = 0;
+    sync_stat.files_started = 0;
+    sync_stat.bd_sync_bytes = 0;
+    sync_stat.blockdiff_time_sec = 0;
   }
   void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id,
                                 const std::string &snap_name) {
@@ -433,11 +451,46 @@ private:
     sync_stat.last_sync_files = sync_stat.sync_files;
     ++sync_stat.synced_snap_count;
   }
+  void set_snapdiff(const std::string &dir_root, bool snapdiff) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
+    sync_stat.snapdiff = snapdiff;
+  }
+  void set_blockdiff_bw(const std::string &dir_root, const uint64_t bd_syncbytes, const double bd_time) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
+    sync_stat.bd_sync_bytes += bd_syncbytes;
+    sync_stat.blockdiff_time_sec += bd_time;
+  }
+  void add_io(const std::string &dir_root, const uint64_t& br, const uint64_t bw,
+              const double rt, const double wt) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
+    sync_stat.bytes_read += br;
+    sync_stat.bytes_written += bw;
+    sync_stat.read_time_sec += rt;
+    sync_stat.write_time_sec += wt;
+  }
+  void inc_delta_bytes(const std::string &dir_root, const uint64_t& b) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
+    sync_stat.discovered_delta_bytes += b;
+  }
   void inc_sync_bytes(const std::string &dir_root, const uint64_t& b) {
     std::scoped_lock locker(m_lock);
     auto &sync_stat = m_snap_sync_stats.at(dir_root);
     sync_stat.sync_bytes += b;
   }
+  void inc_actual_sync_bytes(const std::string &dir_root, const uint64_t& b) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
+    sync_stat.actual_sync_bytes += b;
+  }
+  void inc_files_started(const std::string &dir_root) {
+    std::scoped_lock locker(m_lock);
+    auto &sync_stat = m_snap_sync_stats.at(dir_root);
+    sync_stat.files_started++;
+  }
   void inc_sync_files(const std::string &dir_root) {
     std::scoped_lock locker(m_lock);
     auto &sync_stat = m_snap_sync_stats.at(dir_root);
@@ -557,6 +610,7 @@ private:
 
   // add syncm to syncm_q
   void enqueue_syncm(const std::shared_ptr<SyncMechanism>& item);
+  double compute_eta(SnapSyncStat& sync_stat);
 };
 
 } // namespace mirror