services:
- mds
min: 1
+- name: subv_metrics_window_interval
+ type: secs
+ level: dev
+ desc: subvolume metrics sliding window interval, seconds
+ long_desc: interval in seconds to hold values in sliding window for subvolume metrics, in the metrics aggregator
+ default: 60
+ min: 30
+ services:
+ - mds
\ No newline at end of file
#define CEPH_MDS_PERF_METRIC_TYPES_H
#include <ostream>
+#include <shared_mutex>
#include "common/Formatter.h"
#include "include/cephfs/types.h" // for mds_rank_t
WRITE_CLASS_DENC(OpenedInodesMetric)
WRITE_CLASS_DENC(ReadIoSizesMetric)
WRITE_CLASS_DENC(WriteIoSizesMetric)
+WRITE_CLASS_DENC(SubvolumeMetric)
// metrics that are forwarded to the MDS by client(s).
struct Metrics {
OpenedInodesMetric opened_inodes_metric;
ReadIoSizesMetric read_io_sizes_metric;
WriteIoSizesMetric write_io_sizes_metric;
+ SubvolumeMetric subvolume_metrics;
// metric update type
uint32_t update_type = UpdateType::UPDATE_TYPE_REFRESH;
DENC(Metrics, v, p) {
- DENC_START(4, 1, p);
+ DENC_START(5, 1, p);
denc(v.update_type, p);
denc(v.cap_hit_metric, p);
denc(v.read_latency_metric, p);
denc(v.read_io_sizes_metric, p);
denc(v.write_io_sizes_metric, p);
}
+ if (struct_v >= 5) {
+ denc(v.subvolume_metrics, p);
+ }
DENC_FINISH(p);
}
f->dump_object("opened_inodes_metric", opened_inodes_metric);
f->dump_object("read_io_sizes_metric", read_io_sizes_metric);
f->dump_object("write_io_sizes_metric", write_io_sizes_metric);
+ f->dump_object("subvolume_metrics", subvolume_metrics);
}
friend std::ostream& operator<<(std::ostream& os, const Metrics& metrics) {
<< ", opened_inodes_metric=" << metrics.opened_inodes_metric
<< ", read_io_sizes_metric=" << metrics.read_io_sizes_metric
<< ", write_io_sizes_metric=" << metrics.write_io_sizes_metric
+ << ", subvolume_metrics=" << metrics.subvolume_metrics
<< "}]";
return os;
}
version_t seq = 0;
mds_rank_t rank = MDS_RANK_NONE;
std::map<entity_inst_t, Metrics> client_metrics_map;
+ std::vector<SubvolumeMetric> subvolume_metrics;
metrics_message_t() {
}
void encode(bufferlist &bl, uint64_t features) const {
using ceph::encode;
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(seq, bl);
encode(rank, bl);
encode(client_metrics_map, bl, features);
+ encode(subvolume_metrics, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator &iter) {
using ceph::decode;
- DECODE_START(1, iter);
+ DECODE_START(2, iter);
decode(seq, iter);
decode(rank, iter);
decode(client_metrics_map, iter);
+ if (struct_v >= 2) {
+ decode(subvolume_metrics, iter);
+ }
DECODE_FINISH(iter);
}
f->dump_object("client", client);
f->dump_object("metrics", metrics);
}
+ f->open_array_section("subvolume_metrics");
+ for (const auto &metric : subvolume_metrics) {
+ f->open_object_section("metric");
+ metric.dump(f);
+ f->close_section();
+ }
+ f->close_section();
}
- friend std::ostream& operator<<(std::ostream& os, const metrics_message_t &metrics_message) {
- os << "[sequence=" << metrics_message.seq << ", rank=" << metrics_message.rank
- << ", metrics=" << metrics_message.client_metrics_map << "]";
- return os;
- }
+ friend std::ostream& operator<<(std::ostream& os, const metrics_message_t &m) {
+ os << "[sequence=" << m.seq << ", rank=" << m.rank
+ << ", client_metrics=" << m.client_metrics_map
+ << ", subvolume_metrics=" << m.subvolume_metrics << "]";
+ return os;
+ }
};
WRITE_CLASS_ENCODER_FEATURES(metrics_message_t)
l_mds_per_client_metrics_last
};
+enum {
+ l_subvolume_metrics_first = 30000,
+ l_subvolume_metrics_read_iops,
+ l_subvolume_metrics_read_tp_Bps,
+ l_subvolume_metrics_avg_read_latency,
+ l_subvolume_metrics_write_iops,
+ l_subvolume_metrics_write_tp_Bps,
+ l_subvolume_metrics_avg_write_latency,
+ l_subvolume_metrics_last_window_end,
+ l_subvolume_metrics_last_window,
+ l_subvolume_metrics_last
+};
+
MetricAggregator::MetricAggregator(CephContext *cct, MDSRank *mds, MgrClient *mgrc)
: Dispatcher(cct),
m_cct(cct),
return get_perf_reports();
});
+ subv_window_sec = g_conf().get_val<std::chrono::seconds>("subv_metrics_window_interval").count();
+ if (!subv_window_sec)
+ return -EINVAL;
+
return 0;
}
return false;
}
+void MetricAggregator::refresh_subvolume_metrics_for_rank(
+ mds_rank_t rank, const std::vector<SubvolumeMetric> &metrics) {
+ for (const auto &m : metrics) {
+ // Register labeled PerfCounters if needed
+ if (!subvolume_perf_counters.contains(m.subvolume_path)) {
+ std::string labels = ceph::perf_counters::key_create(
+ "mds_subvolume_metrics",
+ {{"subvolume_path", m.subvolume_path},
+ {"fs_name", std::string(mds->mdsmap->get_fs_name())}});
+ PerfCountersBuilder plb(m_cct, labels,
+ l_subvolume_metrics_first,
+ l_subvolume_metrics_last);
+ plb.add_u64(l_subvolume_metrics_read_iops, "avg_read_iops",
+ "Average read IOPS", "rops", PerfCountersBuilder::PRIO_CRITICAL);
+ plb.add_u64(l_subvolume_metrics_read_tp_Bps, "avg_read_tp_Bps",
+ "Average read throughput (Bps)", "rbps", PerfCountersBuilder::PRIO_CRITICAL);
+ plb.add_u64(l_subvolume_metrics_avg_read_latency, "avg_read_lat_msec",
+ "Average read latency (ms)", "rlav", PerfCountersBuilder::PRIO_CRITICAL);
+ plb.add_u64(l_subvolume_metrics_write_iops, "avg_write_iops",
+ "Average write IOPS", "wops", PerfCountersBuilder::PRIO_CRITICAL);
+ plb.add_u64(l_subvolume_metrics_write_tp_Bps, "avg_write_tp_Bps",
+ "Average write throughput (Bps)", "wbps", PerfCountersBuilder::PRIO_CRITICAL);
+ plb.add_u64(l_subvolume_metrics_avg_write_latency, "avg_write_lat_msec",
+ "Average write latency (ms)", "wlav", PerfCountersBuilder::PRIO_CRITICAL);
+
+ auto perf_counter = plb.create_perf_counters();
+ subvolume_perf_counters[m.subvolume_path] = perf_counter;
+ m_cct->get_perfcounters_collection()->add(perf_counter);
+
+ subvolume_aggregated_metrics.try_emplace(m.subvolume_path, subv_window_sec);
+ }
+
+ // Update sliding window
+ auto &tracker = subvolume_aggregated_metrics.at(m.subvolume_path);
+ tracker.add_value(m);
+ }
+
+ // Aggregate, update metrics, and clean stale subvolumes
+ for (auto it = subvolume_aggregated_metrics.begin(); it != subvolume_aggregated_metrics.end(); ) {
+ const std::string &path = it->first;
+ auto &tracker = it->second;
+ tracker.update();
+
+ if (tracker.is_empty()) {
+ dout(10) << "Removing stale subv_metric for path=" << path << ", window size:=" << subv_window_sec << dendl;
+
+ // Remove PerfCounters
+ auto counter_it = subvolume_perf_counters.find(path);
+ if (counter_it != subvolume_perf_counters.end()) {
+ m_cct->get_perfcounters_collection()->remove(counter_it->second);
+ delete counter_it->second;
+ subvolume_perf_counters.erase(counter_it);
+ }
+
+ // Remove PerfQuery entries
+ for (auto &[query, perf_key_map] : query_metrics_map) {
+ MDSPerfMetricKey key;
+ auto sub_key_func_cleanup = [this, &path](const MDSPerfMetricSubKeyDescriptor &desc,
+ MDSPerfMetricSubKey *sub_key) {
+ if (desc.type == MDSPerfMetricSubKeyType::SUBVOLUME_PATH) {
+ std::smatch match;
+ if (std::regex_search(path, match, desc.regex) && match.size() > 1) {
+ for (size_t i = 1; i < match.size(); ++i) {
+ sub_key->push_back(match[i].str());
+ }
+ return true;
+ }
+ } else if (desc.type == MDSPerfMetricSubKeyType::MDS_RANK) {
+ sub_key->push_back(std::to_string(mds->get_nodeid()));
+ return true;
+ }
+ return false;
+ };
+
+ if (query.get_key(sub_key_func_cleanup, &key)) {
+ if (perf_key_map.erase(key)) {
+ dout(15) << __func__ << ": Removed PerfQuery entry for subv_metric=" << path << dendl;
+ }
+ }
+ }
+
+ it = subvolume_aggregated_metrics.erase(it);
+ // removed stale, continue to the next one, no need to increment the iterator since erase returns the next one
+ continue;
+ } else {
+ tracker.update();
+
+ AggregatedSubvolumeMetric aggr_metric;
+ aggr_metric.subvolume_path = path;
+ aggr_metric.time_window_last_dur_sec = tracker.get_current_window_duration_sec();
+ aggr_metric.time_window_last_end_sec = tracker.get_time_from_last_sample();
+ if (aggr_metric.time_window_last_dur_sec == 0)
+ aggr_metric.time_window_last_dur_sec = 1; // avoid div-by-zero
+
+ uint64_t total_read_ops = 0, total_write_ops = 0;
+ uint64_t total_read_bytes = 0, total_write_bytes = 0;
+ uint64_t weighted_read_latency_sum = 0, weighted_write_latency_sum = 0;
+
+ tracker.for_each_value([&](const SubvolumeMetric &m) {
+ total_read_ops += m.read_ops;
+ total_write_ops += m.write_ops;
+ total_read_bytes += m.read_size;
+ total_write_bytes += m.write_size;
+ weighted_read_latency_sum += m.avg_read_latency * m.read_ops;
+ weighted_write_latency_sum += m.avg_write_latency * m.write_ops;
+ });
+
+ aggr_metric.read_iops = total_read_ops / aggr_metric.time_window_last_dur_sec;
+ aggr_metric.write_iops = total_write_ops / aggr_metric.time_window_last_dur_sec;
+ aggr_metric.read_tpBs = total_read_bytes / aggr_metric.time_window_last_dur_sec;
+ aggr_metric.write_tBps = total_write_bytes / aggr_metric.time_window_last_dur_sec;
+
+ aggr_metric.avg_read_latency = (total_read_ops > 0)
+ ? (weighted_read_latency_sum / total_read_ops) / 1000
+ : 0;
+ aggr_metric.avg_write_latency = (total_write_ops > 0)
+ ? (weighted_write_latency_sum / total_write_ops) / 1000
+ : 0;
+
+ // update PerfCounters
+ auto counter = subvolume_perf_counters[path];
+ ceph_assert(counter);
+ counter->set(l_subvolume_metrics_read_iops, aggr_metric.read_iops);
+ counter->set(l_subvolume_metrics_read_tp_Bps, aggr_metric.read_tpBs);
+ counter->set(l_subvolume_metrics_avg_read_latency, aggr_metric.avg_read_latency);
+ counter->set(l_subvolume_metrics_write_iops, aggr_metric.write_iops);
+ counter->set(l_subvolume_metrics_write_tp_Bps, aggr_metric.write_tBps);
+ counter->set(l_subvolume_metrics_avg_write_latency, aggr_metric.avg_write_latency);
+ counter->set(l_subvolume_metrics_last_window_end, aggr_metric.time_window_last_end_sec);
+ counter->set(l_subvolume_metrics_last_window, aggr_metric.time_window_last_dur_sec);
+
+ // Update query_metrics_map
+ auto sub_key_func_subvolume = [this, &path](const MDSPerfMetricSubKeyDescriptor &desc,
+ MDSPerfMetricSubKey *sub_key) {
+ if (desc.type == MDSPerfMetricSubKeyType::SUBVOLUME_PATH) {
+ std::smatch match;
+ if (std::regex_search(path, match, desc.regex) && match.size() > 1) {
+ for (size_t i = 1; i < match.size(); ++i) {
+ sub_key->push_back(match[i].str());
+ }
+ return true;
+ }
+ } else if (desc.type == MDSPerfMetricSubKeyType::MDS_RANK) {
+ sub_key->push_back(std::to_string(mds->get_nodeid()));
+ return true;
+ }
+ return false;
+ };
+
+ for (auto &[query, perf_key_map] : query_metrics_map) {
+ MDSPerfMetricKey key;
+ bool matched = query.get_key(sub_key_func_subvolume, &key);
+ if (!matched)
+ continue;
+
+ auto &perf_counters = perf_key_map[key];
+ if (perf_counters.empty()) {
+ perf_counters.resize(query.performance_counter_descriptors.size());
+ }
+
+ query.update_counters(
+ [&](const MDSPerformanceCounterDescriptor &desc, PerformanceCounter *counter) {
+ switch (desc.type) {
+ case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC:
+ counter->first = aggr_metric.read_iops;
+ break;
+ case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC:
+ counter->first = aggr_metric.write_iops;
+ break;
+ case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC:
+ counter->first = aggr_metric.read_tpBs;
+ break;
+ case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC:
+ counter->first = aggr_metric.write_tBps;
+ break;
+ case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC:
+ counter->first = aggr_metric.avg_read_latency;
+ break;
+ case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC:
+ counter->first = aggr_metric.avg_write_latency;
+ break;
+ default:
+ break;
+ }
+ },
+ &perf_counters);
+ }
+
+ // non stale metric, continue to the next one
+ ++it;
+ }
+ }
+}
+
void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client,
mds_rank_t rank, const Metrics &metrics) {
dout(20) << ": client=" << client << ", rank=" << rank << ", metrics="
c->second = metrics.metadata_latency_metric.count;
}
break;
+ // subvolume metrics are handled in refresh_subvolume_metrics_for_rank()
+ case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC:
+ break;
default:
ceph_abort_msg("unknown counter type");
}
case MDSPerfMetricSubKeyType::CLIENT_ID:
match_string = stringify(client);
break;
+ // subvolumes metrics are handled in refresh_subvolume_metrics_for_rank()
+ case MDSPerfMetricSubKeyType::SUBVOLUME_PATH:
+ return false;
+ break;
default:
ceph_abort_msg("unknown counter type");
}
case MDSPerfMetricSubKeyType::CLIENT_ID:
match_string = stringify(client);
break;
+ // subvolume metrics are handled in refresh_subvolume_metrics_for_rank()
+ case MDSPerfMetricSubKeyType::SUBVOLUME_PATH:
+ break;
default:
ceph_abort_msg("unknown counter type");
}
ceph_abort();
}
}
+
+ refresh_subvolume_metrics_for_rank(rank, metrics_message.subvolume_metrics);
}
void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) {
const MDSConfigPayload &mds_config_payload = std::get<MDSConfigPayload>(config_payload);
const std::map<MDSPerfMetricQuery, MDSPerfMetricLimits> &queries = mds_config_payload.config;
- dout(10) << ": setting " << queries.size() << " queries" << dendl;
+ dout(10) << ": setting " << queries.size() << " perf_queries" << dendl;
std::scoped_lock locker(lock);
std::map<MDSPerfMetricQuery, std::map<MDSPerfMetricKey, PerformanceCounters>> new_data;
for (auto &p : queries) {
+ dout(10) << ": perf_query " << p << dendl;
std::swap(new_data[p.first], query_metrics_map[p.first]);
}
std::swap(query_metrics_map, new_data);
PerfCounters *m_perf_counters;
std::map<std::pair<entity_inst_t, mds_rank_t>, PerfCounters*> client_perf_counters;
+ uint64_t subv_window_sec;
+ std::unordered_map<std::string, SlidingWindowTracker<SubvolumeMetric>> subvolume_aggregated_metrics;
+ std::map<std::string, PerfCounters*> subvolume_perf_counters;
void handle_mds_metrics(const cref_t<MMDSMetrics> &m);
void refresh_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank,
const Metrics &metrics);
+ void refresh_subvolume_metrics_for_rank(mds_rank_t rank, const std::vector<SubvolumeMetric> &metrics);
void remove_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank, bool remove);
void cull_metrics_for_rank(mds_rank_t rank);
case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC:
case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC:
case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC:
break;
default:
ceph_abort_msg("unknown counter type");
case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC:
case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC:
case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC:
break;
default:
ceph_abort_msg("unknown counter type");
case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC:
os << "stdev_metadata_latency";
break;
+ case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC:
+ os << "subv_read_iops";
+ break;
+ case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC:
+ os << "subv_write_iops";
+ break;
+ case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC:
+ os << "subv_last_read_tp_Bps";
+ break;
+ case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC:
+ os << "subv_last_write_tp_Bps";
+ break;
+ case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC:
+ os << "subv_avg_read_latency";
+ break;
+ case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC:
+ os << "subv_avg_write_latency";
+ break;
}
return os;
enum class MDSPerfMetricSubKeyType : uint8_t {
MDS_RANK = 0,
CLIENT_ID = 1,
+ SUBVOLUME_PATH = 2,
};
struct MDSPerfMetricSubKeyDescriptor {
switch (type) {
case MDSPerfMetricSubKeyType::MDS_RANK:
case MDSPerfMetricSubKeyType::CLIENT_ID:
+ case MDSPerfMetricSubKeyType::SUBVOLUME_PATH:
return true;
default:
return false;
STDEV_WRITE_LATENCY_METRIC = 13,
AVG_METADATA_LATENCY_METRIC = 14,
STDEV_METADATA_LATENCY_METRIC = 15,
+ SUBV_READ_IOPS_METRIC = 16,
+ SUBV_WRITE_IOPS_METRIC = 17,
+ SUBV_READ_THROUGHPUT_METRIC = 18,
+ SUBV_WRITE_THROUGHPUT_METRIC = 19,
+ SUBV_AVG_READ_LATENCY_METRIC = 20,
+ SUBV_AVG_WRITE_LATENCY_METRIC = 21
};
struct MDSPerformanceCounterDescriptor {
case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC:
case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC:
case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC:
+ case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC:
+ case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC:
return true;
default:
return false;
}
};
+// all latencies are converted to millisec during the aggregation
+struct AggregatedSubvolumeMetric {
+ std::string subvolume_path;
+
+ uint64_t read_iops = 0;
+ uint64_t write_iops = 0;
+ uint64_t read_tpBs = 0;
+ uint64_t write_tBps = 0;
+
+ uint64_t min_read_latency = std::numeric_limits<uint64_t>::max();
+ uint64_t max_read_latency = 0;
+ uint64_t avg_read_latency = 0;
+
+ uint64_t min_write_latency = std::numeric_limits<uint64_t>::max();
+ uint64_t max_write_latency = 0;
+ uint64_t avg_write_latency = 0;
+
+ uint64_t time_window_last_end_sec = 0;
+ uint64_t time_window_last_dur_sec = 0;
+
+ void dump(ceph::Formatter* f) const {
+ f->dump_string("subvolume_path", subvolume_path);
+ f->dump_unsigned("read_iops", read_iops);
+ f->dump_unsigned("write_iops", write_iops);
+ f->dump_unsigned("read_tpBs", read_tpBs);
+ f->dump_unsigned("write_tBps", write_tBps);
+
+ f->dump_unsigned("min_read_latency_ns", min_read_latency);
+ f->dump_unsigned("max_read_latency_ns", max_read_latency);
+ f->dump_unsigned("avg_read_latency_ns", avg_read_latency);
+
+ f->dump_unsigned("min_write_latency_ns", min_write_latency);
+ f->dump_unsigned("max_write_latency_ns", max_write_latency);
+ f->dump_unsigned("avg_write_latency_ns", avg_write_latency);
+
+ f->dump_unsigned("time_window_sec_end", time_window_last_end_sec);
+ f->dump_unsigned("time_window_sec_dur", time_window_last_dur_sec);
+ }
+};
+
+using TimePoint = std::chrono::steady_clock::time_point;
+using Duration = std::chrono::steady_clock::duration;
+
+template <typename T>
+struct DataPoint {
+ TimePoint timestamp;
+ T value;
+};
+
+/**
+* @brief Holds a collection of I/O performance metrics for a specific storage subvolume.
+*
+* Simple sliding window to hold values for some period of time, allows to iterate over values
+* to calculate whatever is needed.
+* See for_each_value usage in the MetricsHandler.cc
+*/
+template <typename T>
+class SlidingWindowTracker {
+public:
+ explicit SlidingWindowTracker(uint64_t window_duration_seconds)
+ : window_duration(std::chrono::seconds(window_duration_seconds))
+ {}
+
+ void add_value(const T& value, TimePoint timestamp = std::chrono::steady_clock::now()) {
+ std::unique_lock lock(data_lock);
+ data_points.push_back({timestamp, value});
+ }
+
+ // prune old data
+ void update() {
+ std::unique_lock lock(data_lock);
+ prune_old_data(std::chrono::steady_clock::now());
+ }
+
+ // Call function on each value in window
+ template <typename Fn>
+ void for_each_value(Fn&& fn) const {
+ std::shared_lock lock(data_lock);
+ for (const auto& dp : data_points) {
+ fn(dp.value);
+ }
+ }
+
+ uint64_t get_current_window_duration_sec() const {
+ std::shared_lock lock(data_lock);
+ if (data_points.size() < 2) {
+ return 0;
+ }
+ auto duration = data_points.back().timestamp - data_points.front().timestamp;
+ return std::chrono::duration_cast<std::chrono::seconds>(duration).count();
+ }
+
+ bool is_empty() const {
+ std::shared_lock lock(data_lock);
+ return data_points.empty();
+ }
+
+ uint64_t get_time_from_last_sample() const {
+ if (data_points.empty()) {
+ return std::numeric_limits<uint64_t>::max();
+ }
+ auto duration = std::chrono::steady_clock::now() - data_points.back().timestamp;
+ return std::chrono::duration_cast<std::chrono::seconds>(duration).count();
+ }
+ private:
+ void prune_old_data(TimePoint now) {
+ TimePoint window_start = now - window_duration;
+ while (!data_points.empty() && data_points.front().timestamp < window_start) {
+ data_points.pop_front();
+ }
+ }
+
+ mutable std::shared_mutex data_lock;
+ std::deque<DataPoint<T>> data_points;
+ Duration window_duration;
+};
+
WRITE_CLASS_DENC(MDSPerfMetrics)
WRITE_CLASS_DENC(MDSPerfMetricReport)