uint64_t bytes_written = 0;
sec_duration read_time{0};
sec_duration write_time{0};
+ uint64_t discovered_delta = 0;
+
+ inc_files_started(dir_root);
int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0);
if (r < 0) {
derr << ": failed to open local file path=" << epath << ": "
while (num_blocks > 0) {
auto offset = b->offset;
auto len = b->len;
+ discovered_delta += b->len;
+ inc_delta_bytes(dir_root, discovered_delta);
dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", block: ["
<< offset << "~" << len << "]" << dendl;
break;
}
bytes_written += r;
+ inc_actual_sync_bytes(dir_root, r);
offset += r;
}
dout(20) << ": dir_root=" << m_dir_root << ", epath=" << epath
<< ", sync_check=" << sync_check << dendl;
+ using clock = std::chrono::steady_clock;
+ using seconds = std::chrono::duration<double>;
+ seconds blockdiff_time{0};
+ uint64_t bd_synced_bytes = 0;
+
if (!sync_check || stx.stx_size <= m_peer_replayer.get_blockdiff_min_file_size()) {
- return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_s = clock::now();
+ int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_e = clock::now();
+ blockdiff_time = seconds(bd_e - bd_s);
+ bd_synced_bytes = stx.stx_size;
+ if ( r == 0 )
+ m_peer_replayer.set_blockdiff_metrics(m_dir_root, bd_synced_bytes, blockdiff_time.count());
+ return r;
}
ceph_file_blockdiff_info info;
if (r < 0) {
dout(20) << ": new file epath=" << epath << dendl;
- return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_s = clock::now();
+ int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_e = clock::now();
+ blockdiff_time = seconds(bd_e - bd_s);
+ bd_synced_bytes = stx.stx_size;
+ if ( r == 0 )
+ m_peer_replayer.set_blockdiff_metrics(m_dir_root, bd_synced_bytes, blockdiff_time.count());
+ return r;
}
r = 1;
+ auto bd_s = clock::now();
while (true) {
ceph_file_blockdiff_changedblocks blocks;
r = ceph_file_blockdiff(&info, &blocks);
int rr = r;
if (blocks.num_blocks) {
+ auto bd_num_blocks = blocks.num_blocks;
+ auto bd_cblock = blocks.b;
r = callback(blocks.num_blocks, blocks.b);
ceph_free_file_blockdiff_buffer(&blocks);
if (r < 0) {
derr << ": blockdiff callback returned error: r=" << r << dendl;
break;
}
+ while(bd_num_blocks > 0) {
+ bd_synced_bytes += bd_cblock->len;
+ --bd_num_blocks;
+ bd_cblock++;
+ }
}
if (rr == 0) {
}
// else fetch next changed blocks
}
+ // blockdiff throughput
+ auto bd_e = clock::now();
+ blockdiff_time = seconds(bd_e - bd_s);
+ if ( r == 0 )
+ m_peer_replayer.set_blockdiff_metrics(m_dir_root, bd_synced_bytes, blockdiff_time.count());
ceph_file_blockdiff_finish(&info);
return r;
return out.str();
};
+double PeerReplayer::compute_eta(PeerReplayer::SnapSyncStat& sync_stat) {
+ // mlock is held by the caller
+
+ static constexpr uint64_t MIN_FILES_SAMPLE = 25;
+ static constexpr uint64_t MIN_BYTES_SAMPLE = 1ULL * 16 * 1024 * 1024; // 16MiB
+
+ double read_time = sync_stat.read_time_sec;
+ double write_time = sync_stat.write_time_sec;
+ double bytes_read = sync_stat.bytes_read;
+ double bytes_written = sync_stat.bytes_written;
+ uint64_t files_started = sync_stat.files_started;
+ uint64_t files_synced = sync_stat.sync_files;
+ double read_bps = read_time > 0 ? bytes_read / read_time : 0;
+ double write_bps = write_time > 0 ? bytes_written / write_time : 0;
+ uint64_t discovered_delta_bytes = sync_stat.discovered_delta_bytes;
+ uint64_t synced_bytes = sync_stat.actual_sync_bytes;
+ uint64_t file_synced_bytes = sync_stat.sync_bytes;
+ uint64_t total_files = sync_stat.total_files;
+
+ if (read_time == 0 || write_time == 0)
+ return -1.0; //Calculating
+ if (files_synced < MIN_FILES_SAMPLE && file_synced_bytes < MIN_BYTES_SAMPLE)
+ return -1.0; //Calculating
+
+
+ if (sync_stat.snapdiff) {
+ /* blockdiff :
+ * - total number of files with delta to be synced is known
+ * - delta of each file is unknown, the avg delta per file is estimated
+ * - effective bandwidth should accommodate blockdiff time
+ * - effective bandwidth is based on cumulative synced bytes and time taken, so
+ * it considers total average past performance and hence it is mostly pessimistic
+ */
+ double avg_delta_per_file = (double)discovered_delta_bytes / (double)files_started;
+ double estimated_total_delta = avg_delta_per_file * total_files;
+ double effective_bw = (double) sync_stat.blockdiff_sync_bytes / sync_stat.blockdiff_time_sec;
+
+ uint64_t remaining =
+ estimated_total_delta > synced_bytes
+ ? static_cast<uint64_t>(estimated_total_delta - synced_bytes)
+ : 0;
+
+ if (effective_bw <= 0)
+ return -1.0;
+ return remaining / effective_bw;
+ } else {
+ /* full sync :
+ * - effective bandwidth is mostly dependant on read and write, again
+ * it's cumulative, so prediction is based on average past performace.
+ */
+ uint64_t remaining = sync_stat.total_bytes - synced_bytes;
+ double effective_bw = std::min(read_bps, write_bps);
+ if (effective_bw <= 0)
+ return -1.0; //Calculating
+ return remaining / effective_bw;
+ }
+}
+
void PeerReplayer::peer_status(Formatter *f) {
std::scoped_lock locker(m_lock);
f->open_object_section("stats");
f->dump_string("sync_percent", os.str());
}
f->close_section(); //files
+ double eta = compute_eta(sync_stat);
+ if (eta == -1.0)
+ f->dump_string("eta", "calculating...");
+ else
+ f->dump_string("eta", format_time(compute_eta(sync_stat)));
f->close_section(); //current_syncing_snap
}
if (sync_stat.last_synced_snap) {
uint64_t bytes_written = 0; //actual bytes written counter, independently for each directory sync.
double read_time_sec = 0.0; //actual read time in seconds counter, independently for each directroy sync.
double write_time_sec = 0.0; //actual write time in seconds counter, independently for each directroy sync.
+ // eta related
+ // files_started will be ahead of sync_files as it's incremented just after delta discovery
+ uint64_t files_started = 0; //files picked up for sync counter, independently for each directory sync.
+ uint64_t actual_sync_bytes = 0; //actual bytes synced using delta, counter, independently for each directory sync.
+ uint64_t discovered_delta_bytes = 0; //discovered delta bytes counter, independently for each directory sync.
+ uint64_t blockdiff_sync_bytes = 0; //actual bytes synced using SnapDiff/blockdiff counter
+ double blockdiff_time_sec = 0.0; //actual sync time using SnapDiff/blockdiff counter
};
void _inc_failed_count(const std::string &dir_root) {
sync_stat.bytes_written = 0;
sync_stat.read_time_sec = 0.0;
sync_stat.write_time_sec = 0.0;
+ sync_stat.files_started = 0;
+ sync_stat.actual_sync_bytes = 0;
+ sync_stat.discovered_delta_bytes = 0;
+ sync_stat.blockdiff_sync_bytes = 0;
+ sync_stat.blockdiff_time_sec = 0.0;
}
void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id,
const std::string &snap_name) {
sync_stat.crawl_finished = state;
sync_stat.crawl_duration = seconds;
}
+ void set_blockdiff_metrics(const std::string &dir_root, const uint64_t bd_syncbytes, const double bd_time) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.blockdiff_sync_bytes += bd_syncbytes;
+ sync_stat.blockdiff_time_sec += bd_time;
+ }
void add_io(const std::string &dir_root, const uint64_t& br, const uint64_t bw,
const double rt, const double wt) {
std::scoped_lock locker(m_lock);
sync_stat.read_time_sec += rt;
sync_stat.write_time_sec += wt;
}
+ void inc_delta_bytes(const std::string &dir_root, const uint64_t& b) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.discovered_delta_bytes += b;
+ }
void inc_sync_bytes(const std::string &dir_root, const uint64_t& b) {
std::scoped_lock locker(m_lock);
auto &sync_stat = m_snap_sync_stats.at(dir_root);
sync_stat.sync_bytes += b;
}
+ void inc_actual_sync_bytes(const std::string &dir_root, const uint64_t& b) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.actual_sync_bytes += b;
+ }
+ void inc_files_started(const std::string &dir_root) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.files_started++;
+ }
void inc_sync_files(const std::string &dir_root) {
std::scoped_lock locker(m_lock);
auto &sync_stat = m_snap_sync_stats.at(dir_root);
// format routines for peer_status
static std::string format_bytes(double bytes);
static std::string format_time(double total_seconds);
+ static double compute_eta(SnapSyncStat& sync_stat);
};
} // namespace mirror