From a206a71056ee57692757065fc6cabdd1d9400287 Mon Sep 17 00:00:00 2001 From: Igor Golikov Date: Mon, 8 Dec 2025 10:43:38 +0000 Subject: [PATCH] mds: add new perf and subvolume utilization metrics Perf merics: CPU% and number of open requests Subvolume utilization metrics: quota info and current size Signed-off-by: Igor Golikov Fixes: https://tracker.ceph.com/issues/74135 Fixes: https://tracker.ceph.com/issues/73700 --- src/common/TrackedOp.cc | 20 ++- src/common/TrackedOp.h | 1 + src/common/util.cc | 41 ++++++ src/include/util.h | 10 +- src/mds/MDBalancer.cc | 20 +-- src/mds/MDCache.cc | 26 +++- src/mds/MDSPerfMetricTypes.h | 45 ++++++- src/mds/MDSRank.cc | 12 ++ src/mds/MDSRank.h | 5 + src/mds/MetricAggregator.cc | 82 +++++++++++- src/mds/MetricAggregator.h | 7 +- src/mds/MetricsHandler.cc | 249 ++++++++++++++++++++++++++++++++--- src/mds/MetricsHandler.h | 50 ++++++- src/mds/mdstypes.cc | 6 +- src/mds/mdstypes.h | 12 +- 15 files changed, 535 insertions(+), 51 deletions(-) diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc index ef12a868141..875f63bb4f5 100644 --- a/src/common/TrackedOp.cc +++ b/src/common/TrackedOp.cc @@ -190,7 +190,8 @@ void OpHistory::dump_ops(utime_t now, Formatter *f, set filters, bool by struct ShardedTrackingData { ceph::mutex ops_in_flight_lock_sharded; TrackedOp::tracked_op_list_t ops_in_flight_sharded; - explicit ShardedTrackingData(string lock_name) + std::atomic ops_in_flight_count{0}; + explicit ShardedTrackingData(const char* lock_name) : ops_in_flight_lock_sharded(ceph::make_mutex(lock_name)) {} }; @@ -316,6 +317,21 @@ bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, setops_in_flight_count.load(std::memory_order_relaxed); + } + return total_ops_in_flight; +} + bool OpTracker::register_inflight_op(TrackedOp *i) { if (!tracking_enabled) @@ -330,6 +346,7 @@ bool OpTracker::register_inflight_op(TrackedOp *i) std::lock_guard locker(sdata->ops_in_flight_lock_sharded); sdata->ops_in_flight_sharded.push_back(*i); i->seq = current_seq; + sdata->ops_in_flight_count.fetch_add(1, std::memory_order_relaxed); } return true; } @@ -346,6 +363,7 @@ void OpTracker::unregister_inflight_op(TrackedOp* const i) std::lock_guard locker(sdata->ops_in_flight_lock_sharded); auto p = sdata->ops_in_flight_sharded.iterator_to(*i); sdata->ops_in_flight_sharded.erase(p); + sdata->ops_in_flight_count.fetch_sub(1, std::memory_order_relaxed); } } diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index 25eedf46dfb..1ada0727477 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -149,6 +149,7 @@ public: bool dump_historic_slow_ops(ceph::Formatter *f, std::set filters = {""}); bool register_inflight_op(TrackedOp *i); void unregister_inflight_op(TrackedOp *i); + uint64_t get_num_ops_in_flight(); void record_history_op(TrackedOpRef&& i); void get_age_ms_histogram(pow2_hist_t *h); diff --git a/src/common/util.cc b/src/common/util.cc index 07cf882baee..e3fd0ea20b8 100644 --- a/src/common/util.cc +++ b/src/common/util.cc @@ -17,9 +17,13 @@ #include #endif +#include #include +#include +#include #include +#include "acconfig.h" #include "include/compat.h" #include "include/util.h" #include "common/debug.h" @@ -459,3 +463,40 @@ std::string bytes2str(uint64_t count) { snprintf(str, sizeof str, "%" PRIu64 "%sB", count, s[i]); return std::string(str); } + +#ifndef _WIN32 +bool ceph::read_process_cpu_ticks(uint64_t* total, std::string* error) +{ + ceph_assert(total != nullptr); + const char* stat_path = PROCPREFIX "/proc/self/stat"; + std::ifstream stat_file(stat_path); + if (!stat_file.is_open()) { + if (error) { + *error = std::string("failed to open '") + stat_path + "'"; + } + return false; + } + + std::vector stat_vec((std::istream_iterator{stat_file}), + std::istream_iterator()); + if (stat_vec.size() < 15) { + if (error) { + *error = std::string("failed to parse '") + stat_path + "'"; + } + return false; + } + + uint64_t utime = std::strtoull(stat_vec[13].c_str(), nullptr, 10); + uint64_t stime = std::strtoull(stat_vec[14].c_str(), nullptr, 10); + *total = utime + stime; + return true; +} +#else +bool ceph::read_process_cpu_ticks(uint64_t* total, std::string* error) +{ + if (error) { + *error = "/proc/self/stat not available on this platform"; + } + return false; +} +#endif diff --git a/src/include/util.h b/src/include/util.h index b29a031047c..fc114c2bb23 100644 --- a/src/include/util.h +++ b/src/include/util.h @@ -110,7 +110,12 @@ void dump_services(ceph::Formatter* f, const std::map @@ -119,5 +124,6 @@ bool match_str(const std::string& s, const XS& ...xs) return ((s == xs) || ...); } -} // namespace ceph::util +} // namespace util +} // namespace ceph #endif /* CEPH_UTIL_H */ diff --git a/src/mds/MDBalancer.cc b/src/mds/MDBalancer.cc index 2b933c38135..f109384f486 100644 --- a/src/mds/MDBalancer.cc +++ b/src/mds/MDBalancer.cc @@ -32,7 +32,6 @@ #include "msg/Messenger.h" #include "messages/MHeartbeat.h" -#include #include #include @@ -41,6 +40,7 @@ using namespace std; #include "common/config.h" #include "common/debug.h" #include "common/errno.h" +#include "include/util.h" /* Note, by default debug_mds_balancer is 1/5. For debug messages 1 stat_vec(std::istream_iterator{stat_file}, - std::istream_iterator()); - if (stat_vec.size() >= 15) { - // utime + stime - cpu_time = strtoll(stat_vec[13].c_str(), nullptr, 10) + - strtoll(stat_vec[14].c_str(), nullptr, 10); - } else { - derr << "input file '" << stat_path << "' not resolvable" << dendl_impl; - } + uint64_t ticks = 0; + std::string err; + if (ceph::read_process_cpu_ticks(&ticks, &err)) { + cpu_time = ticks; } else { - derr << "input file '" << stat_path << "' not found" << dendl_impl; + derr << err << dendl_impl; } } diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 623a2e2348a..38ac9ce979a 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -2128,10 +2128,34 @@ void MDCache::broadcast_quota_to_client(CInode *in, client_t exclude_ct, bool qu return; const auto& pi = in->get_projected_inode(); + inodeno_t subvolume_id = in->get_subvolume_id(); + dout(10) << __func__ << " ino " << in->ino() + << " subvol " << subvolume_id + << " quota_enabled=" << pi->quota.is_enabled() + << " quota_change=" << quota_change + << " max_bytes=" << pi->quota.max_bytes + << " rbytes=" << pi->rstat.rbytes + << dendl; + + // Update subvolume quota cache in MetricsHandler. + // Update when quota is enabled OR when there's a quota change (e.g., removing quota). + // This ensures cache is updated to 0 when quota is set to unlimited. + // Pass both quota and current used_bytes from this inode. + if (subvolume_id != inodeno_t{0} && (pi->quota.is_enabled() || quota_change)) { + // force_zero=true when quota was removed (quota_change but not enabled) + bool force_zero = quota_change && !pi->quota.is_enabled(); + uint64_t used_bytes = pi->rstat.rbytes > 0 ? static_cast(pi->rstat.rbytes) : 0; + mds->metrics_handler.maybe_update_subvolume_quota( + subvolume_id, + pi->quota.max_bytes > 0 ? static_cast(pi->quota.max_bytes) : 0, + used_bytes, + force_zero); + } + if (!pi->quota.is_enabled() && !quota_change) return; - // creaete snaprealm for quota inode (quota was set before mimic) + // create snaprealm for quota inode (quota was set before mimic) if (!in->get_projected_srnode()) mds->server->create_quota_realm(in); diff --git a/src/mds/MDSPerfMetricTypes.h b/src/mds/MDSPerfMetricTypes.h index 9601a2236b6..e6d7a8b6c44 100644 --- a/src/mds/MDSPerfMetricTypes.h +++ b/src/mds/MDSPerfMetricTypes.h @@ -18,6 +18,11 @@ enum UpdateType : uint32_t { UPDATE_TYPE_REMOVE, }; +inline constexpr uint32_t l_mds_rank_perf_start = 40000; +inline constexpr uint32_t l_mds_rank_perf_cpu_usage = l_mds_rank_perf_start + 1; +inline constexpr uint32_t l_mds_rank_perf_open_requests = l_mds_rank_perf_start + 2; +inline constexpr uint32_t l_mds_rank_perf_last = l_mds_rank_perf_start + 3; + struct CapHitMetric { uint64_t hits = 0; uint64_t misses = 0; @@ -301,6 +306,30 @@ WRITE_CLASS_DENC(ReadIoSizesMetric) WRITE_CLASS_DENC(WriteIoSizesMetric) WRITE_CLASS_DENC(SubvolumeMetric) +struct RankPerfMetrics { + uint64_t cpu_usage_percent = 0; + uint64_t open_requests = 0; + + DENC(RankPerfMetrics, v, p) { + DENC_START(1, 1, p); + denc(v.cpu_usage_percent, p); + denc(v.open_requests, p); + DENC_FINISH(p); + } + + void dump(Formatter *f) const { + f->dump_unsigned("cpu_usage_percent", cpu_usage_percent); + f->dump_unsigned("open_requests", open_requests); + } + + friend std::ostream& operator<<(std::ostream& os, const RankPerfMetrics &metric) { + os << "{cpu_usage_percent=" << metric.cpu_usage_percent + << ", open_requests=" << metric.open_requests << "}"; + return os; + } +}; +WRITE_CLASS_DENC(RankPerfMetrics) + // metrics that are forwarded to the MDS by client(s). struct Metrics { // metrics @@ -377,6 +406,7 @@ struct metrics_message_t { mds_rank_t rank = MDS_RANK_NONE; std::map client_metrics_map; std::vector subvolume_metrics; + RankPerfMetrics rank_metrics; metrics_message_t() { } @@ -386,23 +416,29 @@ struct metrics_message_t { void encode(bufferlist &bl, uint64_t features) const { using ceph::encode; - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(seq, bl); encode(rank, bl); encode(client_metrics_map, bl, features); encode(subvolume_metrics, bl); + encode(rank_metrics, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator &iter) { using ceph::decode; - DECODE_START(2, iter); + DECODE_START(3, iter); decode(seq, iter); decode(rank, iter); decode(client_metrics_map, iter); if (struct_v >= 2) { decode(subvolume_metrics, iter); } + if (struct_v >= 3) { + decode(rank_metrics, iter); + } else { + rank_metrics = {}; + } DECODE_FINISH(iter); } @@ -413,6 +449,7 @@ struct metrics_message_t { f->dump_object("client", client); f->dump_object("metrics", metrics); } + f->dump_object("rank_metrics", rank_metrics); f->open_array_section("subvolume_metrics"); for (const auto &metric : subvolume_metrics) { f->open_object_section("metric"); @@ -425,7 +462,9 @@ struct metrics_message_t { friend std::ostream& operator<<(std::ostream& os, const metrics_message_t &m) { os << "[sequence=" << m.seq << ", rank=" << m.rank << ", client_metrics=" << m.client_metrics_map - << ", subvolume_metrics=" << m.subvolume_metrics << "]"; + << ", subvolume_metrics=" << m.subvolume_metrics; + os << ", rank_metrics=" << m.rank_metrics; + os << "]"; return os; } }; diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 02816d700c5..5a5a4c33fe5 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -17,6 +17,10 @@ #include "osdc/Journaler.h" #include +#include +#include +#include +#include #include "common/DecayCounter.h" #include "common/debug.h" #include "common/errno.h" @@ -4085,6 +4089,14 @@ std::string MDSRank::get_path(inodeno_t ino) { return res; } +uint64_t MDSRank::get_inode_rbytes(inodeno_t ino) { + std::lock_guard locker(mds_lock); + CInode* inode = mdcache->get_inode(ino); + if (!inode) return 0; + const auto& pi = inode->get_projected_inode(); + return pi->rstat.rbytes > 0 ? static_cast(pi->rstat.rbytes) : 0; +} + std::vector MDSRankDispatcher::get_tracked_keys() const noexcept { diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index 80657432c43..82657dccb90 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -17,6 +17,7 @@ #define MDS_RANK_H_ #include +#include #include #include "common/admin_socket.h" // for asok_finisher @@ -167,6 +168,9 @@ class MDSRank { friend class C_CacheDropExecAndReply; friend class C_ScrubExecAndReply; friend class C_ScrubControlExecAndReply; + friend class MDCache; + friend class Locker; + friend class CInode; CephContext *cct; @@ -391,6 +395,7 @@ class MDSRank { } std::string get_path(inodeno_t ino); + uint64_t get_inode_rbytes(inodeno_t ino); // Reference to global MDS::mds_lock, so that users of MDSRank don't // carry around references to the outer MDS, and we can substitute diff --git a/src/mds/MetricAggregator.cc b/src/mds/MetricAggregator.cc index fb42c97a743..183b9e9c689 100644 --- a/src/mds/MetricAggregator.cc +++ b/src/mds/MetricAggregator.cc @@ -4,6 +4,7 @@ #include "MetricAggregator.h" #include "MDSMap.h" #include "MDSRank.h" +#include "MetricsHandler.h" #include "mgr/MgrClient.h" #include "common/ceph_context.h" @@ -21,11 +22,11 @@ #define dout_prefix *_dout << "mds.metric.aggregator" << " " << __func__ // Performance Counters - enum { +enum { l_mds_client_metrics_start = 10000, l_mds_client_metrics_num_clients, l_mds_client_metrics_last - }; +}; enum { l_mds_per_client_metrics_start = 20000, @@ -45,7 +46,7 @@ enum { l_mds_per_client_metrics_total_write_ops, l_mds_per_client_metrics_total_write_size, l_mds_per_client_metrics_last - }; +}; enum { l_subvolume_metrics_first = 30000, @@ -55,6 +56,8 @@ enum { l_subvolume_metrics_write_iops, l_subvolume_metrics_write_tp_Bps, l_subvolume_metrics_avg_write_latency, + l_subvolume_metrics_quota_bytes, + l_subvolume_metrics_used_bytes, l_subvolume_metrics_last }; @@ -108,9 +111,6 @@ int MetricAggregator::init() { }); subv_window_sec = g_conf().get_val("subv_metrics_window_interval").count(); - if (!subv_window_sec) - return -EINVAL; - return 0; } @@ -133,6 +133,14 @@ void MetricAggregator::shutdown() { } client_perf_counters.clear(); + for (auto &[rank, perf_counters] : rank_perf_counters) { + if (perf_counters != nullptr) { + m_cct->get_perfcounters_collection()->remove(perf_counters); + delete perf_counters; + } + } + rank_perf_counters.clear(); + PerfCounters *perf_counters = nullptr; std::swap(perf_counters, m_perf_counters); if (perf_counters != nullptr) { @@ -186,6 +194,12 @@ void MetricAggregator::refresh_subvolume_metrics_for_rank( "Average write throughput (Bps)", "wbps", PerfCountersBuilder::PRIO_CRITICAL); plb.add_u64(l_subvolume_metrics_avg_write_latency, "avg_write_lat_msec", "Average write latency (ms)", "wlav", PerfCountersBuilder::PRIO_CRITICAL); + plb.add_u64(l_subvolume_metrics_quota_bytes, "quota_bytes", + "Configured quota (bytes) for the subvolume", nullptr, + PerfCountersBuilder::PRIO_USEFUL, UNIT_BYTES); + plb.add_u64(l_subvolume_metrics_used_bytes, "used_bytes", + "Current logical bytes used by the subvolume", nullptr, + PerfCountersBuilder::PRIO_USEFUL, UNIT_BYTES); auto perf_counter = plb.create_perf_counters(); subvolume_perf_counters[m.subvolume_path] = perf_counter; @@ -259,6 +273,9 @@ void MetricAggregator::refresh_subvolume_metrics_for_rank( uint64_t total_read_ops = 0, total_write_ops = 0; uint64_t total_read_bytes = 0, total_write_bytes = 0; uint64_t weighted_read_latency_sum = 0, weighted_write_latency_sum = 0; + uint64_t latest_sample_ts = 0; + uint64_t latest_quota_bytes = 0; + uint64_t latest_used_bytes = 0; tracker.for_each_value([&](const SubvolumeMetric &m) { total_read_ops += m.read_ops; @@ -267,6 +284,11 @@ void MetricAggregator::refresh_subvolume_metrics_for_rank( total_write_bytes += m.write_size; weighted_read_latency_sum += m.avg_read_latency * m.read_ops; weighted_write_latency_sum += m.avg_write_latency * m.write_ops; + if (m.time_stamp >= latest_sample_ts) { + latest_sample_ts = m.time_stamp; + latest_quota_bytes = m.quota_bytes; + latest_used_bytes = m.used_bytes; + } }); aggr_metric.read_iops = total_read_ops / aggr_metric.time_window_last_dur_sec; @@ -280,6 +302,8 @@ void MetricAggregator::refresh_subvolume_metrics_for_rank( aggr_metric.avg_write_latency = (total_write_ops > 0) ? (weighted_write_latency_sum / total_write_ops) / 1000 : 0; + aggr_metric.quota_bytes = latest_quota_bytes; + aggr_metric.used_bytes = latest_used_bytes; // update PerfCounters auto counter = subvolume_perf_counters[path]; @@ -290,6 +314,8 @@ void MetricAggregator::refresh_subvolume_metrics_for_rank( counter->set(l_subvolume_metrics_write_iops, aggr_metric.write_iops); counter->set(l_subvolume_metrics_write_tp_Bps, aggr_metric.write_tBps); counter->set(l_subvolume_metrics_avg_write_latency, aggr_metric.avg_write_latency); + counter->set(l_subvolume_metrics_quota_bytes, aggr_metric.quota_bytes); + counter->set(l_subvolume_metrics_used_bytes, aggr_metric.used_bytes); // Update query_metrics_map auto sub_key_func_subvolume = [this, &path](const MDSPerfMetricSubKeyDescriptor &desc, @@ -341,6 +367,12 @@ void MetricAggregator::refresh_subvolume_metrics_for_rank( case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC: counter->first = aggr_metric.avg_write_latency; break; + case MDSPerformanceCounterType::SUBV_QUOTA_BYTES_METRIC: + counter->first = aggr_metric.quota_bytes; + break; + case MDSPerformanceCounterType::SUBV_USED_BYTES_METRIC: + counter->first = aggr_metric.used_bytes; + break; default: break; } @@ -574,6 +606,8 @@ void MetricAggregator::refresh_metrics_for_rank(const entity_inst_t &client, case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC: case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC: case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC: + case MDSPerformanceCounterType::SUBV_QUOTA_BYTES_METRIC: + case MDSPerformanceCounterType::SUBV_USED_BYTES_METRIC: break; default: ceph_abort_msg("unknown counter type"); @@ -724,6 +758,32 @@ void MetricAggregator::handle_mds_metrics(const cref_t &m) { } refresh_subvolume_metrics_for_rank(rank, subvolume_metrics); + update_rank_perf_metrics(rank, metrics_message.rank_metrics); +} + +void MetricAggregator::update_rank_perf_metrics(mds_rank_t rank, const RankPerfMetrics& metrics) { + auto &perf_counter = rank_perf_counters[rank]; + if (!perf_counter) { + std::string labels = ceph::perf_counters::key_create( + "mds_rank_perf", + {{"rank", stringify(rank)}}); + PerfCountersBuilder plb(m_cct, labels, + l_mds_rank_perf_start, l_mds_rank_perf_last); + plb.add_u64(l_mds_rank_perf_cpu_usage, + "cpu_usage", + "Sum of per-core CPU utilisation reported for this MDS (100 == one full core)", + "cpu%", + PerfCountersBuilder::PRIO_USEFUL); + plb.add_u64(l_mds_rank_perf_open_requests, + "open_requests", + "Number of metadata requests currently in flight on this MDS", + "req", + PerfCountersBuilder::PRIO_USEFUL); + perf_counter = plb.create_perf_counters(); + m_cct->get_perfcounters_collection()->add(perf_counter); + } + perf_counter->set(l_mds_rank_perf_cpu_usage, metrics.cpu_usage_percent); + perf_counter->set(l_mds_rank_perf_open_requests, metrics.open_requests); } void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) { @@ -736,6 +796,16 @@ void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) { dout(10) << ": culled " << p.size() << " clients" << dendl; clients_by_rank.erase(rank); + remove_rank_perf_metrics_for_rank(rank); +} + +void MetricAggregator::remove_rank_perf_metrics_for_rank(mds_rank_t rank) { + auto it = rank_perf_counters.find(rank); + if (it != rank_perf_counters.end()) { + m_cct->get_perfcounters_collection()->remove(it->second); + delete it->second; + rank_perf_counters.erase(it); + } } void MetricAggregator::notify_mdsmap(const MDSMap &mdsmap) { diff --git a/src/mds/MetricAggregator.h b/src/mds/MetricAggregator.h index 1554f8f4dbe..1ce884abe84 100644 --- a/src/mds/MetricAggregator.h +++ b/src/mds/MetricAggregator.h @@ -17,7 +17,7 @@ #include "mgr/Types.h" // for PerformanceCounters #include "mgr/MetricTypes.h" // for MetricPayload -#include "mgr/MDSPerfMetricTypes.h" +#include "mds/MDSPerfMetricTypes.h" #include "mdstypes.h" #include "MDSPinger.h" @@ -74,9 +74,10 @@ private: PerfCounters *m_perf_counters; std::map, PerfCounters*> client_perf_counters; - uint64_t subv_window_sec; + uint64_t subv_window_sec = 0; std::unordered_map> subvolume_aggregated_metrics; std::map subvolume_perf_counters; + std::map rank_perf_counters; void handle_mds_metrics(const cref_t &m); @@ -84,6 +85,8 @@ private: const Metrics &metrics); void refresh_subvolume_metrics_for_rank(mds_rank_t rank, const std::vector &metrics); void remove_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank, bool remove); + void update_rank_perf_metrics(mds_rank_t rank, const RankPerfMetrics& metrics); + void remove_rank_perf_metrics_for_rank(mds_rank_t rank); void cull_metrics_for_rank(mds_rank_t rank); diff --git a/src/mds/MetricsHandler.cc b/src/mds/MetricsHandler.cc index 23811a42a75..c309eb1fd1d 100644 --- a/src/mds/MetricsHandler.cc +++ b/src/mds/MetricsHandler.cc @@ -3,16 +3,29 @@ #include "MetricsHandler.h" +#include +#include +#include +#include #include +#include +#include #include "common/debug.h" #include "common/errno.h" +#include "common/perf_counters.h" +#include "common/perf_counters_key.h" +#include "include/util.h" +#include "include/fs_types.h" +#include "include/stringify.h" #include "messages/MClientMetrics.h" #include "messages/MMDSMetrics.h" #include "messages/MMDSPing.h" #include "MDSRank.h" +#include "MDCache.h" +#include "CInode.h" #include "SessionMap.h" #define dout_context g_ceph_context @@ -23,6 +36,11 @@ MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds) : Dispatcher(cct), mds(mds) { + clk_tck = sysconf(_SC_CLK_TCK); + if (clk_tck <= 0) { + dout(1) << "failed to determine clock ticks per second, cpu usage metric disabled" << dendl; + clk_tck = 0; + } } Dispatcher::dispatch_result_t MetricsHandler::ms_dispatch2(const ref_t &m) { @@ -46,6 +64,32 @@ Dispatcher::dispatch_result_t MetricsHandler::ms_dispatch2(const ref_t void MetricsHandler::init() { dout(10) << dendl; + // Create local perf counters for non-rank0 MDS only. Rank0's perf counters + // are created by MetricAggregator::update_rank_perf_metrics() to avoid + // duplicate counters (MetricAggregator creates counters for all ranks). + if (mds->get_nodeid() != 0 && !rank_perf_counters) { + std::string labels = ceph::perf_counters::key_create( + "mds_rank_perf", + {{"rank", stringify(mds->get_nodeid())}}); + + PerfCountersBuilder plb(cct, labels, + l_mds_rank_perf_start, l_mds_rank_perf_last); + plb.add_u64(l_mds_rank_perf_cpu_usage, + "cpu_usage", + "Sum of per-core CPU utilisation for this MDS (100 == one full core)", + "cpu%", + PerfCountersBuilder::PRIO_USEFUL); + plb.add_u64(l_mds_rank_perf_open_requests, + "open_requests", + "Number of metadata requests currently in flight", + "req", + PerfCountersBuilder::PRIO_USEFUL); + rank_perf_counters = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(rank_perf_counters); + } + + subv_window_sec = g_conf().get_val("subv_metrics_window_interval").count(); + updater = std::thread([this]() { ceph_pthread_setname("mds-metrics"); std::unique_lock locker(lock); @@ -54,7 +98,7 @@ void MetricsHandler::init() { locker.unlock(); sleep(after); locker.lock(); - update_rank0(); + update_rank0(locker); } }); } @@ -71,6 +115,12 @@ void MetricsHandler::shutdown() { if (updater.joinable()) { updater.join(); } + + if (rank_perf_counters) { + cct->get_perfcounters_collection()->remove(rank_perf_counters); + delete rank_perf_counters; + rank_perf_counters = nullptr; + } } @@ -333,18 +383,7 @@ void MetricsHandler::handle_payload(Session* session, const SubvolumeMetricsPay resolved_paths.reserve(payload.subvolume_metrics.size()); // RAII guard: unlock on construction, re-lock on destruction (even on exceptions) - struct UnlockGuard { - std::unique_lock &lk; - explicit UnlockGuard(std::unique_lock& l) : lk(l) { lk.unlock(); } - ~UnlockGuard() noexcept { - if (!lk.owns_lock()) { - try { lk.lock(); } - catch (...) { - dout(0) << "failed to re-lock in UnlockGuard dtor" << dendl; - } - } - } - } unlock_guard{lk}; + UnlockGuard unlock_guard{lk}; // unlocked: resolve paths, no contention with mds lock for (const auto& metric : payload.subvolume_metrics) { @@ -444,9 +483,12 @@ void MetricsHandler::notify_mdsmap(const MDSMap &mdsmap) { } } -void MetricsHandler::update_rank0() { +void MetricsHandler::update_rank0(std::unique_lock& locker) { dout(20) << dendl; + sample_cpu_usage(); + sample_open_requests(); + if (!addr_rank0) { dout(20) << ": not yet notified with rank0 address, ignoring" << dendl; return; @@ -457,6 +499,7 @@ void MetricsHandler::update_rank0() { metrics_message.seq = next_seq; metrics_message.rank = mds->get_nodeid(); + metrics_message.rank_metrics = rank_telemetry.metrics; for (auto p = client_metrics_map.begin(); p != client_metrics_map.end();) { // copy metrics and update local metrics map as required @@ -470,15 +513,57 @@ void MetricsHandler::update_rank0() { } } + // Resolve used_bytes for all subvolumes without holding the metrics lock + // (same unlock pattern used for path resolution in handle_payload). + // Step 1 (locked): collect unique subvolume ids + std::vector subvol_ids; + subvol_ids.reserve(subvolume_metrics_map.size()); + for (const auto &[path, aggregated_metrics] : subvolume_metrics_map) { + if (!aggregated_metrics.empty()) { + subvol_ids.push_back(aggregated_metrics.front().subvolume_id); + } + } + + // Step 2 (unlocked): fetch rbytes under mds_lock via helper, release metric log + // it allows to proceed another update in metrics handler + UnlockGuard unlock_guard{locker}; + + // here the metrics handler lock witll be retaken via the UnlockGuard dtor + std::unordered_map subvol_used_bytes; + for (inodeno_t subvol_id : subvol_ids) { + if (subvol_used_bytes.count(subvol_id) == 0) { + uint64_t rbytes = mds->get_inode_rbytes(subvol_id); + subvol_used_bytes[subvol_id] = rbytes; + dout(20) << "resolved used_bytes for subvol " << subvol_id << " = " << rbytes << dendl; + } + } + // subvolume metrics, reserve 100 entries per subvolume ? good enough? metrics_message.subvolume_metrics.reserve(subvolume_metrics_map.size()* 100); for (auto &[path, aggregated_metrics] : subvolume_metrics_map) { metrics_message.subvolume_metrics.emplace_back(); - aggregate_subvolume_metrics(path, aggregated_metrics, metrics_message.subvolume_metrics.back()); + aggregate_subvolume_metrics(path, aggregated_metrics, subvol_used_bytes, + metrics_message.subvolume_metrics.back()); } // if we need to show local MDS metrics, we need to save a last copy... subvolume_metrics_map.clear(); + // Evict stale subvolume quota entries + if (subv_window_sec > 0) { + auto now = std::chrono::steady_clock::now(); + auto threshold = std::chrono::seconds(subv_window_sec) * 2; + for (auto it = subvolume_quota.begin(); it != subvolume_quota.end(); ) { + auto elapsed = std::chrono::duration_cast(now - it->second.last_activity); + if (elapsed > threshold) { + dout(15) << "evicting stale subvolume quota entry " << it->first + << " (inactive for " << elapsed.count() << "s)" << dendl; + it = subvolume_quota.erase(it); + } else { + ++it; + } + } + } + // only start incrementing when its kicked via set_next_seq() if (next_seq != 0) { ++last_updated_seq; @@ -491,7 +576,9 @@ void MetricsHandler::update_rank0() { } void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_path, - const std::vector& metrics_list, SubvolumeMetric &res) { + const std::vector& metrics_list, + const std::unordered_map& subvol_used_bytes, + SubvolumeMetric &res) { dout(20) << ": aggregating " << metrics_list.size() << " subv_metrics" << dendl; res.subvolume_path = subvolume_path; @@ -505,6 +592,8 @@ void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_pa res.avg_read_latency = 0; res.avg_write_latency = 0; res.time_stamp = 0; + res.quota_bytes = 0; + res.used_bytes = 0; for (const auto& m : metrics_list) { res.read_ops += m.read_count; @@ -524,6 +613,27 @@ void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_pa } } + // Lookup quota and used_bytes after aggregating I/O metrics + if (!metrics_list.empty()) { + inodeno_t subvolume_id = metrics_list.front().subvolume_id; + + // Get quota/used bytes from cache and update last activity time + auto it = subvolume_quota.find(subvolume_id); + if (it != subvolume_quota.end()) { + res.quota_bytes = it->second.quota_bytes; + res.used_bytes = it->second.used_bytes; + it->second.last_activity = std::chrono::steady_clock::now(); + } + + // Fallback: if cache didn't have used_bytes, use the pre-fetched map + if (res.used_bytes == 0) { + auto used_it = subvol_used_bytes.find(subvolume_id); + if (used_it != subvol_used_bytes.end()) { + res.used_bytes = used_it->second; + } + } + } + // normalize latencies res.avg_read_latency = (res.read_ops > 0) ? (weighted_read_latency_sum / res.read_ops) @@ -531,4 +641,111 @@ void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_pa res.avg_write_latency = (res.write_ops > 0) ? (weighted_write_latency_sum / res.write_ops) : 0; +} + +void MetricsHandler::maybe_update_subvolume_quota(inodeno_t subvol_id, uint64_t quota_bytes, uint64_t used_bytes, bool force_zero) { + std::lock_guard l(lock); + + auto it = subvolume_quota.find(subvol_id); + if (it == subvolume_quota.end()) { + // If the subvolume was not registered yet, insert it now so we don't lose + // the first quota update (e.g., when broadcast happens before caps/metadata). + it = subvolume_quota.emplace(subvol_id, SubvolumeQuotaInfo{}).first; + dout(20) << __func__ << " inserted subvolume_quota for " << subvol_id << dendl; + } + + // Only update quota_bytes if this inode has quota enabled (avoid overwriting + // a good value from a quota-enabled child with 0 from the subvolume root). + // Exception: force_zero=true means quota was explicitly removed (set to unlimited). + if (quota_bytes > 0 || force_zero) { + it->second.quota_bytes = quota_bytes; + } + it->second.used_bytes = used_bytes; + it->second.last_activity = std::chrono::steady_clock::now(); + + dout(20) << __func__ << " subvol " << subvol_id + << " quota=" << it->second.quota_bytes + << " used=" << it->second.used_bytes + << " (input: quota=" << quota_bytes << ", used=" << used_bytes << ")" << dendl; +} + +void MetricsHandler::sample_cpu_usage() { + uint64_t current_ticks = 0; + std::string err; + + if (clk_tck <= 0) { + rank_telemetry.metrics.cpu_usage_percent = 0; + if (rank_perf_counters) { + rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0); + } + return; + } + + if (!ceph::read_process_cpu_ticks(¤t_ticks, &err)) { + rank_telemetry.metrics.cpu_usage_percent = 0; + if (rank_perf_counters) { + rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0); + } + dout(5) << err << dendl; + return; + } + + auto now = std::chrono::steady_clock::now(); + if (!rank_telemetry.cpu_sample_initialized) { + rank_telemetry.last_cpu_total_ticks = current_ticks; + rank_telemetry.last_cpu_sample_time = now; + rank_telemetry.cpu_sample_initialized = true; + rank_telemetry.metrics.cpu_usage_percent = 0; + if (rank_perf_counters) { + rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0); + } + return; + } + + if (current_ticks < rank_telemetry.last_cpu_total_ticks) { + rank_telemetry.last_cpu_total_ticks = current_ticks; + rank_telemetry.last_cpu_sample_time = now; + rank_telemetry.metrics.cpu_usage_percent = 0; + if (rank_perf_counters) { + rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0); + } + return; + } + + double elapsed = std::chrono::duration(now - rank_telemetry.last_cpu_sample_time).count(); + if (elapsed <= 0.0) { + rank_telemetry.metrics.cpu_usage_percent = 0; + if (rank_perf_counters) { + rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0); + } + return; + } + + uint64_t delta_ticks = current_ticks - rank_telemetry.last_cpu_total_ticks; + rank_telemetry.last_cpu_total_ticks = current_ticks; + rank_telemetry.last_cpu_sample_time = now; + + double cpu_seconds = static_cast(delta_ticks) / static_cast(clk_tck); + double cores_used = cpu_seconds / elapsed; + double usage_percent = cores_used * 100.0; + if (usage_percent < 0.0) { + usage_percent = 0.0; + } + + uint64_t stored = static_cast(std::llround(usage_percent)); + if (rank_perf_counters) { + rank_perf_counters->set(l_mds_rank_perf_cpu_usage, stored); + } + rank_telemetry.metrics.cpu_usage_percent = stored; +} + +void MetricsHandler::sample_open_requests() { + uint64_t open = 0; + if (mds->op_tracker.is_tracking()) { + open = mds->op_tracker.get_num_ops_in_flight(); + } + rank_telemetry.metrics.open_requests = open; + if (rank_perf_counters) { + rank_perf_counters->set(l_mds_rank_perf_open_requests, open); + } } \ No newline at end of file diff --git a/src/mds/MetricsHandler.h b/src/mds/MetricsHandler.h index bab551f951a..960def21600 100644 --- a/src/mds/MetricsHandler.h +++ b/src/mds/MetricsHandler.h @@ -4,6 +4,7 @@ #ifndef CEPH_MDS_METRICS_HANDLER_H #define CEPH_MDS_METRICS_HANDLER_H +#include #include #include #include @@ -13,7 +14,7 @@ #include "msg/Dispatcher.h" #include "common/ceph_mutex.h" -#include "MDSPerfMetricTypes.h" +#include "mds/MDSPerfMetricTypes.h" #include "include/cephfs/metrics/Types.h" #include @@ -63,6 +64,11 @@ public: void notify_mdsmap(const MDSMap &mdsmap); + // Called from MDCache::broadcast_quota_to_client to update quota for subvolumes + // quota_bytes: only updated if > 0, unless force_zero is true (quota removed) + // used_bytes is fetched dynamically from inode rstat in aggregate_subvolume_metrics + void maybe_update_subvolume_quota(inodeno_t subvol_id, uint64_t quota_bytes, uint64_t used_bytes, bool force_zero = false); + private: struct HandlePayloadVisitor : public boost::static_visitor { MetricsHandler *metrics_handler; @@ -105,7 +111,16 @@ private: std::map> client_metrics_map; // maps subvolume path -> aggregated metrics from all clients reporting to this MDS instance std::unordered_map> subvolume_metrics_map; - uint64_t subv_metrics_tracker_window_time_sec = 300; + uint64_t subv_window_sec = 0; + + // maps subvolume_id (inode number) -> quota info, updated when quota is broadcast to clients + // used_bytes is fetched dynamically from inode rstat, not cached here + struct SubvolumeQuotaInfo { + uint64_t quota_bytes = 0; + uint64_t used_bytes = 0; + std::chrono::steady_clock::time_point last_activity; + }; + std::unordered_map subvolume_quota; // address of rank 0 mds, so that the message can be sent withoutÃ¥ // acquiring mds_lock. misdirected messages to rank 0 are taken // care of by rank 0. @@ -132,10 +147,37 @@ private: void handle_client_metrics(const cref_t &m); void handle_mds_ping(const cref_t &m); - void update_rank0(); + void update_rank0(std::unique_lock& locker); + + // RAII helper to temporarily unlock/relock a unique_lock + struct UnlockGuard { + std::unique_lock& lk; + explicit UnlockGuard(std::unique_lock& l) : lk(l) { lk.unlock(); } + ~UnlockGuard() noexcept { + if (!lk.owns_lock()) { + try { lk.lock(); } catch (...) { + // avoid throwing from destructor + } + } + } + }; void aggregate_subvolume_metrics(const std::string& subvolume_path, - const std::vector& metrics_list, SubvolumeMetric &res); + const std::vector& metrics_list, + const std::unordered_map& subvol_used_bytes, + SubvolumeMetric &res); + + void sample_cpu_usage(); + void sample_open_requests(); + + PerfCounters *rank_perf_counters = nullptr; + long clk_tck = 0; + struct RankTelemetry { + RankPerfMetrics metrics; + bool cpu_sample_initialized = false; + uint64_t last_cpu_total_ticks = 0; + std::chrono::steady_clock::time_point last_cpu_sample_time{}; + } rank_telemetry; }; #endif // CEPH_MDS_METRICS_HANDLER_H diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc index 51105f4d034..642dc2b547d 100644 --- a/src/mds/mdstypes.cc +++ b/src/mds/mdstypes.cc @@ -1244,6 +1244,8 @@ void SubvolumeMetric::dump(Formatter *f) const { f->dump_unsigned("avg_read_latency", avg_read_latency); f->dump_unsigned("avg_write_latency", avg_write_latency); f->dump_unsigned("time_window_sec", time_stamp); + f->dump_unsigned("quota_bytes", quota_bytes); + f->dump_unsigned("used_bytes", used_bytes); } std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m) { @@ -1254,6 +1256,8 @@ std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m) { << ", write_size=" << m.write_size << ", avg_read_lat=" << m.avg_read_latency << ", avg_write_lat=" << m.avg_write_latency - << ", time_window_sec=" << m.time_stamp << "}"; + << ", time_window_sec=" << m.time_stamp + << ", quota_bytes=" << m.quota_bytes + << ", used_bytes=" << m.used_bytes << "}"; return os; } diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index 06240554e95..cfcd3c1511f 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -1006,9 +1006,11 @@ struct SubvolumeMetric { uint64_t avg_read_latency = 0; uint64_t avg_write_latency = 0; uint64_t time_stamp = 0; + uint64_t quota_bytes = 0; + uint64_t used_bytes = 0; DENC(SubvolumeMetric, v, p) { - DENC_START(1, 1, p); + DENC_START(2, 1, p); denc(v.subvolume_path, p); denc(v.read_ops, p); denc(v.write_ops, p); @@ -1017,6 +1019,14 @@ struct SubvolumeMetric { denc(v.avg_read_latency, p); denc(v.avg_write_latency, p); denc(v.time_stamp, p); + if (struct_v >= 2) { + denc(v.quota_bytes, p); + denc(v.used_bytes, p); + } else { + auto &mutable_v = const_cast(v); + mutable_v.quota_bytes = 0; + mutable_v.used_bytes = 0; + } DENC_FINISH(p); } -- 2.47.3