]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: templatize/generalize metrics collection interface 29214/head
authorVenky Shankar <vshankar@redhat.com>
Tue, 10 Sep 2019 13:49:04 +0000 (09:49 -0400)
committerVenky Shankar <vshankar@redhat.com>
Fri, 6 Dec 2019 03:51:45 +0000 (22:51 -0500)
templatize metrics collection so as to reuse quering routines.
`MetricCollector` can be subclassed and along with implementing
` process_reports()` to process incoming metrics data.

also, generalize metrics data in `MMgrReport` and metric query
configuration in `MMgrConfigure`.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
19 files changed:
src/messages/MMgrConfigure.h
src/messages/MMgrReport.h
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/BaseMgrModule.cc
src/mgr/CMakeLists.txt
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/MetricCollector.cc [new file with mode: 0644]
src/mgr/MetricCollector.h [new file with mode: 0644]
src/mgr/MetricTypes.h [new file with mode: 0644]
src/mgr/MgrClient.cc
src/mgr/MgrClient.h
src/mgr/OSDPerfMetricCollector.cc
src/mgr/OSDPerfMetricCollector.h
src/mgr/OSDPerfMetricTypes.h
src/mgr/Types.h [new file with mode: 0644]
src/osd/OSD.cc
src/osd/OSD.h

index 1cf7bf7888911fcc4641703e5a8d81becb3e0f8d..aaa36790a4c06b474585dd60b5d7ccbfe2ac39e0 100644 (file)
@@ -16,6 +16,7 @@
 #define CEPH_MMGRCONFIGURE_H_
 
 #include "msg/Message.h"
+#include "mgr/MetricTypes.h"
 #include "mgr/OSDPerfMetricTypes.h"
 
 /**
@@ -24,7 +25,7 @@
  */
 class MMgrConfigure : public Message {
 private:
-  static constexpr int HEAD_VERSION = 3;
+  static constexpr int HEAD_VERSION = 4;
   static constexpr int COMPAT_VERSION = 1;
 
 public:
@@ -35,6 +36,8 @@ public:
 
   std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> osd_perf_metric_queries;
 
+  boost::optional<MetricConfigMessage> metric_config_message;
+
   void decode_payload() override
   {
     using ceph::decode;
@@ -46,6 +49,9 @@ public:
     if (header.version >= 3) {
       decode(osd_perf_metric_queries, p);
     }
+    if (header.version >= 4) {
+      decode(metric_config_message, p);
+    }
   }
 
   void encode_payload(uint64_t features) override {
@@ -53,6 +59,7 @@ public:
     encode(stats_period, payload);
     encode(stats_threshold, payload);
     encode(osd_perf_metric_queries, payload);
+    encode(metric_config_message, payload);
   }
 
   std::string_view get_type_name() const override { return "mgrconfigure"; }
index 25ac6a9e53ffdc466126c0c23a3677b05def12d7..d9ef4eee957d7c43c3cd4ad3f51c8258f03f8f42 100644 (file)
@@ -18,6 +18,7 @@
 #include <boost/optional.hpp>
 
 #include "msg/Message.h"
+#include "mgr/MetricTypes.h"
 #include "mgr/OSDPerfMetricTypes.h"
 
 #include "common/perf_counters.h"
@@ -73,7 +74,7 @@ WRITE_CLASS_ENCODER(PerfCounterType)
 
 class MMgrReport : public Message {
 private:
-  static constexpr int HEAD_VERSION = 8;
+  static constexpr int HEAD_VERSION = 9;
   static constexpr int COMPAT_VERSION = 1;
 
 public:
@@ -107,6 +108,8 @@ public:
 
   std::map<OSDPerfMetricQuery, OSDPerfMetricReport>  osd_perf_metric_reports;
 
+  boost::optional<MetricReportMessage> metric_report_message;
+
   void decode_payload() override
   {
     using ceph::decode;
@@ -132,6 +135,9 @@ public:
     if (header.version >= 8) {
       decode(task_status, p);
     }
+    if (header.version >= 9) {
+      decode(metric_report_message, p);
+    }
   }
 
   void encode_payload(uint64_t features) override {
@@ -146,6 +152,7 @@ public:
     encode(config_bl, payload);
     encode(osd_perf_metric_reports, payload);
     encode(task_status, payload);
+    encode(metric_report_message, payload);
   }
 
   std::string_view get_type_name() const override { return "mgrreport"; }
index 36a5f74bafea5701e98147b904bcd6bbe71fdf37..4d38a6e1e0f21f857a2ad3628227dcaa42a0184b 100644 (file)
@@ -1011,14 +1011,14 @@ void ActivePyModules::set_uri(const std::string& module_name,
   modules.at(module_name)->set_uri(uri);
 }
 
-OSDPerfMetricQueryID ActivePyModules::add_osd_perf_query(
+MetricQueryID ActivePyModules::add_osd_perf_query(
     const OSDPerfMetricQuery &query,
     const std::optional<OSDPerfMetricLimit> &limit)
 {
   return server.add_osd_perf_query(query, limit);
 }
 
-void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
+void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id)
 {
   int r = server.remove_osd_perf_query(query_id);
   if (r < 0) {
@@ -1027,7 +1027,7 @@ void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
   }
 }
 
-PyObject *ActivePyModules::get_osd_perf_counters(OSDPerfMetricQueryID query_id)
+PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id)
 {
   std::map<OSDPerfMetricKey, PerformanceCounters> counters;
 
index a8a0f3c4cf0503e4476651d6d0544c5f03328a80..192360400c762bc0c8cd82110bdc296c8b62c21e 100644 (file)
@@ -101,11 +101,11 @@ public:
       const std::string &svc_id,
       const std::string &path) const;
 
-  OSDPerfMetricQueryID add_osd_perf_query(
+  MetricQueryID add_osd_perf_query(
       const OSDPerfMetricQuery &query,
       const std::optional<OSDPerfMetricLimit> &limit);
-  void remove_osd_perf_query(OSDPerfMetricQueryID query_id);
-  PyObject *get_osd_perf_counters(OSDPerfMetricQueryID query_id);
+  void remove_osd_perf_query(MetricQueryID query_id);
+  PyObject *get_osd_perf_counters(MetricQueryID query_id);
 
   bool get_store(const std::string &module_name,
       const std::string &key, std::string *val) const;
index c8d30875bf4124c4a437d10411c7f0550d8deb85..7d603d45a6e3c6b401708d85ead694914a7e7dfe 100644 (file)
@@ -25,6 +25,7 @@
 #include "mon/MonClient.h"
 #include "common/errno.h"
 #include "common/version.h"
+#include "mgr/Types.h"
 
 #include "PyUtil.h"
 #include "BaseMgrModule.h"
@@ -992,7 +993,7 @@ ceph_add_osd_perf_query(BaseMgrModule *self, PyObject *args)
 static PyObject*
 ceph_remove_osd_perf_query(BaseMgrModule *self, PyObject *args)
 {
-  OSDPerfMetricQueryID query_id;
+  MetricQueryID query_id;
   if (!PyArg_ParseTuple(args, "i:ceph_remove_osd_perf_query", &query_id)) {
     derr << "Invalid args!" << dendl;
     return nullptr;
@@ -1005,7 +1006,7 @@ ceph_remove_osd_perf_query(BaseMgrModule *self, PyObject *args)
 static PyObject*
 ceph_get_osd_perf_counters(BaseMgrModule *self, PyObject *args)
 {
-  OSDPerfMetricQueryID query_id;
+  MetricQueryID query_id;
   if (!PyArg_ParseTuple(args, "i:ceph_get_osd_perf_counters", &query_id)) {
     derr << "Invalid args!" << dendl;
     return nullptr;
index e69335eb6976736a8fa91e073551a15482f792de..101db6ed88956fef3228e9e297c853a21796a9e4 100644 (file)
@@ -17,6 +17,7 @@ if(WITH_MGR)
     Gil.cc
     Mgr.cc
     MgrStandby.cc
+    MetricCollector.cc
     OSDPerfMetricTypes.cc
     OSDPerfMetricCollector.cc
     PyFormatter.cc
index 7b81ffff027207a8796ba492dec70995b4129352..54d2caeab263b11d74d14220c1a73e3efb2d3097 100644 (file)
@@ -633,6 +633,11 @@ bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
     osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
   }
 
+  if (m->metric_report_message) {
+    const MetricReportMessage &message = *m->metric_report_message;
+    boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
+  }
+
   return true;
 }
 
@@ -2848,20 +2853,20 @@ void DaemonServer::_send_configure(ConnectionRef c)
   c->send_message2(configure);
 }
 
-OSDPerfMetricQueryID DaemonServer::add_osd_perf_query(
+MetricQueryID DaemonServer::add_osd_perf_query(
     const OSDPerfMetricQuery &query,
     const std::optional<OSDPerfMetricLimit> &limit)
 {
   return osd_perf_metric_collector.add_query(query, limit);
 }
 
-int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
+int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
 {
   return osd_perf_metric_collector.remove_query(query_id);
 }
 
 int DaemonServer::get_osd_perf_counters(
-    OSDPerfMetricQueryID query_id,
+    MetricQueryID query_id,
     std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
 {
   return osd_perf_metric_collector.get_counters(query_id, counters);
index 6fdf1c0756a76ac910cda33d574a44584beae99a..6289176c9e02fc59fc456cb9148bee6e710daef1 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <set>
 #include <string>
+#include <boost/variant.hpp>
 
 #include "common/ceph_mutex.h"
 #include "common/LogClient.h"
@@ -29,6 +30,7 @@
 #include "ServiceMap.h"
 #include "MgrSession.h"
 #include "DaemonState.h"
+#include "MetricCollector.h"
 #include "OSDPerfMetricCollector.h"
 
 class MMgrReport;
@@ -107,8 +109,7 @@ private:
   void tick();
   void schedule_tick_locked(double delay_sec);
 
-  class OSDPerfMetricCollectorListener :
-      public OSDPerfMetricCollector::Listener {
+  class OSDPerfMetricCollectorListener : public MetricListener {
   public:
     OSDPerfMetricCollectorListener(DaemonServer *server)
       : server(server) {
@@ -123,6 +124,27 @@ private:
   OSDPerfMetricCollector osd_perf_metric_collector;
   void handle_osd_perf_metric_query_updated();
 
+  void handle_metric_payload(const OSDMetricPayload &payload) {
+    osd_perf_metric_collector.process_reports(payload);
+  }
+
+  void handle_metric_payload(const UnknownMetricPayload &payload) {
+    ceph_abort();
+  }
+
+  struct HandlePayloadVisitor : public boost::static_visitor<void> {
+    DaemonServer *server;
+
+    HandlePayloadVisitor(DaemonServer *server)
+      : server(server) {
+    }
+
+    template <typename MetricPayload>
+    inline void operator()(const MetricPayload &payload) const {
+      server->handle_metric_payload(payload);
+    }
+  };
+
 public:
   int init(uint64_t gid, entity_addrvec_t client_addrs);
   void shutdown();
@@ -157,11 +179,11 @@ public:
 
   void _send_configure(ConnectionRef c);
 
-  OSDPerfMetricQueryID add_osd_perf_query(
+  MetricQueryID add_osd_perf_query(
       const OSDPerfMetricQuery &query,
       const std::optional<OSDPerfMetricLimit> &limit);
-  int remove_osd_perf_query(OSDPerfMetricQueryID query_id);
-  int get_osd_perf_counters(OSDPerfMetricQueryID query_id,
+  int remove_osd_perf_query(MetricQueryID query_id);
+  int get_osd_perf_counters(MetricQueryID query_id,
                             std::map<OSDPerfMetricKey, PerformanceCounters> *c);
 
   virtual const char** get_tracked_conf_keys() const override;
diff --git a/src/mgr/MetricCollector.cc b/src/mgr/MetricCollector.cc
new file mode 100644 (file)
index 0000000..1eb26a8
--- /dev/null
@@ -0,0 +1,183 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+
+#include "mgr/MetricCollector.h"
+#include "mgr/OSDPerfMetricTypes.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr.metric_collector " << __func__ << ": "
+
+template <typename Query, typename Limit, typename Key, typename Report>
+MetricCollector<Query, Limit, Key, Report>::MetricCollector(MetricListener &listener)
+  : listener(listener)
+{
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+MetricQueryID MetricCollector<Query, Limit, Key, Report>::add_query(
+    const Query &query,
+    const std::optional<Limit> &limit) {
+  dout(20) << "query=" << query << ", limit=" << limit << dendl;
+  uint64_t query_id;
+  bool notify = false;
+
+  {
+    std::lock_guard locker(lock);
+
+    query_id = next_query_id++;
+    auto it = queries.find(query);
+    if (it == queries.end()) {
+      it = queries.emplace(query, std::map<MetricQueryID, OptionalLimit>{}).first;
+      notify = true;
+    } else if (is_limited(it->second)) {
+      notify = true;
+    }
+
+    it->second.emplace(query_id, limit);
+    counters.emplace(query_id, std::map<Key, PerformanceCounters>{});
+  }
+
+  dout(10) << query << " " << (limit ? stringify(*limit) : "unlimited")
+           << " query_id=" << query_id << dendl;
+
+  if (notify) {
+    listener.handle_query_updated();
+  }
+
+  return query_id;
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+int MetricCollector<Query, Limit, Key, Report>::remove_query(MetricQueryID query_id) {
+  dout(20) << "query_id=" << query_id << dendl;
+  bool found = false;
+  bool notify = false;
+
+  {
+    std::lock_guard locker(lock);
+
+    for (auto it = queries.begin() ; it != queries.end();) {
+      auto iter = it->second.find(query_id);
+      if (iter == it->second.end()) {
+        ++it;
+        continue;
+      }
+
+      it->second.erase(iter);
+      if (it->second.empty()) {
+        it = queries.erase(it);
+        notify = true;
+      } else if (is_limited(it->second)) {
+        ++it;
+        notify = true;
+      }
+      found = true;
+      break;
+    }
+    counters.erase(query_id);
+  }
+
+  if (!found) {
+    dout(10) << query_id << " not found" << dendl;
+    return -ENOENT;
+  }
+
+  dout(10) << query_id << dendl;
+
+  if (notify) {
+    listener.handle_query_updated();
+  }
+
+  return 0;
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+void MetricCollector<Query, Limit, Key, Report>::remove_all_queries() {
+  dout(20) << dendl;
+  bool notify;
+
+  {
+    std::lock_guard locker(lock);
+
+    notify = !queries.empty();
+    queries.clear();
+  }
+
+  if (notify) {
+    listener.handle_query_updated();
+  }
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+int MetricCollector<Query, Limit, Key, Report>::get_counters(
+    MetricQueryID query_id, std::map<Key, PerformanceCounters> *c) {
+  dout(20) << dendl;
+
+  std::lock_guard locker(lock);
+
+  auto it = counters.find(query_id);
+  if (it == counters.end()) {
+    dout(10) << "counters for " << query_id << " not found" << dendl;
+    return -ENOENT;
+  }
+
+  *c = std::move(it->second);
+  it->second.clear();
+
+  return 0;
+}
+
+template <typename Query, typename Limit, typename Key, typename Report>
+void MetricCollector<Query, Limit, Key, Report>::process_reports_generic(
+    const std::map<Query, Report> &reports, UpdateCallback callback) {
+  ceph_assert(ceph_mutex_is_locked(lock));
+
+  if (reports.empty()) {
+    return;
+  }
+
+  for (auto& [query, report] : reports) {
+    dout(10) << "report for " << query << " query: "
+             << report.group_packed_performance_counters.size() << " records"
+             << dendl;
+
+    for (auto& [key, bl] : report.group_packed_performance_counters) {
+      auto bl_it = bl.cbegin();
+
+      for (auto& p : queries[query]) {
+        auto &key_counters = counters[p.first][key];
+        if (key_counters.empty()) {
+          key_counters.resize(query.performance_counter_descriptors.size(),
+                              {0, 0});
+        }
+      }
+
+      auto desc_it = report.performance_counter_descriptors.begin();
+      for (size_t i = 0; i < query.performance_counter_descriptors.size(); i++) {
+        if (desc_it == report.performance_counter_descriptors.end()) {
+          break;
+        }
+        if (*desc_it != query.performance_counter_descriptors[i]) {
+          continue;
+        }
+        PerformanceCounter c;
+        desc_it->unpack_counter(bl_it, &c);
+        dout(20) << "counter " << key << " " << *desc_it << ": " << c << dendl;
+
+        for (auto& p : queries[query]) {
+          auto &key_counters = counters[p.first][key];
+          callback(&key_counters[i], c);
+        }
+        desc_it++;
+      }
+    }
+  }
+}
+
+template class
+MetricCollector<OSDPerfMetricQuery, OSDPerfMetricLimit, OSDPerfMetricKey, OSDPerfMetricReport>;
diff --git a/src/mgr/MetricCollector.h b/src/mgr/MetricCollector.h
new file mode 100644 (file)
index 0000000..f74787e
--- /dev/null
@@ -0,0 +1,81 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MGR_METRIC_COLLECTOR_H
+#define CEPH_MGR_METRIC_COLLECTOR_H
+
+#include <map>
+#include <set>
+#include <tuple>
+#include <vector>
+#include <utility>
+#include <algorithm>
+
+#include "common/ceph_mutex.h"
+#include "msg/Message.h"
+#include "mgr/Types.h"
+#include "mgr/MetricTypes.h"
+
+class MMgrReport;
+
+template <typename Query, typename Limit, typename Key, typename Report>
+class MetricCollector {
+public:
+  virtual ~MetricCollector() {
+  }
+
+  using Limits = std::set<Limit>;
+
+  MetricCollector(MetricListener &listener);
+
+  MetricQueryID add_query(const Query &query, const std::optional<Limit> &limit);
+
+  int remove_query(MetricQueryID query_id);
+
+  void remove_all_queries();
+
+  int get_counters(MetricQueryID query_id, std::map<Key, PerformanceCounters> *counters);
+
+  std::map<Query, Limits> get_queries() const {
+    std::lock_guard locker(lock);
+
+    std::map<Query, Limits> result;
+    for (auto& [query, limits] : queries) {
+      auto result_it = result.insert({query, {}}).first;
+      if (is_limited(limits)) {
+        for (auto& limit : limits) {
+          result_it->second.insert(*limit.second);
+        }
+      }
+    }
+
+    return result;
+  }
+
+  virtual void process_reports(const MetricPayload &payload) = 0;
+
+protected:
+  typedef std::optional<Limit> OptionalLimit;
+  typedef std::map<MetricQueryID, OptionalLimit> QueryIDLimit;
+  typedef std::map<Query, QueryIDLimit> Queries;
+  typedef std::map<MetricQueryID, std::map<Key, PerformanceCounters>> Counters;
+  typedef std::function<void(PerformanceCounter *, const PerformanceCounter &)> UpdateCallback;
+
+  mutable ceph::mutex lock = ceph::make_mutex("mgr::metric::collector::lock");
+
+  Queries queries;
+  Counters counters;
+
+  void process_reports_generic(const std::map<Query, Report> &reports, UpdateCallback callback);
+
+private:
+  MetricListener &listener;
+  MetricQueryID next_query_id = 0;
+
+  bool is_limited(const std::map<MetricQueryID, OptionalLimit> &limits) const {
+    return std::any_of(begin(limits), end(limits),
+                       [](auto &limits) { return limits.second.has_value(); });
+  }
+};
+
+#endif // CEPH_MGR_METRIC_COLLECTOR_H
diff --git a/src/mgr/MetricTypes.h b/src/mgr/MetricTypes.h
new file mode 100644 (file)
index 0000000..3d4d4a6
--- /dev/null
@@ -0,0 +1,213 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MGR_METRIC_TYPES_H
+#define CEPH_MGR_METRIC_TYPES_H
+
+#include <boost/variant.hpp>
+#include "include/denc.h"
+#include "mgr/OSDPerfMetricTypes.h"
+
+enum class MetricReportType {
+  METRIC_REPORT_TYPE_OSD = 0,
+};
+
+struct OSDMetricPayload {
+  static const MetricReportType METRIC_REPORT_TYPE = MetricReportType::METRIC_REPORT_TYPE_OSD;
+  std::map<OSDPerfMetricQuery, OSDPerfMetricReport> report;
+
+  OSDMetricPayload() {
+  }
+  OSDMetricPayload(const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &report)
+    : report(report) {
+  }
+
+  DENC(OSDMetricPayload, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.report, p);
+    DENC_FINISH(p);
+  }
+};
+
+struct UnknownMetricPayload {
+  static const MetricReportType METRIC_REPORT_TYPE = static_cast<MetricReportType>(-1);
+
+  UnknownMetricPayload() { }
+
+  DENC(UnknownMetricPayload, v, p) {
+    ceph_abort();
+  }
+};
+
+WRITE_CLASS_DENC(OSDMetricPayload)
+WRITE_CLASS_DENC(UnknownMetricPayload)
+
+typedef boost::variant<OSDMetricPayload,
+                       UnknownMetricPayload> MetricPayload;
+
+class EncodeMetricPayloadVisitor : public boost::static_visitor<void> {
+public:
+  explicit EncodeMetricPayloadVisitor(bufferlist &bl) : m_bl(bl) {
+  }
+
+  template <typename MetricPayload>
+  inline void operator()(const MetricPayload &payload) const {
+    using ceph::encode;
+    encode(static_cast<uint32_t>(MetricPayload::METRIC_REPORT_TYPE), m_bl);
+    encode(payload, m_bl);
+  }
+
+private:
+  bufferlist &m_bl;
+};
+
+class DecodeMetricPayloadVisitor : public boost::static_visitor<void> {
+public:
+  DecodeMetricPayloadVisitor(bufferlist::const_iterator &iter) : m_iter(iter) {
+  }
+
+  template <typename MetricPayload>
+  inline void operator()(MetricPayload &payload) const {
+    using ceph::decode;
+    decode(payload, m_iter);
+  }
+
+private:
+  bufferlist::const_iterator &m_iter;
+};
+
+struct MetricReportMessage {
+  MetricPayload payload;
+
+  MetricReportMessage(const MetricPayload &payload = UnknownMetricPayload())
+    : payload(payload) {
+  }
+
+  void encode(bufferlist &bl) const {
+    boost::apply_visitor(EncodeMetricPayloadVisitor(bl), payload);
+  }
+
+  void decode(bufferlist::const_iterator &iter) {
+    using ceph::decode;
+
+    uint32_t metric_report_type;
+    decode(metric_report_type, iter);
+
+    switch (static_cast<MetricReportType>(metric_report_type)) {
+    case MetricReportType::METRIC_REPORT_TYPE_OSD:
+      payload = OSDMetricPayload();
+      break;
+    default:
+      payload = UnknownMetricPayload();
+      break;
+  }
+
+  boost::apply_visitor(DecodeMetricPayloadVisitor(iter), payload);
+  }
+};
+
+WRITE_CLASS_ENCODER(MetricReportMessage);
+
+// variant for sending configure message to mgr clients
+
+enum MetricConfigType {
+  METRIC_CONFIG_TYPE_OSD = 0,
+};
+
+struct OSDConfigPayload {
+  static const MetricConfigType METRIC_CONFIG_TYPE = MetricConfigType::METRIC_CONFIG_TYPE_OSD;
+  std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> config;
+
+  OSDConfigPayload() {
+  }
+  OSDConfigPayload(const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &config)
+    : config(config) {
+  }
+
+  DENC(OSDConfigPayload, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.config, p);
+    DENC_FINISH(p);
+  }
+};
+
+struct UnknownConfigPayload {
+  static const MetricConfigType METRIC_CONFIG_TYPE = static_cast<MetricConfigType>(-1);
+
+  UnknownConfigPayload() { }
+
+  DENC(UnknownConfigPayload, v, p) {
+    ceph_abort();
+  }
+};
+
+WRITE_CLASS_DENC(OSDConfigPayload)
+WRITE_CLASS_DENC(UnknownConfigPayload)
+
+typedef boost::variant<OSDConfigPayload,
+                       UnknownConfigPayload> ConfigPayload;
+
+class EncodeConfigPayloadVisitor : public boost::static_visitor<void> {
+public:
+  explicit EncodeConfigPayloadVisitor(bufferlist &bl) : m_bl(bl) {
+  }
+
+  template <typename ConfigPayload>
+  inline void operator()(const ConfigPayload &payload) const {
+    using ceph::encode;
+    encode(static_cast<uint32_t>(ConfigPayload::METRIC_CONFIG_TYPE), m_bl);
+    encode(payload, m_bl);
+  }
+
+private:
+  bufferlist &m_bl;
+};
+
+class DecodeConfigPayloadVisitor : public boost::static_visitor<void> {
+public:
+  DecodeConfigPayloadVisitor(bufferlist::const_iterator &iter) : m_iter(iter) {
+  }
+
+  template <typename ConfigPayload>
+  inline void operator()(ConfigPayload &payload) const {
+    using ceph::decode;
+    decode(payload, m_iter);
+  }
+
+private:
+  bufferlist::const_iterator &m_iter;
+};
+
+struct MetricConfigMessage {
+  ConfigPayload payload;
+
+  MetricConfigMessage(const ConfigPayload &payload = UnknownConfigPayload())
+    : payload(payload) {
+  }
+
+  void encode(bufferlist &bl) const {
+    boost::apply_visitor(EncodeConfigPayloadVisitor(bl), payload);
+  }
+
+  void decode(bufferlist::const_iterator &iter) {
+    using ceph::decode;
+
+    uint32_t metric_config_type;
+    decode(metric_config_type, iter);
+
+    switch (metric_config_type) {
+    case MetricConfigType::METRIC_CONFIG_TYPE_OSD:
+      payload = OSDConfigPayload();
+      break;
+    default:
+      payload = UnknownConfigPayload();
+      break;
+  }
+
+  boost::apply_visitor(DecodeConfigPayloadVisitor(iter), payload);
+  }
+};
+
+WRITE_CLASS_ENCODER(MetricConfigMessage);
+
+#endif // CEPH_MGR_METRIC_TYPES_H
index fd22c05e99800ff6df9264a83734fa7eb8e27fd0..4903fa3a083ca4b305d7aa0a9557dbb4ab4966e7 100644 (file)
@@ -400,7 +400,9 @@ void MgrClient::_send_report()
                            &last_config_bl_version);
 
   if (get_perf_report_cb) {
-    get_perf_report_cb(&report->osd_perf_metric_reports);
+    MetricPayload payload = get_perf_report_cb();
+    MetricReportMessage message(payload);
+    report->metric_report_message = message;
   }
 
   session->con->send_message2(report);
@@ -437,8 +439,11 @@ bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
     stats_threshold = m->stats_threshold;
   }
 
-  if (set_perf_queries_cb) {
-    set_perf_queries_cb(m->osd_perf_metric_queries);
+  if (!m->osd_perf_metric_queries.empty()) {
+    handle_config_payload(m->osd_perf_metric_queries);
+  } else if (m->metric_config_message) {
+    const MetricConfigMessage &message = *m->metric_config_message;
+    boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
   }
 
   bool starting = (stats_period == 0) && (m->stats_period != 0);
index e9053ccf1f0279a67795fd46bac5f3227c3d980d..6ac56fec970d2dcebd0a74f2111c5b31338f4a12 100644 (file)
 #ifndef MGR_CLIENT_H_
 #define MGR_CLIENT_H_
 
+#include <boost/variant.hpp>
+
 #include "msg/Connection.h"
 #include "msg/Dispatcher.h"
 #include "mon/MgrMap.h"
 #include "mgr/DaemonHealthMetric.h"
 
 #include "messages/MMgrReport.h"
-#include "mgr/OSDPerfMetricTypes.h"
+#include "mgr/MetricTypes.h"
 
 #include "common/perf_counters.h"
 #include "common/Timer.h"
@@ -84,10 +86,8 @@ protected:
   // If provided, use this to compose an MPGStats to send with
   // our reports (hook for use by OSD)
   std::function<MPGStats*()> pgstats_cb;
-  std::function<void(const std::map<OSDPerfMetricQuery,
-                                    OSDPerfMetricLimits> &)> set_perf_queries_cb;
-  std::function<void(std::map<OSDPerfMetricQuery,
-                              OSDPerfMetricReport> *)> get_perf_report_cb;
+  std::function<void(const ConfigPayload &)> set_perf_queries_cb;
+  std::function<MetricPayload()> get_perf_report_cb;
 
   // for service registration and beacon
   bool service_daemon = false;
@@ -131,10 +131,8 @@ public:
     int r);
 
   void set_perf_metric_query_cb(
-    std::function<void(const std::map<OSDPerfMetricQuery,
-                                      OSDPerfMetricLimits> &)> cb_set,
-          std::function<void(std::map<OSDPerfMetricQuery,
-                                      OSDPerfMetricReport> *)> cb_get)
+    std::function<void(const ConfigPayload &)> cb_set,
+    std::function<MetricPayload()> cb_get)
   {
       std::lock_guard l(lock);
       set_perf_queries_cb = cb_set;
@@ -171,6 +169,29 @@ public:
   bool is_initialized() const { return initialized; }
 
 private:
+  void handle_config_payload(const OSDConfigPayload &payload) {
+    if (set_perf_queries_cb) {
+      set_perf_queries_cb(payload);
+    }
+  }
+
+  void handle_config_payload(const UnknownConfigPayload &payload) {
+    ceph_abort();
+  }
+
+  struct HandlePayloadVisitor : public boost::static_visitor<void> {
+    MgrClient *mgrc;
+
+    HandlePayloadVisitor(MgrClient *mgrc)
+      : mgrc(mgrc) {
+    }
+
+    template <typename ConfigPayload>
+    inline void operator()(const ConfigPayload &payload) const {
+      mgrc->handle_config_payload(payload);
+    }
+  };
+
   void _send_stats();
   void _send_pgstats();
   void _send_report();
index e57ee9aa544fdca6e9117475e10c00fa850c92a3..e1acff2e80184b0b86f3872be60edbb923b05055 100644 (file)
@@ -4,6 +4,7 @@
 #include "common/debug.h"
 #include "common/errno.h"
 
+#include "messages/MMgrReport.h"
 #include "OSDPerfMetricCollector.h"
 
 #define dout_context g_ceph_context
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr.osd_perf_metric_collector " << __func__ << " "
 
-namespace {
-
-bool is_limited(const std::map<OSDPerfMetricQueryID,
-                                std::optional<OSDPerfMetricLimit>> &limits) {
-  for (auto &it : limits) {
-    if (!it.second) {
-      return false;
-    }
-  }
-  return true;
+OSDPerfMetricCollector::OSDPerfMetricCollector(MetricListener &listener)
+  : MetricCollector<OSDPerfMetricQuery,
+                    OSDPerfMetricLimit,
+                    OSDPerfMetricKey,
+                    OSDPerfMetricReport>(listener) {
 }
 
-} // anonymous namespace
-
-OSDPerfMetricCollector::OSDPerfMetricCollector(Listener &listener)
-  : listener(listener) {
-}
+void OSDPerfMetricCollector::process_reports(const MetricPayload &payload) {
+  const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports =
+    boost::get<OSDMetricPayload>(payload).report;
 
-std::map<OSDPerfMetricQuery, OSDPerfMetricLimits>
-OSDPerfMetricCollector::get_queries() const {
   std::lock_guard locker(lock);
-
-  std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> result;
-  for (auto &it : queries) {
-    auto &query = it.first;
-    auto &limits = it.second;
-    auto result_it = result.insert({query, {}}).first;
-    if (is_limited(limits)) {
-      for (auto &iter : limits) {
-        result_it->second.insert(*iter.second);
-      }
-    }
-  }
-
-  return result;
-}
-
-OSDPerfMetricQueryID OSDPerfMetricCollector::add_query(
-    const OSDPerfMetricQuery& query,
-    const std::optional<OSDPerfMetricLimit> &limit) {
-  uint64_t query_id;
-  bool notify = false;
-
-  {
-    std::lock_guard locker(lock);
-
-    query_id = next_query_id++;
-    auto it = queries.find(query);
-    if (it == queries.end()) {
-      it = queries.insert({query, {}}).first;
-      notify = true;
-    } else if (is_limited(it->second)) {
-      notify = true;
-    }
-    it->second.insert({query_id, limit});
-    counters[query_id];
-  }
-
-  dout(10) << query << " " << (limit ? stringify(*limit) : "unlimited")
-           << " query_id=" << query_id << dendl;
-
-  if (notify) {
-    listener.handle_query_updated();
-  }
-
-  return query_id;
-}
-
-int OSDPerfMetricCollector::remove_query(int query_id) {
-  bool found = false;
-  bool notify = false;
-
-  {
-    std::lock_guard locker(lock);
-
-    for (auto it = queries.begin() ; it != queries.end(); it++) {
-      auto iter = it->second.find(query_id);
-      if (iter == it->second.end()) {
-        continue;
-      }
-
-      it->second.erase(iter);
-      if (it->second.empty()) {
-        queries.erase(it);
-        notify = true;
-      } else if (is_limited(it->second)) {
-        notify = true;
-      }
-      found = true;
-      break;
-    }
-    counters.erase(query_id);
-  }
-
-  if (!found) {
-    dout(10) << query_id << " not found" << dendl;
-    return -ENOENT;
-  }
-
-  dout(10) << query_id << dendl;
-
-  if (notify) {
-    listener.handle_query_updated();
-  }
-
-  return 0;
-}
-
-void OSDPerfMetricCollector::remove_all_queries() {
-  dout(10) << dendl;
-
-  bool notify;
-
-  {
-    std::lock_guard locker(lock);
-
-    notify = !queries.empty();
-    queries.clear();
-  }
-
-  if (notify) {
-    listener.handle_query_updated();
-  }
-}
-
-int OSDPerfMetricCollector::get_counters(
-    OSDPerfMetricQueryID query_id,
-    std::map<OSDPerfMetricKey, PerformanceCounters> *c) {
-  std::lock_guard locker(lock);
-
-  auto it = counters.find(query_id);
-  if (it == counters.end()) {
-    dout(10) << "counters for " << query_id << " not found" << dendl;
-    return -ENOENT;
-  }
-
-  *c = std::move(it->second);
-  it->second.clear();
-
-  return 0;
-}
-
-void OSDPerfMetricCollector::process_reports(
-    const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports) {
-
-  if (reports.empty()) {
-    return;
-  }
-
-  std::lock_guard locker(lock);
-
-  for (auto &it : reports) {
-    auto &query = it.first;
-    auto &report = it.second;
-    dout(10) << "report for " << query << " query: "
-             << report.group_packed_performance_counters.size() << " records"
-             << dendl;
-
-    for (auto &it : report.group_packed_performance_counters) {
-      auto &key = it.first;
-      auto bl_it = it.second.cbegin();
-
-      for (auto &queries_it : queries[query]) {
-        auto query_id = queries_it.first;
-        auto &key_counters = counters[query_id][key];
-        if (key_counters.empty()) {
-          key_counters.resize(query.performance_counter_descriptors.size(),
-                              {0, 0});
-        }
-      }
-
-      auto desc_it = report.performance_counter_descriptors.begin();
-      for (size_t i = 0; i < query.performance_counter_descriptors.size(); i++) {
-        if (desc_it == report.performance_counter_descriptors.end()) {
-          break;
-        }
-        if (*desc_it != query.performance_counter_descriptors[i]) {
-          continue;
-        }
-        PerformanceCounter c;
-        desc_it->unpack_counter(bl_it, &c);
-        dout(20) << "counter " << key << " " << *desc_it << ": " << c << dendl;
-
-        for (auto &queries_it : queries[query]) {
-          auto query_id = queries_it.first;
-          auto &key_counters = counters[query_id][key];
-          key_counters[i].first += c.first;
-          key_counters[i].second += c.second;
-        }
-        desc_it++;
-      }
-    }
-  }
+  process_reports_generic(
+    reports, [](PerformanceCounter *counter, const PerformanceCounter &update) {
+      counter->first += update.first;
+      counter->second += update.second;
+    });
 }
index 266278972b15ab3e9de833a37af3857508b25d90..f45a89c8be24b39564b5aaa59a5690a37ba0b3e0 100644 (file)
@@ -4,52 +4,19 @@
 #ifndef OSD_PERF_METRIC_COLLECTOR_H_
 #define OSD_PERF_METRIC_COLLECTOR_H_
 
-#include "common/ceph_mutex.h"
-
+#include "mgr/MetricCollector.h"
 #include "mgr/OSDPerfMetricTypes.h"
 
-#include <map>
-
 /**
  * OSD performance query class.
  */
-class OSDPerfMetricCollector {
+class OSDPerfMetricCollector
+  : public MetricCollector<OSDPerfMetricQuery, OSDPerfMetricLimit, OSDPerfMetricKey,
+                           OSDPerfMetricReport> {
 public:
-  struct Listener {
-    virtual ~Listener() {
-    }
-
-    virtual void handle_query_updated() = 0;
-  };
-
-  OSDPerfMetricCollector(Listener &listener);
-
-  std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> get_queries() const;
-
-  OSDPerfMetricQueryID add_query(
-      const OSDPerfMetricQuery& query,
-      const std::optional<OSDPerfMetricLimit> &limit);
-  int remove_query(OSDPerfMetricQueryID query_id);
-  void remove_all_queries();
-
-  int get_counters(OSDPerfMetricQueryID query_id,
-                   std::map<OSDPerfMetricKey, PerformanceCounters> *counters);
-
-  void process_reports(
-      const std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports);
-
-private:
-  typedef std::optional<OSDPerfMetricLimit> OptionalLimit;
-  typedef std::map<OSDPerfMetricQuery,
-                   std::map<OSDPerfMetricQueryID, OptionalLimit>> Queries;
-  typedef std::map<OSDPerfMetricQueryID,
-                   std::map<OSDPerfMetricKey, PerformanceCounters>> Counters;
+  OSDPerfMetricCollector(MetricListener &listener);
 
-  Listener &listener;
-  mutable ceph::mutex lock = ceph::make_mutex("OSDPerfMetricCollector::lock");
-  OSDPerfMetricQueryID next_query_id = 0;
-  Queries queries;
-  Counters counters;
+  void process_reports(const MetricPayload &payload) override;
 };
 
 #endif // OSD_PERF_METRIC_COLLECTOR_H_
index 5a1dfc2cbbef92a5acd79b6841cb3c6175a6f966..63e8a2b44a49e9653c527c996fc1972c0cd65676 100644 (file)
@@ -7,6 +7,8 @@
 #include "include/denc.h"
 #include "include/stringify.h"
 
+#include "mgr/Types.h"
+
 #include <regex>
 
 typedef std::vector<std::string> OSDPerfMetricSubKey; // array of regex match
@@ -126,9 +128,6 @@ struct denc_traits<OSDPerfMetricKeyDescriptor> {
   }
 };
 
-typedef std::pair<uint64_t,uint64_t> PerformanceCounter;
-typedef std::vector<PerformanceCounter> PerformanceCounters;
-
 enum class PerformanceCounterType : uint8_t {
   OPS = 0,
   WRITE_OPS = 1,
@@ -266,8 +265,6 @@ std::ostream& operator<<(std::ostream& os, const OSDPerfMetricLimit &limit);
 
 typedef std::set<OSDPerfMetricLimit> OSDPerfMetricLimits;
 
-typedef int OSDPerfMetricQueryID;
-
 struct OSDPerfMetricQuery {
   bool operator<(const OSDPerfMetricQuery &other) const {
     if (key_descriptor < other.key_descriptor) {
diff --git a/src/mgr/Types.h b/src/mgr/Types.h
new file mode 100644 (file)
index 0000000..30810de
--- /dev/null
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MGR_TYPES_H
+#define CEPH_MGR_TYPES_H
+
+typedef int MetricQueryID;
+
+typedef std::pair<uint64_t,uint64_t> PerformanceCounter;
+typedef std::vector<PerformanceCounter> PerformanceCounters;
+
+struct MetricListener {
+  virtual ~MetricListener() {
+  }
+
+  virtual void handle_query_updated() = 0;
+};
+
+#endif // CEPH_MGR_TYPES_H
index 5b4c8b3c26b6dc5898d5ccf4b12ec3e90eb017f3..fcc99d1b73e7a3d6be0e2397dcd37a3d9d69e647 100644 (file)
@@ -3539,11 +3539,11 @@ int OSD::init()
 
   mgrc.set_pgstats_cb([this](){ return collect_pg_stats(); });
   mgrc.set_perf_metric_query_cb(
-    [this](const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries) {
-        set_perf_queries(queries);
+    [this](const ConfigPayload &config_payload) {
+        set_perf_queries(config_payload);
       },
-      [this](std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports) {
-        get_perf_reports(reports);
+      [this] {
+        return get_perf_reports();
       });
   mgrc.init();
 
@@ -9906,8 +9906,9 @@ void OSD::get_latest_osdmap()
 
 // --------------------------------
 
-void OSD::set_perf_queries(
-    const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries) {
+void OSD::set_perf_queries(const ConfigPayload &config_payload) {
+  const OSDConfigPayload &osd_config_payload = boost::get<OSDConfigPayload>(config_payload);
+  const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries = osd_config_payload.config;
   dout(10) << "setting " << queries.size() << " queries" << dendl;
 
   std::list<OSDPerfMetricQuery> supported_queries;
@@ -9934,8 +9935,10 @@ void OSD::set_perf_queries(
   }
 }
 
-void OSD::get_perf_reports(
-    std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports) {
+MetricPayload OSD::get_perf_reports() {
+  OSDMetricPayload payload;
+  std::map<OSDPerfMetricQuery, OSDPerfMetricReport> &reports = payload.report;
+
   std::vector<PGRef> pgs;
   _get_pgs(&pgs);
   DynamicPerfStats dps;
@@ -9950,8 +9953,10 @@ void OSD::get_perf_reports(
     pg->unlock();
     dps.merge(pg_dps);
   }
-  dps.add_to_reports(m_perf_limits, reports);
-  dout(20) << "reports for " << reports->size() << " queries" << dendl;
+  dps.add_to_reports(m_perf_limits, &reports);
+  dout(20) << "reports for " << reports.size() << " queries" << dendl;
+
+  return payload;
 }
 
 // =============================================================
index 58ef72b73436ebd40ad3fdba1a85f786f02ee3c2..fd5102fb5f07ffc85f33d2a5bc4057d895bbeb5c 100644 (file)
@@ -2080,10 +2080,8 @@ public:
   friend class OSDService;
 
 private:
-  void set_perf_queries(
-      const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries);
-  void get_perf_reports(
-      std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
+  void set_perf_queries(const ConfigPayload &config_payload);
+  MetricPayload get_perf_reports();
 
   ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
   std::list<OSDPerfMetricQuery> m_perf_queries;