templatize metrics collection so as to reuse quering routines.
`MetricCollector` can be subclassed and along with implementing
` process_reports()` to process incoming metrics data.
also, generalize metrics data in `MMgrReport` and metric query
configuration in `MMgrConfigure`.
Signed-off-by: Venky Shankar <vshankar@redhat.com>
#define CEPH_MMGRCONFIGURE_H_
#include "msg/Message.h"
+#include "mgr/MetricTypes.h"
#include "mgr/OSDPerfMetricTypes.h"
/**
*/
class MMgrConfigure : public Message {
private:
- static constexpr int HEAD_VERSION = 3;
+ static constexpr int HEAD_VERSION = 4;
static constexpr int COMPAT_VERSION = 1;
public:
std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> osd_perf_metric_queries;
+ boost::optional<MetricConfigMessage> metric_config_message;
+
void decode_payload() override
{
using ceph::decode;
if (header.version >= 3) {
decode(osd_perf_metric_queries, p);
}
+ if (header.version >= 4) {
+ decode(metric_config_message, p);
+ }
}
void encode_payload(uint64_t features) override {
encode(stats_period, payload);
encode(stats_threshold, payload);
encode(osd_perf_metric_queries, payload);
+ encode(metric_config_message, payload);
}
std::string_view get_type_name() const override { return "mgrconfigure"; }
#include <boost/optional.hpp>
#include "msg/Message.h"
+#include "mgr/MetricTypes.h"
#include "mgr/OSDPerfMetricTypes.h"
#include "common/perf_counters.h"
class MMgrReport : public Message {
private:
- static constexpr int HEAD_VERSION = 8;
+ static constexpr int HEAD_VERSION = 9;
static constexpr int COMPAT_VERSION = 1;
public:
std::map<OSDPerfMetricQuery, OSDPerfMetricReport> osd_perf_metric_reports;
+ boost::optional<MetricReportMessage> metric_report_message;
+
void decode_payload() override
{
using ceph::decode;
if (header.version >= 8) {
decode(task_status, p);
}
+ if (header.version >= 9) {
+ decode(metric_report_message, p);
+ }
}
void encode_payload(uint64_t features) override {
encode(config_bl, payload);
encode(osd_perf_metric_reports, payload);
encode(task_status, payload);
+ encode(metric_report_message, payload);
}
std::string_view get_type_name() const override { return "mgrreport"; }
modules.at(module_name)->set_uri(uri);
}
-OSDPerfMetricQueryID ActivePyModules::add_osd_perf_query(
+MetricQueryID ActivePyModules::add_osd_perf_query(
const OSDPerfMetricQuery &query,
const std::optional<OSDPerfMetricLimit> &limit)
{
return server.add_osd_perf_query(query, limit);
}
-void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
+void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id)
{
int r = server.remove_osd_perf_query(query_id);
if (r < 0) {
}
}
-PyObject *ActivePyModules::get_osd_perf_counters(OSDPerfMetricQueryID query_id)
+PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id)
{
std::map<OSDPerfMetricKey, PerformanceCounters> counters;
const std::string &svc_id,
const std::string &path) const;
- OSDPerfMetricQueryID add_osd_perf_query(
+ MetricQueryID add_osd_perf_query(
const OSDPerfMetricQuery &query,
const std::optional<OSDPerfMetricLimit> &limit);
- void remove_osd_perf_query(OSDPerfMetricQueryID query_id);
- PyObject *get_osd_perf_counters(OSDPerfMetricQueryID query_id);
+ void remove_osd_perf_query(MetricQueryID query_id);
+ PyObject *get_osd_perf_counters(MetricQueryID query_id);
bool get_store(const std::string &module_name,
const std::string &key, std::string *val) const;
#include "mon/MonClient.h"
#include "common/errno.h"
#include "common/version.h"
+#include "mgr/Types.h"
#include "PyUtil.h"
#include "BaseMgrModule.h"
static PyObject*
ceph_remove_osd_perf_query(BaseMgrModule *self, PyObject *args)
{
- OSDPerfMetricQueryID query_id;
+ MetricQueryID query_id;
if (!PyArg_ParseTuple(args, "i:ceph_remove_osd_perf_query", &query_id)) {
derr << "Invalid args!" << dendl;
return nullptr;
static PyObject*
ceph_get_osd_perf_counters(BaseMgrModule *self, PyObject *args)
{
- OSDPerfMetricQueryID query_id;
+ MetricQueryID query_id;
if (!PyArg_ParseTuple(args, "i:ceph_get_osd_perf_counters", &query_id)) {
derr << "Invalid args!" << dendl;
return nullptr;
Gil.cc
Mgr.cc
MgrStandby.cc
+ MetricCollector.cc
OSDPerfMetricTypes.cc
OSDPerfMetricCollector.cc
PyFormatter.cc
osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
}
+ if (m->metric_report_message) {
+ const MetricReportMessage &message = *m->metric_report_message;
+ boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
+ }
+
return true;
}
c->send_message2(configure);
}
-OSDPerfMetricQueryID DaemonServer::add_osd_perf_query(
+MetricQueryID DaemonServer::add_osd_perf_query(
const OSDPerfMetricQuery &query,
const std::optional<OSDPerfMetricLimit> &limit)
{
return osd_perf_metric_collector.add_query(query, limit);
}
-int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
+int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
{
return osd_perf_metric_collector.remove_query(query_id);
}
int DaemonServer::get_osd_perf_counters(
- OSDPerfMetricQueryID query_id,
+ MetricQueryID query_id,
std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
{
return osd_perf_metric_collector.get_counters(query_id, counters);
#include <set>
#include <string>
+#include <boost/variant.hpp>
#include "common/ceph_mutex.h"
#include "common/LogClient.h"
#include "ServiceMap.h"
#include "MgrSession.h"
#include "DaemonState.h"
+#include "MetricCollector.h"
#include "OSDPerfMetricCollector.h"
class MMgrReport;
void tick();
void schedule_tick_locked(double delay_sec);
- class OSDPerfMetricCollectorListener :
- public OSDPerfMetricCollector::Listener {
+ class OSDPerfMetricCollectorListener : public MetricListener {
public:
OSDPerfMetricCollectorListener(DaemonServer *server)
: server(server) {
OSDPerfMetricCollector osd_perf_metric_collector;
void handle_osd_perf_metric_query_updated();
+ void handle_metric_payload(const OSDMetricPayload &payload) {
+ osd_perf_metric_collector.process_reports(payload);
+ }
+
+ void handle_metric_payload(const UnknownMetricPayload &payload) {
+ ceph_abort();
+ }
+
+ struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ DaemonServer *server;
+
+ HandlePayloadVisitor(DaemonServer *server)
+ : server(server) {
+ }
+
+ template <typename MetricPayload>
+ inline void operator()(const MetricPayload &payload) const {
+ server->handle_metric_payload(payload);
+ }
+ };
+
public:
int init(uint64_t gid, entity_addrvec_t client_addrs);
void shutdown();
void _send_configure(ConnectionRef c);
- OSDPerfMetricQueryID add_osd_perf_query(
+ MetricQueryID add_osd_perf_query(
const OSDPerfMetricQuery &query,
const std::optional<OSDPerfMetricLimit> &limit);
- int remove_osd_perf_query(OSDPerfMetricQueryID query_id);
- int get_osd_perf_counters(OSDPerfMetricQueryID query_id,
+ int remove_osd_perf_query(MetricQueryID query_id);
+ int get_osd_perf_counters(MetricQueryID query_id,
std::map<OSDPerfMetricKey, PerformanceCounters> *c);
virtual const char** get_tracked_conf_keys() const override;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+
+#include "mgr/MetricCollector.h"
+#include "mgr/OSDPerfMetricTypes.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr.metric_collector " << __func__ << ": "
+
+template <typename Query, typename Limit, typename Key, typename Report>
+MetricCollector<Query, Limit, Key, Report>::MetricCollector(MetricListener &listener)
+ : listener(listener)
+{
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+MetricQueryID MetricCollector<Query, Limit, Key, Report>::add_query(
+ const Query &query,
+ const std::optional<Limit> &limit) {
+ dout(20) << "query=" << query << ", limit=" << limit << dendl;
+ uint64_t query_id;
+ bool notify = false;
+
+ {
+ std::lock_guard locker(lock);
+
+ query_id = next_query_id++;
+ auto it = queries.find(query);
+ if (it == queries.end()) {
+ it = queries.emplace(query, std::map<MetricQueryID, OptionalLimit>{}).first;
+ notify = true;
+ } else if (is_limited(it->second)) {
+ notify = true;
+ }
+
+ it->second.emplace(query_id, limit);
+ counters.emplace(query_id, std::map<Key, PerformanceCounters>{});
+ }
+
+ dout(10) << query << " " << (limit ? stringify(*limit) : "unlimited")
+ << " query_id=" << query_id << dendl;
+
+ if (notify) {
+ listener.handle_query_updated();
+ }
+
+ return query_id;
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+int MetricCollector<Query, Limit, Key, Report>::remove_query(MetricQueryID query_id) {
+ dout(20) << "query_id=" << query_id << dendl;
+ bool found = false;
+ bool notify = false;
+
+ {
+ std::lock_guard locker(lock);
+
+ for (auto it = queries.begin() ; it != queries.end();) {
+ auto iter = it->second.find(query_id);
+ if (iter == it->second.end()) {
+ ++it;
+ continue;
+ }
+
+ it->second.erase(iter);
+ if (it->second.empty()) {
+ it = queries.erase(it);
+ notify = true;
+ } else if (is_limited(it->second)) {
+ ++it;
+ notify = true;
+ }
+ found = true;
+ break;
+ }
+ counters.erase(query_id);
+ }
+
+ if (!found) {
+ dout(10) << query_id << " not found" << dendl;
+ return -ENOENT;
+ }
+
+ dout(10) << query_id << dendl;
+
+ if (notify) {
+ listener.handle_query_updated();
+ }
+
+ return 0;
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+void MetricCollector<Query, Limit, Key, Report>::remove_all_queries() {
+ dout(20) << dendl;
+ bool notify;
+
+ {
+ std::lock_guard locker(lock);
+
+ notify = !queries.empty();
+ queries.clear();
+ }
+
+ if (notify) {
+ listener.handle_query_updated();
+ }
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+int MetricCollector<Query, Limit, Key, Report>::get_counters(
+ MetricQueryID query_id, std::map<Key, PerformanceCounters> *c) {
+ dout(20) << dendl;
+
+ std::lock_guard locker(lock);
+
+ auto it = counters.find(query_id);
+ if (it == counters.end()) {
+ dout(10) << "counters for " << query_id << " not found" << dendl;
+ return -ENOENT;
+ }
+
+ *c = std::move(it->second);
+ it->second.clear();
+
+ return 0;
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+void MetricCollector<Query, Limit, Key, Report>::process_reports_generic(
+ const std::map<Query, Report> &reports, UpdateCallback callback) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+
+ if (reports.empty()) {
+ return;
+ }
+
+ for (auto& [query, report] : reports) {
+ dout(10) << "report for " << query << " query: "
+ << report.group_packed_performance_counters.size() << " records"
+ << dendl;
+
+ for (auto& [key, bl] : report.group_packed_performance_counters) {
+ auto bl_it = bl.cbegin();
+
+ for (auto& p : queries[query]) {
+ auto &key_counters = counters[p.first][key];
+ if (key_counters.empty()) {
+ key_counters.resize(query.performance_counter_descriptors.size(),
+ {0, 0});
+ }
+ }
+
+ auto desc_it = report.performance_counter_descriptors.begin();
+ for (size_t i = 0; i < query.performance_counter_descriptors.size(); i++) {
+ if (desc_it == report.performance_counter_descriptors.end()) {
+ break;
+ }
+ if (*desc_it != query.performance_counter_descriptors[i]) {
+ continue;
+ }
+ PerformanceCounter c;
+ desc_it->unpack_counter(bl_it, &c);
+ dout(20) << "counter " << key << " " << *desc_it << ": " << c << dendl;
+
+ for (auto& p : queries[query]) {
+ auto &key_counters = counters[p.first][key];
+ callback(&key_counters[i], c);
+ }
+ desc_it++;
+ }
+ }
+ }
+}
+
+template class
+MetricCollector<OSDPerfMetricQuery, OSDPerfMetricLimit, OSDPerfMetricKey, OSDPerfMetricReport>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MGR_METRIC_COLLECTOR_H
+#define CEPH_MGR_METRIC_COLLECTOR_H
+
+#include <map>
+#include <set>
+#include <tuple>
+#include <vector>
+#include <utility>
+#include <algorithm>
+
+#include "common/ceph_mutex.h"
+#include "msg/Message.h"
+#include "mgr/Types.h"
+#include "mgr/MetricTypes.h"
+
+class MMgrReport;
+
+template <typename Query, typename Limit, typename Key, typename Report>
+class MetricCollector {
+public:
+ virtual ~MetricCollector() {
+ }
+
+ using Limits = std::set<Limit>;
+
+ MetricCollector(MetricListener &listener);
+
+ MetricQueryID add_query(const Query &query, const std::optional<Limit> &limit);
+
+ int remove_query(MetricQueryID query_id);
+
+ void remove_all_queries();
+
+ int get_counters(MetricQueryID query_id, std::map<Key, PerformanceCounters> *counters);
+
+ std::map<Query, Limits> get_queries() const {
+ std::lock_guard locker(lock);
+
+ std::map<Query, Limits> result;
+ for (auto& [query, limits] : queries) {
+ auto result_it = result.insert({query, {}}).first;
+ if (is_limited(limits)) {
+ for (auto& limit : limits) {
+ result_it->second.insert(*limit.second);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ virtual void process_reports(const MetricPayload &payload) = 0;
+
+protected:
+ typedef std::optional<Limit> OptionalLimit;
+ typedef std::map<MetricQueryID, OptionalLimit> QueryIDLimit;
+ typedef std::map<Query, QueryIDLimit> Queries;
+ typedef std::map<MetricQueryID, std::map<Key, PerformanceCounters>> Counters;
+ typedef std::function<void(PerformanceCounter *, const PerformanceCounter &)> UpdateCallback;
+
+ mutable ceph::mutex lock = ceph::make_mutex("mgr::metric::collector::lock");
+
+ Queries queries;
+ Counters counters;
+
+ void process_reports_generic(const std::map<Query, Report> &reports, UpdateCallback callback);
+
+private:
+ MetricListener &listener;
+ MetricQueryID next_query_id = 0;
+
+ bool is_limited(const std::map<MetricQueryID, OptionalLimit> &limits) const {
+ return std::any_of(begin(limits), end(limits),
+ [](auto &limits) { return limits.second.has_value(); });
+ }
+};
+
+#endif // CEPH_MGR_METRIC_COLLECTOR_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MGR_METRIC_TYPES_H
+#define CEPH_MGR_METRIC_TYPES_H
+
+#include <boost/variant.hpp>
+#include "include/denc.h"
+#include "mgr/OSDPerfMetricTypes.h"
+
+enum class MetricReportType {
+ METRIC_REPORT_TYPE_OSD = 0,
+};
+
+struct OSDMetricPayload {
+ static const MetricReportType METRIC_REPORT_TYPE = MetricReportType::METRIC_REPORT_TYPE_OSD;
+ std::map<OSDPerfMetricQuery, OSDPerfMetricReport> report;
+
+ OSDMetricPayload() {
+ }
+ OSDMetricPayload(const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &report)
+ : report(report) {
+ }
+
+ DENC(OSDMetricPayload, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.report, p);
+ DENC_FINISH(p);
+ }
+};
+
+struct UnknownMetricPayload {
+ static const MetricReportType METRIC_REPORT_TYPE = static_cast<MetricReportType>(-1);
+
+ UnknownMetricPayload() { }
+
+ DENC(UnknownMetricPayload, v, p) {
+ ceph_abort();
+ }
+};
+
+WRITE_CLASS_DENC(OSDMetricPayload)
+WRITE_CLASS_DENC(UnknownMetricPayload)
+
+typedef boost::variant<OSDMetricPayload,
+ UnknownMetricPayload> MetricPayload;
+
+class EncodeMetricPayloadVisitor : public boost::static_visitor<void> {
+public:
+ explicit EncodeMetricPayloadVisitor(bufferlist &bl) : m_bl(bl) {
+ }
+
+ template <typename MetricPayload>
+ inline void operator()(const MetricPayload &payload) const {
+ using ceph::encode;
+ encode(static_cast<uint32_t>(MetricPayload::METRIC_REPORT_TYPE), m_bl);
+ encode(payload, m_bl);
+ }
+
+private:
+ bufferlist &m_bl;
+};
+
+class DecodeMetricPayloadVisitor : public boost::static_visitor<void> {
+public:
+ DecodeMetricPayloadVisitor(bufferlist::const_iterator &iter) : m_iter(iter) {
+ }
+
+ template <typename MetricPayload>
+ inline void operator()(MetricPayload &payload) const {
+ using ceph::decode;
+ decode(payload, m_iter);
+ }
+
+private:
+ bufferlist::const_iterator &m_iter;
+};
+
+struct MetricReportMessage {
+ MetricPayload payload;
+
+ MetricReportMessage(const MetricPayload &payload = UnknownMetricPayload())
+ : payload(payload) {
+ }
+
+ void encode(bufferlist &bl) const {
+ boost::apply_visitor(EncodeMetricPayloadVisitor(bl), payload);
+ }
+
+ void decode(bufferlist::const_iterator &iter) {
+ using ceph::decode;
+
+ uint32_t metric_report_type;
+ decode(metric_report_type, iter);
+
+ switch (static_cast<MetricReportType>(metric_report_type)) {
+ case MetricReportType::METRIC_REPORT_TYPE_OSD:
+ payload = OSDMetricPayload();
+ break;
+ default:
+ payload = UnknownMetricPayload();
+ break;
+ }
+
+ boost::apply_visitor(DecodeMetricPayloadVisitor(iter), payload);
+ }
+};
+
+WRITE_CLASS_ENCODER(MetricReportMessage);
+
+// variant for sending configure message to mgr clients
+
+enum MetricConfigType {
+ METRIC_CONFIG_TYPE_OSD = 0,
+};
+
+struct OSDConfigPayload {
+ static const MetricConfigType METRIC_CONFIG_TYPE = MetricConfigType::METRIC_CONFIG_TYPE_OSD;
+ std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> config;
+
+ OSDConfigPayload() {
+ }
+ OSDConfigPayload(const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &config)
+ : config(config) {
+ }
+
+ DENC(OSDConfigPayload, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.config, p);
+ DENC_FINISH(p);
+ }
+};
+
+struct UnknownConfigPayload {
+ static const MetricConfigType METRIC_CONFIG_TYPE = static_cast<MetricConfigType>(-1);
+
+ UnknownConfigPayload() { }
+
+ DENC(UnknownConfigPayload, v, p) {
+ ceph_abort();
+ }
+};
+
+WRITE_CLASS_DENC(OSDConfigPayload)
+WRITE_CLASS_DENC(UnknownConfigPayload)
+
+typedef boost::variant<OSDConfigPayload,
+ UnknownConfigPayload> ConfigPayload;
+
+class EncodeConfigPayloadVisitor : public boost::static_visitor<void> {
+public:
+ explicit EncodeConfigPayloadVisitor(bufferlist &bl) : m_bl(bl) {
+ }
+
+ template <typename ConfigPayload>
+ inline void operator()(const ConfigPayload &payload) const {
+ using ceph::encode;
+ encode(static_cast<uint32_t>(ConfigPayload::METRIC_CONFIG_TYPE), m_bl);
+ encode(payload, m_bl);
+ }
+
+private:
+ bufferlist &m_bl;
+};
+
+class DecodeConfigPayloadVisitor : public boost::static_visitor<void> {
+public:
+ DecodeConfigPayloadVisitor(bufferlist::const_iterator &iter) : m_iter(iter) {
+ }
+
+ template <typename ConfigPayload>
+ inline void operator()(ConfigPayload &payload) const {
+ using ceph::decode;
+ decode(payload, m_iter);
+ }
+
+private:
+ bufferlist::const_iterator &m_iter;
+};
+
+struct MetricConfigMessage {
+ ConfigPayload payload;
+
+ MetricConfigMessage(const ConfigPayload &payload = UnknownConfigPayload())
+ : payload(payload) {
+ }
+
+ void encode(bufferlist &bl) const {
+ boost::apply_visitor(EncodeConfigPayloadVisitor(bl), payload);
+ }
+
+ void decode(bufferlist::const_iterator &iter) {
+ using ceph::decode;
+
+ uint32_t metric_config_type;
+ decode(metric_config_type, iter);
+
+ switch (metric_config_type) {
+ case MetricConfigType::METRIC_CONFIG_TYPE_OSD:
+ payload = OSDConfigPayload();
+ break;
+ default:
+ payload = UnknownConfigPayload();
+ break;
+ }
+
+ boost::apply_visitor(DecodeConfigPayloadVisitor(iter), payload);
+ }
+};
+
+WRITE_CLASS_ENCODER(MetricConfigMessage);
+
+#endif // CEPH_MGR_METRIC_TYPES_H
&last_config_bl_version);
if (get_perf_report_cb) {
- get_perf_report_cb(&report->osd_perf_metric_reports);
+ MetricPayload payload = get_perf_report_cb();
+ MetricReportMessage message(payload);
+ report->metric_report_message = message;
}
session->con->send_message2(report);
stats_threshold = m->stats_threshold;
}
- if (set_perf_queries_cb) {
- set_perf_queries_cb(m->osd_perf_metric_queries);
+ if (!m->osd_perf_metric_queries.empty()) {
+ handle_config_payload(m->osd_perf_metric_queries);
+ } else if (m->metric_config_message) {
+ const MetricConfigMessage &message = *m->metric_config_message;
+ boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
}
bool starting = (stats_period == 0) && (m->stats_period != 0);
#ifndef MGR_CLIENT_H_
#define MGR_CLIENT_H_
+#include <boost/variant.hpp>
+
#include "msg/Connection.h"
#include "msg/Dispatcher.h"
#include "mon/MgrMap.h"
#include "mgr/DaemonHealthMetric.h"
#include "messages/MMgrReport.h"
-#include "mgr/OSDPerfMetricTypes.h"
+#include "mgr/MetricTypes.h"
#include "common/perf_counters.h"
#include "common/Timer.h"
// If provided, use this to compose an MPGStats to send with
// our reports (hook for use by OSD)
std::function<MPGStats*()> pgstats_cb;
- std::function<void(const std::map<OSDPerfMetricQuery,
- OSDPerfMetricLimits> &)> set_perf_queries_cb;
- std::function<void(std::map<OSDPerfMetricQuery,
- OSDPerfMetricReport> *)> get_perf_report_cb;
+ std::function<void(const ConfigPayload &)> set_perf_queries_cb;
+ std::function<MetricPayload()> get_perf_report_cb;
// for service registration and beacon
bool service_daemon = false;
int r);
void set_perf_metric_query_cb(
- std::function<void(const std::map<OSDPerfMetricQuery,
- OSDPerfMetricLimits> &)> cb_set,
- std::function<void(std::map<OSDPerfMetricQuery,
- OSDPerfMetricReport> *)> cb_get)
+ std::function<void(const ConfigPayload &)> cb_set,
+ std::function<MetricPayload()> cb_get)
{
std::lock_guard l(lock);
set_perf_queries_cb = cb_set;
bool is_initialized() const { return initialized; }
private:
+ void handle_config_payload(const OSDConfigPayload &payload) {
+ if (set_perf_queries_cb) {
+ set_perf_queries_cb(payload);
+ }
+ }
+
+ void handle_config_payload(const UnknownConfigPayload &payload) {
+ ceph_abort();
+ }
+
+ struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ MgrClient *mgrc;
+
+ HandlePayloadVisitor(MgrClient *mgrc)
+ : mgrc(mgrc) {
+ }
+
+ template <typename ConfigPayload>
+ inline void operator()(const ConfigPayload &payload) const {
+ mgrc->handle_config_payload(payload);
+ }
+ };
+
void _send_stats();
void _send_pgstats();
void _send_report();
#include "common/debug.h"
#include "common/errno.h"
+#include "messages/MMgrReport.h"
#include "OSDPerfMetricCollector.h"
#define dout_context g_ceph_context
#undef dout_prefix
#define dout_prefix *_dout << "mgr.osd_perf_metric_collector " << __func__ << " "
-namespace {
-
-bool is_limited(const std::map<OSDPerfMetricQueryID,
- std::optional<OSDPerfMetricLimit>> &limits) {
- for (auto &it : limits) {
- if (!it.second) {
- return false;
- }
- }
- return true;
+OSDPerfMetricCollector::OSDPerfMetricCollector(MetricListener &listener)
+ : MetricCollector<OSDPerfMetricQuery,
+ OSDPerfMetricLimit,
+ OSDPerfMetricKey,
+ OSDPerfMetricReport>(listener) {
}
-} // anonymous namespace
-
-OSDPerfMetricCollector::OSDPerfMetricCollector(Listener &listener)
- : listener(listener) {
-}
+void OSDPerfMetricCollector::process_reports(const MetricPayload &payload) {
+ const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports =
+ boost::get<OSDMetricPayload>(payload).report;
-std::map<OSDPerfMetricQuery, OSDPerfMetricLimits>
-OSDPerfMetricCollector::get_queries() const {
std::lock_guard locker(lock);
-
- std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> result;
- for (auto &it : queries) {
- auto &query = it.first;
- auto &limits = it.second;
- auto result_it = result.insert({query, {}}).first;
- if (is_limited(limits)) {
- for (auto &iter : limits) {
- result_it->second.insert(*iter.second);
- }
- }
- }
-
- return result;
-}
-
-OSDPerfMetricQueryID OSDPerfMetricCollector::add_query(
- const OSDPerfMetricQuery& query,
- const std::optional<OSDPerfMetricLimit> &limit) {
- uint64_t query_id;
- bool notify = false;
-
- {
- std::lock_guard locker(lock);
-
- query_id = next_query_id++;
- auto it = queries.find(query);
- if (it == queries.end()) {
- it = queries.insert({query, {}}).first;
- notify = true;
- } else if (is_limited(it->second)) {
- notify = true;
- }
- it->second.insert({query_id, limit});
- counters[query_id];
- }
-
- dout(10) << query << " " << (limit ? stringify(*limit) : "unlimited")
- << " query_id=" << query_id << dendl;
-
- if (notify) {
- listener.handle_query_updated();
- }
-
- return query_id;
-}
-
-int OSDPerfMetricCollector::remove_query(int query_id) {
- bool found = false;
- bool notify = false;
-
- {
- std::lock_guard locker(lock);
-
- for (auto it = queries.begin() ; it != queries.end(); it++) {
- auto iter = it->second.find(query_id);
- if (iter == it->second.end()) {
- continue;
- }
-
- it->second.erase(iter);
- if (it->second.empty()) {
- queries.erase(it);
- notify = true;
- } else if (is_limited(it->second)) {
- notify = true;
- }
- found = true;
- break;
- }
- counters.erase(query_id);
- }
-
- if (!found) {
- dout(10) << query_id << " not found" << dendl;
- return -ENOENT;
- }
-
- dout(10) << query_id << dendl;
-
- if (notify) {
- listener.handle_query_updated();
- }
-
- return 0;
-}
-
-void OSDPerfMetricCollector::remove_all_queries() {
- dout(10) << dendl;
-
- bool notify;
-
- {
- std::lock_guard locker(lock);
-
- notify = !queries.empty();
- queries.clear();
- }
-
- if (notify) {
- listener.handle_query_updated();
- }
-}
-
-int OSDPerfMetricCollector::get_counters(
- OSDPerfMetricQueryID query_id,
- std::map<OSDPerfMetricKey, PerformanceCounters> *c) {
- std::lock_guard locker(lock);
-
- auto it = counters.find(query_id);
- if (it == counters.end()) {
- dout(10) << "counters for " << query_id << " not found" << dendl;
- return -ENOENT;
- }
-
- *c = std::move(it->second);
- it->second.clear();
-
- return 0;
-}
-
-void OSDPerfMetricCollector::process_reports(
- const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports) {
-
- if (reports.empty()) {
- return;
- }
-
- std::lock_guard locker(lock);
-
- for (auto &it : reports) {
- auto &query = it.first;
- auto &report = it.second;
- dout(10) << "report for " << query << " query: "
- << report.group_packed_performance_counters.size() << " records"
- << dendl;
-
- for (auto &it : report.group_packed_performance_counters) {
- auto &key = it.first;
- auto bl_it = it.second.cbegin();
-
- for (auto &queries_it : queries[query]) {
- auto query_id = queries_it.first;
- auto &key_counters = counters[query_id][key];
- if (key_counters.empty()) {
- key_counters.resize(query.performance_counter_descriptors.size(),
- {0, 0});
- }
- }
-
- auto desc_it = report.performance_counter_descriptors.begin();
- for (size_t i = 0; i < query.performance_counter_descriptors.size(); i++) {
- if (desc_it == report.performance_counter_descriptors.end()) {
- break;
- }
- if (*desc_it != query.performance_counter_descriptors[i]) {
- continue;
- }
- PerformanceCounter c;
- desc_it->unpack_counter(bl_it, &c);
- dout(20) << "counter " << key << " " << *desc_it << ": " << c << dendl;
-
- for (auto &queries_it : queries[query]) {
- auto query_id = queries_it.first;
- auto &key_counters = counters[query_id][key];
- key_counters[i].first += c.first;
- key_counters[i].second += c.second;
- }
- desc_it++;
- }
- }
- }
+ process_reports_generic(
+ reports, [](PerformanceCounter *counter, const PerformanceCounter &update) {
+ counter->first += update.first;
+ counter->second += update.second;
+ });
}
#ifndef OSD_PERF_METRIC_COLLECTOR_H_
#define OSD_PERF_METRIC_COLLECTOR_H_
-#include "common/ceph_mutex.h"
-
+#include "mgr/MetricCollector.h"
#include "mgr/OSDPerfMetricTypes.h"
-#include <map>
-
/**
* OSD performance query class.
*/
-class OSDPerfMetricCollector {
+class OSDPerfMetricCollector
+ : public MetricCollector<OSDPerfMetricQuery, OSDPerfMetricLimit, OSDPerfMetricKey,
+ OSDPerfMetricReport> {
public:
- struct Listener {
- virtual ~Listener() {
- }
-
- virtual void handle_query_updated() = 0;
- };
-
- OSDPerfMetricCollector(Listener &listener);
-
- std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> get_queries() const;
-
- OSDPerfMetricQueryID add_query(
- const OSDPerfMetricQuery& query,
- const std::optional<OSDPerfMetricLimit> &limit);
- int remove_query(OSDPerfMetricQueryID query_id);
- void remove_all_queries();
-
- int get_counters(OSDPerfMetricQueryID query_id,
- std::map<OSDPerfMetricKey, PerformanceCounters> *counters);
-
- void process_reports(
- const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports);
-
-private:
- typedef std::optional<OSDPerfMetricLimit> OptionalLimit;
- typedef std::map<OSDPerfMetricQuery,
- std::map<OSDPerfMetricQueryID, OptionalLimit>> Queries;
- typedef std::map<OSDPerfMetricQueryID,
- std::map<OSDPerfMetricKey, PerformanceCounters>> Counters;
+ OSDPerfMetricCollector(MetricListener &listener);
- Listener &listener;
- mutable ceph::mutex lock = ceph::make_mutex("OSDPerfMetricCollector::lock");
- OSDPerfMetricQueryID next_query_id = 0;
- Queries queries;
- Counters counters;
+ void process_reports(const MetricPayload &payload) override;
};
#endif // OSD_PERF_METRIC_COLLECTOR_H_
#include "include/denc.h"
#include "include/stringify.h"
+#include "mgr/Types.h"
+
#include <regex>
typedef std::vector<std::string> OSDPerfMetricSubKey; // array of regex match
}
};
-typedef std::pair<uint64_t,uint64_t> PerformanceCounter;
-typedef std::vector<PerformanceCounter> PerformanceCounters;
-
enum class PerformanceCounterType : uint8_t {
OPS = 0,
WRITE_OPS = 1,
typedef std::set<OSDPerfMetricLimit> OSDPerfMetricLimits;
-typedef int OSDPerfMetricQueryID;
-
struct OSDPerfMetricQuery {
bool operator<(const OSDPerfMetricQuery &other) const {
if (key_descriptor < other.key_descriptor) {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MGR_TYPES_H
+#define CEPH_MGR_TYPES_H
+
+typedef int MetricQueryID;
+
+typedef std::pair<uint64_t,uint64_t> PerformanceCounter;
+typedef std::vector<PerformanceCounter> PerformanceCounters;
+
+struct MetricListener {
+ virtual ~MetricListener() {
+ }
+
+ virtual void handle_query_updated() = 0;
+};
+
+#endif // CEPH_MGR_TYPES_H
mgrc.set_pgstats_cb([this](){ return collect_pg_stats(); });
mgrc.set_perf_metric_query_cb(
- [this](const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries) {
- set_perf_queries(queries);
+ [this](const ConfigPayload &config_payload) {
+ set_perf_queries(config_payload);
},
- [this](std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports) {
- get_perf_reports(reports);
+ [this] {
+ return get_perf_reports();
});
mgrc.init();
// --------------------------------
-void OSD::set_perf_queries(
- const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries) {
+void OSD::set_perf_queries(const ConfigPayload &config_payload) {
+ const OSDConfigPayload &osd_config_payload = boost::get<OSDConfigPayload>(config_payload);
+ const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries = osd_config_payload.config;
dout(10) << "setting " << queries.size() << " queries" << dendl;
std::list<OSDPerfMetricQuery> supported_queries;
}
}
-void OSD::get_perf_reports(
- std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports) {
+MetricPayload OSD::get_perf_reports() {
+ OSDMetricPayload payload;
+ std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports = payload.report;
+
std::vector<PGRef> pgs;
_get_pgs(&pgs);
DynamicPerfStats dps;
pg->unlock();
dps.merge(pg_dps);
}
- dps.add_to_reports(m_perf_limits, reports);
- dout(20) << "reports for " << reports->size() << " queries" << dendl;
+ dps.add_to_reports(m_perf_limits, &reports);
+ dout(20) << "reports for " << reports.size() << " queries" << dendl;
+
+ return payload;
}
// =============================================================
friend class OSDService;
private:
- void set_perf_queries(
- const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries);
- void get_perf_reports(
- std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
+ void set_perf_queries(const ConfigPayload &config_payload);
+ MetricPayload get_perf_reports();
ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
std::list<OSDPerfMetricQuery> m_perf_queries;