From efcebe1eb4ae16ea9c633436e3e105add4a2662c Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Tue, 10 Sep 2019 09:49:04 -0400 Subject: [PATCH] mgr: templatize/generalize metrics collection interface templatize metrics collection so as to reuse quering routines. `MetricCollector` can be subclassed and along with implementing ` process_reports()` to process incoming metrics data. also, generalize metrics data in `MMgrReport` and metric query configuration in `MMgrConfigure`. Signed-off-by: Venky Shankar --- src/messages/MMgrConfigure.h | 9 +- src/messages/MMgrReport.h | 9 +- src/mgr/ActivePyModules.cc | 6 +- src/mgr/ActivePyModules.h | 6 +- src/mgr/BaseMgrModule.cc | 5 +- src/mgr/CMakeLists.txt | 1 + src/mgr/DaemonServer.cc | 11 +- src/mgr/DaemonServer.h | 32 ++++- src/mgr/MetricCollector.cc | 183 +++++++++++++++++++++++++ src/mgr/MetricCollector.h | 81 ++++++++++++ src/mgr/MetricTypes.h | 213 ++++++++++++++++++++++++++++++ src/mgr/MgrClient.cc | 11 +- src/mgr/MgrClient.h | 39 ++++-- src/mgr/OSDPerfMetricCollector.cc | 203 ++-------------------------- src/mgr/OSDPerfMetricCollector.h | 45 +------ src/mgr/OSDPerfMetricTypes.h | 7 +- src/mgr/Types.h | 19 +++ src/osd/OSD.cc | 25 ++-- src/osd/OSD.h | 6 +- 19 files changed, 634 insertions(+), 277 deletions(-) create mode 100644 src/mgr/MetricCollector.cc create mode 100644 src/mgr/MetricCollector.h create mode 100644 src/mgr/MetricTypes.h create mode 100644 src/mgr/Types.h diff --git a/src/messages/MMgrConfigure.h b/src/messages/MMgrConfigure.h index 1cf7bf7888911..aaa36790a4c06 100644 --- a/src/messages/MMgrConfigure.h +++ b/src/messages/MMgrConfigure.h @@ -16,6 +16,7 @@ #define CEPH_MMGRCONFIGURE_H_ #include "msg/Message.h" +#include "mgr/MetricTypes.h" #include "mgr/OSDPerfMetricTypes.h" /** @@ -24,7 +25,7 @@ */ class MMgrConfigure : public Message { private: - static constexpr int HEAD_VERSION = 3; + static constexpr int HEAD_VERSION = 4; static constexpr int COMPAT_VERSION = 1; public: @@ -35,6 +36,8 @@ public: std::map osd_perf_metric_queries; + boost::optional metric_config_message; + void decode_payload() override { using ceph::decode; @@ -46,6 +49,9 @@ public: if (header.version >= 3) { decode(osd_perf_metric_queries, p); } + if (header.version >= 4) { + decode(metric_config_message, p); + } } void encode_payload(uint64_t features) override { @@ -53,6 +59,7 @@ public: encode(stats_period, payload); encode(stats_threshold, payload); encode(osd_perf_metric_queries, payload); + encode(metric_config_message, payload); } std::string_view get_type_name() const override { return "mgrconfigure"; } diff --git a/src/messages/MMgrReport.h b/src/messages/MMgrReport.h index 25ac6a9e53ffd..d9ef4eee957d7 100644 --- a/src/messages/MMgrReport.h +++ b/src/messages/MMgrReport.h @@ -18,6 +18,7 @@ #include #include "msg/Message.h" +#include "mgr/MetricTypes.h" #include "mgr/OSDPerfMetricTypes.h" #include "common/perf_counters.h" @@ -73,7 +74,7 @@ WRITE_CLASS_ENCODER(PerfCounterType) class MMgrReport : public Message { private: - static constexpr int HEAD_VERSION = 8; + static constexpr int HEAD_VERSION = 9; static constexpr int COMPAT_VERSION = 1; public: @@ -107,6 +108,8 @@ public: std::map osd_perf_metric_reports; + boost::optional metric_report_message; + void decode_payload() override { using ceph::decode; @@ -132,6 +135,9 @@ public: if (header.version >= 8) { decode(task_status, p); } + if (header.version >= 9) { + decode(metric_report_message, p); + } } void encode_payload(uint64_t features) override { @@ -146,6 +152,7 @@ public: encode(config_bl, payload); encode(osd_perf_metric_reports, payload); encode(task_status, payload); + encode(metric_report_message, payload); } std::string_view get_type_name() const override { return "mgrreport"; } diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index 36a5f74bafea5..4d38a6e1e0f21 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -1011,14 +1011,14 @@ void ActivePyModules::set_uri(const std::string& module_name, modules.at(module_name)->set_uri(uri); } -OSDPerfMetricQueryID ActivePyModules::add_osd_perf_query( +MetricQueryID ActivePyModules::add_osd_perf_query( const OSDPerfMetricQuery &query, const std::optional &limit) { return server.add_osd_perf_query(query, limit); } -void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id) +void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id) { int r = server.remove_osd_perf_query(query_id); if (r < 0) { @@ -1027,7 +1027,7 @@ void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id) } } -PyObject *ActivePyModules::get_osd_perf_counters(OSDPerfMetricQueryID query_id) +PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id) { std::map counters; diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h index a8a0f3c4cf050..192360400c762 100644 --- a/src/mgr/ActivePyModules.h +++ b/src/mgr/ActivePyModules.h @@ -101,11 +101,11 @@ public: const std::string &svc_id, const std::string &path) const; - OSDPerfMetricQueryID add_osd_perf_query( + MetricQueryID add_osd_perf_query( const OSDPerfMetricQuery &query, const std::optional &limit); - void remove_osd_perf_query(OSDPerfMetricQueryID query_id); - PyObject *get_osd_perf_counters(OSDPerfMetricQueryID query_id); + void remove_osd_perf_query(MetricQueryID query_id); + PyObject *get_osd_perf_counters(MetricQueryID query_id); bool get_store(const std::string &module_name, const std::string &key, std::string *val) const; diff --git a/src/mgr/BaseMgrModule.cc b/src/mgr/BaseMgrModule.cc index c8d30875bf412..7d603d45a6e3c 100644 --- a/src/mgr/BaseMgrModule.cc +++ b/src/mgr/BaseMgrModule.cc @@ -25,6 +25,7 @@ #include "mon/MonClient.h" #include "common/errno.h" #include "common/version.h" +#include "mgr/Types.h" #include "PyUtil.h" #include "BaseMgrModule.h" @@ -992,7 +993,7 @@ ceph_add_osd_perf_query(BaseMgrModule *self, PyObject *args) static PyObject* ceph_remove_osd_perf_query(BaseMgrModule *self, PyObject *args) { - OSDPerfMetricQueryID query_id; + MetricQueryID query_id; if (!PyArg_ParseTuple(args, "i:ceph_remove_osd_perf_query", &query_id)) { derr << "Invalid args!" << dendl; return nullptr; @@ -1005,7 +1006,7 @@ ceph_remove_osd_perf_query(BaseMgrModule *self, PyObject *args) static PyObject* ceph_get_osd_perf_counters(BaseMgrModule *self, PyObject *args) { - OSDPerfMetricQueryID query_id; + MetricQueryID query_id; if (!PyArg_ParseTuple(args, "i:ceph_get_osd_perf_counters", &query_id)) { derr << "Invalid args!" << dendl; return nullptr; diff --git a/src/mgr/CMakeLists.txt b/src/mgr/CMakeLists.txt index e69335eb69767..101db6ed88956 100644 --- a/src/mgr/CMakeLists.txt +++ b/src/mgr/CMakeLists.txt @@ -17,6 +17,7 @@ if(WITH_MGR) Gil.cc Mgr.cc MgrStandby.cc + MetricCollector.cc OSDPerfMetricTypes.cc OSDPerfMetricCollector.cc PyFormatter.cc diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index 7b81ffff02720..54d2caeab263b 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -633,6 +633,11 @@ bool DaemonServer::handle_report(const ref_t& m) osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports); } + if (m->metric_report_message) { + const MetricReportMessage &message = *m->metric_report_message; + boost::apply_visitor(HandlePayloadVisitor(this), message.payload); + } + return true; } @@ -2848,20 +2853,20 @@ void DaemonServer::_send_configure(ConnectionRef c) c->send_message2(configure); } -OSDPerfMetricQueryID DaemonServer::add_osd_perf_query( +MetricQueryID DaemonServer::add_osd_perf_query( const OSDPerfMetricQuery &query, const std::optional &limit) { return osd_perf_metric_collector.add_query(query, limit); } -int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id) +int DaemonServer::remove_osd_perf_query(MetricQueryID query_id) { return osd_perf_metric_collector.remove_query(query_id); } int DaemonServer::get_osd_perf_counters( - OSDPerfMetricQueryID query_id, + MetricQueryID query_id, std::map *counters) { return osd_perf_metric_collector.get_counters(query_id, counters); diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h index 6fdf1c0756a76..6289176c9e02f 100644 --- a/src/mgr/DaemonServer.h +++ b/src/mgr/DaemonServer.h @@ -18,6 +18,7 @@ #include #include +#include #include "common/ceph_mutex.h" #include "common/LogClient.h" @@ -29,6 +30,7 @@ #include "ServiceMap.h" #include "MgrSession.h" #include "DaemonState.h" +#include "MetricCollector.h" #include "OSDPerfMetricCollector.h" class MMgrReport; @@ -107,8 +109,7 @@ private: void tick(); void schedule_tick_locked(double delay_sec); - class OSDPerfMetricCollectorListener : - public OSDPerfMetricCollector::Listener { + class OSDPerfMetricCollectorListener : public MetricListener { public: OSDPerfMetricCollectorListener(DaemonServer *server) : server(server) { @@ -123,6 +124,27 @@ private: OSDPerfMetricCollector osd_perf_metric_collector; void handle_osd_perf_metric_query_updated(); + void handle_metric_payload(const OSDMetricPayload &payload) { + osd_perf_metric_collector.process_reports(payload); + } + + void handle_metric_payload(const UnknownMetricPayload &payload) { + ceph_abort(); + } + + struct HandlePayloadVisitor : public boost::static_visitor { + DaemonServer *server; + + HandlePayloadVisitor(DaemonServer *server) + : server(server) { + } + + template + inline void operator()(const MetricPayload &payload) const { + server->handle_metric_payload(payload); + } + }; + public: int init(uint64_t gid, entity_addrvec_t client_addrs); void shutdown(); @@ -157,11 +179,11 @@ public: void _send_configure(ConnectionRef c); - OSDPerfMetricQueryID add_osd_perf_query( + MetricQueryID add_osd_perf_query( const OSDPerfMetricQuery &query, const std::optional &limit); - int remove_osd_perf_query(OSDPerfMetricQueryID query_id); - int get_osd_perf_counters(OSDPerfMetricQueryID query_id, + int remove_osd_perf_query(MetricQueryID query_id); + int get_osd_perf_counters(MetricQueryID query_id, std::map *c); virtual const char** get_tracked_conf_keys() const override; diff --git a/src/mgr/MetricCollector.cc b/src/mgr/MetricCollector.cc new file mode 100644 index 0000000000000..1eb26a8faadc4 --- /dev/null +++ b/src/mgr/MetricCollector.cc @@ -0,0 +1,183 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/debug.h" +#include "common/errno.h" + +#include "mgr/MetricCollector.h" +#include "mgr/OSDPerfMetricTypes.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr.metric_collector " << __func__ << ": " + +template +MetricCollector::MetricCollector(MetricListener &listener) + : listener(listener) +{ +} + +template +MetricQueryID MetricCollector::add_query( + const Query &query, + const std::optional &limit) { + dout(20) << "query=" << query << ", limit=" << limit << dendl; + uint64_t query_id; + bool notify = false; + + { + std::lock_guard locker(lock); + + query_id = next_query_id++; + auto it = queries.find(query); + if (it == queries.end()) { + it = queries.emplace(query, std::map{}).first; + notify = true; + } else if (is_limited(it->second)) { + notify = true; + } + + it->second.emplace(query_id, limit); + counters.emplace(query_id, std::map{}); + } + + dout(10) << query << " " << (limit ? stringify(*limit) : "unlimited") + << " query_id=" << query_id << dendl; + + if (notify) { + listener.handle_query_updated(); + } + + return query_id; +} + +template +int MetricCollector::remove_query(MetricQueryID query_id) { + dout(20) << "query_id=" << query_id << dendl; + bool found = false; + bool notify = false; + + { + std::lock_guard locker(lock); + + for (auto it = queries.begin() ; it != queries.end();) { + auto iter = it->second.find(query_id); + if (iter == it->second.end()) { + ++it; + continue; + } + + it->second.erase(iter); + if (it->second.empty()) { + it = queries.erase(it); + notify = true; + } else if (is_limited(it->second)) { + ++it; + notify = true; + } + found = true; + break; + } + counters.erase(query_id); + } + + if (!found) { + dout(10) << query_id << " not found" << dendl; + return -ENOENT; + } + + dout(10) << query_id << dendl; + + if (notify) { + listener.handle_query_updated(); + } + + return 0; +} + +template +void MetricCollector::remove_all_queries() { + dout(20) << dendl; + bool notify; + + { + std::lock_guard locker(lock); + + notify = !queries.empty(); + queries.clear(); + } + + if (notify) { + listener.handle_query_updated(); + } +} + +template +int MetricCollector::get_counters( + MetricQueryID query_id, std::map *c) { + dout(20) << dendl; + + std::lock_guard locker(lock); + + auto it = counters.find(query_id); + if (it == counters.end()) { + dout(10) << "counters for " << query_id << " not found" << dendl; + return -ENOENT; + } + + *c = std::move(it->second); + it->second.clear(); + + return 0; +} + +template +void MetricCollector::process_reports_generic( + const std::map &reports, UpdateCallback callback) { + ceph_assert(ceph_mutex_is_locked(lock)); + + if (reports.empty()) { + return; + } + + for (auto& [query, report] : reports) { + dout(10) << "report for " << query << " query: " + << report.group_packed_performance_counters.size() << " records" + << dendl; + + for (auto& [key, bl] : report.group_packed_performance_counters) { + auto bl_it = bl.cbegin(); + + for (auto& p : queries[query]) { + auto &key_counters = counters[p.first][key]; + if (key_counters.empty()) { + key_counters.resize(query.performance_counter_descriptors.size(), + {0, 0}); + } + } + + auto desc_it = report.performance_counter_descriptors.begin(); + for (size_t i = 0; i < query.performance_counter_descriptors.size(); i++) { + if (desc_it == report.performance_counter_descriptors.end()) { + break; + } + if (*desc_it != query.performance_counter_descriptors[i]) { + continue; + } + PerformanceCounter c; + desc_it->unpack_counter(bl_it, &c); + dout(20) << "counter " << key << " " << *desc_it << ": " << c << dendl; + + for (auto& p : queries[query]) { + auto &key_counters = counters[p.first][key]; + callback(&key_counters[i], c); + } + desc_it++; + } + } + } +} + +template class +MetricCollector; diff --git a/src/mgr/MetricCollector.h b/src/mgr/MetricCollector.h new file mode 100644 index 0000000000000..f74787e96a9ee --- /dev/null +++ b/src/mgr/MetricCollector.h @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_MGR_METRIC_COLLECTOR_H +#define CEPH_MGR_METRIC_COLLECTOR_H + +#include +#include +#include +#include +#include +#include + +#include "common/ceph_mutex.h" +#include "msg/Message.h" +#include "mgr/Types.h" +#include "mgr/MetricTypes.h" + +class MMgrReport; + +template +class MetricCollector { +public: + virtual ~MetricCollector() { + } + + using Limits = std::set; + + MetricCollector(MetricListener &listener); + + MetricQueryID add_query(const Query &query, const std::optional &limit); + + int remove_query(MetricQueryID query_id); + + void remove_all_queries(); + + int get_counters(MetricQueryID query_id, std::map *counters); + + std::map get_queries() const { + std::lock_guard locker(lock); + + std::map result; + for (auto& [query, limits] : queries) { + auto result_it = result.insert({query, {}}).first; + if (is_limited(limits)) { + for (auto& limit : limits) { + result_it->second.insert(*limit.second); + } + } + } + + return result; + } + + virtual void process_reports(const MetricPayload &payload) = 0; + +protected: + typedef std::optional OptionalLimit; + typedef std::map QueryIDLimit; + typedef std::map Queries; + typedef std::map> Counters; + typedef std::function UpdateCallback; + + mutable ceph::mutex lock = ceph::make_mutex("mgr::metric::collector::lock"); + + Queries queries; + Counters counters; + + void process_reports_generic(const std::map &reports, UpdateCallback callback); + +private: + MetricListener &listener; + MetricQueryID next_query_id = 0; + + bool is_limited(const std::map &limits) const { + return std::any_of(begin(limits), end(limits), + [](auto &limits) { return limits.second.has_value(); }); + } +}; + +#endif // CEPH_MGR_METRIC_COLLECTOR_H diff --git a/src/mgr/MetricTypes.h b/src/mgr/MetricTypes.h new file mode 100644 index 0000000000000..3d4d4a68ee6cd --- /dev/null +++ b/src/mgr/MetricTypes.h @@ -0,0 +1,213 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_MGR_METRIC_TYPES_H +#define CEPH_MGR_METRIC_TYPES_H + +#include +#include "include/denc.h" +#include "mgr/OSDPerfMetricTypes.h" + +enum class MetricReportType { + METRIC_REPORT_TYPE_OSD = 0, +}; + +struct OSDMetricPayload { + static const MetricReportType METRIC_REPORT_TYPE = MetricReportType::METRIC_REPORT_TYPE_OSD; + std::map report; + + OSDMetricPayload() { + } + OSDMetricPayload(const std::map &report) + : report(report) { + } + + DENC(OSDMetricPayload, v, p) { + DENC_START(1, 1, p); + denc(v.report, p); + DENC_FINISH(p); + } +}; + +struct UnknownMetricPayload { + static const MetricReportType METRIC_REPORT_TYPE = static_cast(-1); + + UnknownMetricPayload() { } + + DENC(UnknownMetricPayload, v, p) { + ceph_abort(); + } +}; + +WRITE_CLASS_DENC(OSDMetricPayload) +WRITE_CLASS_DENC(UnknownMetricPayload) + +typedef boost::variant MetricPayload; + +class EncodeMetricPayloadVisitor : public boost::static_visitor { +public: + explicit EncodeMetricPayloadVisitor(bufferlist &bl) : m_bl(bl) { + } + + template + inline void operator()(const MetricPayload &payload) const { + using ceph::encode; + encode(static_cast(MetricPayload::METRIC_REPORT_TYPE), m_bl); + encode(payload, m_bl); + } + +private: + bufferlist &m_bl; +}; + +class DecodeMetricPayloadVisitor : public boost::static_visitor { +public: + DecodeMetricPayloadVisitor(bufferlist::const_iterator &iter) : m_iter(iter) { + } + + template + inline void operator()(MetricPayload &payload) const { + using ceph::decode; + decode(payload, m_iter); + } + +private: + bufferlist::const_iterator &m_iter; +}; + +struct MetricReportMessage { + MetricPayload payload; + + MetricReportMessage(const MetricPayload &payload = UnknownMetricPayload()) + : payload(payload) { + } + + void encode(bufferlist &bl) const { + boost::apply_visitor(EncodeMetricPayloadVisitor(bl), payload); + } + + void decode(bufferlist::const_iterator &iter) { + using ceph::decode; + + uint32_t metric_report_type; + decode(metric_report_type, iter); + + switch (static_cast(metric_report_type)) { + case MetricReportType::METRIC_REPORT_TYPE_OSD: + payload = OSDMetricPayload(); + break; + default: + payload = UnknownMetricPayload(); + break; + } + + boost::apply_visitor(DecodeMetricPayloadVisitor(iter), payload); + } +}; + +WRITE_CLASS_ENCODER(MetricReportMessage); + +// variant for sending configure message to mgr clients + +enum MetricConfigType { + METRIC_CONFIG_TYPE_OSD = 0, +}; + +struct OSDConfigPayload { + static const MetricConfigType METRIC_CONFIG_TYPE = MetricConfigType::METRIC_CONFIG_TYPE_OSD; + std::map config; + + OSDConfigPayload() { + } + OSDConfigPayload(const std::map &config) + : config(config) { + } + + DENC(OSDConfigPayload, v, p) { + DENC_START(1, 1, p); + denc(v.config, p); + DENC_FINISH(p); + } +}; + +struct UnknownConfigPayload { + static const MetricConfigType METRIC_CONFIG_TYPE = static_cast(-1); + + UnknownConfigPayload() { } + + DENC(UnknownConfigPayload, v, p) { + ceph_abort(); + } +}; + +WRITE_CLASS_DENC(OSDConfigPayload) +WRITE_CLASS_DENC(UnknownConfigPayload) + +typedef boost::variant ConfigPayload; + +class EncodeConfigPayloadVisitor : public boost::static_visitor { +public: + explicit EncodeConfigPayloadVisitor(bufferlist &bl) : m_bl(bl) { + } + + template + inline void operator()(const ConfigPayload &payload) const { + using ceph::encode; + encode(static_cast(ConfigPayload::METRIC_CONFIG_TYPE), m_bl); + encode(payload, m_bl); + } + +private: + bufferlist &m_bl; +}; + +class DecodeConfigPayloadVisitor : public boost::static_visitor { +public: + DecodeConfigPayloadVisitor(bufferlist::const_iterator &iter) : m_iter(iter) { + } + + template + inline void operator()(ConfigPayload &payload) const { + using ceph::decode; + decode(payload, m_iter); + } + +private: + bufferlist::const_iterator &m_iter; +}; + +struct MetricConfigMessage { + ConfigPayload payload; + + MetricConfigMessage(const ConfigPayload &payload = UnknownConfigPayload()) + : payload(payload) { + } + + void encode(bufferlist &bl) const { + boost::apply_visitor(EncodeConfigPayloadVisitor(bl), payload); + } + + void decode(bufferlist::const_iterator &iter) { + using ceph::decode; + + uint32_t metric_config_type; + decode(metric_config_type, iter); + + switch (metric_config_type) { + case MetricConfigType::METRIC_CONFIG_TYPE_OSD: + payload = OSDConfigPayload(); + break; + default: + payload = UnknownConfigPayload(); + break; + } + + boost::apply_visitor(DecodeConfigPayloadVisitor(iter), payload); + } +}; + +WRITE_CLASS_ENCODER(MetricConfigMessage); + +#endif // CEPH_MGR_METRIC_TYPES_H diff --git a/src/mgr/MgrClient.cc b/src/mgr/MgrClient.cc index fd22c05e99800..4903fa3a083ca 100644 --- a/src/mgr/MgrClient.cc +++ b/src/mgr/MgrClient.cc @@ -400,7 +400,9 @@ void MgrClient::_send_report() &last_config_bl_version); if (get_perf_report_cb) { - get_perf_report_cb(&report->osd_perf_metric_reports); + MetricPayload payload = get_perf_report_cb(); + MetricReportMessage message(payload); + report->metric_report_message = message; } session->con->send_message2(report); @@ -437,8 +439,11 @@ bool MgrClient::handle_mgr_configure(ref_t m) stats_threshold = m->stats_threshold; } - if (set_perf_queries_cb) { - set_perf_queries_cb(m->osd_perf_metric_queries); + if (!m->osd_perf_metric_queries.empty()) { + handle_config_payload(m->osd_perf_metric_queries); + } else if (m->metric_config_message) { + const MetricConfigMessage &message = *m->metric_config_message; + boost::apply_visitor(HandlePayloadVisitor(this), message.payload); } bool starting = (stats_period == 0) && (m->stats_period != 0); diff --git a/src/mgr/MgrClient.h b/src/mgr/MgrClient.h index e9053ccf1f027..6ac56fec970d2 100644 --- a/src/mgr/MgrClient.h +++ b/src/mgr/MgrClient.h @@ -14,13 +14,15 @@ #ifndef MGR_CLIENT_H_ #define MGR_CLIENT_H_ +#include + #include "msg/Connection.h" #include "msg/Dispatcher.h" #include "mon/MgrMap.h" #include "mgr/DaemonHealthMetric.h" #include "messages/MMgrReport.h" -#include "mgr/OSDPerfMetricTypes.h" +#include "mgr/MetricTypes.h" #include "common/perf_counters.h" #include "common/Timer.h" @@ -84,10 +86,8 @@ protected: // If provided, use this to compose an MPGStats to send with // our reports (hook for use by OSD) std::function pgstats_cb; - std::function &)> set_perf_queries_cb; - std::function *)> get_perf_report_cb; + std::function set_perf_queries_cb; + std::function get_perf_report_cb; // for service registration and beacon bool service_daemon = false; @@ -131,10 +131,8 @@ public: int r); void set_perf_metric_query_cb( - std::function &)> cb_set, - std::function *)> cb_get) + std::function cb_set, + std::function cb_get) { std::lock_guard l(lock); set_perf_queries_cb = cb_set; @@ -171,6 +169,29 @@ public: bool is_initialized() const { return initialized; } private: + void handle_config_payload(const OSDConfigPayload &payload) { + if (set_perf_queries_cb) { + set_perf_queries_cb(payload); + } + } + + void handle_config_payload(const UnknownConfigPayload &payload) { + ceph_abort(); + } + + struct HandlePayloadVisitor : public boost::static_visitor { + MgrClient *mgrc; + + HandlePayloadVisitor(MgrClient *mgrc) + : mgrc(mgrc) { + } + + template + inline void operator()(const ConfigPayload &payload) const { + mgrc->handle_config_payload(payload); + } + }; + void _send_stats(); void _send_pgstats(); void _send_report(); diff --git a/src/mgr/OSDPerfMetricCollector.cc b/src/mgr/OSDPerfMetricCollector.cc index e57ee9aa544fd..e1acff2e80184 100644 --- a/src/mgr/OSDPerfMetricCollector.cc +++ b/src/mgr/OSDPerfMetricCollector.cc @@ -4,6 +4,7 @@ #include "common/debug.h" #include "common/errno.h" +#include "messages/MMgrReport.h" #include "OSDPerfMetricCollector.h" #define dout_context g_ceph_context @@ -11,197 +12,21 @@ #undef dout_prefix #define dout_prefix *_dout << "mgr.osd_perf_metric_collector " << __func__ << " " -namespace { - -bool is_limited(const std::map> &limits) { - for (auto &it : limits) { - if (!it.second) { - return false; - } - } - return true; +OSDPerfMetricCollector::OSDPerfMetricCollector(MetricListener &listener) + : MetricCollector(listener) { } -} // anonymous namespace - -OSDPerfMetricCollector::OSDPerfMetricCollector(Listener &listener) - : listener(listener) { -} +void OSDPerfMetricCollector::process_reports(const MetricPayload &payload) { + const std::map &reports = + boost::get(payload).report; -std::map -OSDPerfMetricCollector::get_queries() const { std::lock_guard locker(lock); - - std::map result; - for (auto &it : queries) { - auto &query = it.first; - auto &limits = it.second; - auto result_it = result.insert({query, {}}).first; - if (is_limited(limits)) { - for (auto &iter : limits) { - result_it->second.insert(*iter.second); - } - } - } - - return result; -} - -OSDPerfMetricQueryID OSDPerfMetricCollector::add_query( - const OSDPerfMetricQuery& query, - const std::optional &limit) { - uint64_t query_id; - bool notify = false; - - { - std::lock_guard locker(lock); - - query_id = next_query_id++; - auto it = queries.find(query); - if (it == queries.end()) { - it = queries.insert({query, {}}).first; - notify = true; - } else if (is_limited(it->second)) { - notify = true; - } - it->second.insert({query_id, limit}); - counters[query_id]; - } - - dout(10) << query << " " << (limit ? stringify(*limit) : "unlimited") - << " query_id=" << query_id << dendl; - - if (notify) { - listener.handle_query_updated(); - } - - return query_id; -} - -int OSDPerfMetricCollector::remove_query(int query_id) { - bool found = false; - bool notify = false; - - { - std::lock_guard locker(lock); - - for (auto it = queries.begin() ; it != queries.end(); it++) { - auto iter = it->second.find(query_id); - if (iter == it->second.end()) { - continue; - } - - it->second.erase(iter); - if (it->second.empty()) { - queries.erase(it); - notify = true; - } else if (is_limited(it->second)) { - notify = true; - } - found = true; - break; - } - counters.erase(query_id); - } - - if (!found) { - dout(10) << query_id << " not found" << dendl; - return -ENOENT; - } - - dout(10) << query_id << dendl; - - if (notify) { - listener.handle_query_updated(); - } - - return 0; -} - -void OSDPerfMetricCollector::remove_all_queries() { - dout(10) << dendl; - - bool notify; - - { - std::lock_guard locker(lock); - - notify = !queries.empty(); - queries.clear(); - } - - if (notify) { - listener.handle_query_updated(); - } -} - -int OSDPerfMetricCollector::get_counters( - OSDPerfMetricQueryID query_id, - std::map *c) { - std::lock_guard locker(lock); - - auto it = counters.find(query_id); - if (it == counters.end()) { - dout(10) << "counters for " << query_id << " not found" << dendl; - return -ENOENT; - } - - *c = std::move(it->second); - it->second.clear(); - - return 0; -} - -void OSDPerfMetricCollector::process_reports( - const std::map &reports) { - - if (reports.empty()) { - return; - } - - std::lock_guard locker(lock); - - for (auto &it : reports) { - auto &query = it.first; - auto &report = it.second; - dout(10) << "report for " << query << " query: " - << report.group_packed_performance_counters.size() << " records" - << dendl; - - for (auto &it : report.group_packed_performance_counters) { - auto &key = it.first; - auto bl_it = it.second.cbegin(); - - for (auto &queries_it : queries[query]) { - auto query_id = queries_it.first; - auto &key_counters = counters[query_id][key]; - if (key_counters.empty()) { - key_counters.resize(query.performance_counter_descriptors.size(), - {0, 0}); - } - } - - auto desc_it = report.performance_counter_descriptors.begin(); - for (size_t i = 0; i < query.performance_counter_descriptors.size(); i++) { - if (desc_it == report.performance_counter_descriptors.end()) { - break; - } - if (*desc_it != query.performance_counter_descriptors[i]) { - continue; - } - PerformanceCounter c; - desc_it->unpack_counter(bl_it, &c); - dout(20) << "counter " << key << " " << *desc_it << ": " << c << dendl; - - for (auto &queries_it : queries[query]) { - auto query_id = queries_it.first; - auto &key_counters = counters[query_id][key]; - key_counters[i].first += c.first; - key_counters[i].second += c.second; - } - desc_it++; - } - } - } + process_reports_generic( + reports, [](PerformanceCounter *counter, const PerformanceCounter &update) { + counter->first += update.first; + counter->second += update.second; + }); } diff --git a/src/mgr/OSDPerfMetricCollector.h b/src/mgr/OSDPerfMetricCollector.h index 266278972b15a..f45a89c8be24b 100644 --- a/src/mgr/OSDPerfMetricCollector.h +++ b/src/mgr/OSDPerfMetricCollector.h @@ -4,52 +4,19 @@ #ifndef OSD_PERF_METRIC_COLLECTOR_H_ #define OSD_PERF_METRIC_COLLECTOR_H_ -#include "common/ceph_mutex.h" - +#include "mgr/MetricCollector.h" #include "mgr/OSDPerfMetricTypes.h" -#include - /** * OSD performance query class. */ -class OSDPerfMetricCollector { +class OSDPerfMetricCollector + : public MetricCollector { public: - struct Listener { - virtual ~Listener() { - } - - virtual void handle_query_updated() = 0; - }; - - OSDPerfMetricCollector(Listener &listener); - - std::map get_queries() const; - - OSDPerfMetricQueryID add_query( - const OSDPerfMetricQuery& query, - const std::optional &limit); - int remove_query(OSDPerfMetricQueryID query_id); - void remove_all_queries(); - - int get_counters(OSDPerfMetricQueryID query_id, - std::map *counters); - - void process_reports( - const std::map &reports); - -private: - typedef std::optional OptionalLimit; - typedef std::map> Queries; - typedef std::map> Counters; + OSDPerfMetricCollector(MetricListener &listener); - Listener &listener; - mutable ceph::mutex lock = ceph::make_mutex("OSDPerfMetricCollector::lock"); - OSDPerfMetricQueryID next_query_id = 0; - Queries queries; - Counters counters; + void process_reports(const MetricPayload &payload) override; }; #endif // OSD_PERF_METRIC_COLLECTOR_H_ diff --git a/src/mgr/OSDPerfMetricTypes.h b/src/mgr/OSDPerfMetricTypes.h index 5a1dfc2cbbef9..63e8a2b44a49e 100644 --- a/src/mgr/OSDPerfMetricTypes.h +++ b/src/mgr/OSDPerfMetricTypes.h @@ -7,6 +7,8 @@ #include "include/denc.h" #include "include/stringify.h" +#include "mgr/Types.h" + #include typedef std::vector OSDPerfMetricSubKey; // array of regex match @@ -126,9 +128,6 @@ struct denc_traits { } }; -typedef std::pair PerformanceCounter; -typedef std::vector PerformanceCounters; - enum class PerformanceCounterType : uint8_t { OPS = 0, WRITE_OPS = 1, @@ -266,8 +265,6 @@ std::ostream& operator<<(std::ostream& os, const OSDPerfMetricLimit &limit); typedef std::set OSDPerfMetricLimits; -typedef int OSDPerfMetricQueryID; - struct OSDPerfMetricQuery { bool operator<(const OSDPerfMetricQuery &other) const { if (key_descriptor < other.key_descriptor) { diff --git a/src/mgr/Types.h b/src/mgr/Types.h new file mode 100644 index 0000000000000..30810de65c75e --- /dev/null +++ b/src/mgr/Types.h @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_MGR_TYPES_H +#define CEPH_MGR_TYPES_H + +typedef int MetricQueryID; + +typedef std::pair PerformanceCounter; +typedef std::vector PerformanceCounters; + +struct MetricListener { + virtual ~MetricListener() { + } + + virtual void handle_query_updated() = 0; +}; + +#endif // CEPH_MGR_TYPES_H diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 5b4c8b3c26b6d..fcc99d1b73e7a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3539,11 +3539,11 @@ int OSD::init() mgrc.set_pgstats_cb([this](){ return collect_pg_stats(); }); mgrc.set_perf_metric_query_cb( - [this](const std::map &queries) { - set_perf_queries(queries); + [this](const ConfigPayload &config_payload) { + set_perf_queries(config_payload); }, - [this](std::map *reports) { - get_perf_reports(reports); + [this] { + return get_perf_reports(); }); mgrc.init(); @@ -9906,8 +9906,9 @@ void OSD::get_latest_osdmap() // -------------------------------- -void OSD::set_perf_queries( - const std::map &queries) { +void OSD::set_perf_queries(const ConfigPayload &config_payload) { + const OSDConfigPayload &osd_config_payload = boost::get(config_payload); + const std::map &queries = osd_config_payload.config; dout(10) << "setting " << queries.size() << " queries" << dendl; std::list supported_queries; @@ -9934,8 +9935,10 @@ void OSD::set_perf_queries( } } -void OSD::get_perf_reports( - std::map *reports) { +MetricPayload OSD::get_perf_reports() { + OSDMetricPayload payload; + std::map &reports = payload.report; + std::vector pgs; _get_pgs(&pgs); DynamicPerfStats dps; @@ -9950,8 +9953,10 @@ void OSD::get_perf_reports( pg->unlock(); dps.merge(pg_dps); } - dps.add_to_reports(m_perf_limits, reports); - dout(20) << "reports for " << reports->size() << " queries" << dendl; + dps.add_to_reports(m_perf_limits, &reports); + dout(20) << "reports for " << reports.size() << " queries" << dendl; + + return payload; } // ============================================================= diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 58ef72b73436e..fd5102fb5f07f 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -2080,10 +2080,8 @@ public: friend class OSDService; private: - void set_perf_queries( - const std::map &queries); - void get_perf_reports( - std::map *reports); + void set_perf_queries(const ConfigPayload &config_payload); + MetricPayload get_perf_reports(); ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock"); std::list m_perf_queries; -- 2.47.3