struct ShardedTrackingData {
ceph::mutex ops_in_flight_lock_sharded;
TrackedOp::tracked_op_list_t ops_in_flight_sharded;
- explicit ShardedTrackingData(string lock_name)
+ std::atomic<uint64_t> ops_in_flight_count{0};
+ explicit ShardedTrackingData(const char* lock_name)
: ops_in_flight_lock_sharded(ceph::make_mutex(lock_name)) {}
};
return true;
}
+uint64_t OpTracker::get_num_ops_in_flight()
+{
+ if (!tracking_enabled)
+ return 0;
+
+ std::shared_lock l{lock};
+ uint64_t total_ops_in_flight = 0;
+ for (uint32_t i = 0; i < num_optracker_shards; ++i) {
+ ShardedTrackingData* sdata = sharded_in_flight_list[i];
+ ceph_assert(nullptr != sdata);
+ total_ops_in_flight += sdata->ops_in_flight_count.load(std::memory_order_relaxed);
+ }
+ return total_ops_in_flight;
+}
+
bool OpTracker::register_inflight_op(TrackedOp *i)
{
if (!tracking_enabled)
std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
sdata->ops_in_flight_sharded.push_back(*i);
i->seq = current_seq;
+ sdata->ops_in_flight_count.fetch_add(1, std::memory_order_relaxed);
}
return true;
}
std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
sdata->ops_in_flight_sharded.erase(p);
+ sdata->ops_in_flight_count.fetch_sub(1, std::memory_order_relaxed);
}
}
bool dump_historic_slow_ops(ceph::Formatter *f, std::set<std::string> filters = {""});
bool register_inflight_op(TrackedOp *i);
void unregister_inflight_op(TrackedOp *i);
+ uint64_t get_num_ops_in_flight();
void record_history_op(TrackedOpRef&& i);
void get_age_ms_histogram(pow2_hist_t *h);
#include <sys/utsname.h>
#endif
+#include <cstdlib>
#include <fstream>
+#include <iterator>
+#include <vector>
#include <boost/algorithm/string.hpp>
+#include "acconfig.h"
#include "include/compat.h"
#include "include/util.h"
#include "common/debug.h"
snprintf(str, sizeof str, "%" PRIu64 "%sB", count, s[i]);
return std::string(str);
}
+
+#ifndef _WIN32
+bool ceph::read_process_cpu_ticks(uint64_t* total, std::string* error)
+{
+ ceph_assert(total != nullptr);
+ const char* stat_path = PROCPREFIX "/proc/self/stat";
+ std::ifstream stat_file(stat_path);
+ if (!stat_file.is_open()) {
+ if (error) {
+ *error = std::string("failed to open '") + stat_path + "'";
+ }
+ return false;
+ }
+
+ std::vector<std::string> stat_vec((std::istream_iterator<std::string>{stat_file}),
+ std::istream_iterator<std::string>());
+ if (stat_vec.size() < 15) {
+ if (error) {
+ *error = std::string("failed to parse '") + stat_path + "'";
+ }
+ return false;
+ }
+
+ uint64_t utime = std::strtoull(stat_vec[13].c_str(), nullptr, 10);
+ uint64_t stime = std::strtoull(stat_vec[14].c_str(), nullptr, 10);
+ *total = utime + stime;
+ return true;
+}
+#else
+bool ceph::read_process_cpu_ticks(uint64_t* total, std::string* error)
+{
+ if (error) {
+ *error = "/proc/self/stat not available on this platform";
+ }
+ return false;
+}
+#endif
std::string cleanbin(ceph::buffer::list &bl, bool &b64, bool show = false);
std::string cleanbin(std::string &str);
-namespace ceph::util {
+namespace ceph {
+
+/// Read user+system CPU ticks for the current process from /proc/self/stat
+bool read_process_cpu_ticks(uint64_t* total, std::string* error = nullptr);
+
+namespace util {
// Returns true if s matches any parameters:
template <typename ...XS>
return ((s == xs) || ...);
}
-} // namespace ceph::util
+} // namespace util
+} // namespace ceph
#endif /* CEPH_UTIL_H */
#include "msg/Messenger.h"
#include "messages/MHeartbeat.h"
-#include <fstream>
#include <vector>
#include <map>
#include "common/config.h"
#include "common/debug.h"
#include "common/errno.h"
+#include "include/util.h"
/* Note, by default debug_mds_balancer is 1/5. For debug messages 1<lvl<=5,
* should_gather (below) will be true; so, debug_mds will be ignored even if
uint64_t cpu_time = 1;
{
- string stat_path = PROCPREFIX "/proc/self/stat";
- ifstream stat_file(stat_path);
- if (stat_file.is_open()) {
- vector<string> stat_vec(std::istream_iterator<string>{stat_file},
- std::istream_iterator<string>());
- if (stat_vec.size() >= 15) {
- // utime + stime
- cpu_time = strtoll(stat_vec[13].c_str(), nullptr, 10) +
- strtoll(stat_vec[14].c_str(), nullptr, 10);
- } else {
- derr << "input file '" << stat_path << "' not resolvable" << dendl_impl;
- }
+ uint64_t ticks = 0;
+ std::string err;
+ if (ceph::read_process_cpu_ticks(&ticks, &err)) {
+ cpu_time = ticks;
} else {
- derr << "input file '" << stat_path << "' not found" << dendl_impl;
+ derr << err << dendl_impl;
}
}
return;
const auto& pi = in->get_projected_inode();
+ inodeno_t subvolume_id = in->get_subvolume_id();
+ dout(10) << __func__ << " ino " << in->ino()
+ << " subvol " << subvolume_id
+ << " quota_enabled=" << pi->quota.is_enabled()
+ << " quota_change=" << quota_change
+ << " max_bytes=" << pi->quota.max_bytes
+ << " rbytes=" << pi->rstat.rbytes
+ << dendl;
+
+ // Update subvolume quota cache in MetricsHandler.
+ // Update when quota is enabled OR when there's a quota change (e.g., removing quota).
+ // This ensures cache is updated to 0 when quota is set to unlimited.
+ // Pass both quota and current used_bytes from this inode.
+ if (subvolume_id != inodeno_t{0} && (pi->quota.is_enabled() || quota_change)) {
+ // force_zero=true when quota was removed (quota_change but not enabled)
+ bool force_zero = quota_change && !pi->quota.is_enabled();
+ uint64_t used_bytes = pi->rstat.rbytes > 0 ? static_cast<uint64_t>(pi->rstat.rbytes) : 0;
+ mds->metrics_handler.maybe_update_subvolume_quota(
+ subvolume_id,
+ pi->quota.max_bytes > 0 ? static_cast<uint64_t>(pi->quota.max_bytes) : 0,
+ used_bytes,
+ force_zero);
+ }
+
if (!pi->quota.is_enabled() && !quota_change)
return;
- // creaete snaprealm for quota inode (quota was set before mimic)
+ // create snaprealm for quota inode (quota was set before mimic)
if (!in->get_projected_srnode())
mds->server->create_quota_realm(in);
UPDATE_TYPE_REMOVE,
};
+inline constexpr uint32_t l_mds_rank_perf_start = 40000;
+inline constexpr uint32_t l_mds_rank_perf_cpu_usage = l_mds_rank_perf_start + 1;
+inline constexpr uint32_t l_mds_rank_perf_open_requests = l_mds_rank_perf_start + 2;
+inline constexpr uint32_t l_mds_rank_perf_last = l_mds_rank_perf_start + 3;
+
struct CapHitMetric {
uint64_t hits = 0;
uint64_t misses = 0;
WRITE_CLASS_DENC(WriteIoSizesMetric)
WRITE_CLASS_DENC(SubvolumeMetric)
+struct RankPerfMetrics {
+ uint64_t cpu_usage_percent = 0;
+ uint64_t open_requests = 0;
+
+ DENC(RankPerfMetrics, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.cpu_usage_percent, p);
+ denc(v.open_requests, p);
+ DENC_FINISH(p);
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_unsigned("cpu_usage_percent", cpu_usage_percent);
+ f->dump_unsigned("open_requests", open_requests);
+ }
+
+ friend std::ostream& operator<<(std::ostream& os, const RankPerfMetrics &metric) {
+ os << "{cpu_usage_percent=" << metric.cpu_usage_percent
+ << ", open_requests=" << metric.open_requests << "}";
+ return os;
+ }
+};
+WRITE_CLASS_DENC(RankPerfMetrics)
+
// metrics that are forwarded to the MDS by client(s).
struct Metrics {
// metrics
mds_rank_t rank = MDS_RANK_NONE;
std::map<entity_inst_t, Metrics> client_metrics_map;
std::vector<SubvolumeMetric> subvolume_metrics;
+ RankPerfMetrics rank_metrics;
metrics_message_t() {
}
void encode(bufferlist &bl, uint64_t features) const {
using ceph::encode;
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(seq, bl);
encode(rank, bl);
encode(client_metrics_map, bl, features);
encode(subvolume_metrics, bl);
+ encode(rank_metrics, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator &iter) {
using ceph::decode;
- DECODE_START(2, iter);
+ DECODE_START(3, iter);
decode(seq, iter);
decode(rank, iter);
decode(client_metrics_map, iter);
if (struct_v >= 2) {
decode(subvolume_metrics, iter);
}
+ if (struct_v >= 3) {
+ decode(rank_metrics, iter);
+ } else {
+ rank_metrics = {};
+ }
DECODE_FINISH(iter);
}
f->dump_object("client", client);
f->dump_object("metrics", metrics);
}
+ f->dump_object("rank_metrics", rank_metrics);
f->open_array_section("subvolume_metrics");
for (const auto &metric : subvolume_metrics) {
f->open_object_section("metric");
friend std::ostream& operator<<(std::ostream& os, const metrics_message_t &m) {
os << "[sequence=" << m.seq << ", rank=" << m.rank
<< ", client_metrics=" << m.client_metrics_map
- << ", subvolume_metrics=" << m.subvolume_metrics << "]";
+ << ", subvolume_metrics=" << m.subvolume_metrics;
+ os << ", rank_metrics=" << m.rank_metrics;
+ os << "]";
return os;
}
};
#include "osdc/Journaler.h"
#include <typeinfo>
+#include <fstream>
+#include <iterator>
+#include <vector>
+#include <cstdlib>
#include "common/DecayCounter.h"
#include "common/debug.h"
#include "common/errno.h"
return res;
}
+uint64_t MDSRank::get_inode_rbytes(inodeno_t ino) {
+ std::lock_guard locker(mds_lock);
+ CInode* inode = mdcache->get_inode(ino);
+ if (!inode) return 0;
+ const auto& pi = inode->get_projected_inode();
+ return pi->rstat.rbytes > 0 ? static_cast<uint64_t>(pi->rstat.rbytes) : 0;
+}
+
std::vector<std::string> MDSRankDispatcher::get_tracked_keys()
const noexcept
{
#define MDS_RANK_H_
#include <atomic>
+#include <cstdint>
#include <string_view>
#include "common/admin_socket.h" // for asok_finisher
friend class C_CacheDropExecAndReply;
friend class C_ScrubExecAndReply;
friend class C_ScrubControlExecAndReply;
+ friend class MDCache;
+ friend class Locker;
+ friend class CInode;
CephContext *cct;
}
std::string get_path(inodeno_t ino);
+ uint64_t get_inode_rbytes(inodeno_t ino);
// Reference to global MDS::mds_lock, so that users of MDSRank don't
// carry around references to the outer MDS, and we can substitute
#include "MetricAggregator.h"
#include "MDSMap.h"
#include "MDSRank.h"
+#include "MetricsHandler.h"
#include "mgr/MgrClient.h"
#include "common/ceph_context.h"
#define dout_prefix *_dout << "mds.metric.aggregator" << " " << __func__
// Performance Counters
- enum {
+enum {
l_mds_client_metrics_start = 10000,
l_mds_client_metrics_num_clients,
l_mds_client_metrics_last
- };
+};
enum {
l_mds_per_client_metrics_start = 20000,
l_mds_per_client_metrics_total_write_ops,
l_mds_per_client_metrics_total_write_size,
l_mds_per_client_metrics_last
- };
+};
enum {
l_subvolume_metrics_first = 30000,
l_subvolume_metrics_write_iops,
l_subvolume_metrics_write_tp_Bps,
l_subvolume_metrics_avg_write_latency,
+ l_subvolume_metrics_quota_bytes,
+ l_subvolume_metrics_used_bytes,
l_subvolume_metrics_last
};
});
subv_window_sec = g_conf().get_val<std::chrono::seconds>("subv_metrics_window_interval").count();
- if (!subv_window_sec)
- return -EINVAL;
-
return 0;
}
}
client_perf_counters.clear();
+ for (auto &[rank, perf_counters] : rank_perf_counters) {
+ if (perf_counters != nullptr) {
+ m_cct->get_perfcounters_collection()->remove(perf_counters);
+ delete perf_counters;
+ }
+ }
+ rank_perf_counters.clear();
+
PerfCounters *perf_counters = nullptr;
std::swap(perf_counters, m_perf_counters);
if (perf_counters != nullptr) {
"Average write throughput (Bps)", "wbps", PerfCountersBuilder::PRIO_CRITICAL);
plb.add_u64(l_subvolume_metrics_avg_write_latency, "avg_write_lat_msec",
"Average write latency (ms)", "wlav", PerfCountersBuilder::PRIO_CRITICAL);
+ plb.add_u64(l_subvolume_metrics_quota_bytes, "quota_bytes",
+ "Configured quota (bytes) for the subvolume", nullptr,
+ PerfCountersBuilder::PRIO_USEFUL, UNIT_BYTES);
+ plb.add_u64(l_subvolume_metrics_used_bytes, "used_bytes",
+ "Current logical bytes used by the subvolume", nullptr,
+ PerfCountersBuilder::PRIO_USEFUL, UNIT_BYTES);
auto perf_counter = plb.create_perf_counters();
subvolume_perf_counters[m.subvolume_path] = perf_counter;
uint64_t total_read_ops = 0, total_write_ops = 0;
uint64_t total_read_bytes = 0, total_write_bytes = 0;
uint64_t weighted_read_latency_sum = 0, weighted_write_latency_sum = 0;
+ uint64_t latest_sample_ts = 0;
+ uint64_t latest_quota_bytes = 0;
+ uint64_t latest_used_bytes = 0;
tracker.for_each_value([&](const SubvolumeMetric &m) {
total_read_ops += m.read_ops;
total_write_bytes += m.write_size;
weighted_read_latency_sum += m.avg_read_latency * m.read_ops;
weighted_write_latency_sum += m.avg_write_latency * m.write_ops;
+ if (m.time_stamp >= latest_sample_ts) {
+ latest_sample_ts = m.time_stamp;
+ latest_quota_bytes = m.quota_bytes;
+ latest_used_bytes = m.used_bytes;
+ }
});
aggr_metric.read_iops = total_read_ops / aggr_metric.time_window_last_dur_sec;
aggr_metric.avg_write_latency = (total_write_ops > 0)
? (weighted_write_latency_sum / total_write_ops) / 1000
: 0;
+ aggr_metric.quota_bytes = latest_quota_bytes;
+ aggr_metric.used_bytes = latest_used_bytes;
// update PerfCounters
auto counter = subvolume_perf_counters[path];
counter->set(l_subvolume_metrics_write_iops, aggr_metric.write_iops);
counter->set(l_subvolume_metrics_write_tp_Bps, aggr_metric.write_tBps);
counter->set(l_subvolume_metrics_avg_write_latency, aggr_metric.avg_write_latency);
+ counter->set(l_subvolume_metrics_quota_bytes, aggr_metric.quota_bytes);
+ counter->set(l_subvolume_metrics_used_bytes, aggr_metric.used_bytes);
// Update query_metrics_map
auto sub_key_func_subvolume = [this, &path](const MDSPerfMetricSubKeyDescriptor &desc,
case MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC:
counter->first = aggr_metric.avg_write_latency;
break;
+ case MDSPerformanceCounterType::SUBV_QUOTA_BYTES_METRIC:
+ counter->first = aggr_metric.quota_bytes;
+ break;
+ case MDSPerformanceCounterType::SUBV_USED_BYTES_METRIC:
+ counter->first = aggr_metric.used_bytes;
+ break;
default:
break;
}
case MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC:
case MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC:
case MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC:
+ case MDSPerformanceCounterType::SUBV_QUOTA_BYTES_METRIC:
+ case MDSPerformanceCounterType::SUBV_USED_BYTES_METRIC:
break;
default:
ceph_abort_msg("unknown counter type");
}
refresh_subvolume_metrics_for_rank(rank, subvolume_metrics);
+ update_rank_perf_metrics(rank, metrics_message.rank_metrics);
+}
+
+void MetricAggregator::update_rank_perf_metrics(mds_rank_t rank, const RankPerfMetrics& metrics) {
+ auto &perf_counter = rank_perf_counters[rank];
+ if (!perf_counter) {
+ std::string labels = ceph::perf_counters::key_create(
+ "mds_rank_perf",
+ {{"rank", stringify(rank)}});
+ PerfCountersBuilder plb(m_cct, labels,
+ l_mds_rank_perf_start, l_mds_rank_perf_last);
+ plb.add_u64(l_mds_rank_perf_cpu_usage,
+ "cpu_usage",
+ "Sum of per-core CPU utilisation reported for this MDS (100 == one full core)",
+ "cpu%",
+ PerfCountersBuilder::PRIO_USEFUL);
+ plb.add_u64(l_mds_rank_perf_open_requests,
+ "open_requests",
+ "Number of metadata requests currently in flight on this MDS",
+ "req",
+ PerfCountersBuilder::PRIO_USEFUL);
+ perf_counter = plb.create_perf_counters();
+ m_cct->get_perfcounters_collection()->add(perf_counter);
+ }
+ perf_counter->set(l_mds_rank_perf_cpu_usage, metrics.cpu_usage_percent);
+ perf_counter->set(l_mds_rank_perf_open_requests, metrics.open_requests);
}
void MetricAggregator::cull_metrics_for_rank(mds_rank_t rank) {
dout(10) << ": culled " << p.size() << " clients" << dendl;
clients_by_rank.erase(rank);
+ remove_rank_perf_metrics_for_rank(rank);
+}
+
+void MetricAggregator::remove_rank_perf_metrics_for_rank(mds_rank_t rank) {
+ auto it = rank_perf_counters.find(rank);
+ if (it != rank_perf_counters.end()) {
+ m_cct->get_perfcounters_collection()->remove(it->second);
+ delete it->second;
+ rank_perf_counters.erase(it);
+ }
}
void MetricAggregator::notify_mdsmap(const MDSMap &mdsmap) {
#include "mgr/Types.h" // for PerformanceCounters
#include "mgr/MetricTypes.h" // for MetricPayload
-#include "mgr/MDSPerfMetricTypes.h"
+#include "mds/MDSPerfMetricTypes.h"
#include "mdstypes.h"
#include "MDSPinger.h"
PerfCounters *m_perf_counters;
std::map<std::pair<entity_inst_t, mds_rank_t>, PerfCounters*> client_perf_counters;
- uint64_t subv_window_sec;
+ uint64_t subv_window_sec = 0;
std::unordered_map<std::string, SlidingWindowTracker<SubvolumeMetric>> subvolume_aggregated_metrics;
std::map<std::string, PerfCounters*> subvolume_perf_counters;
+ std::map<mds_rank_t, PerfCounters*> rank_perf_counters;
void handle_mds_metrics(const cref_t<MMDSMetrics> &m);
const Metrics &metrics);
void refresh_subvolume_metrics_for_rank(mds_rank_t rank, const std::vector<SubvolumeMetric> &metrics);
void remove_metrics_for_rank(const entity_inst_t &client, mds_rank_t rank, bool remove);
+ void update_rank_perf_metrics(mds_rank_t rank, const RankPerfMetrics& metrics);
+ void remove_rank_perf_metrics_for_rank(mds_rank_t rank);
void cull_metrics_for_rank(mds_rank_t rank);
#include "MetricsHandler.h"
+#include <cmath>
+#include <cstdlib>
+#include <limits>
+#include <string>
#include <variant>
+#include <vector>
+#include <unistd.h>
#include "common/debug.h"
#include "common/errno.h"
+#include "common/perf_counters.h"
+#include "common/perf_counters_key.h"
+#include "include/util.h"
+#include "include/fs_types.h"
+#include "include/stringify.h"
#include "messages/MClientMetrics.h"
#include "messages/MMDSMetrics.h"
#include "messages/MMDSPing.h"
#include "MDSRank.h"
+#include "MDCache.h"
+#include "CInode.h"
#include "SessionMap.h"
#define dout_context g_ceph_context
MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds)
: Dispatcher(cct),
mds(mds) {
+ clk_tck = sysconf(_SC_CLK_TCK);
+ if (clk_tck <= 0) {
+ dout(1) << "failed to determine clock ticks per second, cpu usage metric disabled" << dendl;
+ clk_tck = 0;
+ }
}
Dispatcher::dispatch_result_t MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
void MetricsHandler::init() {
dout(10) << dendl;
+ // Create local perf counters for non-rank0 MDS only. Rank0's perf counters
+ // are created by MetricAggregator::update_rank_perf_metrics() to avoid
+ // duplicate counters (MetricAggregator creates counters for all ranks).
+ if (mds->get_nodeid() != 0 && !rank_perf_counters) {
+ std::string labels = ceph::perf_counters::key_create(
+ "mds_rank_perf",
+ {{"rank", stringify(mds->get_nodeid())}});
+
+ PerfCountersBuilder plb(cct, labels,
+ l_mds_rank_perf_start, l_mds_rank_perf_last);
+ plb.add_u64(l_mds_rank_perf_cpu_usage,
+ "cpu_usage",
+ "Sum of per-core CPU utilisation for this MDS (100 == one full core)",
+ "cpu%",
+ PerfCountersBuilder::PRIO_USEFUL);
+ plb.add_u64(l_mds_rank_perf_open_requests,
+ "open_requests",
+ "Number of metadata requests currently in flight",
+ "req",
+ PerfCountersBuilder::PRIO_USEFUL);
+ rank_perf_counters = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(rank_perf_counters);
+ }
+
+ subv_window_sec = g_conf().get_val<std::chrono::seconds>("subv_metrics_window_interval").count();
+
updater = std::thread([this]() {
ceph_pthread_setname("mds-metrics");
std::unique_lock locker(lock);
locker.unlock();
sleep(after);
locker.lock();
- update_rank0();
+ update_rank0(locker);
}
});
}
if (updater.joinable()) {
updater.join();
}
+
+ if (rank_perf_counters) {
+ cct->get_perfcounters_collection()->remove(rank_perf_counters);
+ delete rank_perf_counters;
+ rank_perf_counters = nullptr;
+ }
}
resolved_paths.reserve(payload.subvolume_metrics.size());
// RAII guard: unlock on construction, re-lock on destruction (even on exceptions)
- struct UnlockGuard {
- std::unique_lock<ceph::mutex> &lk;
- explicit UnlockGuard(std::unique_lock<ceph::mutex>& l) : lk(l) { lk.unlock(); }
- ~UnlockGuard() noexcept {
- if (!lk.owns_lock()) {
- try { lk.lock(); }
- catch (...) {
- dout(0) << "failed to re-lock in UnlockGuard dtor" << dendl;
- }
- }
- }
- } unlock_guard{lk};
+ UnlockGuard unlock_guard{lk};
// unlocked: resolve paths, no contention with mds lock
for (const auto& metric : payload.subvolume_metrics) {
}
}
-void MetricsHandler::update_rank0() {
+void MetricsHandler::update_rank0(std::unique_lock<ceph::mutex>& locker) {
dout(20) << dendl;
+ sample_cpu_usage();
+ sample_open_requests();
+
if (!addr_rank0) {
dout(20) << ": not yet notified with rank0 address, ignoring" << dendl;
return;
metrics_message.seq = next_seq;
metrics_message.rank = mds->get_nodeid();
+ metrics_message.rank_metrics = rank_telemetry.metrics;
for (auto p = client_metrics_map.begin(); p != client_metrics_map.end();) {
// copy metrics and update local metrics map as required
}
}
+ // Resolve used_bytes for all subvolumes without holding the metrics lock
+ // (same unlock pattern used for path resolution in handle_payload).
+ // Step 1 (locked): collect unique subvolume ids
+ std::vector<inodeno_t> subvol_ids;
+ subvol_ids.reserve(subvolume_metrics_map.size());
+ for (const auto &[path, aggregated_metrics] : subvolume_metrics_map) {
+ if (!aggregated_metrics.empty()) {
+ subvol_ids.push_back(aggregated_metrics.front().subvolume_id);
+ }
+ }
+
+ // Step 2 (unlocked): fetch rbytes under mds_lock via helper, release metric log
+ // it allows to proceed another update in metrics handler
+ UnlockGuard unlock_guard{locker};
+
+ // here the metrics handler lock witll be retaken via the UnlockGuard dtor
+ std::unordered_map<inodeno_t, uint64_t> subvol_used_bytes;
+ for (inodeno_t subvol_id : subvol_ids) {
+ if (subvol_used_bytes.count(subvol_id) == 0) {
+ uint64_t rbytes = mds->get_inode_rbytes(subvol_id);
+ subvol_used_bytes[subvol_id] = rbytes;
+ dout(20) << "resolved used_bytes for subvol " << subvol_id << " = " << rbytes << dendl;
+ }
+ }
+
// subvolume metrics, reserve 100 entries per subvolume ? good enough?
metrics_message.subvolume_metrics.reserve(subvolume_metrics_map.size()* 100);
for (auto &[path, aggregated_metrics] : subvolume_metrics_map) {
metrics_message.subvolume_metrics.emplace_back();
- aggregate_subvolume_metrics(path, aggregated_metrics, metrics_message.subvolume_metrics.back());
+ aggregate_subvolume_metrics(path, aggregated_metrics, subvol_used_bytes,
+ metrics_message.subvolume_metrics.back());
}
// if we need to show local MDS metrics, we need to save a last copy...
subvolume_metrics_map.clear();
+ // Evict stale subvolume quota entries
+ if (subv_window_sec > 0) {
+ auto now = std::chrono::steady_clock::now();
+ auto threshold = std::chrono::seconds(subv_window_sec) * 2;
+ for (auto it = subvolume_quota.begin(); it != subvolume_quota.end(); ) {
+ auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - it->second.last_activity);
+ if (elapsed > threshold) {
+ dout(15) << "evicting stale subvolume quota entry " << it->first
+ << " (inactive for " << elapsed.count() << "s)" << dendl;
+ it = subvolume_quota.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
// only start incrementing when its kicked via set_next_seq()
if (next_seq != 0) {
++last_updated_seq;
}
void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_path,
- const std::vector<AggregatedIOMetrics>& metrics_list, SubvolumeMetric &res) {
+ const std::vector<AggregatedIOMetrics>& metrics_list,
+ const std::unordered_map<inodeno_t, uint64_t>& subvol_used_bytes,
+ SubvolumeMetric &res) {
dout(20) << ": aggregating " << metrics_list.size() << " subv_metrics" << dendl;
res.subvolume_path = subvolume_path;
res.avg_read_latency = 0;
res.avg_write_latency = 0;
res.time_stamp = 0;
+ res.quota_bytes = 0;
+ res.used_bytes = 0;
for (const auto& m : metrics_list) {
res.read_ops += m.read_count;
}
}
+ // Lookup quota and used_bytes after aggregating I/O metrics
+ if (!metrics_list.empty()) {
+ inodeno_t subvolume_id = metrics_list.front().subvolume_id;
+
+ // Get quota/used bytes from cache and update last activity time
+ auto it = subvolume_quota.find(subvolume_id);
+ if (it != subvolume_quota.end()) {
+ res.quota_bytes = it->second.quota_bytes;
+ res.used_bytes = it->second.used_bytes;
+ it->second.last_activity = std::chrono::steady_clock::now();
+ }
+
+ // Fallback: if cache didn't have used_bytes, use the pre-fetched map
+ if (res.used_bytes == 0) {
+ auto used_it = subvol_used_bytes.find(subvolume_id);
+ if (used_it != subvol_used_bytes.end()) {
+ res.used_bytes = used_it->second;
+ }
+ }
+ }
+
// normalize latencies
res.avg_read_latency = (res.read_ops > 0)
? (weighted_read_latency_sum / res.read_ops)
res.avg_write_latency = (res.write_ops > 0)
? (weighted_write_latency_sum / res.write_ops)
: 0;
+}
+
+void MetricsHandler::maybe_update_subvolume_quota(inodeno_t subvol_id, uint64_t quota_bytes, uint64_t used_bytes, bool force_zero) {
+ std::lock_guard l(lock);
+
+ auto it = subvolume_quota.find(subvol_id);
+ if (it == subvolume_quota.end()) {
+ // If the subvolume was not registered yet, insert it now so we don't lose
+ // the first quota update (e.g., when broadcast happens before caps/metadata).
+ it = subvolume_quota.emplace(subvol_id, SubvolumeQuotaInfo{}).first;
+ dout(20) << __func__ << " inserted subvolume_quota for " << subvol_id << dendl;
+ }
+
+ // Only update quota_bytes if this inode has quota enabled (avoid overwriting
+ // a good value from a quota-enabled child with 0 from the subvolume root).
+ // Exception: force_zero=true means quota was explicitly removed (set to unlimited).
+ if (quota_bytes > 0 || force_zero) {
+ it->second.quota_bytes = quota_bytes;
+ }
+ it->second.used_bytes = used_bytes;
+ it->second.last_activity = std::chrono::steady_clock::now();
+
+ dout(20) << __func__ << " subvol " << subvol_id
+ << " quota=" << it->second.quota_bytes
+ << " used=" << it->second.used_bytes
+ << " (input: quota=" << quota_bytes << ", used=" << used_bytes << ")" << dendl;
+}
+
+void MetricsHandler::sample_cpu_usage() {
+ uint64_t current_ticks = 0;
+ std::string err;
+
+ if (clk_tck <= 0) {
+ rank_telemetry.metrics.cpu_usage_percent = 0;
+ if (rank_perf_counters) {
+ rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0);
+ }
+ return;
+ }
+
+ if (!ceph::read_process_cpu_ticks(¤t_ticks, &err)) {
+ rank_telemetry.metrics.cpu_usage_percent = 0;
+ if (rank_perf_counters) {
+ rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0);
+ }
+ dout(5) << err << dendl;
+ return;
+ }
+
+ auto now = std::chrono::steady_clock::now();
+ if (!rank_telemetry.cpu_sample_initialized) {
+ rank_telemetry.last_cpu_total_ticks = current_ticks;
+ rank_telemetry.last_cpu_sample_time = now;
+ rank_telemetry.cpu_sample_initialized = true;
+ rank_telemetry.metrics.cpu_usage_percent = 0;
+ if (rank_perf_counters) {
+ rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0);
+ }
+ return;
+ }
+
+ if (current_ticks < rank_telemetry.last_cpu_total_ticks) {
+ rank_telemetry.last_cpu_total_ticks = current_ticks;
+ rank_telemetry.last_cpu_sample_time = now;
+ rank_telemetry.metrics.cpu_usage_percent = 0;
+ if (rank_perf_counters) {
+ rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0);
+ }
+ return;
+ }
+
+ double elapsed = std::chrono::duration<double>(now - rank_telemetry.last_cpu_sample_time).count();
+ if (elapsed <= 0.0) {
+ rank_telemetry.metrics.cpu_usage_percent = 0;
+ if (rank_perf_counters) {
+ rank_perf_counters->set(l_mds_rank_perf_cpu_usage, 0);
+ }
+ return;
+ }
+
+ uint64_t delta_ticks = current_ticks - rank_telemetry.last_cpu_total_ticks;
+ rank_telemetry.last_cpu_total_ticks = current_ticks;
+ rank_telemetry.last_cpu_sample_time = now;
+
+ double cpu_seconds = static_cast<double>(delta_ticks) / static_cast<double>(clk_tck);
+ double cores_used = cpu_seconds / elapsed;
+ double usage_percent = cores_used * 100.0;
+ if (usage_percent < 0.0) {
+ usage_percent = 0.0;
+ }
+
+ uint64_t stored = static_cast<uint64_t>(std::llround(usage_percent));
+ if (rank_perf_counters) {
+ rank_perf_counters->set(l_mds_rank_perf_cpu_usage, stored);
+ }
+ rank_telemetry.metrics.cpu_usage_percent = stored;
+}
+
+void MetricsHandler::sample_open_requests() {
+ uint64_t open = 0;
+ if (mds->op_tracker.is_tracking()) {
+ open = mds->op_tracker.get_num_ops_in_flight();
+ }
+ rank_telemetry.metrics.open_requests = open;
+ if (rank_perf_counters) {
+ rank_perf_counters->set(l_mds_rank_perf_open_requests, open);
+ }
}
\ No newline at end of file
#ifndef CEPH_MDS_METRICS_HANDLER_H
#define CEPH_MDS_METRICS_HANDLER_H
+#include <chrono>
#include <map>
#include <mutex>
#include <unordered_map>
#include "msg/Dispatcher.h"
#include "common/ceph_mutex.h"
-#include "MDSPerfMetricTypes.h"
+#include "mds/MDSPerfMetricTypes.h"
#include "include/cephfs/metrics/Types.h"
#include <boost/optional.hpp>
void notify_mdsmap(const MDSMap &mdsmap);
+ // Called from MDCache::broadcast_quota_to_client to update quota for subvolumes
+ // quota_bytes: only updated if > 0, unless force_zero is true (quota removed)
+ // used_bytes is fetched dynamically from inode rstat in aggregate_subvolume_metrics
+ void maybe_update_subvolume_quota(inodeno_t subvol_id, uint64_t quota_bytes, uint64_t used_bytes, bool force_zero = false);
+
private:
struct HandlePayloadVisitor : public boost::static_visitor<void> {
MetricsHandler *metrics_handler;
std::map<entity_inst_t, std::pair<version_t, Metrics>> client_metrics_map;
// maps subvolume path -> aggregated metrics from all clients reporting to this MDS instance
std::unordered_map<std::string, std::vector<AggregatedIOMetrics>> subvolume_metrics_map;
- uint64_t subv_metrics_tracker_window_time_sec = 300;
+ uint64_t subv_window_sec = 0;
+
+ // maps subvolume_id (inode number) -> quota info, updated when quota is broadcast to clients
+ // used_bytes is fetched dynamically from inode rstat, not cached here
+ struct SubvolumeQuotaInfo {
+ uint64_t quota_bytes = 0;
+ uint64_t used_bytes = 0;
+ std::chrono::steady_clock::time_point last_activity;
+ };
+ std::unordered_map<inodeno_t, SubvolumeQuotaInfo> subvolume_quota;
// address of rank 0 mds, so that the message can be sent withoutå
// acquiring mds_lock. misdirected messages to rank 0 are taken
// care of by rank 0.
void handle_client_metrics(const cref_t<MClientMetrics> &m);
void handle_mds_ping(const cref_t<MMDSPing> &m);
- void update_rank0();
+ void update_rank0(std::unique_lock<ceph::mutex>& locker);
+
+ // RAII helper to temporarily unlock/relock a unique_lock
+ struct UnlockGuard {
+ std::unique_lock<ceph::mutex>& lk;
+ explicit UnlockGuard(std::unique_lock<ceph::mutex>& l) : lk(l) { lk.unlock(); }
+ ~UnlockGuard() noexcept {
+ if (!lk.owns_lock()) {
+ try { lk.lock(); } catch (...) {
+ // avoid throwing from destructor
+ }
+ }
+ }
+ };
void aggregate_subvolume_metrics(const std::string& subvolume_path,
- const std::vector<AggregatedIOMetrics>& metrics_list, SubvolumeMetric &res);
+ const std::vector<AggregatedIOMetrics>& metrics_list,
+ const std::unordered_map<inodeno_t, uint64_t>& subvol_used_bytes,
+ SubvolumeMetric &res);
+
+ void sample_cpu_usage();
+ void sample_open_requests();
+
+ PerfCounters *rank_perf_counters = nullptr;
+ long clk_tck = 0;
+ struct RankTelemetry {
+ RankPerfMetrics metrics;
+ bool cpu_sample_initialized = false;
+ uint64_t last_cpu_total_ticks = 0;
+ std::chrono::steady_clock::time_point last_cpu_sample_time{};
+ } rank_telemetry;
};
#endif // CEPH_MDS_METRICS_HANDLER_H
f->dump_unsigned("avg_read_latency", avg_read_latency);
f->dump_unsigned("avg_write_latency", avg_write_latency);
f->dump_unsigned("time_window_sec", time_stamp);
+ f->dump_unsigned("quota_bytes", quota_bytes);
+ f->dump_unsigned("used_bytes", used_bytes);
}
std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m) {
<< ", write_size=" << m.write_size
<< ", avg_read_lat=" << m.avg_read_latency
<< ", avg_write_lat=" << m.avg_write_latency
- << ", time_window_sec=" << m.time_stamp << "}";
+ << ", time_window_sec=" << m.time_stamp
+ << ", quota_bytes=" << m.quota_bytes
+ << ", used_bytes=" << m.used_bytes << "}";
return os;
}
uint64_t avg_read_latency = 0;
uint64_t avg_write_latency = 0;
uint64_t time_stamp = 0;
+ uint64_t quota_bytes = 0;
+ uint64_t used_bytes = 0;
DENC(SubvolumeMetric, v, p) {
- DENC_START(1, 1, p);
+ DENC_START(2, 1, p);
denc(v.subvolume_path, p);
denc(v.read_ops, p);
denc(v.write_ops, p);
denc(v.avg_read_latency, p);
denc(v.avg_write_latency, p);
denc(v.time_stamp, p);
+ if (struct_v >= 2) {
+ denc(v.quota_bytes, p);
+ denc(v.used_bytes, p);
+ } else {
+ auto &mutable_v = const_cast<SubvolumeMetric&>(v);
+ mutable_v.quota_bytes = 0;
+ mutable_v.used_bytes = 0;
+ }
DENC_FINISH(p);
}