namespace {
+double round_to_two_places(double value) {
+ return abs(round(value * 100) / 100);
+}
+
template<typename I>
std::pair<uint64_t, librbd::SnapInfo*> get_newest_mirror_snapshot(
I* image_ctx) {
}
void handle_read(uint64_t bytes_read) override {
+ replayer->handle_copy_image_read(bytes_read);
}
int update_progress(uint64_t object_number, uint64_t object_count) override {
static_cast<float>(std::max<uint64_t>(1U, m_local_object_count)));
}
+ m_bytes_per_second(0);
+ auto bytes_per_second = m_bytes_per_second.get_average();
+ root_obj["bytes_per_second"] = round_to_two_places(bytes_per_second);
+
+ auto bytes_per_snapshot = boost::accumulators::rolling_mean(
+ m_bytes_per_snapshot);
+ root_obj["bytes_per_snapshot"] = round_to_two_places(bytes_per_snapshot);
+
+ auto pending_bytes = bytes_per_snapshot * m_pending_snapshots;
+ if (bytes_per_second > 0 && m_pending_snapshots > 0) {
+ auto seconds_until_synced = round_to_two_places(
+ pending_bytes / bytes_per_second);
+ if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
+ seconds_until_synced = std::numeric_limits<uint64_t>::max();
+ }
+
+ root_obj["seconds_until_synced"] = static_cast<uint64_t>(
+ seconds_until_synced);
+ }
+
*description = json_spirit::write(
root_obj, json_spirit::remove_trailing_zeros);
std::unique_lock<ceph::mutex>* locker) {
dout(10) << dendl;
+ m_pending_snapshots = 0;
+
bool split_brain = false;
bool remote_demoted = false;
auto remote_image_ctx = m_state_builder->remote_image_ctx;
continue;
}
+ // found candidate snapshot to sync
+ ++m_pending_snapshots;
+ if (m_remote_snap_id_end != CEPH_NOSNAP) {
+ continue;
+ }
+
m_remote_snap_id_end = remote_snap_id;
m_remote_mirror_snap_ns = *mirror_ns;
- break;
}
image_locker.unlock();
<< m_local_mirror_snap_ns.last_copied_object_number << ", "
<< "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
+ m_snapshot_bytes = 0;
m_deep_copy_handler = new DeepCopyHandler(this);
auto ctx = create_context_callback<
Replayer<I>, &Replayer<I>::handle_copy_image>(this);
return;
}
+ {
+ std::unique_lock locker{m_lock};
+ m_bytes_per_snapshot(m_snapshot_bytes);
+ m_snapshot_bytes = 0;
+ }
+
apply_image_state();
}
update_non_primary_snapshot(false);
}
+template <typename I>
+void Replayer<I>::handle_copy_image_read(uint64_t bytes_read) {
+ dout(20) << "bytes_read=" << bytes_read << dendl;
+
+ std::unique_lock locker{m_lock};
+ m_bytes_per_second(bytes_read);
+ m_snapshot_bytes += bytes_read;
+}
+
template <typename I>
void Replayer<I>::apply_image_state() {
dout(10) << dendl;
#include "common/AsyncOpTracker.h"
#include "cls/rbd/cls_rbd_types.h"
#include "librbd/mirror/snapshot/Types.h"
+#include "tools/rbd_mirror/image_replayer/TimeRollingMean.h"
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/rolling_mean.hpp>
#include <string>
#include <type_traits>
librbd::mirror::snapshot::ImageState m_image_state;
DeepCopyHandler* m_deep_copy_handler = nullptr;
+ TimeRollingMean m_bytes_per_second;
+
+ uint64_t m_snapshot_bytes = 0;
+ boost::accumulators::accumulator_set<
+ uint64_t, boost::accumulators::stats<
+ boost::accumulators::tag::rolling_mean>> m_bytes_per_snapshot{
+ boost::accumulators::tag::rolling_window::window_size = 2};
+
+ uint32_t m_pending_snapshots = 0;
+
bool m_remote_image_updated = false;
bool m_updating_sync_point = false;
bool m_sync_in_progress = false;
void handle_copy_image(int r);
void handle_copy_image_progress(uint64_t object_number,
uint64_t object_count);
+ void handle_copy_image_read(uint64_t bytes_read);
void apply_image_state();
void handle_apply_image_state(int r);