From: Venky Shankar Date: Mon, 26 Aug 2019 09:32:23 +0000 (-0400) Subject: mgr/stats: mds performance stats module X-Git-Tag: v17.0.0~899^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=7523aef6e8656952418d17227c877952f9370f95;p=ceph-ci.git mgr/stats: mds performance stats module Signed-off-by: Venky Shankar --- diff --git a/ceph.spec.in b/ceph.spec.in index a7f62b9c8bd..838fd7b7702 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -1673,6 +1673,7 @@ fi %{_datadir}/ceph/mgr/restful %{_datadir}/ceph/mgr/selftest %{_datadir}/ceph/mgr/snap_schedule +%{_datadir}/ceph/mgr/stats %{_datadir}/ceph/mgr/status %{_datadir}/ceph/mgr/telegraf %{_datadir}/ceph/mgr/telemetry diff --git a/debian/ceph-mgr-modules-core.install b/debian/ceph-mgr-modules-core.install index f0f18065bf6..a1a74c0af2d 100644 --- a/debian/ceph-mgr-modules-core.install +++ b/debian/ceph-mgr-modules-core.install @@ -16,6 +16,7 @@ usr/share/ceph/mgr/rbd_support usr/share/ceph/mgr/restful usr/share/ceph/mgr/selftest usr/share/ceph/mgr/snap_schedule +usr/share/ceph/mgr/stats usr/share/ceph/mgr/status usr/share/ceph/mgr/telegraf usr/share/ceph/mgr/telemetry diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index a7fc0940767..9f28f8a58aa 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -1054,9 +1054,8 @@ void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id) PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id) { - std::map counters; - - int r = server.get_osd_perf_counters(query_id, &counters); + OSDPerfCollector collector(query_id); + int r = server.get_osd_perf_counters(&collector); if (r < 0) { dout(0) << "get_osd_perf_counters for query_id=" << query_id << " failed: " << cpp_strerror(r) << dendl; @@ -1064,11 +1063,72 @@ PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id) } PyFormatter f; + const std::map &counters = collector.counters; + + f.open_array_section("counters"); + for (auto &[key, instance_counters] : counters) { + f.open_object_section("i"); + f.open_array_section("k"); + for (auto &sub_key : key) { + f.open_array_section("s"); + for (size_t i = 0; i < sub_key.size(); i++) { + f.dump_string(stringify(i).c_str(), sub_key[i]); + } + f.close_section(); // s + } + f.close_section(); // k + f.open_array_section("c"); + for (auto &c : instance_counters) { + f.open_array_section("p"); + f.dump_unsigned("0", c.first); + f.dump_unsigned("1", c.second); + f.close_section(); // p + } + f.close_section(); // c + f.close_section(); // i + } + f.close_section(); // counters + + return f.get(); +} + +MetricQueryID ActivePyModules::add_mds_perf_query( + const MDSPerfMetricQuery &query, + const std::optional &limit) +{ + return server.add_mds_perf_query(query, limit); +} + +void ActivePyModules::remove_mds_perf_query(MetricQueryID query_id) +{ + int r = server.remove_mds_perf_query(query_id); + if (r < 0) { + dout(0) << "remove_mds_perf_query for query_id=" << query_id << " failed: " + << cpp_strerror(r) << dendl; + } +} + +PyObject *ActivePyModules::get_mds_perf_counters(MetricQueryID query_id) +{ + MDSPerfCollector collector(query_id); + int r = server.get_mds_perf_counters(&collector); + if (r < 0) { + dout(0) << "get_mds_perf_counters for query_id=" << query_id << " failed: " + << cpp_strerror(r) << dendl; + Py_RETURN_NONE; + } + + PyFormatter f; + const std::map &counters = collector.counters; + + f.open_array_section("metrics"); + + f.open_array_section("delayed_ranks"); + f.dump_string("ranks", stringify(collector.delayed_ranks).c_str()); + f.close_section(); // delayed_ranks f.open_array_section("counters"); - for (auto &it : counters) { - auto &key = it.first; - auto &instance_counters = it.second; + for (auto &[key, instance_counters] : counters) { f.open_object_section("i"); f.open_array_section("k"); for (auto &sub_key : key) { @@ -1090,6 +1150,7 @@ PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id) f.close_section(); // i } f.close_section(); // counters + f.close_section(); // metrics return f.get(); } diff --git a/src/mgr/ActivePyModules.h b/src/mgr/ActivePyModules.h index 4892f2705fc..50cfc163639 100644 --- a/src/mgr/ActivePyModules.h +++ b/src/mgr/ActivePyModules.h @@ -110,6 +110,12 @@ public: void remove_osd_perf_query(MetricQueryID query_id); PyObject *get_osd_perf_counters(MetricQueryID query_id); + MetricQueryID add_mds_perf_query( + const MDSPerfMetricQuery &query, + const std::optional &limit); + void remove_mds_perf_query(MetricQueryID query_id); + PyObject *get_mds_perf_counters(MetricQueryID query_id); + bool get_store(const std::string &module_name, const std::string &key, std::string *val) const; PyObject *get_store_prefix(const std::string &module_name, diff --git a/src/mgr/BaseMgrModule.cc b/src/mgr/BaseMgrModule.cc index b62b681d641..84ceba7a86a 100644 --- a/src/mgr/BaseMgrModule.cc +++ b/src/mgr/BaseMgrModule.cc @@ -1023,6 +1023,262 @@ ceph_get_osd_perf_counters(BaseMgrModule *self, PyObject *args) return self->py_modules->get_osd_perf_counters(query_id); } +// MDS perf query interface -- mostly follows ceph_add_osd_perf_query() +// style + +static PyObject* +ceph_add_mds_perf_query(BaseMgrModule *self, PyObject *args) +{ + static const std::string NAME_KEY_DESCRIPTOR = "key_descriptor"; + static const std::string NAME_COUNTERS_DESCRIPTORS = + "performance_counter_descriptors"; + static const std::string NAME_LIMIT = "limit"; + static const std::string NAME_SUB_KEY_TYPE = "type"; + static const std::string NAME_SUB_KEY_REGEX = "regex"; + static const std::string NAME_LIMIT_ORDER_BY = "order_by"; + static const std::string NAME_LIMIT_MAX_COUNT = "max_count"; + static const std::map sub_key_types = { + {"mds_rank", MDSPerfMetricSubKeyType::MDS_RANK}, + {"client_id", MDSPerfMetricSubKeyType::CLIENT_ID}, + }; + static const std::map counter_types = { + {"cap_hit", MDSPerformanceCounterType::CAP_HIT_METRIC}, + {"read_latency", MDSPerformanceCounterType::READ_LATENCY_METRIC}, + {"write_latency", MDSPerformanceCounterType::WRITE_LATENCY_METRIC}, + {"metadata_latency", MDSPerformanceCounterType::METADATA_LATENCY_METRIC}, + }; + + PyObject *py_query = nullptr; + if (!PyArg_ParseTuple(args, "O:ceph_add_mds_perf_query", &py_query)) { + derr << "Invalid args!" << dendl; + return nullptr; + } + if (!PyDict_Check(py_query)) { + derr << __func__ << " arg not a dict" << dendl; + Py_RETURN_NONE; + } + + PyObject *query_params = PyDict_Items(py_query); + MDSPerfMetricQuery query; + std::optional limit; + + // { + // 'key_descriptor': [ + // {'type': subkey_type, 'regex': regex_pattern}, + // ... + // ], + // 'performance_counter_descriptors': [ + // list, of, descriptor, types + // ], + // 'limit': {'order_by': performance_counter_type, 'max_count': n}, + // } + + for (int i = 0; i < PyList_Size(query_params); ++i) { + PyObject *kv = PyList_GET_ITEM(query_params, i); + char *query_param_name = nullptr; + PyObject *query_param_val = nullptr; + if (!PyArg_ParseTuple(kv, "sO:pair", &query_param_name, &query_param_val)) { + derr << __func__ << " dict item " << i << " not a size 2 tuple" << dendl; + Py_RETURN_NONE; + } + if (query_param_name == NAME_KEY_DESCRIPTOR) { + if (!PyList_Check(query_param_val)) { + derr << __func__ << " " << query_param_name << " not a list" << dendl; + Py_RETURN_NONE; + } + for (int j = 0; j < PyList_Size(query_param_val); j++) { + PyObject *sub_key = PyList_GET_ITEM(query_param_val, j); + if (!PyDict_Check(sub_key)) { + derr << __func__ << " query " << query_param_name << " item " << j + << " not a dict" << dendl; + Py_RETURN_NONE; + } + MDSPerfMetricSubKeyDescriptor d; + PyObject *sub_key_params = PyDict_Items(sub_key); + for (int k = 0; k < PyList_Size(sub_key_params); ++k) { + PyObject *pair = PyList_GET_ITEM(sub_key_params, k); + if (!PyTuple_Check(pair)) { + derr << __func__ << " query " << query_param_name << " item " << j + << " pair " << k << " not a tuple" << dendl; + Py_RETURN_NONE; + } + char *param_name = nullptr; + PyObject *param_value = nullptr; + if (!PyArg_ParseTuple(pair, "sO:pair", ¶m_name, ¶m_value)) { + derr << __func__ << " query " << query_param_name << " item " << j + << " pair " << k << " not a size 2 tuple" << dendl; + Py_RETURN_NONE; + } + if (param_name == NAME_SUB_KEY_TYPE) { + if (!PyUnicode_Check(param_value)) { + derr << __func__ << " query " << query_param_name << " item " << j + << " contains invalid param " << param_name << dendl; + Py_RETURN_NONE; + } + auto type = PyUnicode_AsUTF8(param_value); + auto it = sub_key_types.find(type); + if (it == sub_key_types.end()) { + derr << __func__ << " query " << query_param_name << " item " << j + << " contains invalid type " << dendl; + Py_RETURN_NONE; + } + d.type = it->second; + } else if (param_name == NAME_SUB_KEY_REGEX) { + if (!PyUnicode_Check(param_value)) { + derr << __func__ << " query " << query_param_name << " item " << j + << " contains invalid param " << param_name << dendl; + Py_RETURN_NONE; + } + d.regex_str = PyUnicode_AsUTF8(param_value); + try { + d.regex = d.regex_str.c_str(); + } catch (const std::regex_error& e) { + derr << __func__ << " query " << query_param_name << " item " << j + << " contains invalid regex " << d.regex_str << dendl; + Py_RETURN_NONE; + } + if (d.regex.mark_count() == 0) { + derr << __func__ << " query " << query_param_name << " item " << j + << " regex " << d.regex_str << ": no capturing groups" + << dendl; + Py_RETURN_NONE; + } + } else { + derr << __func__ << " query " << query_param_name << " item " << j + << " contains invalid param " << param_name << dendl; + Py_RETURN_NONE; + } + } + if (d.type == static_cast(-1) || + d.regex_str.empty()) { + derr << __func__ << " query " << query_param_name << " item " << i + << " invalid" << dendl; + Py_RETURN_NONE; + } + query.key_descriptor.push_back(d); + } + } else if (query_param_name == NAME_COUNTERS_DESCRIPTORS) { + if (!PyList_Check(query_param_val)) { + derr << __func__ << " " << query_param_name << " not a list" << dendl; + Py_RETURN_NONE; + } + for (int j = 0; j < PyList_Size(query_param_val); j++) { + PyObject *py_type = PyList_GET_ITEM(query_param_val, j); + if (!PyUnicode_Check(py_type)) { + derr << __func__ << " query " << query_param_name << " item " << j + << " not a string" << dendl; + Py_RETURN_NONE; + } + auto type = PyUnicode_AsUTF8(py_type); + auto it = counter_types.find(type); + if (it == counter_types.end()) { + derr << __func__ << " query " << query_param_name << " item " << type + << " is not valid type" << dendl; + Py_RETURN_NONE; + } + query.performance_counter_descriptors.push_back(it->second); + } + } else if (query_param_name == NAME_LIMIT) { + if (!PyDict_Check(query_param_val)) { + derr << __func__ << " query " << query_param_name << " not a dict" + << dendl; + Py_RETURN_NONE; + } + + limit = MDSPerfMetricLimit(); + PyObject *limit_params = PyDict_Items(query_param_val); + + for (int j = 0; j < PyList_Size(limit_params); ++j) { + PyObject *kv = PyList_GET_ITEM(limit_params, j); + char *limit_param_name = nullptr; + PyObject *limit_param_val = nullptr; + if (!PyArg_ParseTuple(kv, "sO:pair", &limit_param_name, + &limit_param_val)) { + derr << __func__ << " limit item " << j << " not a size 2 tuple" + << dendl; + Py_RETURN_NONE; + } + + if (limit_param_name == NAME_LIMIT_ORDER_BY) { + if (!PyUnicode_Check(limit_param_val)) { + derr << __func__ << " " << limit_param_name << " not a string" + << dendl; + Py_RETURN_NONE; + } + auto order_by = PyUnicode_AsUTF8(limit_param_val); + auto it = counter_types.find(order_by); + if (it == counter_types.end()) { + derr << __func__ << " limit " << limit_param_name + << " not a valid counter type" << dendl; + Py_RETURN_NONE; + } + limit->order_by = it->second; + } else if (limit_param_name == NAME_LIMIT_MAX_COUNT) { +#if PY_MAJOR_VERSION <= 2 + if (!PyInt_Check(limit_param_val) && !PyLong_Check(limit_param_val)) { +#else + if (!PyLong_Check(limit_param_val)) { +#endif + derr << __func__ << " " << limit_param_name << " not an int" + << dendl; + Py_RETURN_NONE; + } + limit->max_count = PyLong_AsLong(limit_param_val); + } else { + derr << __func__ << " unknown limit param: " << limit_param_name + << dendl; + Py_RETURN_NONE; + } + } + } else { + derr << __func__ << " unknown query param: " << query_param_name << dendl; + Py_RETURN_NONE; + } + } + + if (query.key_descriptor.empty()) { + derr << __func__ << " invalid query" << dendl; + Py_RETURN_NONE; + } + + if (limit) { + auto &ds = query.performance_counter_descriptors; + if (std::find(ds.begin(), ds.end(), limit->order_by) == ds.end()) { + derr << __func__ << " limit order_by " << limit->order_by + << " not in performance_counter_descriptors" << dendl; + Py_RETURN_NONE; + } + } + + auto query_id = self->py_modules->add_mds_perf_query(query, limit); + return PyLong_FromLong(query_id); +} + +static PyObject* +ceph_remove_mds_perf_query(BaseMgrModule *self, PyObject *args) +{ + MetricQueryID query_id; + if (!PyArg_ParseTuple(args, "i:ceph_remove_mds_perf_query", &query_id)) { + derr << "Invalid args!" << dendl; + return nullptr; + } + + self->py_modules->remove_mds_perf_query(query_id); + Py_RETURN_NONE; +} + +static PyObject* +ceph_get_mds_perf_counters(BaseMgrModule *self, PyObject *args) +{ + MetricQueryID query_id; + if (!PyArg_ParseTuple(args, "i:ceph_get_mds_perf_counters", &query_id)) { + derr << "Invalid args!" << dendl; + return nullptr; + } + + return self->py_modules->get_mds_perf_counters(query_id); +} + static PyObject* ceph_is_authorized(BaseMgrModule *self, PyObject *args) { @@ -1181,6 +1437,15 @@ PyMethodDef BaseMgrModule_methods[] = { {"_ceph_get_osd_perf_counters", (PyCFunction)ceph_get_osd_perf_counters, METH_VARARGS, "Get osd perf counters"}, + {"_ceph_add_mds_perf_query", (PyCFunction)ceph_add_mds_perf_query, + METH_VARARGS, "Add an osd perf query"}, + + {"_ceph_remove_mds_perf_query", (PyCFunction)ceph_remove_mds_perf_query, + METH_VARARGS, "Remove an osd perf query"}, + + {"_ceph_get_mds_perf_counters", (PyCFunction)ceph_get_mds_perf_counters, + METH_VARARGS, "Get osd perf counters"}, + {"_ceph_is_authorized", (PyCFunction)ceph_is_authorized, METH_VARARGS, "Verify the current session caps are valid"}, diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index 87c293c924a..91e6c3759b9 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -2937,9 +2937,24 @@ int DaemonServer::remove_osd_perf_query(MetricQueryID query_id) return osd_perf_metric_collector.remove_query(query_id); } -int DaemonServer::get_osd_perf_counters( - MetricQueryID query_id, - std::map *counters) +int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector) { - return osd_perf_metric_collector.get_counters(query_id, counters); + return osd_perf_metric_collector.get_counters(collector); +} + +MetricQueryID DaemonServer::add_mds_perf_query( + const MDSPerfMetricQuery &query, + const std::optional &limit) +{ + return mds_perf_metric_collector.add_query(query, limit); +} + +int DaemonServer::remove_mds_perf_query(MetricQueryID query_id) +{ + return mds_perf_metric_collector.remove_query(query_id); +} + +int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector) +{ + return mds_perf_metric_collector.get_counters(collector); } diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h index 1e8bef75de7..7c5af91d1c0 100644 --- a/src/mgr/DaemonServer.h +++ b/src/mgr/DaemonServer.h @@ -207,8 +207,12 @@ public: const OSDPerfMetricQuery &query, const std::optional &limit); int remove_osd_perf_query(MetricQueryID query_id); - int get_osd_perf_counters(MetricQueryID query_id, - std::map *c); + int get_osd_perf_counters(OSDPerfCollector *collector); + + MetricQueryID add_mds_perf_query(const MDSPerfMetricQuery &query, + const std::optional &limit); + int remove_mds_perf_query(MetricQueryID query_id); + int get_mds_perf_counters(MDSPerfCollector *collector); virtual const char** get_tracked_conf_keys() const override; virtual void handle_conf_change(const ConfigProxy& conf, diff --git a/src/mgr/MDSPerfMetricCollector.cc b/src/mgr/MDSPerfMetricCollector.cc index 7d08f3431d4..74404a89d51 100644 --- a/src/mgr/MDSPerfMetricCollector.cc +++ b/src/mgr/MDSPerfMetricCollector.cc @@ -34,3 +34,23 @@ void MDSPerfMetricCollector::process_reports(const MetricPayload &payload) { delayed_ranks = metric_report.rank_metrics_delayed; dout(20) << ": delayed ranks=[" << delayed_ranks << "]" << dendl; } + +int MDSPerfMetricCollector::get_counters(PerfCollector *collector) { + MDSPerfCollector *c = static_cast(collector); + + std::lock_guard locker(lock); + + int r = get_counters_generic(c->query_id, &c->counters); + if (r != 0) { + return r; + } + + get_delayed_ranks(&c->delayed_ranks); + + return r; +} + +void MDSPerfMetricCollector::get_delayed_ranks(std::set *ranks) { + ceph_assert(ceph_mutex_is_locked(lock)); + *ranks = delayed_ranks; +} diff --git a/src/mgr/MDSPerfMetricCollector.h b/src/mgr/MDSPerfMetricCollector.h index 777e4c8fc8a..c6b379ec5b7 100644 --- a/src/mgr/MDSPerfMetricCollector.h +++ b/src/mgr/MDSPerfMetricCollector.h @@ -13,10 +13,14 @@ class MDSPerfMetricCollector MDSPerfMetrics> { private: std::set delayed_ranks; + + void get_delayed_ranks(std::set *ranks); + public: MDSPerfMetricCollector(MetricListener &listener); void process_reports(const MetricPayload &payload) override; + int get_counters(PerfCollector *collector) override; }; #endif // CEPH_MGR_MDS_PERF_COLLECTOR_H diff --git a/src/mgr/MDSPerfMetricTypes.h b/src/mgr/MDSPerfMetricTypes.h index 2111439b09b..1568d9c68c4 100644 --- a/src/mgr/MDSPerfMetricTypes.h +++ b/src/mgr/MDSPerfMetricTypes.h @@ -302,6 +302,15 @@ WRITE_CLASS_DENC(MDSPerfMetricQuery) std::ostream &operator<<(std::ostream &os, const MDSPerfMetricQuery &query); +struct MDSPerfCollector : PerfCollector { + std::map counters; + std::set delayed_ranks; + + MDSPerfCollector(MetricQueryID query_id) + : PerfCollector(query_id) { + } +}; + struct MDSPerfMetrics { MDSPerformanceCounterDescriptors performance_counter_descriptors; std::map group_packed_performance_counters; diff --git a/src/mgr/MetricCollector.cc b/src/mgr/MetricCollector.cc index 836662a165d..7da0dae7fe4 100644 --- a/src/mgr/MetricCollector.cc +++ b/src/mgr/MetricCollector.cc @@ -115,11 +115,10 @@ void MetricCollector::remove_all_queries() { } template -int MetricCollector::get_counters( +int MetricCollector::get_counters_generic( MetricQueryID query_id, std::map *c) { dout(20) << dendl; - - std::lock_guard locker(lock); + ceph_assert(ceph_mutex_is_locked(lock)); auto it = counters.find(query_id); if (it == counters.end()) { diff --git a/src/mgr/MetricCollector.h b/src/mgr/MetricCollector.h index f74787e96a9..19a3eed9b0a 100644 --- a/src/mgr/MetricCollector.h +++ b/src/mgr/MetricCollector.h @@ -34,8 +34,6 @@ public: void remove_all_queries(); - int get_counters(MetricQueryID query_id, std::map *counters); - std::map get_queries() const { std::lock_guard locker(lock); @@ -53,6 +51,7 @@ public: } virtual void process_reports(const MetricPayload &payload) = 0; + virtual int get_counters(PerfCollector *collector) = 0; protected: typedef std::optional OptionalLimit; @@ -67,6 +66,7 @@ protected: Counters counters; void process_reports_generic(const std::map &reports, UpdateCallback callback); + int get_counters_generic(MetricQueryID query_id, std::map *counters); private: MetricListener &listener; diff --git a/src/mgr/OSDPerfMetricCollector.cc b/src/mgr/OSDPerfMetricCollector.cc index e1acff2e801..eb548ce70c4 100644 --- a/src/mgr/OSDPerfMetricCollector.cc +++ b/src/mgr/OSDPerfMetricCollector.cc @@ -30,3 +30,10 @@ void OSDPerfMetricCollector::process_reports(const MetricPayload &payload) { counter->second += update.second; }); } + +int OSDPerfMetricCollector::get_counters(PerfCollector *collector) { + OSDPerfCollector *c = static_cast(collector); + + std::lock_guard locker(lock); + return get_counters_generic(c->query_id, &c->counters); +} diff --git a/src/mgr/OSDPerfMetricCollector.h b/src/mgr/OSDPerfMetricCollector.h index f45a89c8be2..c531dbf6303 100644 --- a/src/mgr/OSDPerfMetricCollector.h +++ b/src/mgr/OSDPerfMetricCollector.h @@ -17,6 +17,7 @@ public: OSDPerfMetricCollector(MetricListener &listener); void process_reports(const MetricPayload &payload) override; + int get_counters(PerfCollector *collector) override; }; #endif // OSD_PERF_METRIC_COLLECTOR_H_ diff --git a/src/mgr/OSDPerfMetricTypes.h b/src/mgr/OSDPerfMetricTypes.h index 63e8a2b44a4..1b5904e13ae 100644 --- a/src/mgr/OSDPerfMetricTypes.h +++ b/src/mgr/OSDPerfMetricTypes.h @@ -333,6 +333,14 @@ struct OSDPerfMetricQuery { }; WRITE_CLASS_DENC(OSDPerfMetricQuery) +struct OSDPerfCollector : PerfCollector { + std::map counters; + + OSDPerfCollector(MetricQueryID query_id) + : PerfCollector(query_id) { + } +}; + std::ostream& operator<<(std::ostream& os, const OSDPerfMetricQuery &query); struct OSDPerfMetricReport { diff --git a/src/mgr/Types.h b/src/mgr/Types.h index 30810de65c7..ab90bbbe9ac 100644 --- a/src/mgr/Types.h +++ b/src/mgr/Types.h @@ -16,4 +16,11 @@ struct MetricListener { virtual void handle_query_updated() = 0; }; +struct PerfCollector { + MetricQueryID query_id; + PerfCollector(MetricQueryID query_id) + : query_id(query_id) { + } +}; + #endif // CEPH_MGR_TYPES_H diff --git a/src/pybind/mgr/ceph_module.pyi b/src/pybind/mgr/ceph_module.pyi index d71259c0dad..907132a5f3f 100644 --- a/src/pybind/mgr/ceph_module.pyi +++ b/src/pybind/mgr/ceph_module.pyi @@ -68,6 +68,9 @@ class BaseMgrModule(object): def _ceph_add_osd_perf_query(self, query):... def _ceph_remove_osd_perf_query(self, query_id):... def _ceph_get_osd_perf_counters(self, query_id):... + def _ceph_add_mds_perf_query(self, query):... + def _ceph_remove_mds_perf_query(self, query_id):... + def _ceph_get_mds_perf_counters(self, query_id):... def _ceph_unregister_client(self, addrs):... def _ceph_register_client(self, addrs):... def _ceph_is_authorized(self, arguments):... diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index c30e5638c4f..607019f7ac1 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1603,6 +1603,51 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_osd_perf_counters(query_id) + def add_mds_perf_query(self, query): + """ + Register an MDS perf query. Argument is a + dict of the query parameters, in this form: + + :: + + { + 'key_descriptor': [ + {'type': subkey_type, 'regex': regex_pattern}, + ... + ], + 'performance_counter_descriptors': [ + list, of, descriptor, types + ], + } + + NOTE: 'limit' and 'order_by' are not supported (yet). + + Valid subkey types: + 'mds_rank', 'client_id' + Valid performance counter types: + 'cap_hit_metric' + + :param object query: query + :rtype: int (query id) + """ + return self._ceph_add_mds_perf_query(query) + + def remove_mds_perf_query(self, query_id): + """ + Unregister an MDS perf query. + + :param int query_id: query ID + """ + return self._ceph_remove_mds_perf_query(query_id) + + def get_mds_perf_counters(self, query_id): + """ + Get stats collected for an MDS perf query. + + :param int query_id: query ID + """ + return self._ceph_get_mds_perf_counters(query_id) + def is_authorized(self, arguments): """ Verifies that the current session caps permit executing the py service diff --git a/src/pybind/mgr/stats/__init__.py b/src/pybind/mgr/stats/__init__.py new file mode 100644 index 00000000000..8f210ac9247 --- /dev/null +++ b/src/pybind/mgr/stats/__init__.py @@ -0,0 +1 @@ +from .module import Module diff --git a/src/pybind/mgr/stats/fs/__init__.py b/src/pybind/mgr/stats/fs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/pybind/mgr/stats/fs/perf_stats.py b/src/pybind/mgr/stats/fs/perf_stats.py new file mode 100644 index 00000000000..a70e5f708a9 --- /dev/null +++ b/src/pybind/mgr/stats/fs/perf_stats.py @@ -0,0 +1,444 @@ +import re +import json +import time +import uuid +import errno +import traceback +from collections import OrderedDict +from typing import List, Dict, Set + +from mgr_module import CommandResult + +from datetime import datetime, timedelta +from threading import Lock, Condition, Thread + +QUERY_IDS = "query_ids" +GLOBAL_QUERY_ID = "global_query_id" +QUERY_LAST_REQUEST = "last_time_stamp" +QUERY_RAW_COUNTERS = "query_raw_counters" +QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global" + +MDS_RANK_ALL = (-1,) +CLIENT_ID_ALL = "\d*" +CLIENT_IP_ALL = ".*" + +MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$' +MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*' +MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0, + 'read_latency': 1, + 'write_latency': 2, + 'metadata_latency': 3}) +MDS_PERF_QUERY_COUNTERS = [] # type: List[str] +MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency'] # type: List[str] + +QUERY_EXPIRE_INTERVAL = timedelta(minutes=1) + +CLIENT_METADATA_KEY = "client_metadata" +CLIENT_METADATA_SUBKEYS = ["hostname", "root"] +CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"] + +NON_EXISTENT_KEY_STR = "N/A" + +class FilterSpec(object): + """ + query filters encapsulated and used as key for query map + """ + def __init__(self, mds_ranks, client_id, client_ip): + self.mds_ranks = mds_ranks + self.client_id = client_id + self.client_ip = client_ip + + def __hash__(self): + return hash((self.mds_ranks, self.client_id, self.client_ip)) + + def __eq__(self, other): + return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip) + + def __ne__(self, other): + return not(self == other) + +def extract_mds_ranks_from_spec(mds_rank_spec): + if not mds_rank_spec: + return MDS_RANK_ALL + match = re.match(r'^(\d[,\d]*)$', mds_rank_spec) + if not match: + raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec)) + return tuple(int(mds_rank) for mds_rank in match.group(0).split(',')) + +def extract_client_id_from_spec(client_id_spec): + if not client_id_spec: + return CLIENT_ID_ALL + # the client id is the spec itself since it'll be a part + # of client filter regex. + return client_id_spec + +def extract_client_ip_from_spec(client_ip_spec): + if not client_ip_spec: + return CLIENT_IP_ALL + # TODO: validate if it is an ip address (or a subset of it). + # the client ip is the spec itself since it'll be a part + # of client filter regex. + return client_ip_spec + +def extract_mds_ranks_from_report(mds_ranks_str): + if not mds_ranks_str: + return [] + return [int(x) for x in mds_ranks_str.split(',')] + +def extract_client_id_and_ip(client): + match = re.match(r'^(client\.\d+)\s(.*)', client) + if match: + return match.group(1), match.group(2) + return None, None + +class FSPerfStats(object): + lock = Lock() + q_cv = Condition(lock) + r_cv = Condition(lock) + + user_queries = {} # type: Dict[str, Dict] + + meta_lock = Lock() + client_metadata = { + 'metadata' : {}, + 'to_purge' : set(), + 'in_progress' : {}, + } # type: Dict + + def __init__(self, module): + self.module = module + self.log = module.log + # report processor thread + self.report_processor = Thread(target=self.run) + self.report_processor.start() + + def set_client_metadata(self, client_id, key, meta): + result = self.client_metadata['metadata'].setdefault(client_id, {}) + if not key in result or not result[key] == meta: + result[key] = meta + + def notify(self, cmdtag): + self.log.debug("cmdtag={0}".format(cmdtag)) + with self.meta_lock: + result = self.client_metadata['in_progress'].pop(cmdtag) + client_meta = result[1].wait() + if client_meta[0] != 0: + self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format( + result[0], client_meta[2])) + return + self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1]))) + for metadata in json.loads(client_meta[1]): + client_id = "client.{0}".format(metadata['id']) + result = self.client_metadata['metadata'].setdefault(client_id, {}) + for subkey in CLIENT_METADATA_SUBKEYS: + self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey]) + for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL: + self.set_client_metadata(client_id, subkey, + metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR)) + metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16) + supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)] + self.set_client_metadata(client_id, "valid_metrics", supported_metrics) + # when all async requests are done, purge clients metadata if any. + if not self.client_metadata['in_progress']: + for client in self.client_metadata['to_purge']: + try: + self.log.info("purge client metadata for {0}".format(client)) + self.client_metadata['metadata'].remove(client) + except: + pass + self.client_metadata['to_purge'].clear() + self.log.debug("client_metadata={0}, to_purge={1}".format( + self.client_metadata['metadata'], self.client_metadata['to_purge'])) + + def update_client_meta(self, rank_set): + new_updates = {} + pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()] + with self.meta_lock: + for rank in rank_set: + if rank in pending_updates: + continue + tag = str(uuid.uuid4()) + result = CommandResult(tag) + new_updates[tag] = (rank, result) + self.client_metadata['in_progress'].update(new_updates) + + self.log.debug("updating client metadata from {0}".format(new_updates)) + + cmd_dict = {'prefix': 'client ls'} + for tag,val in new_updates.items(): + self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag) + + def run(self): + try: + self.log.info("FSPerfStats::report_processor starting...") + while True: + with self.lock: + self.scrub_expired_queries() + self.process_mds_reports() + self.r_cv.notify() + + stats_period = int(self.module.get_ceph_option("mgr_stats_period")) + self.q_cv.wait(stats_period) + self.log.debug("FSPerfStats::tick") + except Exception as e: + self.log.fatal("fatal error: {}".format(traceback.format_exc())) + + def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients): + # this is pretty straight forward -- find what MDSs are missing from + # what is tracked vs what we received in incoming report and purge + # the whole bunch. + tracked_ranks = raw_perf_counters.keys() + available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics] + for rank in set(tracked_ranks) - set(available_ranks): + culled = raw_perf_counters.pop(rank) + self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format( + len(culled[1]), rank, "yes" if culled[0] else "no")) + missing_clients.update(list(culled[1].keys())) + + def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients): + # this is a bit more involed -- for each rank figure out what clients + # are missing in incoming report and purge them from our tracked map. + # but, if this is invoked _after_ cull_mds_entries(), the rank set + # is same, so we can loop based on that assumption. + ranks = raw_perf_counters.keys() + for rank in ranks: + tracked_clients = raw_perf_counters[rank][1].keys() + available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics] + for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]): + raw_perf_counters[rank][1].pop(client) + self.log.info("culled {0} from rank {1}".format(client, rank)) + missing_clients.add(client) + + def cull_missing_entries(self, raw_perf_counters, incoming_metrics): + missing_clients = set() # type: Set[str] + self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients) + self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients) + + self.log.debug("missing_clients={0}".format(missing_clients)) + with self.meta_lock: + if self.client_metadata['in_progress']: + self.client_metadata['to_purge'].update(missing_clients) + self.log.info("deferring client metadata purge (now {0} client(s))".format( + len(self.client_metadata['to_purge']))) + else: + for client in missing_clients: + try: + self.log.info("purge client metadata for {0}".format(client)) + self.client_metadata['metadata'].pop(client) + except KeyError: + pass + self.log.debug("client_metadata={0}".format(self.client_metadata['metadata'])) + + def cull_global_metrics(self, raw_perf_counters, incoming_metrics): + tracked_clients = raw_perf_counters.keys() + available_clients = [counter['k'][0][0] for counter in incoming_metrics] + for client in set(tracked_clients) - set(available_clients): + raw_perf_counters.pop(client) + + def get_raw_perf_counters(self, query): + raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {}) + + for query_id in query[QUERY_IDS]: + result = self.module.get_mds_perf_counters(query_id) + self.log.debug("raw_perf_counters={}".format(raw_perf_counters)) + self.log.debug("get_raw_perf_counters={}".format(result)) + + # extract passed in delayed ranks. metrics for delayed ranks are tagged + # as stale. + delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0]) + + # what's received from MDS + incoming_metrics = result['metrics'][1] + + # cull missing MDSs and clients + self.cull_missing_entries(raw_perf_counters, incoming_metrics) + + # iterate over metrics list and update our copy (note that we have + # already culled the differences). + meta_refresh_ranks = set() + for counter in incoming_metrics: + mds_rank = int(counter['k'][0][0]) + client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0]) + if client_id is not None or not client_ip: # client_id _could_ be 0 + with self.meta_lock: + if not client_id in self.client_metadata['metadata']: + meta_refresh_ranks.add(mds_rank) + self.set_client_metadata(client_id, "IP", client_ip) + else: + self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id)) + + raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}]) + raw_counters[0] = True if mds_rank in delayed_ranks else False + raw_client_counters = raw_counters[1].setdefault(client_id, []) + + del raw_client_counters[:] + raw_client_counters.extend(counter['c']) + # send an asynchronous client metadata refresh + self.update_client_meta(meta_refresh_ranks) + + def get_raw_perf_counters_global(self, query): + raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) + result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID]) + + self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters)) + self.log.debug("get_raw_perf_counters_global={}".format(result)) + + global_metrics = result['metrics'][1] + self.cull_global_metrics(raw_perf_counters, global_metrics) + for counter in global_metrics: + client_id, _ = extract_client_id_and_ip(counter['k'][0][0]) + raw_client_counters = raw_perf_counters.setdefault(client_id, []) + del raw_client_counters[:] + raw_client_counters.extend(counter['c']) + + def process_mds_reports(self): + for query in self.user_queries.values(): + self.get_raw_perf_counters(query) + self.get_raw_perf_counters_global(query) + + def scrub_expired_queries(self): + expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL + for filter_spec in list(self.user_queries.keys()): + user_query = self.user_queries[filter_spec] + self.log.debug("scrubbing query={}".format(user_query)) + if user_query[QUERY_LAST_REQUEST] < expire_time: + expired_query_ids = user_query[QUERY_IDS].copy() + expired_query_ids.append(user_query[GLOBAL_QUERY_ID]) + self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids)) + self.unregister_mds_perf_queries(filter_spec, expired_query_ids) + del self.user_queries[filter_spec] + + def prepare_mds_perf_query(self, rank, client_id, client_ip): + mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS + if not rank == -1: + mds_rank_regex = '^({})$'.format(rank) + client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) + return { + 'key_descriptor' : [ + {'type' : 'mds_rank', 'regex' : mds_rank_regex}, + {'type' : 'client_id', 'regex' : client_regex}, + ], + 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS, + } + + def prepare_global_perf_query(self, client_id, client_ip): + client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) + return { + 'key_descriptor' : [ + {'type' : 'client_id', 'regex' : client_regex}, + ], + 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS, + } + + def unregister_mds_perf_queries(self, filter_spec, query_ids): + self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format( + filter_spec, query_ids)) + for query_id in query_ids: + self.module.remove_mds_perf_query(query_id) + + def register_mds_perf_query(self, filter_spec): + mds_ranks = filter_spec.mds_ranks + client_id = filter_spec.client_id + client_ip = filter_spec.client_ip + + query_ids = [] + try: + # register per-mds perf query + for rank in mds_ranks: + query = self.prepare_mds_perf_query(rank, client_id, client_ip) + self.log.info("register_mds_perf_query: {}".format(query)) + + query_id = self.module.add_mds_perf_query(query) + if query_id is None: # query id can be 0 + raise RuntimeError("failed to add MDS perf query: {}".format(query)) + query_ids.append(query_id) + except Exception: + for query_id in query_ids: + self.module.remove_mds_perf_query(query_id) + raise + return query_ids + + def register_global_perf_query(self, filter_spec): + client_id = filter_spec.client_id + client_ip = filter_spec.client_ip + + # register a global perf query for metrics + query = self.prepare_global_perf_query(client_id, client_ip) + self.log.info("register_global_perf_query: {}".format(query)) + + query_id = self.module.add_mds_perf_query(query) + if query_id is None: # query id can be 0 + raise RuntimeError("failed to add global perf query: {}".format(query)) + return query_id + + def register_query(self, filter_spec): + user_query = self.user_queries.get(filter_spec, None) + if not user_query: + user_query = { + QUERY_IDS : self.register_mds_perf_query(filter_spec), + GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec), + QUERY_LAST_REQUEST : datetime.now(), + } + self.user_queries[filter_spec] = user_query + + self.q_cv.notify() + self.r_cv.wait(5) + else: + user_query[QUERY_LAST_REQUEST] = datetime.now() + return user_query + + def generate_report(self, user_query): + result = {} # type: Dict + # start with counter info -- metrics that are global and per mds + result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS + result["counters"] = MDS_PERF_QUERY_COUNTERS + + # fill in client metadata + raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) + with self.meta_lock: + result_meta = result.setdefault("client_metadata", {}) + for client_id in raw_perfs.keys(): + if client_id in self.client_metadata["metadata"]: + client_meta = result_meta.setdefault(client_id, {}) + client_meta.update(self.client_metadata["metadata"][client_id]) + + # start populating global perf metrics w/ client metadata + metrics = result.setdefault("global_metrics", {}) + for client_id, counters in raw_perfs.items(): + global_client_metrics = metrics.setdefault(client_id, []) + del global_client_metrics[:] + global_client_metrics.extend(counters) + + # and, now per-mds metrics keyed by mds rank along with delayed ranks + raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {}) + metrics = result.setdefault("metrics", {}) + + metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]] + for rank, counters in raw_perfs.items(): + mds_key = "mds.{}".format(rank) + mds_metrics = metrics.setdefault(mds_key, {}) + mds_metrics.update(counters[1]) + return result + + def extract_query_filters(self, cmd): + mds_rank_spec = cmd.get('mds_rank', None) + client_id_spec = cmd.get('client_id', None) + client_ip_spec = cmd.get('client_ip', None) + + self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format( + mds_rank_spec, client_id_spec, client_ip_spec)) + + mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec) + client_id = extract_client_id_from_spec(client_id_spec) + client_ip = extract_client_ip_from_spec(client_ip_spec) + + return FilterSpec(mds_ranks, client_id, client_ip) + + def get_perf_data(self, cmd): + filter_spec = self.extract_query_filters(cmd) + + counters = {} + with self.lock: + user_query = self.register_query(filter_spec) + result = self.generate_report(user_query) + return 0, json.dumps(result), "" diff --git a/src/pybind/mgr/stats/module.py b/src/pybind/mgr/stats/module.py new file mode 100644 index 00000000000..ecacd180d53 --- /dev/null +++ b/src/pybind/mgr/stats/module.py @@ -0,0 +1,38 @@ +""" +performance stats for ceph filesystem (for now...) +""" + +import json +from typing import List, Dict + +from mgr_module import MgrModule + +from .fs.perf_stats import FSPerfStats + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "fs perf stats " + "name=mds_rank,type=CephString,req=false " + "name=client_id,type=CephString,req=false " + "name=client_ip,type=CephString,req=false ", + "desc": "retrieve ceph fs performance stats", + "perm": "r" + }, + ] + MODULE_OPTIONS = [] # type: List[Dict] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.fs_perf_stats = FSPerfStats(self) + + def notify(self, notify_type, notify_id): + if notify_type == "command": + self.fs_perf_stats.notify(notify_id) + + def handle_command(self, inbuf, cmd): + prefix = cmd['prefix'] + # only supported command is `fs perf stats` right now + if prefix.startswith('fs perf stats'): + return self.fs_perf_stats.get_perf_data(cmd) + raise NotImplementedError(cmd['prefix']) diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 7ca38365bbe..77c7559a75c 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -59,6 +59,7 @@ commands = progress/module.py \ rook/module.py \ snap_schedule/module.py \ + stats/module.py \ test_orchestrator/module.py \ mds_autoscaler/module.py \ volumes/__init__.py