]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/stats: mds performance stats module
authorVenky Shankar <vshankar@redhat.com>
Mon, 26 Aug 2019 09:32:23 +0000 (05:32 -0400)
committerVenky Shankar <vshankar@redhat.com>
Mon, 12 Oct 2020 11:34:51 +0000 (07:34 -0400)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
23 files changed:
ceph.spec.in
debian/ceph-mgr-modules-core.install
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/BaseMgrModule.cc
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/MDSPerfMetricCollector.cc
src/mgr/MDSPerfMetricCollector.h
src/mgr/MDSPerfMetricTypes.h
src/mgr/MetricCollector.cc
src/mgr/MetricCollector.h
src/mgr/OSDPerfMetricCollector.cc
src/mgr/OSDPerfMetricCollector.h
src/mgr/OSDPerfMetricTypes.h
src/mgr/Types.h
src/pybind/mgr/ceph_module.pyi
src/pybind/mgr/mgr_module.py
src/pybind/mgr/stats/__init__.py [new file with mode: 0644]
src/pybind/mgr/stats/fs/__init__.py [new file with mode: 0644]
src/pybind/mgr/stats/fs/perf_stats.py [new file with mode: 0644]
src/pybind/mgr/stats/module.py [new file with mode: 0644]
src/pybind/mgr/tox.ini

index a7f62b9c8bda38318fa5ae2030c5c50f23cd9fd7..838fd7b770202739baeacffa9d132c8f540cf0fe 100644 (file)
@@ -1673,6 +1673,7 @@ fi
 %{_datadir}/ceph/mgr/restful
 %{_datadir}/ceph/mgr/selftest
 %{_datadir}/ceph/mgr/snap_schedule
+%{_datadir}/ceph/mgr/stats
 %{_datadir}/ceph/mgr/status
 %{_datadir}/ceph/mgr/telegraf
 %{_datadir}/ceph/mgr/telemetry
index f0f18065bf687db338efdb2e0b3242f4c2af3393..a1a74c0af2d1f9a052557c769d2090e303317846 100644 (file)
@@ -16,6 +16,7 @@ usr/share/ceph/mgr/rbd_support
 usr/share/ceph/mgr/restful
 usr/share/ceph/mgr/selftest
 usr/share/ceph/mgr/snap_schedule
+usr/share/ceph/mgr/stats
 usr/share/ceph/mgr/status
 usr/share/ceph/mgr/telegraf
 usr/share/ceph/mgr/telemetry
index a7fc09407673c457a0f0a2c45347978f13c5a9a1..9f28f8a58aa6b9600d568d5de2253c99f18a78f0 100644 (file)
@@ -1054,9 +1054,8 @@ void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id)
 
 PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id)
 {
-  std::map<OSDPerfMetricKey, PerformanceCounters> counters;
-
-  int r = server.get_osd_perf_counters(query_id, &counters);
+  OSDPerfCollector collector(query_id);
+  int r = server.get_osd_perf_counters(&collector);
   if (r < 0) {
     dout(0) << "get_osd_perf_counters for query_id=" << query_id << " failed: "
             << cpp_strerror(r) << dendl;
@@ -1064,11 +1063,72 @@ PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id)
   }
 
   PyFormatter f;
+  const std::map<OSDPerfMetricKey, PerformanceCounters> &counters = collector.counters;
+
+  f.open_array_section("counters");
+  for (auto &[key, instance_counters] : counters) {
+    f.open_object_section("i");
+    f.open_array_section("k");
+    for (auto &sub_key : key) {
+      f.open_array_section("s");
+      for (size_t i = 0; i < sub_key.size(); i++) {
+        f.dump_string(stringify(i).c_str(), sub_key[i]);
+      }
+      f.close_section(); // s
+    }
+    f.close_section(); // k
+    f.open_array_section("c");
+    for (auto &c : instance_counters) {
+      f.open_array_section("p");
+      f.dump_unsigned("0", c.first);
+      f.dump_unsigned("1", c.second);
+      f.close_section(); // p
+    }
+    f.close_section(); // c
+    f.close_section(); // i
+  }
+  f.close_section(); // counters
+
+  return f.get();
+}
+
+MetricQueryID ActivePyModules::add_mds_perf_query(
+    const MDSPerfMetricQuery &query,
+    const std::optional<MDSPerfMetricLimit> &limit)
+{
+  return server.add_mds_perf_query(query, limit);
+}
+
+void ActivePyModules::remove_mds_perf_query(MetricQueryID query_id)
+{
+  int r = server.remove_mds_perf_query(query_id);
+  if (r < 0) {
+    dout(0) << "remove_mds_perf_query for query_id=" << query_id << " failed: "
+            << cpp_strerror(r) << dendl;
+  }
+}
+
+PyObject *ActivePyModules::get_mds_perf_counters(MetricQueryID query_id)
+{
+  MDSPerfCollector collector(query_id);
+  int r = server.get_mds_perf_counters(&collector);
+  if (r < 0) {
+    dout(0) << "get_mds_perf_counters for query_id=" << query_id << " failed: "
+            << cpp_strerror(r) << dendl;
+    Py_RETURN_NONE;
+  }
+
+  PyFormatter f;
+  const std::map<MDSPerfMetricKey, PerformanceCounters> &counters = collector.counters;
+
+  f.open_array_section("metrics");
+
+  f.open_array_section("delayed_ranks");
+  f.dump_string("ranks", stringify(collector.delayed_ranks).c_str());
+  f.close_section(); // delayed_ranks
 
   f.open_array_section("counters");
-  for (auto &it : counters) {
-    auto &key = it.first;
-    auto  &instance_counters = it.second;
+  for (auto &[key, instance_counters] : counters) {
     f.open_object_section("i");
     f.open_array_section("k");
     for (auto &sub_key : key) {
@@ -1090,6 +1150,7 @@ PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id)
     f.close_section(); // i
   }
   f.close_section(); // counters
+  f.close_section(); // metrics
 
   return f.get();
 }
index 4892f2705fcda596095f2a527395c5e516f76fc8..50cfc163639126b473de5b75f76db32cd729eca7 100644 (file)
@@ -110,6 +110,12 @@ public:
   void remove_osd_perf_query(MetricQueryID query_id);
   PyObject *get_osd_perf_counters(MetricQueryID query_id);
 
+  MetricQueryID add_mds_perf_query(
+      const MDSPerfMetricQuery &query,
+      const std::optional<MDSPerfMetricLimit> &limit);
+  void remove_mds_perf_query(MetricQueryID query_id);
+  PyObject *get_mds_perf_counters(MetricQueryID query_id);
+
   bool get_store(const std::string &module_name,
       const std::string &key, std::string *val) const;
   PyObject *get_store_prefix(const std::string &module_name,
index b62b681d641739851acfcb3bc71b9ab566055f33..84ceba7a86aa7346a0cf58a706102e53ece73015 100644 (file)
@@ -1023,6 +1023,262 @@ ceph_get_osd_perf_counters(BaseMgrModule *self, PyObject *args)
   return self->py_modules->get_osd_perf_counters(query_id);
 }
 
+// MDS perf query interface -- mostly follows ceph_add_osd_perf_query()
+// style
+
+static PyObject*
+ceph_add_mds_perf_query(BaseMgrModule *self, PyObject *args)
+{
+  static const std::string NAME_KEY_DESCRIPTOR = "key_descriptor";
+  static const std::string NAME_COUNTERS_DESCRIPTORS =
+      "performance_counter_descriptors";
+  static const std::string NAME_LIMIT = "limit";
+  static const std::string NAME_SUB_KEY_TYPE = "type";
+  static const std::string NAME_SUB_KEY_REGEX = "regex";
+  static const std::string NAME_LIMIT_ORDER_BY = "order_by";
+  static const std::string NAME_LIMIT_MAX_COUNT = "max_count";
+  static const std::map<std::string, MDSPerfMetricSubKeyType> sub_key_types = {
+    {"mds_rank", MDSPerfMetricSubKeyType::MDS_RANK},
+    {"client_id", MDSPerfMetricSubKeyType::CLIENT_ID},
+  };
+  static const std::map<std::string, MDSPerformanceCounterType> counter_types = {
+    {"cap_hit", MDSPerformanceCounterType::CAP_HIT_METRIC},
+    {"read_latency", MDSPerformanceCounterType::READ_LATENCY_METRIC},
+    {"write_latency", MDSPerformanceCounterType::WRITE_LATENCY_METRIC},
+    {"metadata_latency", MDSPerformanceCounterType::METADATA_LATENCY_METRIC},
+  };
+
+  PyObject *py_query = nullptr;
+  if (!PyArg_ParseTuple(args, "O:ceph_add_mds_perf_query", &py_query)) {
+    derr << "Invalid args!" << dendl;
+    return nullptr;
+  }
+  if (!PyDict_Check(py_query)) {
+    derr << __func__ << " arg not a dict" << dendl;
+    Py_RETURN_NONE;
+  }
+
+  PyObject *query_params = PyDict_Items(py_query);
+  MDSPerfMetricQuery query;
+  std::optional<MDSPerfMetricLimit> limit;
+
+  // {
+  //   'key_descriptor': [
+  //     {'type': subkey_type, 'regex': regex_pattern},
+  //     ...
+  //   ],
+  //   'performance_counter_descriptors': [
+  //     list, of, descriptor, types
+  //   ],
+  //   'limit': {'order_by': performance_counter_type, 'max_count': n},
+  // }
+
+  for (int i = 0; i < PyList_Size(query_params); ++i) {
+    PyObject *kv = PyList_GET_ITEM(query_params, i);
+    char *query_param_name = nullptr;
+    PyObject *query_param_val = nullptr;
+    if (!PyArg_ParseTuple(kv, "sO:pair", &query_param_name, &query_param_val)) {
+      derr << __func__ << " dict item " << i << " not a size 2 tuple" << dendl;
+      Py_RETURN_NONE;
+    }
+    if (query_param_name == NAME_KEY_DESCRIPTOR) {
+      if (!PyList_Check(query_param_val)) {
+        derr << __func__ << " " << query_param_name << " not a list" << dendl;
+        Py_RETURN_NONE;
+      }
+      for (int j = 0; j < PyList_Size(query_param_val); j++) {
+        PyObject *sub_key = PyList_GET_ITEM(query_param_val, j);
+        if (!PyDict_Check(sub_key)) {
+          derr << __func__ << " query " << query_param_name << " item " << j
+               << " not a dict" << dendl;
+          Py_RETURN_NONE;
+        }
+        MDSPerfMetricSubKeyDescriptor d;
+        PyObject *sub_key_params = PyDict_Items(sub_key);
+        for (int k = 0; k < PyList_Size(sub_key_params); ++k) {
+          PyObject *pair = PyList_GET_ITEM(sub_key_params, k);
+          if (!PyTuple_Check(pair)) {
+            derr << __func__ << " query " << query_param_name << " item " << j
+                 << " pair " << k << " not a tuple" << dendl;
+            Py_RETURN_NONE;
+          }
+          char *param_name = nullptr;
+          PyObject *param_value = nullptr;
+          if (!PyArg_ParseTuple(pair, "sO:pair", &param_name, &param_value)) {
+            derr << __func__ << " query " << query_param_name << " item " << j
+                 << " pair " << k << " not a size 2 tuple" << dendl;
+            Py_RETURN_NONE;
+          }
+          if (param_name == NAME_SUB_KEY_TYPE) {
+            if (!PyUnicode_Check(param_value)) {
+              derr << __func__ << " query " << query_param_name << " item " << j
+                   << " contains invalid param " << param_name << dendl;
+              Py_RETURN_NONE;
+            }
+            auto type = PyUnicode_AsUTF8(param_value);
+            auto it = sub_key_types.find(type);
+            if (it == sub_key_types.end()) {
+              derr << __func__ << " query " << query_param_name << " item " << j
+                   << " contains invalid type " << dendl;
+              Py_RETURN_NONE;
+            }
+            d.type = it->second;
+          } else if (param_name == NAME_SUB_KEY_REGEX) {
+            if (!PyUnicode_Check(param_value)) {
+              derr << __func__ << " query " << query_param_name << " item " << j
+                   << " contains invalid param " << param_name << dendl;
+              Py_RETURN_NONE;
+            }
+            d.regex_str = PyUnicode_AsUTF8(param_value);
+            try {
+              d.regex = d.regex_str.c_str();
+            } catch (const std::regex_error& e) {
+              derr << __func__ << " query " << query_param_name << " item " << j
+                   << " contains invalid regex " << d.regex_str << dendl;
+              Py_RETURN_NONE;
+            }
+            if (d.regex.mark_count() == 0) {
+              derr << __func__ << " query " << query_param_name << " item " << j
+                   << " regex " << d.regex_str << ": no capturing groups"
+                   << dendl;
+              Py_RETURN_NONE;
+            }
+          } else {
+            derr << __func__ << " query " << query_param_name << " item " << j
+                 << " contains invalid param " << param_name << dendl;
+            Py_RETURN_NONE;
+          }
+        }
+        if (d.type == static_cast<MDSPerfMetricSubKeyType>(-1) ||
+            d.regex_str.empty()) {
+          derr << __func__ << " query " << query_param_name << " item " << i
+               << " invalid" << dendl;
+          Py_RETURN_NONE;
+        }
+        query.key_descriptor.push_back(d);
+      }
+    } else if (query_param_name == NAME_COUNTERS_DESCRIPTORS) {
+      if (!PyList_Check(query_param_val)) {
+        derr << __func__ << " " << query_param_name << " not a list" << dendl;
+        Py_RETURN_NONE;
+      }
+      for (int j = 0; j < PyList_Size(query_param_val); j++) {
+        PyObject *py_type = PyList_GET_ITEM(query_param_val, j);
+        if (!PyUnicode_Check(py_type)) {
+          derr << __func__ << " query " << query_param_name << " item " << j
+               << " not a string" << dendl;
+          Py_RETURN_NONE;
+        }
+        auto type = PyUnicode_AsUTF8(py_type);
+        auto it = counter_types.find(type);
+        if (it == counter_types.end()) {
+          derr << __func__ << " query " << query_param_name << " item " << type
+               << " is not valid type" << dendl;
+          Py_RETURN_NONE;
+        }
+        query.performance_counter_descriptors.push_back(it->second);
+      }
+    } else if (query_param_name == NAME_LIMIT) {
+      if (!PyDict_Check(query_param_val)) {
+        derr << __func__ << " query " << query_param_name << " not a dict"
+             << dendl;
+        Py_RETURN_NONE;
+      }
+
+      limit = MDSPerfMetricLimit();
+      PyObject *limit_params = PyDict_Items(query_param_val);
+
+      for (int j = 0; j < PyList_Size(limit_params); ++j) {
+        PyObject *kv = PyList_GET_ITEM(limit_params, j);
+        char *limit_param_name = nullptr;
+        PyObject *limit_param_val = nullptr;
+        if (!PyArg_ParseTuple(kv, "sO:pair", &limit_param_name,
+                              &limit_param_val)) {
+          derr << __func__ << " limit item " << j << " not a size 2 tuple"
+               << dendl;
+          Py_RETURN_NONE;
+        }
+
+        if (limit_param_name == NAME_LIMIT_ORDER_BY) {
+          if (!PyUnicode_Check(limit_param_val)) {
+            derr << __func__ << " " << limit_param_name << " not a string"
+                 << dendl;
+            Py_RETURN_NONE;
+          }
+          auto order_by = PyUnicode_AsUTF8(limit_param_val);
+          auto it = counter_types.find(order_by);
+          if (it == counter_types.end()) {
+            derr << __func__ << " limit " << limit_param_name
+                 << " not a valid counter type" << dendl;
+            Py_RETURN_NONE;
+          }
+          limit->order_by = it->second;
+        } else if (limit_param_name == NAME_LIMIT_MAX_COUNT) {
+#if PY_MAJOR_VERSION <= 2
+          if (!PyInt_Check(limit_param_val) && !PyLong_Check(limit_param_val)) {
+#else
+          if (!PyLong_Check(limit_param_val)) {
+#endif
+            derr << __func__ << " " << limit_param_name << " not an int"
+                 << dendl;
+            Py_RETURN_NONE;
+          }
+          limit->max_count = PyLong_AsLong(limit_param_val);
+        } else {
+          derr << __func__ << " unknown limit param: " << limit_param_name
+               << dendl;
+          Py_RETURN_NONE;
+        }
+      }
+    } else {
+      derr << __func__ << " unknown query param: " << query_param_name << dendl;
+      Py_RETURN_NONE;
+    }
+  }
+
+  if (query.key_descriptor.empty()) {
+    derr << __func__ << " invalid query" << dendl;
+    Py_RETURN_NONE;
+  }
+
+  if (limit) {
+    auto &ds = query.performance_counter_descriptors;
+    if (std::find(ds.begin(), ds.end(), limit->order_by) == ds.end()) {
+      derr << __func__ << " limit order_by " << limit->order_by
+           << " not in performance_counter_descriptors" << dendl;
+      Py_RETURN_NONE;
+    }
+  }
+
+  auto query_id = self->py_modules->add_mds_perf_query(query, limit);
+  return PyLong_FromLong(query_id);
+}
+
+static PyObject*
+ceph_remove_mds_perf_query(BaseMgrModule *self, PyObject *args)
+{
+  MetricQueryID query_id;
+  if (!PyArg_ParseTuple(args, "i:ceph_remove_mds_perf_query", &query_id)) {
+    derr << "Invalid args!" << dendl;
+    return nullptr;
+  }
+
+  self->py_modules->remove_mds_perf_query(query_id);
+  Py_RETURN_NONE;
+}
+
+static PyObject*
+ceph_get_mds_perf_counters(BaseMgrModule *self, PyObject *args)
+{
+  MetricQueryID query_id;
+  if (!PyArg_ParseTuple(args, "i:ceph_get_mds_perf_counters", &query_id)) {
+    derr << "Invalid args!" << dendl;
+    return nullptr;
+  }
+
+  return self->py_modules->get_mds_perf_counters(query_id);
+}
+
 static PyObject*
 ceph_is_authorized(BaseMgrModule *self, PyObject *args)
 {
@@ -1181,6 +1437,15 @@ PyMethodDef BaseMgrModule_methods[] = {
   {"_ceph_get_osd_perf_counters", (PyCFunction)ceph_get_osd_perf_counters,
     METH_VARARGS, "Get osd perf counters"},
 
+  {"_ceph_add_mds_perf_query", (PyCFunction)ceph_add_mds_perf_query,
+    METH_VARARGS, "Add an osd perf query"},
+
+  {"_ceph_remove_mds_perf_query", (PyCFunction)ceph_remove_mds_perf_query,
+    METH_VARARGS, "Remove an osd perf query"},
+
+  {"_ceph_get_mds_perf_counters", (PyCFunction)ceph_get_mds_perf_counters,
+    METH_VARARGS, "Get osd perf counters"},
+
   {"_ceph_is_authorized", (PyCFunction)ceph_is_authorized,
     METH_VARARGS, "Verify the current session caps are valid"},
 
index 87c293c924a18a7d591726f3a849c5f1d94e3a02..91e6c3759b9f091f9c3b8bc0d993bb5a353465e0 100644 (file)
@@ -2937,9 +2937,24 @@ int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
   return osd_perf_metric_collector.remove_query(query_id);
 }
 
-int DaemonServer::get_osd_perf_counters(
-    MetricQueryID query_id,
-    std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
+int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
 {
-  return osd_perf_metric_collector.get_counters(query_id, counters);
+  return osd_perf_metric_collector.get_counters(collector);
+}
+
+MetricQueryID DaemonServer::add_mds_perf_query(
+    const MDSPerfMetricQuery &query,
+    const std::optional<MDSPerfMetricLimit> &limit)
+{
+  return mds_perf_metric_collector.add_query(query, limit);
+}
+
+int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
+{
+  return mds_perf_metric_collector.remove_query(query_id);
+}
+
+int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
+{
+  return mds_perf_metric_collector.get_counters(collector);
 }
index 1e8bef75de7002f60f9dc2124d1cf932fe45e537..7c5af91d1c0dc43385d1313f0b4c857979ed4e7d 100644 (file)
@@ -207,8 +207,12 @@ public:
       const OSDPerfMetricQuery &query,
       const std::optional<OSDPerfMetricLimit> &limit);
   int remove_osd_perf_query(MetricQueryID query_id);
-  int get_osd_perf_counters(MetricQueryID query_id,
-                            std::map<OSDPerfMetricKey, PerformanceCounters> *c);
+  int get_osd_perf_counters(OSDPerfCollector *collector);
+
+  MetricQueryID add_mds_perf_query(const MDSPerfMetricQuery &query,
+                                   const std::optional<MDSPerfMetricLimit> &limit);
+  int remove_mds_perf_query(MetricQueryID query_id);
+  int get_mds_perf_counters(MDSPerfCollector *collector);
 
   virtual const char** get_tracked_conf_keys() const override;
   virtual void handle_conf_change(const ConfigProxy& conf,
index 7d08f3431d40a3b206e114e6beff41b4bc43997f..74404a89d515538519e895a322718be30843fd77 100644 (file)
@@ -34,3 +34,23 @@ void MDSPerfMetricCollector::process_reports(const MetricPayload &payload) {
   delayed_ranks = metric_report.rank_metrics_delayed;
   dout(20) << ": delayed ranks=[" << delayed_ranks << "]" << dendl;
 }
+
+int MDSPerfMetricCollector::get_counters(PerfCollector *collector) {
+  MDSPerfCollector *c = static_cast<MDSPerfCollector *>(collector);
+
+  std::lock_guard locker(lock);
+
+  int r = get_counters_generic(c->query_id, &c->counters);
+  if (r != 0) {
+    return r;
+  }
+
+  get_delayed_ranks(&c->delayed_ranks);
+
+  return r;
+}
+
+void MDSPerfMetricCollector::get_delayed_ranks(std::set<mds_rank_t> *ranks) {
+  ceph_assert(ceph_mutex_is_locked(lock));
+  *ranks = delayed_ranks;
+}
index 777e4c8fc8a9e030f3d653b49fac53060cda8025..c6b379ec5b78bf6a1ecfc993f11ff5d2a01a9b28 100644 (file)
@@ -13,10 +13,14 @@ class MDSPerfMetricCollector
                            MDSPerfMetrics> {
 private:
   std::set<mds_rank_t> delayed_ranks;
+
+  void get_delayed_ranks(std::set<mds_rank_t> *ranks);
+
 public:
   MDSPerfMetricCollector(MetricListener &listener);
 
   void process_reports(const MetricPayload &payload) override;
+  int get_counters(PerfCollector *collector) override;
 };
 
 #endif // CEPH_MGR_MDS_PERF_COLLECTOR_H
index 2111439b09b91f260e9877b4ff9358ef15034f47..1568d9c68c4000a783f1701595cfddcb156b5619 100644 (file)
@@ -302,6 +302,15 @@ WRITE_CLASS_DENC(MDSPerfMetricQuery)
 
 std::ostream &operator<<(std::ostream &os, const MDSPerfMetricQuery &query);
 
+struct MDSPerfCollector : PerfCollector {
+  std::map<MDSPerfMetricKey, PerformanceCounters> counters;
+  std::set<mds_rank_t> delayed_ranks;
+
+  MDSPerfCollector(MetricQueryID query_id)
+    : PerfCollector(query_id) {
+  }
+};
+
 struct MDSPerfMetrics {
   MDSPerformanceCounterDescriptors performance_counter_descriptors;
   std::map<MDSPerfMetricKey, ceph::buffer::list> group_packed_performance_counters;
index 836662a165df8ee6ec0133c690683a012d1fc548..7da0dae7fe47429d58992334d3bbc5fd2709e251 100644 (file)
@@ -115,11 +115,10 @@ void MetricCollector<Query, Limit, Key, Report>::remove_all_queries() {
 }
 
 template <typename Query, typename Limit, typename Key, typename Report>
-int MetricCollector<Query, Limit, Key, Report>::get_counters(
+int MetricCollector<Query, Limit, Key, Report>::get_counters_generic(
     MetricQueryID query_id, std::map<Key, PerformanceCounters> *c) {
   dout(20) << dendl;
-
-  std::lock_guard locker(lock);
+  ceph_assert(ceph_mutex_is_locked(lock));
 
   auto it = counters.find(query_id);
   if (it == counters.end()) {
index f74787e96a9eeb9e01af23202f87ec8c4197327c..19a3eed9b0aba5b4053f2c7017580329b17b262e 100644 (file)
@@ -34,8 +34,6 @@ public:
 
   void remove_all_queries();
 
-  int get_counters(MetricQueryID query_id, std::map<Key, PerformanceCounters> *counters);
-
   std::map<Query, Limits> get_queries() const {
     std::lock_guard locker(lock);
 
@@ -53,6 +51,7 @@ public:
   }
 
   virtual void process_reports(const MetricPayload &payload) = 0;
+  virtual int get_counters(PerfCollector *collector) = 0;
 
 protected:
   typedef std::optional<Limit> OptionalLimit;
@@ -67,6 +66,7 @@ protected:
   Counters counters;
 
   void process_reports_generic(const std::map<Query, Report> &reports, UpdateCallback callback);
+  int get_counters_generic(MetricQueryID query_id, std::map<Key, PerformanceCounters> *counters);
 
 private:
   MetricListener &listener;
index e1acff2e80184b0b86f3872be60edbb923b05055..eb548ce70c42470d2696c0fd5046442daaae2335 100644 (file)
@@ -30,3 +30,10 @@ void OSDPerfMetricCollector::process_reports(const MetricPayload &payload) {
       counter->second += update.second;
     });
 }
+
+int OSDPerfMetricCollector::get_counters(PerfCollector *collector) {
+  OSDPerfCollector *c = static_cast<OSDPerfCollector *>(collector);
+
+  std::lock_guard locker(lock);
+  return get_counters_generic(c->query_id, &c->counters);
+}
index f45a89c8be24b39564b5aaa59a5690a37ba0b3e0..c531dbf6303541ea88f79cd75bb08b4c8c40de67 100644 (file)
@@ -17,6 +17,7 @@ public:
   OSDPerfMetricCollector(MetricListener &listener);
 
   void process_reports(const MetricPayload &payload) override;
+  int get_counters(PerfCollector *collector) override;
 };
 
 #endif // OSD_PERF_METRIC_COLLECTOR_H_
index 63e8a2b44a49e9653c527c996fc1972c0cd65676..1b5904e13ae7f1d984313e18abef8dd986a7d785 100644 (file)
@@ -333,6 +333,14 @@ struct OSDPerfMetricQuery {
 };
 WRITE_CLASS_DENC(OSDPerfMetricQuery)
 
+struct OSDPerfCollector : PerfCollector {
+  std::map<OSDPerfMetricKey, PerformanceCounters> counters;
+
+  OSDPerfCollector(MetricQueryID query_id)
+    : PerfCollector(query_id) {
+  }
+};
+
 std::ostream& operator<<(std::ostream& os, const OSDPerfMetricQuery &query);
 
 struct OSDPerfMetricReport {
index 30810de65c75e1ceae939866bcfb93f08ca5e903..ab90bbbe9acc4ed9c5773895464a94e4c6911791 100644 (file)
@@ -16,4 +16,11 @@ struct MetricListener {
   virtual void handle_query_updated() = 0;
 };
 
+struct PerfCollector {
+  MetricQueryID query_id;
+  PerfCollector(MetricQueryID query_id)
+    : query_id(query_id) {
+  }
+};
+
 #endif // CEPH_MGR_TYPES_H
index d71259c0dadee231ef561070b840b316c9f68375..907132a5f3fa480e59ef1c14eef6364f9ecff911 100644 (file)
@@ -68,6 +68,9 @@ class BaseMgrModule(object):
     def _ceph_add_osd_perf_query(self, query):...
     def _ceph_remove_osd_perf_query(self, query_id):...
     def _ceph_get_osd_perf_counters(self, query_id):...
+    def _ceph_add_mds_perf_query(self, query):...
+    def _ceph_remove_mds_perf_query(self, query_id):...
+    def _ceph_get_mds_perf_counters(self, query_id):...
     def _ceph_unregister_client(self, addrs):...
     def _ceph_register_client(self, addrs):...
     def _ceph_is_authorized(self, arguments):...
index c30e5638c4f8311c14c301c9f3aa5f16c90ec1ce..607019f7ac1bc824e65f26e7f106cc6389ebe5f6 100644 (file)
@@ -1603,6 +1603,51 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_osd_perf_counters(query_id)
 
+    def add_mds_perf_query(self, query):
+        """
+        Register an MDS perf query.  Argument is a
+        dict of the query parameters, in this form:
+
+        ::
+
+           {
+             'key_descriptor': [
+               {'type': subkey_type, 'regex': regex_pattern},
+               ...
+             ],
+             'performance_counter_descriptors': [
+               list, of, descriptor, types
+             ],
+           }
+
+        NOTE: 'limit' and 'order_by' are not supported (yet).
+
+        Valid subkey types:
+           'mds_rank', 'client_id'
+        Valid performance counter types:
+           'cap_hit_metric'
+
+        :param object query: query
+        :rtype: int (query id)
+        """
+        return self._ceph_add_mds_perf_query(query)
+
+    def remove_mds_perf_query(self, query_id):
+        """
+        Unregister an MDS perf query.
+
+        :param int query_id: query ID
+        """
+        return self._ceph_remove_mds_perf_query(query_id)
+
+    def get_mds_perf_counters(self, query_id):
+        """
+        Get stats collected for an MDS perf query.
+
+        :param int query_id: query ID
+        """
+        return self._ceph_get_mds_perf_counters(query_id)
+
     def is_authorized(self, arguments):
         """
         Verifies that the current session caps permit executing the py service
diff --git a/src/pybind/mgr/stats/__init__.py b/src/pybind/mgr/stats/__init__.py
new file mode 100644 (file)
index 0000000..8f210ac
--- /dev/null
@@ -0,0 +1 @@
+from .module import Module
diff --git a/src/pybind/mgr/stats/fs/__init__.py b/src/pybind/mgr/stats/fs/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/stats/fs/perf_stats.py b/src/pybind/mgr/stats/fs/perf_stats.py
new file mode 100644 (file)
index 0000000..a70e5f7
--- /dev/null
@@ -0,0 +1,444 @@
+import re
+import json
+import time
+import uuid
+import errno
+import traceback
+from collections import OrderedDict
+from typing import List, Dict, Set
+
+from mgr_module import CommandResult
+
+from datetime import datetime, timedelta
+from threading import Lock, Condition, Thread
+
+QUERY_IDS = "query_ids"
+GLOBAL_QUERY_ID = "global_query_id"
+QUERY_LAST_REQUEST = "last_time_stamp"
+QUERY_RAW_COUNTERS = "query_raw_counters"
+QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
+
+MDS_RANK_ALL = (-1,)
+CLIENT_ID_ALL = "\d*"
+CLIENT_IP_ALL = ".*"
+
+MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
+MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
+MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
+                                           'read_latency': 1,
+                                           'write_latency': 2,
+                                           'metadata_latency': 3})
+MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
+MDS_GLOBAL_PERF_QUERY_COUNTERS = ['cap_hit', 'read_latency', 'write_latency', 'metadata_latency'] # type: List[str]
+
+QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+
+CLIENT_METADATA_KEY = "client_metadata"
+CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
+CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
+
+NON_EXISTENT_KEY_STR = "N/A"
+
+class FilterSpec(object):
+    """
+    query filters encapsulated and used as key for query map
+    """
+    def __init__(self, mds_ranks, client_id, client_ip):
+        self.mds_ranks = mds_ranks
+        self.client_id = client_id
+        self.client_ip = client_ip
+
+    def __hash__(self):
+        return hash((self.mds_ranks, self.client_id, self.client_ip))
+
+    def __eq__(self, other):
+        return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip)
+
+    def __ne__(self, other):
+        return not(self == other)
+
+def extract_mds_ranks_from_spec(mds_rank_spec):
+    if not mds_rank_spec:
+        return MDS_RANK_ALL
+    match = re.match(r'^(\d[,\d]*)$', mds_rank_spec)
+    if not match:
+        raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec))
+    return tuple(int(mds_rank) for mds_rank in match.group(0).split(','))
+
+def extract_client_id_from_spec(client_id_spec):
+    if not client_id_spec:
+        return CLIENT_ID_ALL
+    # the client id is the spec itself since it'll be a part
+    # of client filter regex.
+    return client_id_spec
+
+def extract_client_ip_from_spec(client_ip_spec):
+    if not client_ip_spec:
+        return CLIENT_IP_ALL
+    # TODO: validate if it is an ip address (or a subset of it).
+    # the client ip is the spec itself since it'll be a part
+    # of client filter regex.
+    return client_ip_spec
+
+def extract_mds_ranks_from_report(mds_ranks_str):
+    if not mds_ranks_str:
+        return []
+    return [int(x) for x in mds_ranks_str.split(',')]
+
+def extract_client_id_and_ip(client):
+    match = re.match(r'^(client\.\d+)\s(.*)', client)
+    if match:
+        return match.group(1), match.group(2)
+    return None, None
+
+class FSPerfStats(object):
+    lock = Lock()
+    q_cv = Condition(lock)
+    r_cv = Condition(lock)
+
+    user_queries = {} # type: Dict[str, Dict]
+
+    meta_lock = Lock()
+    client_metadata = {
+        'metadata' : {},
+        'to_purge' : set(),
+        'in_progress' : {},
+    } # type: Dict
+
+    def __init__(self, module):
+        self.module = module
+        self.log = module.log
+        # report processor thread
+        self.report_processor = Thread(target=self.run)
+        self.report_processor.start()
+
+    def set_client_metadata(self, client_id, key, meta):
+        result = self.client_metadata['metadata'].setdefault(client_id, {})
+        if not key in result or not result[key] == meta:
+            result[key] = meta
+
+    def notify(self, cmdtag):
+        self.log.debug("cmdtag={0}".format(cmdtag))
+        with self.meta_lock:
+            result = self.client_metadata['in_progress'].pop(cmdtag)
+            client_meta = result[1].wait()
+            if client_meta[0] != 0:
+                self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format(
+                    result[0], client_meta[2]))
+                return
+            self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
+            for metadata in json.loads(client_meta[1]):
+                client_id = "client.{0}".format(metadata['id'])
+                result = self.client_metadata['metadata'].setdefault(client_id, {})
+                for subkey in CLIENT_METADATA_SUBKEYS:
+                    self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
+                for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
+                    self.set_client_metadata(client_id, subkey,
+                                             metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
+                metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
+                supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
+                self.set_client_metadata(client_id, "valid_metrics", supported_metrics)
+            # when all async requests are done, purge clients metadata if any.
+            if not self.client_metadata['in_progress']:
+                for client in self.client_metadata['to_purge']:
+                    try:
+                        self.log.info("purge client metadata for {0}".format(client))
+                        self.client_metadata['metadata'].remove(client)
+                    except:
+                        pass
+                self.client_metadata['to_purge'].clear()
+            self.log.debug("client_metadata={0}, to_purge={1}".format(
+                self.client_metadata['metadata'], self.client_metadata['to_purge']))
+
+    def update_client_meta(self, rank_set):
+        new_updates = {}
+        pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
+        with self.meta_lock:
+            for rank in rank_set:
+                if rank in pending_updates:
+                    continue
+                tag = str(uuid.uuid4())
+                result = CommandResult(tag)
+                new_updates[tag] = (rank, result)
+            self.client_metadata['in_progress'].update(new_updates)
+
+        self.log.debug("updating client metadata from {0}".format(new_updates))
+
+        cmd_dict = {'prefix': 'client ls'}
+        for tag,val in new_updates.items():
+            self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag)
+
+    def run(self):
+        try:
+            self.log.info("FSPerfStats::report_processor starting...")
+            while True:
+                with self.lock:
+                    self.scrub_expired_queries()
+                    self.process_mds_reports()
+                    self.r_cv.notify()
+
+                    stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
+                    self.q_cv.wait(stats_period)
+                self.log.debug("FSPerfStats::tick")
+        except Exception as e:
+            self.log.fatal("fatal error: {}".format(traceback.format_exc()))
+
+    def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
+        # this is pretty straight forward -- find what MDSs are missing from
+        # what is tracked vs what we received in incoming report and purge
+        # the whole bunch.
+        tracked_ranks = raw_perf_counters.keys()
+        available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics]
+        for rank in set(tracked_ranks) - set(available_ranks):
+            culled = raw_perf_counters.pop(rank)
+            self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format(
+                len(culled[1]), rank, "yes" if culled[0] else "no"))
+            missing_clients.update(list(culled[1].keys()))
+
+    def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
+        # this is a bit more involed -- for each rank figure out what clients
+        # are missing in incoming report and purge them from our tracked map.
+        # but, if this is invoked _after_ cull_mds_entries(), the rank set
+        # is same, so we can loop based on that assumption.
+        ranks = raw_perf_counters.keys()
+        for rank in ranks:
+            tracked_clients = raw_perf_counters[rank][1].keys()
+            available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics]
+            for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]):
+                raw_perf_counters[rank][1].pop(client)
+                self.log.info("culled {0} from rank {1}".format(client, rank))
+                missing_clients.add(client)
+
+    def cull_missing_entries(self, raw_perf_counters, incoming_metrics):
+        missing_clients = set() # type: Set[str]
+        self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients)
+        self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients)
+
+        self.log.debug("missing_clients={0}".format(missing_clients))
+        with self.meta_lock:
+            if self.client_metadata['in_progress']:
+                self.client_metadata['to_purge'].update(missing_clients)
+                self.log.info("deferring client metadata purge (now {0} client(s))".format(
+                    len(self.client_metadata['to_purge'])))
+            else:
+                for client in missing_clients:
+                    try:
+                        self.log.info("purge client metadata for {0}".format(client))
+                        self.client_metadata['metadata'].pop(client)
+                    except KeyError:
+                        pass
+                self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
+
+    def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
+        tracked_clients = raw_perf_counters.keys()
+        available_clients = [counter['k'][0][0] for counter in incoming_metrics]
+        for client in set(tracked_clients) - set(available_clients):
+            raw_perf_counters.pop(client)
+
+    def get_raw_perf_counters(self, query):
+        raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {})
+
+        for query_id in query[QUERY_IDS]:
+            result = self.module.get_mds_perf_counters(query_id)
+            self.log.debug("raw_perf_counters={}".format(raw_perf_counters))
+            self.log.debug("get_raw_perf_counters={}".format(result))
+
+            # extract passed in delayed ranks. metrics for delayed ranks are tagged
+            # as stale.
+            delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0])
+
+            # what's received from MDS
+            incoming_metrics = result['metrics'][1]
+
+            # cull missing MDSs and clients
+            self.cull_missing_entries(raw_perf_counters, incoming_metrics)
+
+            # iterate over metrics list and update our copy (note that we have
+            # already culled the differences).
+            meta_refresh_ranks = set()
+            for counter in incoming_metrics:
+                mds_rank = int(counter['k'][0][0])
+                client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
+                if client_id is not None or not client_ip: # client_id _could_ be 0
+                    with self.meta_lock:
+                        if not client_id in self.client_metadata['metadata']:
+                            meta_refresh_ranks.add(mds_rank)
+                        self.set_client_metadata(client_id, "IP", client_ip)
+                else:
+                    self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
+
+                raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
+                raw_counters[0] = True if mds_rank in delayed_ranks else False
+                raw_client_counters = raw_counters[1].setdefault(client_id, [])
+
+                del raw_client_counters[:]
+                raw_client_counters.extend(counter['c'])
+        # send an asynchronous client metadata refresh
+        self.update_client_meta(meta_refresh_ranks)
+
+    def get_raw_perf_counters_global(self, query):
+        raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
+        result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
+
+        self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters))
+        self.log.debug("get_raw_perf_counters_global={}".format(result))
+
+        global_metrics = result['metrics'][1]
+        self.cull_global_metrics(raw_perf_counters, global_metrics)
+        for counter in global_metrics:
+            client_id, _ = extract_client_id_and_ip(counter['k'][0][0])
+            raw_client_counters = raw_perf_counters.setdefault(client_id, [])
+            del raw_client_counters[:]
+            raw_client_counters.extend(counter['c'])
+
+    def process_mds_reports(self):
+        for query in self.user_queries.values():
+            self.get_raw_perf_counters(query)
+            self.get_raw_perf_counters_global(query)
+
+    def scrub_expired_queries(self):
+        expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
+        for filter_spec in list(self.user_queries.keys()):
+            user_query = self.user_queries[filter_spec]
+            self.log.debug("scrubbing query={}".format(user_query))
+            if user_query[QUERY_LAST_REQUEST] < expire_time:
+                expired_query_ids = user_query[QUERY_IDS].copy()
+                expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
+                self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
+                self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
+                del self.user_queries[filter_spec]
+
+    def prepare_mds_perf_query(self, rank, client_id, client_ip):
+        mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
+        if not rank == -1:
+            mds_rank_regex = '^({})$'.format(rank)
+        client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
+        return {
+            'key_descriptor' : [
+                {'type' : 'mds_rank', 'regex' : mds_rank_regex},
+                {'type' : 'client_id', 'regex' : client_regex},
+                ],
+            'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
+            }
+
+    def prepare_global_perf_query(self, client_id, client_ip):
+        client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
+        return {
+            'key_descriptor' : [
+                {'type' : 'client_id', 'regex' : client_regex},
+                ],
+            'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS,
+            }
+
+    def unregister_mds_perf_queries(self, filter_spec, query_ids):
+        self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
+            filter_spec, query_ids))
+        for query_id in query_ids:
+            self.module.remove_mds_perf_query(query_id)
+
+    def register_mds_perf_query(self, filter_spec):
+        mds_ranks = filter_spec.mds_ranks
+        client_id = filter_spec.client_id
+        client_ip = filter_spec.client_ip
+
+        query_ids = []
+        try:
+            # register per-mds perf query
+            for rank in mds_ranks:
+                query = self.prepare_mds_perf_query(rank, client_id, client_ip)
+                self.log.info("register_mds_perf_query: {}".format(query))
+
+                query_id = self.module.add_mds_perf_query(query)
+                if query_id is None: # query id can be 0
+                    raise RuntimeError("failed to add MDS perf query: {}".format(query))
+                query_ids.append(query_id)
+        except Exception:
+            for query_id in query_ids:
+                self.module.remove_mds_perf_query(query_id)
+            raise
+        return query_ids
+
+    def register_global_perf_query(self, filter_spec):
+        client_id = filter_spec.client_id
+        client_ip = filter_spec.client_ip
+
+        # register a global perf query for metrics
+        query = self.prepare_global_perf_query(client_id, client_ip)
+        self.log.info("register_global_perf_query: {}".format(query))
+
+        query_id = self.module.add_mds_perf_query(query)
+        if query_id is None: # query id can be 0
+            raise RuntimeError("failed to add global perf query: {}".format(query))
+        return query_id
+
+    def register_query(self, filter_spec):
+        user_query = self.user_queries.get(filter_spec, None)
+        if not user_query:
+            user_query = {
+                QUERY_IDS : self.register_mds_perf_query(filter_spec),
+                GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
+                QUERY_LAST_REQUEST : datetime.now(),
+                }
+            self.user_queries[filter_spec] = user_query
+
+            self.q_cv.notify()
+            self.r_cv.wait(5)
+        else:
+            user_query[QUERY_LAST_REQUEST] = datetime.now()
+        return user_query
+
+    def generate_report(self, user_query):
+        result = {} # type: Dict
+        # start with counter info -- metrics that are global and per mds
+        result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
+        result["counters"] = MDS_PERF_QUERY_COUNTERS
+
+        # fill in client metadata
+        raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
+        with self.meta_lock:
+            result_meta = result.setdefault("client_metadata", {})
+            for client_id in raw_perfs.keys():
+                if client_id in self.client_metadata["metadata"]:
+                    client_meta = result_meta.setdefault(client_id, {})
+                    client_meta.update(self.client_metadata["metadata"][client_id])
+
+            # start populating global perf metrics w/ client metadata
+            metrics = result.setdefault("global_metrics", {})
+            for client_id, counters in raw_perfs.items():
+                global_client_metrics = metrics.setdefault(client_id, [])
+                del global_client_metrics[:]
+                global_client_metrics.extend(counters)
+
+            # and, now per-mds metrics keyed by mds rank along with delayed ranks
+            raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
+            metrics = result.setdefault("metrics", {})
+
+            metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]]
+            for rank, counters in raw_perfs.items():
+                mds_key = "mds.{}".format(rank)
+                mds_metrics = metrics.setdefault(mds_key, {})
+                mds_metrics.update(counters[1])
+        return result
+
+    def extract_query_filters(self, cmd):
+        mds_rank_spec = cmd.get('mds_rank', None)
+        client_id_spec = cmd.get('client_id', None)
+        client_ip_spec = cmd.get('client_ip', None)
+
+        self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
+            mds_rank_spec, client_id_spec, client_ip_spec))
+
+        mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec)
+        client_id = extract_client_id_from_spec(client_id_spec)
+        client_ip = extract_client_ip_from_spec(client_ip_spec)
+
+        return FilterSpec(mds_ranks, client_id, client_ip)
+
+    def get_perf_data(self, cmd):
+        filter_spec = self.extract_query_filters(cmd)
+
+        counters = {}
+        with self.lock:
+            user_query = self.register_query(filter_spec)
+            result = self.generate_report(user_query)
+        return 0, json.dumps(result), ""
diff --git a/src/pybind/mgr/stats/module.py b/src/pybind/mgr/stats/module.py
new file mode 100644 (file)
index 0000000..ecacd18
--- /dev/null
@@ -0,0 +1,38 @@
+"""
+performance stats for ceph filesystem (for now...)
+"""
+
+import json
+from typing import List, Dict
+
+from mgr_module import MgrModule
+
+from .fs.perf_stats import FSPerfStats
+
+class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "fs perf stats "
+                   "name=mds_rank,type=CephString,req=false "
+                   "name=client_id,type=CephString,req=false "
+                   "name=client_ip,type=CephString,req=false ",
+            "desc": "retrieve ceph fs performance stats",
+            "perm": "r"
+        },
+    ]
+    MODULE_OPTIONS = [] # type: List[Dict]
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        self.fs_perf_stats = FSPerfStats(self)
+
+    def notify(self, notify_type, notify_id):
+        if notify_type == "command":
+            self.fs_perf_stats.notify(notify_id)
+
+    def handle_command(self, inbuf, cmd):
+        prefix = cmd['prefix']
+        # only supported command is `fs perf stats` right now
+        if prefix.startswith('fs perf stats'):
+            return self.fs_perf_stats.get_perf_data(cmd)
+        raise NotImplementedError(cmd['prefix'])
index 7ca38365bbe6eeba10206462debea7cde0ce2315..77c7559a75cf6e103d4747dd306cd150dd9cd96f 100644 (file)
@@ -59,6 +59,7 @@ commands =
            progress/module.py \
            rook/module.py \
            snap_schedule/module.py \
+           stats/module.py \
            test_orchestrator/module.py \
            mds_autoscaler/module.py \
            volumes/__init__.py