From: Kotresh HR Date: Fri, 23 Jan 2026 07:26:57 +0000 (+0530) Subject: tools/cephfs_mirror: Add ETA metrics X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b370f2ebc622e2e7c072cbcfe80e4da265154a33;p=ceph-ci.git tools/cephfs_mirror: Add ETA metrics Signed-off-by: Kotresh HR --- diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 8fe82379bb6..eef0237a4e9 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -685,6 +685,15 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string void *ptr; struct iovec iov[NR_IOVECS]; + using clock = std::chrono::steady_clock; + using seconds = std::chrono::duration; + uint64_t bytes_read = 0; + uint64_t bytes_written = 0; + uint64_t discovered_delta = 0; + seconds read_time{0}; + seconds write_time{0}; + + inc_files_started(dir_root); int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0); if (r < 0) { derr << ": failed to open local file path=" << epath << ": " @@ -712,6 +721,8 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string while (num_blocks > 0) { auto offset = b->offset; auto len = b->len; + discovered_delta += b->len; + inc_delta_bytes(dir_root, discovered_delta); dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", block: [" << offset << "~" << len << "]" << dendl; @@ -747,12 +758,16 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string } } + auto rs = clock::now(); r = ceph_preadv(m_local_mount, l_fd, iov, num_buffers, offset); + auto re = clock::now(); + read_time += seconds(re - rs); if (r < 0) { derr << ": failed to read local file path=" << epath << ": " << cpp_strerror(r) << dendl; break; } + bytes_read += r; dout(10) << ": read: " << r << " bytes" << dendl; if (r == 0) { break; @@ -766,12 +781,17 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string } dout(10) << ": writing to offset: " << offset << dendl; + auto ws = clock::now(); r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, offset); + auto we = clock::now(); + write_time += seconds(we - ws); if (r < 0) { derr << ": failed to write remote file path=" << epath << ": " << cpp_strerror(r) << dendl; break; } + bytes_written += r; + inc_actual_sync_bytes(dir_root, r); offset += r; } @@ -780,6 +800,9 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string ++b; } + //io accounting for eta + add_io(dir_root, bytes_read, bytes_written, read_time.count(), write_time.count()); + if (num_blocks == 0 && r >= 0) { // handle blocklist case dout(20) << ": truncating epath=" << epath << " to " << stx.stx_size << " bytes" << dendl; @@ -1664,8 +1687,19 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath, dout(20) << ": dir_root=" << m_dir_root << ", epath=" << epath << ", sync_check=" << sync_check << dendl; + using clock = std::chrono::steady_clock; + using seconds = std::chrono::duration; + seconds blockdiff_time{0}; + uint64_t bd_synced_bytes = 0; + if (!sync_check) { - return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback); + auto bd_s = clock::now(); + int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback); + auto bd_e = clock::now(); + blockdiff_time = seconds(bd_s - bd_e); + bd_synced_bytes = stx.stx_size; + m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count()); + return r; } ceph_file_blockdiff_info info; @@ -1678,10 +1712,17 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath, if (r < 0) { dout(20) << ": new file epath=" << epath << dendl; - return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback); + auto bd_s = clock::now(); + int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback); + auto bd_e = clock::now(); + blockdiff_time = seconds(bd_s - bd_e); + bd_synced_bytes = stx.stx_size; + m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count()); + return r; } r = 1; + auto bd_s = clock::now(); while (true) { ceph_file_blockdiff_changedblocks blocks; r = ceph_file_blockdiff(&info, &blocks); @@ -1692,6 +1733,13 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath, int rr = r; if (blocks.num_blocks) { + auto bd_num_blocks = blocks.num_blocks; + auto bd_cblock = blocks.b; + while(bd_num_blocks > 0) { + bd_synced_bytes += bd_cblock->len; + --bd_num_blocks; + bd_cblock++; + } r = callback(blocks.num_blocks, blocks.b); ceph_free_file_blockdiff_buffer(&blocks); if (r < 0) { @@ -1705,6 +1753,10 @@ int PeerReplayer::SnapDiffSync::get_changed_blocks(const std::string &epath, } // else fetch next changed blocks } + // blockdiff bandwidth + auto bd_e = clock::now(); + blockdiff_time = seconds(bd_e - bd_s); + m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count()); ceph_file_blockdiff_finish(&info); return r; @@ -1909,10 +1961,11 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu if (fh.p_mnt == m_local_mount) { syncm = std::make_shared(*this, dir_root, m_local_mount, m_remote_mount, &fh, m_peer, current, prev); - + set_snapdiff(dir_root, true); //for stats } else { syncm = std::make_shared(*this, dir_root, m_local_mount, m_remote_mount, &fh, m_peer, current, boost::none); + set_snapdiff(dir_root, false); //for stats } r = syncm->init_sync(); @@ -2379,6 +2432,88 @@ void PeerReplayer::run_datasync(SnapshotDataSyncThread *data_replayer) { } } +std::string format_eta(double eta_sec) { + if (eta_sec < 0) { + return "calculating..."; + } + uint64_t sec = static_cast(eta_sec + 0.5); // round + uint64_t days = sec / 86400; + sec %= 86400; + uint64_t hours = sec / 3600; + sec %= 3600; + uint64_t minutes = sec / 60; + sec %= 60; + + std::ostringstream out; + if (days > 0) + out << days << "d "; + if (hours > 0 || days > 0) + out << hours << "h "; + if (minutes > 0 || hours > 0 || days > 0) + out << minutes << "m "; + out << sec << "s"; + return out.str(); +} + +double PeerReplayer::compute_eta(PeerReplayer::SnapSyncStat& sync_stat) { + // mlock is held by the caller + + static constexpr uint64_t MIN_FILES_SAMPLE = 25; + static constexpr uint64_t MIN_BYTES_SAMPLE = 1ULL * 16 * 1024 * 1024; // 16MiB + + double read_time = sync_stat.read_time_sec; + double write_time = sync_stat.write_time_sec; + double bytes_read = sync_stat.bytes_read; + double bytes_written = sync_stat.bytes_written; + uint64_t files_started = sync_stat.files_started; + double read_bw = bytes_read/read_time; + double write_bw = bytes_written/write_time; + uint64_t discovered_delta_bytes = sync_stat.discovered_delta_bytes; + uint64_t synced_bytes = sync_stat.actual_sync_bytes; + uint64_t file_synced_bytes = sync_stat.sync_bytes; + uint64_t total_files = sync_stat.total_files; + + if (read_time == 0 || write_time == 0) + return -1.0; //Calculating + if (files_started < MIN_FILES_SAMPLE) + return -1.0; //Calculating + if (file_synced_bytes < MIN_BYTES_SAMPLE) + return -1.0; //Calculating + + + if (sync_stat.snapdiff) { + /* blockdiff : + * - total number of files with delta to be synced is known + * - delta of each file is unknown, the avg delta per file is estimated + * - effective bandwidth should accommodate blockdiff time + * - effective bandwidth is based cumulative synced bytes and time taken, so + * it considers total average past performance and hence it is mostly pessimistic + */ + double avg_delta_per_file = (double)discovered_delta_bytes / (double)files_started; + double estimated_total_delta = avg_delta_per_file * total_files; + double effective_bw = (double) sync_stat.bd_sync_bytes / sync_stat.blockdiff_time_sec; + + uint64_t remaining = + estimated_total_delta > synced_bytes + ? static_cast(estimated_total_delta - synced_bytes) + : 0; + + if (effective_bw <= 0) + return -1.0; + return remaining / effective_bw; + } else { + /* full sync : + * - effective bandwidth is mostly dependant on read and write, again + * it's cumulative, so prediction is based on average past performace. + */ + uint64_t remaining = sync_stat.total_bytes - synced_bytes; + double effective_bw = std::min(read_bw, write_bw); + if (effective_bw <= 0) + return -1.0; //Calculating + return remaining / effective_bw; + } +} + void PeerReplayer::peer_status(Formatter *f) { std::scoped_lock locker(m_lock); f->open_object_section("stats"); @@ -2393,6 +2528,10 @@ void PeerReplayer::peer_status(Formatter *f) { f->dump_string("state", "idle"); } else { f->dump_string("state", "syncing"); + if (sync_stat.snapdiff) + f->dump_string("sync-mode", "delta"); + else + f->dump_string("sync-mode", "full"); f->open_object_section("current_syncing_snap"); f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first); f->dump_string("name", (*sync_stat.current_syncing_snap).second); @@ -2428,6 +2567,7 @@ void PeerReplayer::peer_status(Formatter *f) { f->dump_string("sync_percent", os.str()); } f->close_section(); //files + f->dump_string("eta", format_eta(compute_eta(sync_stat))); f->close_section(); //current_syncing_snap } if (sync_stat.last_synced_snap) { diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index ad56a2a724f..94622f41fd6 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -355,7 +355,19 @@ private: uint64_t sync_bytes = 0; //sync bytes counter, independently for each directory sync. uint64_t total_bytes = 0; //total bytes counter, independently for each directory sync. uint64_t sync_files = 0; //sync files counter, independently for each directory sync. + // files_started will be ahead of sync_files as it's incremented just after delta discovery + uint64_t files_started = 0; //files picked up for sync counter, independently for each directory sync. uint64_t total_files = 0; //total files counter, independently for each directory sync. + bool snapdiff = false; // RemoteSync or Snapdiff + // actual io accounting + uint64_t bytes_read = 0; //actual bytes read counter, independently for each directory sync. + uint64_t bytes_written = 0; //actual bytes written counter, independently for each directory sync. + double read_time_sec = 0.0; //actual read time in seconds counter, independently for each directroy sync. + double write_time_sec = 0.0; //actual write time in seconds counter, independently for each directroy sync. + uint64_t actual_sync_bytes = 0; //actual bytes synced using delta, counter, independently for each directory sync. + uint64_t discovered_delta_bytes = 0; //discovered delta bytes counter, independently for each directory sync. + uint64_t bd_sync_bytes = 0; //actual bytes synced using SnapDiff/blockdiff counter + double blockdiff_time_sec = 0.0; //actual sync time using SnapDiff/blockdiff counter }; void _inc_failed_count(const std::string &dir_root) { @@ -397,9 +409,15 @@ private: _set_last_synced_snap(dir_root, snap_id, snap_name); auto &sync_stat = m_snap_sync_stats.at(dir_root); sync_stat.sync_bytes = 0; + sync_stat.actual_sync_bytes = 0; + sync_stat.bd_sync_bytes = 0; sync_stat.total_bytes = 0; + sync_stat.discovered_delta_bytes = 0; sync_stat.sync_files = 0; sync_stat.total_files = 0; + sync_stat.files_started = 0; + sync_stat.bd_sync_bytes = 0; + sync_stat.blockdiff_time_sec = 0; } void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id, const std::string &snap_name) { @@ -433,11 +451,46 @@ private: sync_stat.last_sync_files = sync_stat.sync_files; ++sync_stat.synced_snap_count; } + void set_snapdiff(const std::string &dir_root, bool snapdiff) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.snapdiff = snapdiff; + } + void set_blockdiff_bw(const std::string &dir_root, const uint64_t bd_syncbytes, const double bd_time) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.bd_sync_bytes += bd_syncbytes; + sync_stat.blockdiff_time_sec += bd_time; + } + void add_io(const std::string &dir_root, const uint64_t& br, const uint64_t bw, + const double rt, const double wt) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.bytes_read += br; + sync_stat.bytes_written += bw; + sync_stat.read_time_sec += rt; + sync_stat.write_time_sec += wt; + } + void inc_delta_bytes(const std::string &dir_root, const uint64_t& b) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.discovered_delta_bytes += b; + } void inc_sync_bytes(const std::string &dir_root, const uint64_t& b) { std::scoped_lock locker(m_lock); auto &sync_stat = m_snap_sync_stats.at(dir_root); sync_stat.sync_bytes += b; } + void inc_actual_sync_bytes(const std::string &dir_root, const uint64_t& b) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.actual_sync_bytes += b; + } + void inc_files_started(const std::string &dir_root) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.files_started++; + } void inc_sync_files(const std::string &dir_root) { std::scoped_lock locker(m_lock); auto &sync_stat = m_snap_sync_stats.at(dir_root); @@ -557,6 +610,7 @@ private: // add syncm to syncm_q void enqueue_syncm(const std::shared_ptr& item); + double compute_eta(SnapSyncStat& sync_stat); }; } // namespace mirror