void *ptr;
struct iovec iov[NR_IOVECS];
+ using clock = std::chrono::steady_clock;
+ using seconds = std::chrono::duration<double>;
+ uint64_t bytes_read = 0;
+ uint64_t bytes_written = 0;
+ uint64_t discovered_delta = 0;
+ seconds read_time{0};
+ seconds write_time{0};
+
+ inc_files_started(dir_root);
int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0);
if (r < 0) {
derr << ": failed to open local file path=" << epath << ": "
while (num_blocks > 0) {
auto offset = b->offset;
auto len = b->len;
+ discovered_delta += b->len;
+ inc_delta_bytes(dir_root, discovered_delta);
dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", block: ["
<< offset << "~" << len << "]" << dendl;
}
}
+ auto rs = clock::now();
r = ceph_preadv(m_local_mount, l_fd, iov, num_buffers, offset);
+ auto re = clock::now();
+ read_time += seconds(re - rs);
if (r < 0) {
derr << ": failed to read local file path=" << epath << ": "
<< cpp_strerror(r) << dendl;
break;
}
+ bytes_read += r;
dout(10) << ": read: " << r << " bytes" << dendl;
if (r == 0) {
break;
}
dout(10) << ": writing to offset: " << offset << dendl;
+ auto ws = clock::now();
r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, offset);
+ auto we = clock::now();
+ write_time += seconds(we - ws);
if (r < 0) {
derr << ": failed to write remote file path=" << epath << ": "
<< cpp_strerror(r) << dendl;
break;
}
+ bytes_written += r;
+ inc_actual_sync_bytes(dir_root, r);
offset += r;
}
++b;
}
+ //io accounting for eta
+ add_io(dir_root, bytes_read, bytes_written, read_time.count(), write_time.count());
+
if (num_blocks == 0 && r >= 0) { // handle blocklist case
dout(20) << ": truncating epath=" << epath << " to " << stx.stx_size << " bytes"
<< dendl;
dout(20) << ": dir_root=" << m_dir_root << ", epath=" << epath
<< ", sync_check=" << sync_check << dendl;
+ using clock = std::chrono::steady_clock;
+ using seconds = std::chrono::duration<double>;
+ seconds blockdiff_time{0};
+ uint64_t bd_synced_bytes = 0;
+
if (!sync_check) {
- return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_s = clock::now();
+ int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_e = clock::now();
+ blockdiff_time = seconds(bd_s - bd_e);
+ bd_synced_bytes = stx.stx_size;
+ m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count());
+ return r;
}
ceph_file_blockdiff_info info;
if (r < 0) {
dout(20) << ": new file epath=" << epath << dendl;
- return SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_s = clock::now();
+ int r = SyncMechanism::get_changed_blocks(epath, stx, sync_check, callback);
+ auto bd_e = clock::now();
+ blockdiff_time = seconds(bd_s - bd_e);
+ bd_synced_bytes = stx.stx_size;
+ m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count());
+ return r;
}
r = 1;
+ auto bd_s = clock::now();
while (true) {
ceph_file_blockdiff_changedblocks blocks;
r = ceph_file_blockdiff(&info, &blocks);
int rr = r;
if (blocks.num_blocks) {
+ auto bd_num_blocks = blocks.num_blocks;
+ auto bd_cblock = blocks.b;
+ while(bd_num_blocks > 0) {
+ bd_synced_bytes += bd_cblock->len;
+ --bd_num_blocks;
+ bd_cblock++;
+ }
r = callback(blocks.num_blocks, blocks.b);
ceph_free_file_blockdiff_buffer(&blocks);
if (r < 0) {
}
// else fetch next changed blocks
}
+ // blockdiff bandwidth
+ auto bd_e = clock::now();
+ blockdiff_time = seconds(bd_e - bd_s);
+ m_peer_replayer.set_blockdiff_bw(m_dir_root, bd_synced_bytes, blockdiff_time.count());
ceph_file_blockdiff_finish(&info);
return r;
if (fh.p_mnt == m_local_mount) {
syncm = std::make_shared<SnapDiffSync>(*this, dir_root, m_local_mount, m_remote_mount,
&fh, m_peer, current, prev);
-
+ set_snapdiff(dir_root, true); //for stats
} else {
syncm = std::make_shared<RemoteSync>(*this, dir_root, m_local_mount, m_remote_mount,
&fh, m_peer, current, boost::none);
+ set_snapdiff(dir_root, false); //for stats
}
r = syncm->init_sync();
}
}
+std::string format_eta(double eta_sec) {
+ if (eta_sec < 0) {
+ return "calculating...";
+ }
+ uint64_t sec = static_cast<uint64_t>(eta_sec + 0.5); // round
+ uint64_t days = sec / 86400;
+ sec %= 86400;
+ uint64_t hours = sec / 3600;
+ sec %= 3600;
+ uint64_t minutes = sec / 60;
+ sec %= 60;
+
+ std::ostringstream out;
+ if (days > 0)
+ out << days << "d ";
+ if (hours > 0 || days > 0)
+ out << hours << "h ";
+ if (minutes > 0 || hours > 0 || days > 0)
+ out << minutes << "m ";
+ out << sec << "s";
+ return out.str();
+}
+
+double PeerReplayer::compute_eta(PeerReplayer::SnapSyncStat& sync_stat) {
+ // mlock is held by the caller
+
+ static constexpr uint64_t MIN_FILES_SAMPLE = 25;
+ static constexpr uint64_t MIN_BYTES_SAMPLE = 1ULL * 16 * 1024 * 1024; // 16MiB
+
+ double read_time = sync_stat.read_time_sec;
+ double write_time = sync_stat.write_time_sec;
+ double bytes_read = sync_stat.bytes_read;
+ double bytes_written = sync_stat.bytes_written;
+ uint64_t files_started = sync_stat.files_started;
+ double read_bw = bytes_read/read_time;
+ double write_bw = bytes_written/write_time;
+ uint64_t discovered_delta_bytes = sync_stat.discovered_delta_bytes;
+ uint64_t synced_bytes = sync_stat.actual_sync_bytes;
+ uint64_t file_synced_bytes = sync_stat.sync_bytes;
+ uint64_t total_files = sync_stat.total_files;
+
+ if (read_time == 0 || write_time == 0)
+ return -1.0; //Calculating
+ if (files_started < MIN_FILES_SAMPLE)
+ return -1.0; //Calculating
+ if (file_synced_bytes < MIN_BYTES_SAMPLE)
+ return -1.0; //Calculating
+
+
+ if (sync_stat.snapdiff) {
+ /* blockdiff :
+ * - total number of files with delta to be synced is known
+ * - delta of each file is unknown, the avg delta per file is estimated
+ * - effective bandwidth should accommodate blockdiff time
+ * - effective bandwidth is based cumulative synced bytes and time taken, so
+ * it considers total average past performance and hence it is mostly pessimistic
+ */
+ double avg_delta_per_file = (double)discovered_delta_bytes / (double)files_started;
+ double estimated_total_delta = avg_delta_per_file * total_files;
+ double effective_bw = (double) sync_stat.bd_sync_bytes / sync_stat.blockdiff_time_sec;
+
+ uint64_t remaining =
+ estimated_total_delta > synced_bytes
+ ? static_cast<uint64_t>(estimated_total_delta - synced_bytes)
+ : 0;
+
+ if (effective_bw <= 0)
+ return -1.0;
+ return remaining / effective_bw;
+ } else {
+ /* full sync :
+ * - effective bandwidth is mostly dependant on read and write, again
+ * it's cumulative, so prediction is based on average past performace.
+ */
+ uint64_t remaining = sync_stat.total_bytes - synced_bytes;
+ double effective_bw = std::min(read_bw, write_bw);
+ if (effective_bw <= 0)
+ return -1.0; //Calculating
+ return remaining / effective_bw;
+ }
+}
+
void PeerReplayer::peer_status(Formatter *f) {
std::scoped_lock locker(m_lock);
f->open_object_section("stats");
f->dump_string("state", "idle");
} else {
f->dump_string("state", "syncing");
+ if (sync_stat.snapdiff)
+ f->dump_string("sync-mode", "delta");
+ else
+ f->dump_string("sync-mode", "full");
f->open_object_section("current_syncing_snap");
f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first);
f->dump_string("name", (*sync_stat.current_syncing_snap).second);
f->dump_string("sync_percent", os.str());
}
f->close_section(); //files
+ f->dump_string("eta", format_eta(compute_eta(sync_stat)));
f->close_section(); //current_syncing_snap
}
if (sync_stat.last_synced_snap) {
uint64_t sync_bytes = 0; //sync bytes counter, independently for each directory sync.
uint64_t total_bytes = 0; //total bytes counter, independently for each directory sync.
uint64_t sync_files = 0; //sync files counter, independently for each directory sync.
+ // files_started will be ahead of sync_files as it's incremented just after delta discovery
+ uint64_t files_started = 0; //files picked up for sync counter, independently for each directory sync.
uint64_t total_files = 0; //total files counter, independently for each directory sync.
+ bool snapdiff = false; // RemoteSync or Snapdiff
+ // actual io accounting
+ uint64_t bytes_read = 0; //actual bytes read counter, independently for each directory sync.
+ uint64_t bytes_written = 0; //actual bytes written counter, independently for each directory sync.
+ double read_time_sec = 0.0; //actual read time in seconds counter, independently for each directroy sync.
+ double write_time_sec = 0.0; //actual write time in seconds counter, independently for each directroy sync.
+ uint64_t actual_sync_bytes = 0; //actual bytes synced using delta, counter, independently for each directory sync.
+ uint64_t discovered_delta_bytes = 0; //discovered delta bytes counter, independently for each directory sync.
+ uint64_t bd_sync_bytes = 0; //actual bytes synced using SnapDiff/blockdiff counter
+ double blockdiff_time_sec = 0.0; //actual sync time using SnapDiff/blockdiff counter
};
void _inc_failed_count(const std::string &dir_root) {
_set_last_synced_snap(dir_root, snap_id, snap_name);
auto &sync_stat = m_snap_sync_stats.at(dir_root);
sync_stat.sync_bytes = 0;
+ sync_stat.actual_sync_bytes = 0;
+ sync_stat.bd_sync_bytes = 0;
sync_stat.total_bytes = 0;
+ sync_stat.discovered_delta_bytes = 0;
sync_stat.sync_files = 0;
sync_stat.total_files = 0;
+ sync_stat.files_started = 0;
+ sync_stat.bd_sync_bytes = 0;
+ sync_stat.blockdiff_time_sec = 0;
}
void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id,
const std::string &snap_name) {
sync_stat.last_sync_files = sync_stat.sync_files;
++sync_stat.synced_snap_count;
}
+ void set_snapdiff(const std::string &dir_root, bool snapdiff) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.snapdiff = snapdiff;
+ }
+ void set_blockdiff_bw(const std::string &dir_root, const uint64_t bd_syncbytes, const double bd_time) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.bd_sync_bytes += bd_syncbytes;
+ sync_stat.blockdiff_time_sec += bd_time;
+ }
+ void add_io(const std::string &dir_root, const uint64_t& br, const uint64_t bw,
+ const double rt, const double wt) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.bytes_read += br;
+ sync_stat.bytes_written += bw;
+ sync_stat.read_time_sec += rt;
+ sync_stat.write_time_sec += wt;
+ }
+ void inc_delta_bytes(const std::string &dir_root, const uint64_t& b) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.discovered_delta_bytes += b;
+ }
void inc_sync_bytes(const std::string &dir_root, const uint64_t& b) {
std::scoped_lock locker(m_lock);
auto &sync_stat = m_snap_sync_stats.at(dir_root);
sync_stat.sync_bytes += b;
}
+ void inc_actual_sync_bytes(const std::string &dir_root, const uint64_t& b) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.actual_sync_bytes += b;
+ }
+ void inc_files_started(const std::string &dir_root) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.files_started++;
+ }
void inc_sync_files(const std::string &dir_root) {
std::scoped_lock locker(m_lock);
auto &sync_stat = m_snap_sync_stats.at(dir_root);
// add syncm to syncm_q
void enqueue_syncm(const std::shared_ptr<SyncMechanism>& item);
+ double compute_eta(SnapSyncStat& sync_stat);
};
} // namespace mirror