Gil.cc
Mgr.cc
MgrStandby.cc
+ OSDPerfMetricQuery.cc
+ OSDPerfMetricCollector.cc
PyFormatter.cc
PyModule.cc
PyModuleRegistry.cc
#include "mgr/mgr_commands.h"
#include "mgr/DaemonHealthMetricCollector.h"
+#include "mgr/OSDPerfMetricCollector.h"
#include "mon/MonCommand.h"
#include "messages/MMgrOpen.h"
pgmap_ready(false),
timer(g_ceph_context, lock),
shutting_down(false),
- tick_event(nullptr)
+ tick_event(nullptr),
+ osd_perf_metric_collector_listener(this),
+ osd_perf_metric_collector(osd_perf_metric_collector_listener)
{
g_conf().add_observer(this);
}
schedule_tick_locked(delay_sec);
}
+void DaemonServer::handle_osd_perf_metric_query_updated()
+{
+ dout(10) << dendl;
+
+ // Send a fresh MMgrConfigure to all clients, so that they can follow
+ // the new policy for transmitting stats
+ finisher.queue(new FunctionContext([this](int r) {
+ Mutex::Locker l(lock);
+ for (auto &c : daemon_connections) {
+ if (c->peer_is_osd()) {
+ _send_configure(c);
+ }
+ }
+ }));
+}
+
void DaemonServer::shutdown()
{
dout(10) << "begin" << dendl;
{
ceph_assert(lock.is_locked_by_me());
- OSDPerfMetricQuery query;
-
auto configure = new MMgrConfigure();
configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
if (c->peer_is_osd()) {
- configure->osd_perf_metric_queries.push_back(query);
+ configure->osd_perf_metric_queries =
+ osd_perf_metric_collector.get_queries();
}
c->send_message(configure);
}
+OSDPerfMetricQueryID DaemonServer::add_osd_perf_query(
+ const OSDPerfMetricQuery &query)
+{
+ return osd_perf_metric_collector.add_query(query);
+}
+
+int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
+{
+ return osd_perf_metric_collector.remove_query(query_id);
+}
#include "ServiceMap.h"
#include "MgrSession.h"
#include "DaemonState.h"
+#include "OSDPerfMetricCollector.h"
class MMgrReport;
class MMgrOpen;
class MCommand;
struct MonCommand;
class CommandContext;
+struct OSDPerfMetricQuery;
/**
void tick();
void schedule_tick_locked(double delay_sec);
+ class OSDPerfMetricCollectorListener :
+ public OSDPerfMetricCollector::Listener {
+ public:
+ OSDPerfMetricCollectorListener(DaemonServer *server)
+ : server(server) {
+ }
+ void handle_query_updated() override {
+ server->handle_osd_perf_metric_query_updated();
+ }
+ private:
+ DaemonServer *server;
+ };
+ OSDPerfMetricCollectorListener osd_perf_metric_collector_listener;
+ OSDPerfMetricCollector osd_perf_metric_collector;
+ void handle_osd_perf_metric_query_updated();
+
public:
int init(uint64_t gid, entity_addrvec_t client_addrs);
void shutdown();
void _send_configure(ConnectionRef c);
+ OSDPerfMetricQueryID add_osd_perf_query(const OSDPerfMetricQuery &query);
+ int remove_osd_perf_query(OSDPerfMetricQueryID query_id);
+
virtual const char** get_tracked_conf_keys() const override;
virtual void handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed) override;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+
+#include "OSDPerfMetricCollector.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr.osd_perf_metric_collector " << __func__ << " "
+
+OSDPerfMetricCollector::OSDPerfMetricCollector(Listener &listener)
+ : listener(listener), lock("OSDPerfMetricCollector::lock") {
+}
+
+std::list<OSDPerfMetricQuery> OSDPerfMetricCollector::get_queries() {
+ Mutex::Locker locker(lock);
+
+ std::list<OSDPerfMetricQuery> query_list;
+ for (auto &it : queries) {
+ query_list.push_back(it.first);
+ }
+
+ return query_list;
+}
+
+int OSDPerfMetricCollector::add_query(const OSDPerfMetricQuery& query) {
+ uint64_t query_id;
+ bool notify = false;
+
+ {
+ Mutex::Locker locker(lock);
+
+ query_id = next_query_id++;
+ auto it = queries.find(query);
+ if (it == queries.end()) {
+ it = queries.insert({query, {}}).first;
+ notify = true;
+ }
+ it->second.insert(query_id);
+ }
+
+ dout(10) << query << " query_id=" << query_id << dendl;
+
+ if (notify) {
+ listener.handle_query_updated();
+ }
+
+ return query_id;
+}
+
+int OSDPerfMetricCollector::remove_query(int query_id) {
+ bool found = false;
+ bool notify = false;
+
+ {
+ Mutex::Locker locker(lock);
+
+ for (auto it = queries.begin() ; it != queries.end(); it++) {
+ auto &ids = it->second;
+
+ if (ids.erase(query_id) > 0) {
+ if (ids.empty()) {
+ queries.erase(it);
+ notify = true;
+ }
+ found = true;
+ break;
+ }
+ }
+ }
+
+ if (!found) {
+ dout(10) << query_id << " not found" << dendl;
+ return -ENOENT;
+ }
+
+ dout(10) << query_id << dendl;
+
+ if (notify) {
+ listener.handle_query_updated();
+ }
+
+ return 0;
+}
+
+void OSDPerfMetricCollector::remove_all_queries() {
+ dout(10) << dendl;
+
+ bool notify;
+
+ {
+ Mutex::Locker locker(lock);
+
+ notify = !queries.empty();
+ queries.clear();
+ }
+
+ if (notify) {
+ listener.handle_query_updated();
+ }
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef OSD_PERF_METRIC_COLLECTOR_H_
+#define OSD_PERF_METRIC_COLLECTOR_H_
+
+#include "common/Mutex.h"
+
+#include "mgr/OSDPerfMetricQuery.h"
+
+#include <list>
+#include <set>
+
+/**
+ * OSD performance query class.
+ */
+class OSDPerfMetricCollector {
+public:
+ struct Listener {
+ virtual ~Listener() {
+ }
+
+ virtual void handle_query_updated() = 0;
+ };
+
+ OSDPerfMetricCollector(Listener &listener);
+
+ std::list<OSDPerfMetricQuery> get_queries();
+
+ OSDPerfMetricQueryID add_query(const OSDPerfMetricQuery& query);
+ int remove_query(OSDPerfMetricQueryID query_id);
+ void remove_all_queries();
+
+private:
+ typedef std::map<OSDPerfMetricQuery, std::set<OSDPerfMetricQueryID>> Queries;
+
+ Listener &listener;
+ mutable Mutex lock;
+ OSDPerfMetricQueryID next_query_id = 0;
+ Queries queries;
+};
+
+#endif // OSD_PERF_METRIC_COLLECTOR_H_
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "OSDPerfMetricQuery.h"
+
+std::ostream& operator<<(std::ostream& os, const OSDPerfMetricQuery &query) {
+ return os << "simple";
+}
struct OSDPerfMetricQuery
{
- OSDPerfMetricQueryID query_id;
+ bool operator<(const OSDPerfMetricQuery &other) const {
+ return false;
+ }
DENC(OSDPerfMetricQuery, v, p) {
- DENC_START(1, 1, p);
- denc(v.query_id, p);
- DENC_FINISH(p);
+ DENC_START(1, 1, p);
+ DENC_FINISH(p);
}
};
WRITE_CLASS_DENC(OSDPerfMetricQuery)
+std::ostream& operator<<(std::ostream& os, const OSDPerfMetricQuery &query);
+
#endif // OSD_PERF_METRIC_QUERY_H_