s_instance = nullptr;
}
+ MOCK_METHOD1(handle_entry_processed, void(uint64_t));
MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish));
};
image_replayer/PrepareLocalImageRequest.cc
image_replayer/PrepareRemoteImageRequest.cc
image_replayer/StateBuilder.cc
+ image_replayer/TimeRollingMean.cc
image_replayer/Utils.cc
image_replayer/journal/CreateLocalImageRequest.cc
image_replayer/journal/EventPreprocessor.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tools/rbd_mirror/image_replayer/TimeRollingMean.h"
+#include "common/Clock.h"
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+void TimeRollingMean::operator()(uint32_t value) {
+ auto time = ceph_clock_now();
+ if (m_last_time.is_zero()) {
+ m_last_time = time;
+ } else if (m_last_time.sec() < time.sec()) {
+ auto sec = m_last_time.sec();
+ while (sec++ < time.sec()) {
+ m_rolling_mean(m_sum);
+ m_sum = 0;
+ }
+
+ m_last_time = time;
+ }
+
+ m_sum += value;
+}
+
+double TimeRollingMean::get_average() const {
+ return boost::accumulators::rolling_mean(m_rolling_mean);
+}
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_TIME_ROLLING_MEAN_H
+#define RBD_MIRROR_IMAGE_REPLAYER_TIME_ROLLING_MEAN_H
+
+#include "include/utime.h"
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/rolling_mean.hpp>
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+
+class TimeRollingMean {
+public:
+
+ void operator()(uint32_t value);
+
+ double get_average() const;
+
+private:
+ typedef boost::accumulators::accumulator_set<
+ uint64_t, boost::accumulators::stats<
+ boost::accumulators::tag::rolling_mean>> RollingMean;
+
+ utime_t m_last_time;
+ uint64_t m_sum = 0;
+
+ RollingMean m_rolling_mean{
+ boost::accumulators::tag::rolling_window::window_size = 30};
+
+};
+
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_TIME_ROLLING_MEAN_H
namespace {
+double round_to_two_places(double value) {
+ return abs(round(value * 100) / 100);
+}
+
json_spirit::mObject to_json_object(
const cls::journal::ObjectPosition& position) {
json_spirit::mObject object;
m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) {
}
+template <typename I>
+void ReplayStatusFormatter<I>::handle_entry_processed(uint32_t bytes) {
+ dout(20) << dendl;
+
+ m_bytes_per_second(bytes);
+ m_entries_per_second(1);
+}
+
template <typename I>
bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
Context *on_finish) {
template <typename I>
void ReplayStatusFormatter<I>::format(std::string *description) {
-
dout(20) << "m_master_position=" << m_master_position
<< ", m_mirror_position=" << m_mirror_position
<< ", m_entries_behind_master=" << m_entries_behind_master << dendl;
root_obj["non_primary_position"] = to_json_object(m_mirror_position);
root_obj["entries_behind_primary"] = (
m_entries_behind_master > 0 ? m_entries_behind_master : 0);
- *description = json_spirit::write(root_obj);
+
+ m_bytes_per_second(0);
+ root_obj["bytes_per_second"] = round_to_two_places(
+ m_bytes_per_second.get_average());
+
+ m_entries_per_second(0);
+ auto entries_per_second = m_entries_per_second.get_average();
+ root_obj["entries_per_second"] = round_to_two_places(entries_per_second);
+
+ if (m_entries_behind_master > 0 && entries_per_second > 0) {
+ auto seconds_until_synced = round_to_two_places(
+ m_entries_behind_master / entries_per_second);
+ if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
+ seconds_until_synced = std::numeric_limits<uint64_t>::max();
+ }
+
+ root_obj["seconds_until_synced"] = static_cast<uint64_t>(
+ seconds_until_synced);
+ }
+
+ *description = json_spirit::write(
+ root_obj, json_spirit::remove_trailing_zeros);
}
} // namespace journal
#include "cls/journal/cls_journal_types.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
+#include "tools/rbd_mirror/image_replayer/TimeRollingMean.h"
namespace journal { class Journaler; }
namespace librbd { class ImageCtx; }
ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
+ void handle_entry_processed(uint32_t bytes);
+
bool get_or_send_update(std::string *description, Context *on_finish);
private:
cls::journal::Tag m_tag;
std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
+ TimeRollingMean m_bytes_per_second;
+ TimeRollingMean m_entries_per_second;
+
bool calculate_behind_master_or_send_update();
void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
}
}
+ m_replay_status_formatter->handle_entry_processed(m_replay_bytes);
+
if (update_status) {
unregister_perf_counters();
register_perf_counters();