%{_datadir}/ceph/mgr/restful
%{_datadir}/ceph/mgr/selftest
%{_datadir}/ceph/mgr/snap_schedule
+%{_datadir}/ceph/mgr/stats
%{_datadir}/ceph/mgr/status
%{_datadir}/ceph/mgr/telegraf
%{_datadir}/ceph/mgr/telemetry
usr/share/ceph/mgr/restful
usr/share/ceph/mgr/selftest
usr/share/ceph/mgr/snap_schedule
+usr/share/ceph/mgr/stats
usr/share/ceph/mgr/status
usr/share/ceph/mgr/telegraf
usr/share/ceph/mgr/telemetry
PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id)
{
- std::map<OSDPerfMetricKey, PerformanceCounters> counters;
-
- int r = server.get_osd_perf_counters(query_id, &counters);
+ OSDPerfCollector collector(query_id);
+ int r = server.get_osd_perf_counters(&collector);
if (r < 0) {
dout(0) << "get_osd_perf_counters for query_id=" << query_id << " failed: "
<< cpp_strerror(r) << dendl;
}
PyFormatter f;
+ const std::map<OSDPerfMetricKey, PerformanceCounters> &counters = collector.counters;
+
+ f.open_array_section("counters");
+ for (auto &[key, instance_counters] : counters) {
+ f.open_object_section("i");
+ f.open_array_section("k");
+ for (auto &sub_key : key) {
+ f.open_array_section("s");
+ for (size_t i = 0; i < sub_key.size(); i++) {
+ f.dump_string(stringify(i).c_str(), sub_key[i]);
+ }
+ f.close_section(); // s
+ }
+ f.close_section(); // k
+ f.open_array_section("c");
+ for (auto &c : instance_counters) {
+ f.open_array_section("p");
+ f.dump_unsigned("0", c.first);
+ f.dump_unsigned("1", c.second);
+ f.close_section(); // p
+ }
+ f.close_section(); // c
+ f.close_section(); // i
+ }
+ f.close_section(); // counters
+
+ return f.get();
+}
+
+MetricQueryID ActivePyModules::add_mds_perf_query(
+ const MDSPerfMetricQuery &query,
+ const std::optional<MDSPerfMetricLimit> &limit)
+{
+ return server.add_mds_perf_query(query, limit);
+}
+
+void ActivePyModules::remove_mds_perf_query(MetricQueryID query_id)
+{
+ int r = server.remove_mds_perf_query(query_id);
+ if (r < 0) {
+ dout(0) << "remove_mds_perf_query for query_id=" << query_id << " failed: "
+ << cpp_strerror(r) << dendl;
+ }
+}
+
+PyObject *ActivePyModules::get_mds_perf_counters(MetricQueryID query_id)
+{
+ MDSPerfCollector collector(query_id);
+ int r = server.get_mds_perf_counters(&collector);
+ if (r < 0) {
+ dout(0) << "get_mds_perf_counters for query_id=" << query_id << " failed: "
+ << cpp_strerror(r) << dendl;
+ Py_RETURN_NONE;
+ }
+
+ PyFormatter f;
+ const std::map<MDSPerfMetricKey, PerformanceCounters> &counters = collector.counters;
+
+ f.open_array_section("metrics");
+
+ f.open_array_section("delayed_ranks");
+ f.dump_string("ranks", stringify(collector.delayed_ranks).c_str());
+ f.close_section(); // delayed_ranks
f.open_array_section("counters");
- for (auto &it : counters) {
- auto &key = it.first;
- auto &instance_counters = it.second;
+ for (auto &[key, instance_counters] : counters) {
f.open_object_section("i");
f.open_array_section("k");
for (auto &sub_key : key) {
f.close_section(); // i
}
f.close_section(); // counters
+ f.close_section(); // metrics
return f.get();
}
void remove_osd_perf_query(MetricQueryID query_id);
PyObject *get_osd_perf_counters(MetricQueryID query_id);
+ MetricQueryID add_mds_perf_query(
+ const MDSPerfMetricQuery &query,
+ const std::optional<MDSPerfMetricLimit> &limit);
+ void remove_mds_perf_query(MetricQueryID query_id);
+ PyObject *get_mds_perf_counters(MetricQueryID query_id);
+
bool get_store(const std::string &module_name,
const std::string &key, std::string *val) const;
PyObject *get_store_prefix(const std::string &module_name,
return self->py_modules->get_osd_perf_counters(query_id);
}
+// MDS perf query interface -- mostly follows ceph_add_osd_perf_query()
+// style
+
+static PyObject*
+ceph_add_mds_perf_query(BaseMgrModule *self, PyObject *args)
+{
+ static const std::string NAME_KEY_DESCRIPTOR = "key_descriptor";
+ static const std::string NAME_COUNTERS_DESCRIPTORS =
+ "performance_counter_descriptors";
+ static const std::string NAME_LIMIT = "limit";
+ static const std::string NAME_SUB_KEY_TYPE = "type";
+ static const std::string NAME_SUB_KEY_REGEX = "regex";
+ static const std::string NAME_LIMIT_ORDER_BY = "order_by";
+ static const std::string NAME_LIMIT_MAX_COUNT = "max_count";
+ static const std::map<std::string, MDSPerfMetricSubKeyType> sub_key_types = {
+ {"mds_rank", MDSPerfMetricSubKeyType::MDS_RANK},
+ {"client_id", MDSPerfMetricSubKeyType::CLIENT_ID},
+ };
+ static const std::map<std::string, MDSPerformanceCounterType> counter_types = {
+ {"cap_hit", MDSPerformanceCounterType::CAP_HIT_METRIC},
+ {"read_latency", MDSPerformanceCounterType::READ_LATENCY_METRIC},
+ {"write_latency", MDSPerformanceCounterType::WRITE_LATENCY_METRIC},
+ {"metadata_latency", MDSPerformanceCounterType::METADATA_LATENCY_METRIC},
+ };
+
+ PyObject *py_query = nullptr;
+ if (!PyArg_ParseTuple(args, "O:ceph_add_mds_perf_query", &py_query)) {
+ derr << "Invalid args!" << dendl;
+ return nullptr;
+ }
+ if (!PyDict_Check(py_query)) {
+ derr << __func__ << " arg not a dict" << dendl;
+ Py_RETURN_NONE;
+ }
+
+ PyObject *query_params = PyDict_Items(py_query);
+ MDSPerfMetricQuery query;
+ std::optional<MDSPerfMetricLimit> limit;
+
+ // {
+ // 'key_descriptor': [
+ // {'type': subkey_type, 'regex': regex_pattern},
+ // ...
+ // ],
+ // 'performance_counter_descriptors': [
+ // list, of, descriptor, types
+ // ],
+ // 'limit': {'order_by': performance_counter_type, 'max_count': n},
+ // }
+
+ for (int i = 0; i < PyList_Size(query_params); ++i) {
+ PyObject *kv = PyList_GET_ITEM(query_params, i);
+ char *query_param_name = nullptr;
+ PyObject *query_param_val = nullptr;
+ if (!PyArg_ParseTuple(kv, "sO:pair", &query_param_name, &query_param_val)) {
+ derr << __func__ << " dict item " << i << " not a size 2 tuple" << dendl;
+ Py_RETURN_NONE;
+ }
+ if (query_param_name == NAME_KEY_DESCRIPTOR) {
+ if (!PyList_Check(query_param_val)) {
+ derr << __func__ << " " << query_param_name << " not a list" << dendl;
+ Py_RETURN_NONE;
+ }
+ for (int j = 0; j < PyList_Size(query_param_val); j++) {
+ PyObject *sub_key = PyList_GET_ITEM(query_param_val, j);
+ if (!PyDict_Check(sub_key)) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " not a dict" << dendl;
+ Py_RETURN_NONE;
+ }
+ MDSPerfMetricSubKeyDescriptor d;
+ PyObject *sub_key_params = PyDict_Items(sub_key);
+ for (int k = 0; k < PyList_Size(sub_key_params); ++k) {
+ PyObject *pair = PyList_GET_ITEM(sub_key_params, k);
+ if (!PyTuple_Check(pair)) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " pair " << k << " not a tuple" << dendl;
+ Py_RETURN_NONE;
+ }
+ char *param_name = nullptr;
+ PyObject *param_value = nullptr;
+ if (!PyArg_ParseTuple(pair, "sO:pair", ¶m_name, ¶m_value)) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " pair " << k << " not a size 2 tuple" << dendl;
+ Py_RETURN_NONE;
+ }
+ if (param_name == NAME_SUB_KEY_TYPE) {
+ if (!PyUnicode_Check(param_value)) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " contains invalid param " << param_name << dendl;
+ Py_RETURN_NONE;
+ }
+ auto type = PyUnicode_AsUTF8(param_value);
+ auto it = sub_key_types.find(type);
+ if (it == sub_key_types.end()) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " contains invalid type " << dendl;
+ Py_RETURN_NONE;
+ }
+ d.type = it->second;
+ } else if (param_name == NAME_SUB_KEY_REGEX) {
+ if (!PyUnicode_Check(param_value)) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " contains invalid param " << param_name << dendl;
+ Py_RETURN_NONE;
+ }
+ d.regex_str = PyUnicode_AsUTF8(param_value);
+ try {
+ d.regex = d.regex_str.c_str();
+ } catch (const std::regex_error& e) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " contains invalid regex " << d.regex_str << dendl;
+ Py_RETURN_NONE;
+ }
+ if (d.regex.mark_count() == 0) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " regex " << d.regex_str << ": no capturing groups"
+ << dendl;
+ Py_RETURN_NONE;
+ }
+ } else {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " contains invalid param " << param_name << dendl;
+ Py_RETURN_NONE;
+ }
+ }
+ if (d.type == static_cast<MDSPerfMetricSubKeyType>(-1) ||
+ d.regex_str.empty()) {
+ derr << __func__ << " query " << query_param_name << " item " << i
+ << " invalid" << dendl;
+ Py_RETURN_NONE;
+ }
+ query.key_descriptor.push_back(d);
+ }
+ } else if (query_param_name == NAME_COUNTERS_DESCRIPTORS) {
+ if (!PyList_Check(query_param_val)) {
+ derr << __func__ << " " << query_param_name << " not a list" << dendl;
+ Py_RETURN_NONE;
+ }
+ for (int j = 0; j < PyList_Size(query_param_val); j++) {
+ PyObject *py_type = PyList_GET_ITEM(query_param_val, j);
+ if (!PyUnicode_Check(py_type)) {
+ derr << __func__ << " query " << query_param_name << " item " << j
+ << " not a string" << dendl;
+ Py_RETURN_NONE;
+ }
+ auto type = PyUnicode_AsUTF8(py_type);
+ auto it = counter_types.find(type);
+ if (it == counter_types.end()) {
+ derr << __func__ << " query " << query_param_name << " item " << type
+ << " is not valid type" << dendl;
+ Py_RETURN_NONE;
+ }
+ query.performance_counter_descriptors.push_back(it->second);
+ }
+ } else if (query_param_name == NAME_LIMIT) {
+ if (!PyDict_Check(query_param_val)) {
+ derr << __func__ << " query " << query_param_name << " not a dict"
+ << dendl;
+ Py_RETURN_NONE;
+ }
+
+ limit = MDSPerfMetricLimit();
+ PyObject *limit_params = PyDict_Items(query_param_val);
+
+ for (int j = 0; j < PyList_Size(limit_params); ++j) {
+ PyObject *kv = PyList_GET_ITEM(limit_params, j);
+ char *limit_param_name = nullptr;
+ PyObject *limit_param_val = nullptr;
+ if (!PyArg_ParseTuple(kv, "sO:pair", &limit_param_name,
+ &limit_param_val)) {
+ derr << __func__ << " limit item " << j << " not a size 2 tuple"
+ << dendl;
+ Py_RETURN_NONE;
+ }
+
+ if (limit_param_name == NAME_LIMIT_ORDER_BY) {
+ if (!PyUnicode_Check(limit_param_val)) {
+ derr << __func__ << " " << limit_param_name << " not a string"
+ << dendl;
+ Py_RETURN_NONE;
+ }
+ auto order_by = PyUnicode_AsUTF8(limit_param_val);
+ auto it = counter_types.find(order_by);
+ if (it == counter_types.end()) {
+ derr << __func__ << " limit " << limit_param_name
+ << " not a valid counter type" << dendl;
+ Py_RETURN_NONE;
+ }
+ limit->order_by = it->second;
+ } else if (limit_param_name == NAME_LIMIT_MAX_COUNT) {
+#if PY_MAJOR_VERSION <= 2
+ if (!PyInt_Check(limit_param_val) && !PyLong_Check(limit_param_val)) {
+#else
+ if (!PyLong_Check(limit_param_val)) {
+#endif
+ derr << __func__ << " " << limit_param_name << " not an int"
+ << dendl;
+ Py_RETURN_NONE;
+ }
+ limit->max_count = PyLong_AsLong(limit_param_val);
+ } else {
+ derr << __func__ << " unknown limit param: " << limit_param_name
+ << dendl;
+ Py_RETURN_NONE;
+ }
+ }
+ } else {
+ derr << __func__ << " unknown query param: " << query_param_name << dendl;
+ Py_RETURN_NONE;
+ }
+ }
+
+ if (query.key_descriptor.empty()) {
+ derr << __func__ << " invalid query" << dendl;
+ Py_RETURN_NONE;
+ }
+
+ if (limit) {
+ auto &ds = query.performance_counter_descriptors;
+ if (std::find(ds.begin(), ds.end(), limit->order_by) == ds.end()) {
+ derr << __func__ << " limit order_by " << limit->order_by
+ << " not in performance_counter_descriptors" << dendl;
+ Py_RETURN_NONE;
+ }
+ }
+
+ auto query_id = self->py_modules->add_mds_perf_query(query, limit);
+ return PyLong_FromLong(query_id);
+}
+
+static PyObject*
+ceph_remove_mds_perf_query(BaseMgrModule *self, PyObject *args)
+{
+ MetricQueryID query_id;
+ if (!PyArg_ParseTuple(args, "i:ceph_remove_mds_perf_query", &query_id)) {
+ derr << "Invalid args!" << dendl;
+ return nullptr;
+ }
+
+ self->py_modules->remove_mds_perf_query(query_id);
+ Py_RETURN_NONE;
+}
+
+static PyObject*
+ceph_get_mds_perf_counters(BaseMgrModule *self, PyObject *args)
+{
+ MetricQueryID query_id;
+ if (!PyArg_ParseTuple(args, "i:ceph_get_mds_perf_counters", &query_id)) {
+ derr << "Invalid args!" << dendl;
+ return nullptr;
+ }
+
+ return self->py_modules->get_mds_perf_counters(query_id);
+}
+
static PyObject*
ceph_is_authorized(BaseMgrModule *self, PyObject *args)
{
{"_ceph_get_osd_perf_counters", (PyCFunction)ceph_get_osd_perf_counters,
METH_VARARGS, "Get osd perf counters"},
+ {"_ceph_add_mds_perf_query", (PyCFunction)ceph_add_mds_perf_query,
+ METH_VARARGS, "Add an osd perf query"},
+
+ {"_ceph_remove_mds_perf_query", (PyCFunction)ceph_remove_mds_perf_query,
+ METH_VARARGS, "Remove an osd perf query"},
+
+ {"_ceph_get_mds_perf_counters", (PyCFunction)ceph_get_mds_perf_counters,
+ METH_VARARGS, "Get osd perf counters"},
+
{"_ceph_is_authorized", (PyCFunction)ceph_is_authorized,
METH_VARARGS, "Verify the current session caps are valid"},
return osd_perf_metric_collector.remove_query(query_id);
}
-int DaemonServer::get_osd_perf_counters(
- MetricQueryID query_id,
- std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
+int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
{
- return osd_perf_metric_collector.get_counters(query_id, counters);
+ return osd_perf_metric_collector.get_counters(collector);
+}
+
+MetricQueryID DaemonServer::add_mds_perf_query(
+ const MDSPerfMetricQuery &query,
+ const std::optional<MDSPerfMetricLimit> &limit)
+{
+ return mds_perf_metric_collector.add_query(query, limit);
+}
+
+int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
+{
+ return mds_perf_metric_collector.remove_query(query_id);
+}
+
+int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
+{
+ return mds_perf_metric_collector.get_counters(collector);
}
const OSDPerfMetricQuery &query,
const std::optional<OSDPerfMetricLimit> &limit);
int remove_osd_perf_query(MetricQueryID query_id);
- int get_osd_perf_counters(MetricQueryID query_id,
- std::map<OSDPerfMetricKey, PerformanceCounters> *c);
+ int get_osd_perf_counters(OSDPerfCollector *collector);
+
+ MetricQueryID add_mds_perf_query(const MDSPerfMetricQuery &query,
+ const std::optional<MDSPerfMetricLimit> &limit);
+ int remove_mds_perf_query(MetricQueryID query_id);
+ int get_mds_perf_counters(MDSPerfCollector *collector);
virtual const char** get_tracked_conf_keys() const override;
virtual void handle_conf_change(const ConfigProxy& conf,
delayed_ranks = metric_report.rank_metrics_delayed;
dout(20) << ": delayed ranks=[" << delayed_ranks << "]" << dendl;
}
+
+int MDSPerfMetricCollector::get_counters(PerfCollector *collector) {
+ MDSPerfCollector *c = static_cast<MDSPerfCollector *>(collector);
+
+ std::lock_guard locker(lock);
+
+ int r = get_counters_generic(c->query_id, &c->counters);
+ if (r != 0) {
+ return r;
+ }
+
+ get_delayed_ranks(&c->delayed_ranks);
+
+ return r;
+}
+
+void MDSPerfMetricCollector::get_delayed_ranks(std::set<mds_rank_t> *ranks) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+ *ranks = delayed_ranks;
+}
MDSPerfMetrics> {
private:
std::set<mds_rank_t> delayed_ranks;
+
+ void get_delayed_ranks(std::set<mds_rank_t> *ranks);
+
public:
MDSPerfMetricCollector(MetricListener &listener);
void process_reports(const MetricPayload &payload) override;
+ int get_counters(PerfCollector *collector) override;
};
#endif // CEPH_MGR_MDS_PERF_COLLECTOR_H
std::ostream &operator<<(std::ostream &os, const MDSPerfMetricQuery &query);
+struct MDSPerfCollector : PerfCollector {
+ std::map<MDSPerfMetricKey, PerformanceCounters> counters;
+ std::set<mds_rank_t> delayed_ranks;
+
+ MDSPerfCollector(MetricQueryID query_id)
+ : PerfCollector(query_id) {
+ }
+};
+
struct MDSPerfMetrics {
MDSPerformanceCounterDescriptors performance_counter_descriptors;
std::map<MDSPerfMetricKey, ceph::buffer::list> group_packed_performance_counters;
}
template <typename Query, typename Limit, typename Key, typename Report>
-int MetricCollector<Query, Limit, Key, Report>::get_counters(
+int MetricCollector<Query, Limit, Key, Report>::get_counters_generic(
MetricQueryID query_id, std::map<Key, PerformanceCounters> *c) {
dout(20) << dendl;
-
- std::lock_guard locker(lock);
+ ceph_assert(ceph_mutex_is_locked(lock));
auto it = counters.find(query_id);
if (it == counters.end()) {
void remove_all_queries();
- int get_counters(MetricQueryID query_id, std::map<Key, PerformanceCounters> *counters);
-
std::map<Query, Limits> get_queries() const {
std::lock_guard locker(lock);
}
virtual void process_reports(const MetricPayload &payload) = 0;
+ virtual int get_counters(PerfCollector *collector) = 0;
protected:
typedef std::optional<Limit> OptionalLimit;
Counters counters;
void process_reports_generic(const std::map<Query, Report> &reports, UpdateCallback callback);
+ int get_counters_generic(MetricQueryID query_id, std::map<Key, PerformanceCounters> *counters);
private:
MetricListener &listener;
counter->second += update.second;
});
}
+
+int OSDPerfMetricCollector::get_counters(PerfCollector *collector) {
+ OSDPerfCollector *c = static_cast<OSDPerfCollector *>(collector);
+
+ std::lock_guard locker(lock);
+ return get_counters_generic(c->query_id, &c->counters);
+}
OSDPerfMetricCollector(MetricListener &listener);
void process_reports(const MetricPayload &payload) override;
+ int get_counters(PerfCollector *collector) override;
};
#endif // OSD_PERF_METRIC_COLLECTOR_H_
};
WRITE_CLASS_DENC(OSDPerfMetricQuery)
+struct OSDPerfCollector : PerfCollector {
+ std::map<OSDPerfMetricKey, PerformanceCounters> counters;
+
+ OSDPerfCollector(MetricQueryID query_id)
+ : PerfCollector(query_id) {
+ }
+};
+
std::ostream& operator<<(std::ostream& os, const OSDPerfMetricQuery &query);
struct OSDPerfMetricReport {
virtual void handle_query_updated() = 0;
};
+struct PerfCollector {
+ MetricQueryID query_id;
+ PerfCollector(MetricQueryID query_id)
+ : query_id(query_id) {
+ }
+};
+
#endif // CEPH_MGR_TYPES_H
def _ceph_add_osd_perf_query(self, query):...
def _ceph_remove_osd_perf_query(self, query_id):...
def _ceph_get_osd_perf_counters(self, query_id):...
+ def _ceph_add_mds_perf_query(self, query):...
+ def _ceph_remove_mds_perf_query(self, query_id):...
+ def _ceph_get_mds_perf_counters(self, query_id):...
def _ceph_unregister_client(self, addrs):...
def _ceph_register_client(self, addrs):...
def _ceph_is_authorized(self, arguments):...
"""
return self._ceph_get_osd_perf_counters(query_id)
+ def add_mds_perf_query(self, query):
+ """
+ Register an MDS perf query. Argument is a
+ dict of the query parameters, in this form:
+
+ ::
+
+ {
+ 'key_descriptor': [
+ {'type': subkey_type, 'regex': regex_pattern},
+ ...
+ ],
+ 'performance_counter_descriptors': [
+ list, of, descriptor, types
+ ],
+ }
+
+ NOTE: 'limit' and 'order_by' are not supported (yet).
+
+ Valid subkey types:
+ 'mds_rank', 'client_id'
+ Valid performance counter types:
+ 'cap_hit_metric'
+
+ :param object query: query
+ :rtype: int (query id)
+ """
+ return self._ceph_add_mds_perf_query(query)
+
+ def remove_mds_perf_query(self, query_id):
+ """
+ Unregister an MDS perf query.
+
+ :param int query_id: query ID
+ """
+ return self._ceph_remove_mds_perf_query(query_id)
+
+ def get_mds_perf_counters(self, query_id):
+ """
+ Get stats collected for an MDS perf query.
+
+ :param int query_id: query ID
+ """
+ return self._ceph_get_mds_perf_counters(query_id)
+
def is_authorized(self, arguments):
"""
Verifies that the current session caps permit executing the py service
--- /dev/null
+from .module import Module
--- /dev/null
+import re
+import json
+import time
+import uuid
+import errno
+import traceback
+from collections import OrderedDict
+from typing import List, Dict, Set
+
+from mgr_module import CommandResult
+
+from datetime import datetime, timedelta
+from threading import Lock, Condition, Thread
+
+QUERY_IDS = "query_ids"
+GLOBAL_QUERY_ID = "global_query_id"
+QUERY_LAST_REQUEST = "last_time_stamp"
+QUERY_RAW_COUNTERS = "query_raw_counters"
+QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
+
+MDS_RANK_ALL = (-1,)
+CLIENT_ID_ALL = "\d*"
+CLIENT_IP_ALL = ".*"
+
+MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
+MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
+MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
+ 'read_latency': 1,
+ 'write_latency': 2,
+ 'metadata_latency': 3})
+MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
+MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency'] # type: List[str]
+
+QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+
+CLIENT_METADATA_KEY = "client_metadata"
+CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
+CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
+
+NON_EXISTENT_KEY_STR = "N/A"
+
+class FilterSpec(object):
+ """
+ query filters encapsulated and used as key for query map
+ """
+ def __init__(self, mds_ranks, client_id, client_ip):
+ self.mds_ranks = mds_ranks
+ self.client_id = client_id
+ self.client_ip = client_ip
+
+ def __hash__(self):
+ return hash((self.mds_ranks, self.client_id, self.client_ip))
+
+ def __eq__(self, other):
+ return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip)
+
+ def __ne__(self, other):
+ return not(self == other)
+
+def extract_mds_ranks_from_spec(mds_rank_spec):
+ if not mds_rank_spec:
+ return MDS_RANK_ALL
+ match = re.match(r'^(\d[,\d]*)$', mds_rank_spec)
+ if not match:
+ raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec))
+ return tuple(int(mds_rank) for mds_rank in match.group(0).split(','))
+
+def extract_client_id_from_spec(client_id_spec):
+ if not client_id_spec:
+ return CLIENT_ID_ALL
+ # the client id is the spec itself since it'll be a part
+ # of client filter regex.
+ return client_id_spec
+
+def extract_client_ip_from_spec(client_ip_spec):
+ if not client_ip_spec:
+ return CLIENT_IP_ALL
+ # TODO: validate if it is an ip address (or a subset of it).
+ # the client ip is the spec itself since it'll be a part
+ # of client filter regex.
+ return client_ip_spec
+
+def extract_mds_ranks_from_report(mds_ranks_str):
+ if not mds_ranks_str:
+ return []
+ return [int(x) for x in mds_ranks_str.split(',')]
+
+def extract_client_id_and_ip(client):
+ match = re.match(r'^(client\.\d+)\s(.*)', client)
+ if match:
+ return match.group(1), match.group(2)
+ return None, None
+
+class FSPerfStats(object):
+ lock = Lock()
+ q_cv = Condition(lock)
+ r_cv = Condition(lock)
+
+ user_queries = {} # type: Dict[str, Dict]
+
+ meta_lock = Lock()
+ client_metadata = {
+ 'metadata' : {},
+ 'to_purge' : set(),
+ 'in_progress' : {},
+ } # type: Dict
+
+ def __init__(self, module):
+ self.module = module
+ self.log = module.log
+ # report processor thread
+ self.report_processor = Thread(target=self.run)
+ self.report_processor.start()
+
+ def set_client_metadata(self, client_id, key, meta):
+ result = self.client_metadata['metadata'].setdefault(client_id, {})
+ if not key in result or not result[key] == meta:
+ result[key] = meta
+
+ def notify(self, cmdtag):
+ self.log.debug("cmdtag={0}".format(cmdtag))
+ with self.meta_lock:
+ result = self.client_metadata['in_progress'].pop(cmdtag)
+ client_meta = result[1].wait()
+ if client_meta[0] != 0:
+ self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format(
+ result[0], client_meta[2]))
+ return
+ self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
+ for metadata in json.loads(client_meta[1]):
+ client_id = "client.{0}".format(metadata['id'])
+ result = self.client_metadata['metadata'].setdefault(client_id, {})
+ for subkey in CLIENT_METADATA_SUBKEYS:
+ self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
+ for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
+ self.set_client_metadata(client_id, subkey,
+ metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
+ metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
+ supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
+ self.set_client_metadata(client_id, "valid_metrics", supported_metrics)
+ # when all async requests are done, purge clients metadata if any.
+ if not self.client_metadata['in_progress']:
+ for client in self.client_metadata['to_purge']:
+ try:
+ self.log.info("purge client metadata for {0}".format(client))
+ self.client_metadata['metadata'].remove(client)
+ except:
+ pass
+ self.client_metadata['to_purge'].clear()
+ self.log.debug("client_metadata={0}, to_purge={1}".format(
+ self.client_metadata['metadata'], self.client_metadata['to_purge']))
+
+ def update_client_meta(self, rank_set):
+ new_updates = {}
+ pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
+ with self.meta_lock:
+ for rank in rank_set:
+ if rank in pending_updates:
+ continue
+ tag = str(uuid.uuid4())
+ result = CommandResult(tag)
+ new_updates[tag] = (rank, result)
+ self.client_metadata['in_progress'].update(new_updates)
+
+ self.log.debug("updating client metadata from {0}".format(new_updates))
+
+ cmd_dict = {'prefix': 'client ls'}
+ for tag,val in new_updates.items():
+ self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag)
+
+ def run(self):
+ try:
+ self.log.info("FSPerfStats::report_processor starting...")
+ while True:
+ with self.lock:
+ self.scrub_expired_queries()
+ self.process_mds_reports()
+ self.r_cv.notify()
+
+ stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
+ self.q_cv.wait(stats_period)
+ self.log.debug("FSPerfStats::tick")
+ except Exception as e:
+ self.log.fatal("fatal error: {}".format(traceback.format_exc()))
+
+ def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
+ # this is pretty straight forward -- find what MDSs are missing from
+ # what is tracked vs what we received in incoming report and purge
+ # the whole bunch.
+ tracked_ranks = raw_perf_counters.keys()
+ available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics]
+ for rank in set(tracked_ranks) - set(available_ranks):
+ culled = raw_perf_counters.pop(rank)
+ self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format(
+ len(culled[1]), rank, "yes" if culled[0] else "no"))
+ missing_clients.update(list(culled[1].keys()))
+
+ def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
+ # this is a bit more involed -- for each rank figure out what clients
+ # are missing in incoming report and purge them from our tracked map.
+ # but, if this is invoked _after_ cull_mds_entries(), the rank set
+ # is same, so we can loop based on that assumption.
+ ranks = raw_perf_counters.keys()
+ for rank in ranks:
+ tracked_clients = raw_perf_counters[rank][1].keys()
+ available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics]
+ for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]):
+ raw_perf_counters[rank][1].pop(client)
+ self.log.info("culled {0} from rank {1}".format(client, rank))
+ missing_clients.add(client)
+
+ def cull_missing_entries(self, raw_perf_counters, incoming_metrics):
+ missing_clients = set() # type: Set[str]
+ self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients)
+ self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients)
+
+ self.log.debug("missing_clients={0}".format(missing_clients))
+ with self.meta_lock:
+ if self.client_metadata['in_progress']:
+ self.client_metadata['to_purge'].update(missing_clients)
+ self.log.info("deferring client metadata purge (now {0} client(s))".format(
+ len(self.client_metadata['to_purge'])))
+ else:
+ for client in missing_clients:
+ try:
+ self.log.info("purge client metadata for {0}".format(client))
+ self.client_metadata['metadata'].pop(client)
+ except KeyError:
+ pass
+ self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
+
+ def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
+ tracked_clients = raw_perf_counters.keys()
+ available_clients = [counter['k'][0][0] for counter in incoming_metrics]
+ for client in set(tracked_clients) - set(available_clients):
+ raw_perf_counters.pop(client)
+
+ def get_raw_perf_counters(self, query):
+ raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {})
+
+ for query_id in query[QUERY_IDS]:
+ result = self.module.get_mds_perf_counters(query_id)
+ self.log.debug("raw_perf_counters={}".format(raw_perf_counters))
+ self.log.debug("get_raw_perf_counters={}".format(result))
+
+ # extract passed in delayed ranks. metrics for delayed ranks are tagged
+ # as stale.
+ delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0])
+
+ # what's received from MDS
+ incoming_metrics = result['metrics'][1]
+
+ # cull missing MDSs and clients
+ self.cull_missing_entries(raw_perf_counters, incoming_metrics)
+
+ # iterate over metrics list and update our copy (note that we have
+ # already culled the differences).
+ meta_refresh_ranks = set()
+ for counter in incoming_metrics:
+ mds_rank = int(counter['k'][0][0])
+ client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
+ if client_id is not None or not client_ip: # client_id _could_ be 0
+ with self.meta_lock:
+ if not client_id in self.client_metadata['metadata']:
+ meta_refresh_ranks.add(mds_rank)
+ self.set_client_metadata(client_id, "IP", client_ip)
+ else:
+ self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
+
+ raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
+ raw_counters[0] = True if mds_rank in delayed_ranks else False
+ raw_client_counters = raw_counters[1].setdefault(client_id, [])
+
+ del raw_client_counters[:]
+ raw_client_counters.extend(counter['c'])
+ # send an asynchronous client metadata refresh
+ self.update_client_meta(meta_refresh_ranks)
+
+ def get_raw_perf_counters_global(self, query):
+ raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
+ result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
+
+ self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters))
+ self.log.debug("get_raw_perf_counters_global={}".format(result))
+
+ global_metrics = result['metrics'][1]
+ self.cull_global_metrics(raw_perf_counters, global_metrics)
+ for counter in global_metrics:
+ client_id, _ = extract_client_id_and_ip(counter['k'][0][0])
+ raw_client_counters = raw_perf_counters.setdefault(client_id, [])
+ del raw_client_counters[:]
+ raw_client_counters.extend(counter['c'])
+
+ def process_mds_reports(self):
+ for query in self.user_queries.values():
+ self.get_raw_perf_counters(query)
+ self.get_raw_perf_counters_global(query)
+
+ def scrub_expired_queries(self):
+ expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
+ for filter_spec in list(self.user_queries.keys()):
+ user_query = self.user_queries[filter_spec]
+ self.log.debug("scrubbing query={}".format(user_query))
+ if user_query[QUERY_LAST_REQUEST] < expire_time:
+ expired_query_ids = user_query[QUERY_IDS].copy()
+ expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
+ self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
+ self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
+ del self.user_queries[filter_spec]
+
+ def prepare_mds_perf_query(self, rank, client_id, client_ip):
+ mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
+ if not rank == -1:
+ mds_rank_regex = '^({})$'.format(rank)
+ client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
+ return {
+ 'key_descriptor' : [
+ {'type' : 'mds_rank', 'regex' : mds_rank_regex},
+ {'type' : 'client_id', 'regex' : client_regex},
+ ],
+ 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
+ }
+
+ def prepare_global_perf_query(self, client_id, client_ip):
+ client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
+ return {
+ 'key_descriptor' : [
+ {'type' : 'client_id', 'regex' : client_regex},
+ ],
+ 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS,
+ }
+
+ def unregister_mds_perf_queries(self, filter_spec, query_ids):
+ self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
+ filter_spec, query_ids))
+ for query_id in query_ids:
+ self.module.remove_mds_perf_query(query_id)
+
+ def register_mds_perf_query(self, filter_spec):
+ mds_ranks = filter_spec.mds_ranks
+ client_id = filter_spec.client_id
+ client_ip = filter_spec.client_ip
+
+ query_ids = []
+ try:
+ # register per-mds perf query
+ for rank in mds_ranks:
+ query = self.prepare_mds_perf_query(rank, client_id, client_ip)
+ self.log.info("register_mds_perf_query: {}".format(query))
+
+ query_id = self.module.add_mds_perf_query(query)
+ if query_id is None: # query id can be 0
+ raise RuntimeError("failed to add MDS perf query: {}".format(query))
+ query_ids.append(query_id)
+ except Exception:
+ for query_id in query_ids:
+ self.module.remove_mds_perf_query(query_id)
+ raise
+ return query_ids
+
+ def register_global_perf_query(self, filter_spec):
+ client_id = filter_spec.client_id
+ client_ip = filter_spec.client_ip
+
+ # register a global perf query for metrics
+ query = self.prepare_global_perf_query(client_id, client_ip)
+ self.log.info("register_global_perf_query: {}".format(query))
+
+ query_id = self.module.add_mds_perf_query(query)
+ if query_id is None: # query id can be 0
+ raise RuntimeError("failed to add global perf query: {}".format(query))
+ return query_id
+
+ def register_query(self, filter_spec):
+ user_query = self.user_queries.get(filter_spec, None)
+ if not user_query:
+ user_query = {
+ QUERY_IDS : self.register_mds_perf_query(filter_spec),
+ GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
+ QUERY_LAST_REQUEST : datetime.now(),
+ }
+ self.user_queries[filter_spec] = user_query
+
+ self.q_cv.notify()
+ self.r_cv.wait(5)
+ else:
+ user_query[QUERY_LAST_REQUEST] = datetime.now()
+ return user_query
+
+ def generate_report(self, user_query):
+ result = {} # type: Dict
+ # start with counter info -- metrics that are global and per mds
+ result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
+ result["counters"] = MDS_PERF_QUERY_COUNTERS
+
+ # fill in client metadata
+ raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
+ with self.meta_lock:
+ result_meta = result.setdefault("client_metadata", {})
+ for client_id in raw_perfs.keys():
+ if client_id in self.client_metadata["metadata"]:
+ client_meta = result_meta.setdefault(client_id, {})
+ client_meta.update(self.client_metadata["metadata"][client_id])
+
+ # start populating global perf metrics w/ client metadata
+ metrics = result.setdefault("global_metrics", {})
+ for client_id, counters in raw_perfs.items():
+ global_client_metrics = metrics.setdefault(client_id, [])
+ del global_client_metrics[:]
+ global_client_metrics.extend(counters)
+
+ # and, now per-mds metrics keyed by mds rank along with delayed ranks
+ raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
+ metrics = result.setdefault("metrics", {})
+
+ metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]]
+ for rank, counters in raw_perfs.items():
+ mds_key = "mds.{}".format(rank)
+ mds_metrics = metrics.setdefault(mds_key, {})
+ mds_metrics.update(counters[1])
+ return result
+
+ def extract_query_filters(self, cmd):
+ mds_rank_spec = cmd.get('mds_rank', None)
+ client_id_spec = cmd.get('client_id', None)
+ client_ip_spec = cmd.get('client_ip', None)
+
+ self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
+ mds_rank_spec, client_id_spec, client_ip_spec))
+
+ mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec)
+ client_id = extract_client_id_from_spec(client_id_spec)
+ client_ip = extract_client_ip_from_spec(client_ip_spec)
+
+ return FilterSpec(mds_ranks, client_id, client_ip)
+
+ def get_perf_data(self, cmd):
+ filter_spec = self.extract_query_filters(cmd)
+
+ counters = {}
+ with self.lock:
+ user_query = self.register_query(filter_spec)
+ result = self.generate_report(user_query)
+ return 0, json.dumps(result), ""
--- /dev/null
+"""
+performance stats for ceph filesystem (for now...)
+"""
+
+import json
+from typing import List, Dict
+
+from mgr_module import MgrModule
+
+from .fs.perf_stats import FSPerfStats
+
+class Module(MgrModule):
+ COMMANDS = [
+ {
+ "cmd": "fs perf stats "
+ "name=mds_rank,type=CephString,req=false "
+ "name=client_id,type=CephString,req=false "
+ "name=client_ip,type=CephString,req=false ",
+ "desc": "retrieve ceph fs performance stats",
+ "perm": "r"
+ },
+ ]
+ MODULE_OPTIONS = [] # type: List[Dict]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.fs_perf_stats = FSPerfStats(self)
+
+ def notify(self, notify_type, notify_id):
+ if notify_type == "command":
+ self.fs_perf_stats.notify(notify_id)
+
+ def handle_command(self, inbuf, cmd):
+ prefix = cmd['prefix']
+ # only supported command is `fs perf stats` right now
+ if prefix.startswith('fs perf stats'):
+ return self.fs_perf_stats.get_perf_data(cmd)
+ raise NotImplementedError(cmd['prefix'])
progress/module.py \
rook/module.py \
snap_schedule/module.py \
+ stats/module.py \
test_orchestrator/module.py \
mds_autoscaler/module.py \
volumes/__init__.py