From: Igor Golikov Date: Thu, 10 Jul 2025 10:18:57 +0000 (+0000) Subject: mds: aggregate and expose subvolume metrics X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a49ba9d27ab93fe3466822d8865c6112eea86c09;p=ceph.git mds: aggregate and expose subvolume metrics rank0 periodically receives subvolume metrics from other MDS instances and aggregate subvolume metrics using sliding window. The MetricsAggregator exposes PerfCounters and PerfQueries for these metrics. Fixes: https://tracker.ceph.com/issues/68931 Signed-off-by: Igor Golikov --- diff --git a/src/common/options/mds.yaml.in b/src/common/options/mds.yaml.in index e9cef8ba944..54f3458012f 100644 --- a/src/common/options/mds.yaml.in +++ b/src/common/options/mds.yaml.in @@ -1803,3 +1803,12 @@ options: services: - mds min: 1 +- name: subv_metrics_window_interval + type: secs + level: dev + desc: subvolume metrics sliding window interval, seconds + long_desc: interval in seconds to hold values in sliding window for subvolume metrics, in the metrics aggregator + default: 60 + min: 30 + services: + - mds \ No newline at end of file diff --git a/src/mds/MDSPerfMetricTypes.h b/src/mds/MDSPerfMetricTypes.h index a5bc1733940..d540f039faa 100644 --- a/src/mds/MDSPerfMetricTypes.h +++ b/src/mds/MDSPerfMetricTypes.h @@ -5,6 +5,7 @@ #define CEPH_MDS_PERF_METRIC_TYPES_H #include +#include #include "common/Formatter.h" #include "include/cephfs/types.h" // for mds_rank_t @@ -298,6 +299,7 @@ WRITE_CLASS_DENC(PinnedIcapsMetric) WRITE_CLASS_DENC(OpenedInodesMetric) WRITE_CLASS_DENC(ReadIoSizesMetric) WRITE_CLASS_DENC(WriteIoSizesMetric) +WRITE_CLASS_DENC(SubvolumeMetric) // metrics that are forwarded to the MDS by client(s). struct Metrics { @@ -312,12 +314,13 @@ struct Metrics { OpenedInodesMetric opened_inodes_metric; ReadIoSizesMetric read_io_sizes_metric; WriteIoSizesMetric write_io_sizes_metric; + SubvolumeMetric subvolume_metrics; // metric update type uint32_t update_type = UpdateType::UPDATE_TYPE_REFRESH; DENC(Metrics, v, p) { - DENC_START(4, 1, p); + DENC_START(5, 1, p); denc(v.update_type, p); denc(v.cap_hit_metric, p); denc(v.read_latency_metric, p); @@ -335,6 +338,9 @@ struct Metrics { denc(v.read_io_sizes_metric, p); denc(v.write_io_sizes_metric, p); } + if (struct_v >= 5) { + denc(v.subvolume_metrics, p); + } DENC_FINISH(p); } @@ -350,6 +356,7 @@ struct Metrics { f->dump_object("opened_inodes_metric", opened_inodes_metric); f->dump_object("read_io_sizes_metric", read_io_sizes_metric); f->dump_object("write_io_sizes_metric", write_io_sizes_metric); + f->dump_object("subvolume_metrics", subvolume_metrics); } friend std::ostream& operator<<(std::ostream& os, const Metrics& metrics) { @@ -364,6 +371,7 @@ struct Metrics { << ", opened_inodes_metric=" << metrics.opened_inodes_metric << ", read_io_sizes_metric=" << metrics.read_io_sizes_metric << ", write_io_sizes_metric=" << metrics.write_io_sizes_metric + << ", subvolume_metrics=" << metrics.subvolume_metrics << "}]"; return os; } @@ -374,6 +382,7 @@ struct metrics_message_t { version_t seq = 0; mds_rank_t rank = MDS_RANK_NONE; std::map client_metrics_map; + std::vector subvolume_metrics; metrics_message_t() { } @@ -383,19 +392,23 @@ struct metrics_message_t { void encode(bufferlist &bl, uint64_t features) const { using ceph::encode; - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(seq, bl); encode(rank, bl); encode(client_metrics_map, bl, features); + encode(subvolume_metrics, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator &iter) { using ceph::decode; - DECODE_START(1, iter); + DECODE_START(2, iter); decode(seq, iter); decode(rank, iter); decode(client_metrics_map, iter); + if (struct_v >= 2) { + decode(subvolume_metrics, iter); + } DECODE_FINISH(iter); } @@ -406,13 +419,21 @@ struct metrics_message_t { f->dump_object("client", client); f->dump_object("metrics", metrics); } + f->open_array_section("subvolume_metrics"); + for (const auto &metric : subvolume_metrics) { + f->open_object_section("metric"); + metric.dump(f); + f->close_section(); + } + f->close_section(); } - friend std::ostream& operator<<(std::ostream& os, const metrics_message_t &metrics_message) { - os << "[sequence=" << metrics_message.seq << ", rank=" << metrics_message.rank - << ", metrics=" << metrics_message.client_metrics_map << "]"; - return os; - } + friend std::ostream& operator<<(std::ostream& os, const metrics_message_t &m) { + os << "[sequence=" << m.seq << ", rank=" << m.rank + << ", client_metrics=" << m.client_metrics_map + << ", subvolume_metrics=" << m.subvolume_metrics << "]"; + return os; + } }; WRITE_CLASS_ENCODER_FEATURES(metrics_message_t) diff --git a/src/mds/MetricAggregator.cc b/src/mds/MetricAggregator.cc index 0864490b574..fd70ab5adb2 100644 --- a/src/mds/MetricAggregator.cc +++ b/src/mds/MetricAggregator.cc @@ -47,6 +47,19 @@ enum { l_mds_per_client_metrics_last }; +enum { + l_subvolume_metrics_first = 30000, + l_subvolume_metrics_read_iops, + l_subvolume_metrics_read_tp_Bps, + l_subvolume_metrics_avg_read_latency, + l_subvolume_metrics_write_iops, + l_subvolume_metrics_write_tp_Bps, + l_subvolume_metrics_avg_write_latency, + l_subvolume_metrics_last_window_end, + l_subvolume_metrics_last_window, + l_subvolume_metrics_last +}; + MetricAggregator::MetricAggregator(CephContext *cct, MDSRank *mds, MgrClient *mgrc) : Dispatcher(cct), m_cct(cct), @@ -96,6 +109,10 @@ int MetricAggregator::init() { return get_perf_reports(); }); + subv_window_sec = g_conf().get_val("subv_metrics_window_interval").count(); + if (!subv_window_sec) + return -EINVAL; + return 0; } @@ -146,6 +163,200 @@ Dispatcher::dispatch_result_t MetricAggregator::ms_dispatch2(const ref_t &metrics) { + for (const auto &m : metrics) { + // Register labeled PerfCounters if needed + if (!subvolume_perf_counters.contains(m.subvolume_path)) { + std::string labels = ceph::perf_counters::key_create( + "mds_subvolume_metrics", + {{"subvolume_path", m.subvolume_path}, + {"fs_name", std::string(mds->mdsmap->get_fs_name())}}); + PerfCountersBuilder plb(m_cct, labels, + l_subvolume_metrics_first, + l_subvolume_metrics_last); + plb.add_u64(l_subvolume_metrics_read_iops, "avg_read_iops", + "Average read IOPS", "rops", PerfCountersBuilder::PRIO_CRITICAL); + plb.add_u64(l_subvolume_metrics_read_tp_Bps, "avg_read_tp_Bps", + "Average read throughput (Bps)", "rbps", PerfCountersBuilder::PRIO_CRITICAL); + plb.add_u64(l_subvolume_metrics_avg_read_latency, "avg_read_lat_msec", + "Average read latency (ms)", "rlav", PerfCountersBuilder::PRIO_CRITICAL); + plb.add_u64(l_subvolume_metrics_write_iops, "avg_write_iops", + "Average write IOPS", "wops", PerfCountersBuilder::PRIO_CRITICAL); + plb.add_u64(l_subvolume_metrics_write_tp_Bps, "avg_write_tp_Bps", + "Average write throughput (Bps)", "wbps", PerfCountersBuilder::PRIO_CRITICAL); + plb.add_u64(l_subvolume_metrics_avg_write_latency, "avg_write_lat_msec", + "Average write latency (ms)", "wlav", PerfCountersBuilder::PRIO_CRITICAL); + + auto perf_counter = plb.create_perf_counters(); + subvolume_perf_counters[m.subvolume_path] = perf_counter; + m_cct->get_perfcounters_collection()->add(perf_counter); + + subvolume_aggregated_metrics.try_emplace(m.subvolume_path, subv_window_sec); + } + + // Update sliding window + auto &tracker = subvolume_aggregated_metrics.at(m.subvolume_path); + tracker.add_value(m); + } + + // Aggregate, update metrics, and clean stale subvolumes + for (auto it = subvolume_aggregated_metrics.begin(); it != subvolume_aggregated_metrics.end(); ) { + const std::string &path = it->first; + auto &tracker = it->second; + tracker.update(); + + if (tracker.is_empty()) { + dout(10) << "Removing stale subv_metric for path=" << path << ", window size:=" << subv_window_sec << dendl; + + // Remove PerfCounters + auto counter_it = subvolume_perf_counters.find(path); + if (counter_it != subvolume_perf_counters.end()) { + m_cct->get_perfcounters_collection()->remove(counter_it->second); + delete counter_it->second; + subvolume_perf_counters.erase(counter_it); + } + + // Remove PerfQuery entries + for (auto &[query, perf_key_map] : query_metrics_map) { + MDSPerfMetricKey key; + auto sub_key_func_cleanup = [this, &path](const MDSPerfMetricSubKeyDescriptor &desc, + MDSPerfMetricSubKey *sub_key) { + if (desc.type == MDSPerfMetricSubKeyType::SUBVOLUME_PATH) { + std::smatch match; + if (std::regex_search(path, match, desc.regex) && match.size() > 1) { + for (size_t i = 1; i < match.size(); ++i) { + sub_key->push_back(match[i].str()); + } + return true; + } + } else if (desc.type == MDSPerfMetricSubKeyType::MDS_RANK) { + sub_key->push_back(std::to_string(mds->get_nodeid())); + return true; + } + return false; + }; + + if (query.get_key(sub_key_func_cleanup, &key)) { + if (perf_key_map.erase(key)) { + dout(15) << __func__ << ": Removed PerfQuery entry for subv_metric=" << path << dendl; + } + } + } + + it = subvolume_aggregated_metrics.erase(it); + // removed stale, continue to the next one, no need to increment the iterator since erase returns the next one + continue; + } else { + tracker.update(); + + AggregatedSubvolumeMetric aggr_metric; + aggr_metric.subvolume_path = path; + aggr_metric.time_window_last_dur_sec = tracker.get_current_window_duration_sec(); + aggr_metric.time_window_last_end_sec = tracker.get_time_from_last_sample(); + if (aggr_metric.time_window_last_dur_sec == 0) + aggr_metric.time_window_last_dur_sec = 1; // avoid div-by-zero + + uint64_t total_read_ops = 0, total_write_ops = 0; + uint64_t total_read_bytes = 0, total_write_bytes = 0; + uint64_t weighted_read_latency_sum = 0, weighted_write_latency_sum = 0; + + tracker.for_each_value([&](const SubvolumeMetric &m) { + total_read_ops += m.read_ops; + total_write_ops += m.write_ops; + total_read_bytes += m.read_size; + total_write_bytes += m.write_size; + weighted_read_latency_sum += m.avg_read_latency * m.read_ops; + weighted_write_latency_sum += m.avg_write_latency * m.write_ops; + }); + + aggr_metric.read_iops = total_read_ops / aggr_metric.time_window_last_dur_sec; + aggr_metric.write_iops = total_write_ops / aggr_metric.time_window_last_dur_sec; + aggr_metric.read_tpBs = total_read_bytes / aggr_metric.time_window_last_dur_sec; + aggr_metric.write_tBps = total_write_bytes / aggr_metric.time_window_last_dur_sec; + + aggr_metric.avg_read_latency = (total_read_ops > 0) + ? (weighted_read_latency_sum / total_read_ops) / 1000 + : 0; + aggr_metric.avg_write_latency = (total_write_ops > 0) + ? (weighted_write_latency_sum / total_write_ops) / 1000 + : 0; + + // update PerfCounters + auto counter = subvolume_perf_counters[path]; + ceph_assert(counter); + counter->set(l_subvolume_metrics_read_iops, aggr_metric.read_iops); + counter->set(l_subvolume_metrics_read_tp_Bps, aggr_metric.read_tpBs); + counter->set(l_subvolume_metrics_avg_read_latency, aggr_metric.avg_read_latency); + counter->set(l_subvolume_metrics_write_iops, aggr_metric.write_iops); + counter->set(l_subvolume_metrics_write_tp_Bps, aggr_metric.write_tBps); + counter->set(l_subvolume_metrics_avg_write_latency, aggr_metric.avg_write_latency); + counter->set(l_subvolume_metrics_last_window_end, aggr_metric.time_window_last_end_sec); + counter->set(l_subvolume_metrics_last_window, aggr_metric.time_window_last_dur_sec); + + // Update query_metrics_map + auto sub_key_func_subvolume = [this, &path](const MDSPerfMetricSubKeyDescriptor &desc, + MDSPerfMetricSubKey *sub_key) { + if (desc.type == MDSPerfMetricSubKeyType::SUBVOLUME_PATH) { + std::smatch match; + if (std::regex_search(path, match, desc.regex) && match.size() > 1) { + for (size_t i = 1; i < match.size(); ++i) { + sub_key->push_back(match[i].str()); + } + return true; + } + } else if (desc.type == MDSPerfMetricSubKeyType::MDS_RANK) { + sub_key->push_back(std::to_string(mds->get_nodeid())); + return true; + } + return false; + }; + + for (auto &[query, perf_key_map] : query_metrics_map) { + MDSPerfMetricKey key; + bool matched = query.get_key(sub_key_func_subvolume, &key); + if (!matched) + continue; + + auto &perf_counters = perf_key_map[key]; + if (perf_counters.empty()) { + perf_counters.resize(query.performance_counter_descriptors.size()); + } + + query.update_counters( + [&](const MDSPerformanceCounterDescriptor &desc, PerformanceCounter *counter) { + switch (desc.type) { + case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC: + counter->first = aggr_metric.read_iops; + break; + case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC: + counter->first = aggr_metric.write_iops; + break; + case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC: + counter->first = aggr_metric.read_tpBs; + break; + case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC: + counter->first = aggr_metric.write_tBps; + break; + case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC: + counter->first = aggr_metric.avg_read_latency; + break; + case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC: + counter->first = aggr_metric.avg_write_latency; + break; + default: + break; + } + }, + &perf_counters); + } + + // non stale metric, continue to the next one + ++it; + } + } +} + void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank, const Metrics &metrics) { dout(20) << ": client=" << client << ", rank=" << rank << ", metrics=" @@ -359,6 +570,14 @@ void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client, c->second = metrics.metadata_latency_metric.count; } break; + // subvolume metrics are handled in refresh_subvolume_metrics_for_rank() + case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC: + break; default: ceph_abort_msg("unknown counter type"); } @@ -378,6 +597,10 @@ void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client, case MDSPerfMetricSubKeyType::CLIENT_ID: match_string = stringify(client); break; + // subvolumes metrics are handled in refresh_subvolume_metrics_for_rank() + case MDSPerfMetricSubKeyType::SUBVOLUME_PATH: + return false; + break; default: ceph_abort_msg("unknown counter type"); } @@ -437,6 +660,9 @@ void MetricAggregator::remove_metrics_for_rank(const entity_inst_t &client, case MDSPerfMetricSubKeyType::CLIENT_ID: match_string = stringify(client); break; + // subvolume metrics are handled in refresh_subvolume_metrics_for_rank() + case MDSPerfMetricSubKeyType::SUBVOLUME_PATH: + break; default: ceph_abort_msg("unknown counter type"); } @@ -497,6 +723,8 @@ void MetricAggregator::handle_mds_metrics(const cref_t &m) { ceph_abort(); } } + + refresh_subvolume_metrics_for_rank(rank, metrics_message.subvolume_metrics); } void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) { @@ -554,11 +782,12 @@ void MetricAggregator::set_perf_queries(const ConfigPayload &config_payload) { const MDSConfigPayload &mds_config_payload = std::get(config_payload); const std::map &queries = mds_config_payload.config; - dout(10) << ": setting " << queries.size() << " queries" << dendl; + dout(10) << ": setting " << queries.size() << " perf_queries" << dendl; std::scoped_lock locker(lock); std::map> new_data; for (auto &p : queries) { + dout(10) << ": perf_query " << p << dendl; std::swap(new_data[p.first], query_metrics_map[p.first]); } std::swap(query_metrics_map, new_data); diff --git a/src/mds/MetricAggregator.h b/src/mds/MetricAggregator.h index f619b3365fb..579a52c7f76 100644 --- a/src/mds/MetricAggregator.h +++ b/src/mds/MetricAggregator.h @@ -74,11 +74,15 @@ private: PerfCounters *m_perf_counters; std::map, PerfCounters*> client_perf_counters; + uint64_t subv_window_sec; + std::unordered_map> subvolume_aggregated_metrics; + std::map subvolume_perf_counters; void handle_mds_metrics(const cref_t &m); void refresh_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank, const Metrics &metrics); + void refresh_subvolume_metrics_for_rank(mds_rank_t rank, const std::vector &metrics); void remove_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank, bool remove); void cull_metrics_for_rank(mds_rank_t rank); diff --git a/src/mgr/MDSPerfMetricTypes.cc b/src/mgr/MDSPerfMetricTypes.cc index a16003774a4..01c6328dcdc 100644 --- a/src/mgr/MDSPerfMetricTypes.cc +++ b/src/mgr/MDSPerfMetricTypes.cc @@ -41,6 +41,12 @@ void MDSPerformanceCounterDescriptor::pack_counter( case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC: case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC: case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC: break; default: ceph_abort_msg("unknown counter type"); @@ -69,6 +75,12 @@ void MDSPerformanceCounterDescriptor::unpack_counter( case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC: case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC: case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC: break; default: ceph_abort_msg("unknown counter type"); @@ -125,6 +137,24 @@ std::ostream& operator<<(std::ostream &os, const MDSPerformanceCounterDescriptor case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC: os << "stdev_metadata_latency"; break; + case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC: + os << "subv_read_iops"; + break; + case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC: + os << "subv_write_iops"; + break; + case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC: + os << "subv_last_read_tp_Bps"; + break; + case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC: + os << "subv_last_write_tp_Bps"; + break; + case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC: + os << "subv_avg_read_latency"; + break; + case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC: + os << "subv_avg_write_latency"; + break; } return os; diff --git a/src/mgr/MDSPerfMetricTypes.h b/src/mgr/MDSPerfMetricTypes.h index 28d901bb970..4ae25e7d268 100644 --- a/src/mgr/MDSPerfMetricTypes.h +++ b/src/mgr/MDSPerfMetricTypes.h @@ -23,6 +23,7 @@ typedef std::vector MDSPerfMetricKey; enum class MDSPerfMetricSubKeyType : uint8_t { MDS_RANK = 0, CLIENT_ID = 1, + SUBVOLUME_PATH = 2, }; struct MDSPerfMetricSubKeyDescriptor { @@ -34,6 +35,7 @@ struct MDSPerfMetricSubKeyDescriptor { switch (type) { case MDSPerfMetricSubKeyType::MDS_RANK: case MDSPerfMetricSubKeyType::CLIENT_ID: + case MDSPerfMetricSubKeyType::SUBVOLUME_PATH: return true; default: return false; @@ -139,6 +141,12 @@ enum class MDSPerformanceCounterType : uint8_t { STDEV_WRITE_LATENCY_METRIC = 13, AVG_METADATA_LATENCY_METRIC = 14, STDEV_METADATA_LATENCY_METRIC = 15, + SUBV_READ_IOPS_METRIC = 16, + SUBV_WRITE_IOPS_METRIC = 17, + SUBV_READ_THROUGHPUT_METRIC = 18, + SUBV_WRITE_THROUGHPUT_METRIC = 19, + SUBV_AVG_READ_LATENCY_METRIC = 20, + SUBV_AVG_WRITE_LATENCY_METRIC = 21 }; struct MDSPerformanceCounterDescriptor { @@ -162,6 +170,12 @@ struct MDSPerformanceCounterDescriptor { case MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC: case MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC: case MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC: + case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC: + case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC: return true; default: return false; @@ -409,6 +423,123 @@ struct MDSPerfMetricReport { } }; +// all latencies are converted to millisec during the aggregation +struct AggregatedSubvolumeMetric { + std::string subvolume_path; + + uint64_t read_iops = 0; + uint64_t write_iops = 0; + uint64_t read_tpBs = 0; + uint64_t write_tBps = 0; + + uint64_t min_read_latency = std::numeric_limits::max(); + uint64_t max_read_latency = 0; + uint64_t avg_read_latency = 0; + + uint64_t min_write_latency = std::numeric_limits::max(); + uint64_t max_write_latency = 0; + uint64_t avg_write_latency = 0; + + uint64_t time_window_last_end_sec = 0; + uint64_t time_window_last_dur_sec = 0; + + void dump(ceph::Formatter* f) const { + f->dump_string("subvolume_path", subvolume_path); + f->dump_unsigned("read_iops", read_iops); + f->dump_unsigned("write_iops", write_iops); + f->dump_unsigned("read_tpBs", read_tpBs); + f->dump_unsigned("write_tBps", write_tBps); + + f->dump_unsigned("min_read_latency_ns", min_read_latency); + f->dump_unsigned("max_read_latency_ns", max_read_latency); + f->dump_unsigned("avg_read_latency_ns", avg_read_latency); + + f->dump_unsigned("min_write_latency_ns", min_write_latency); + f->dump_unsigned("max_write_latency_ns", max_write_latency); + f->dump_unsigned("avg_write_latency_ns", avg_write_latency); + + f->dump_unsigned("time_window_sec_end", time_window_last_end_sec); + f->dump_unsigned("time_window_sec_dur", time_window_last_dur_sec); + } +}; + +using TimePoint = std::chrono::steady_clock::time_point; +using Duration = std::chrono::steady_clock::duration; + +template +struct DataPoint { + TimePoint timestamp; + T value; +}; + +/** +* @brief Holds a collection of I/O performance metrics for a specific storage subvolume. +* +* Simple sliding window to hold values for some period of time, allows to iterate over values +* to calculate whatever is needed. +* See for_each_value usage in the MetricsHandler.cc +*/ +template +class SlidingWindowTracker { +public: + explicit SlidingWindowTracker(uint64_t window_duration_seconds) + : window_duration(std::chrono::seconds(window_duration_seconds)) + {} + + void add_value(const T& value, TimePoint timestamp = std::chrono::steady_clock::now()) { + std::unique_lock lock(data_lock); + data_points.push_back({timestamp, value}); + } + + // prune old data + void update() { + std::unique_lock lock(data_lock); + prune_old_data(std::chrono::steady_clock::now()); + } + + // Call function on each value in window + template + void for_each_value(Fn&& fn) const { + std::shared_lock lock(data_lock); + for (const auto& dp : data_points) { + fn(dp.value); + } + } + + uint64_t get_current_window_duration_sec() const { + std::shared_lock lock(data_lock); + if (data_points.size() < 2) { + return 0; + } + auto duration = data_points.back().timestamp - data_points.front().timestamp; + return std::chrono::duration_cast(duration).count(); + } + + bool is_empty() const { + std::shared_lock lock(data_lock); + return data_points.empty(); + } + + uint64_t get_time_from_last_sample() const { + if (data_points.empty()) { + return std::numeric_limits::max(); + } + auto duration = std::chrono::steady_clock::now() - data_points.back().timestamp; + return std::chrono::duration_cast(duration).count(); + } + private: + void prune_old_data(TimePoint now) { + TimePoint window_start = now - window_duration; + while (!data_points.empty() && data_points.front().timestamp < window_start) { + data_points.pop_front(); + } + } + + mutable std::shared_mutex data_lock; + std::deque> data_points; + Duration window_duration; +}; + WRITE_CLASS_DENC(MDSPerfMetrics) WRITE_CLASS_DENC(MDSPerfMetricReport)