]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr,stats: integrate subvolume metrics
authorIgor Golikov <igolikov@ibm.com>
Thu, 10 Jul 2025 10:21:56 +0000 (10:21 +0000)
committerVenky Shankar <vshankar@redhat.com>
Wed, 10 Sep 2025 16:42:45 +0000 (16:42 +0000)
mgr and stats support for the new subvolume metrics via existing perf
queries mechanism

Fixes: https://tracker.ceph.com/issues/68932
Signed-off-by: Igor Golikov <igolikov@ibm.com>
src/mds/MetricAggregator.cc
src/mgr/BaseMgrModule.cc
src/pybind/mgr/stats/fs/perf_stats.py

index fd70ab5adb21a571db38df4ba87d6b64ca4374eb..e8e69e29edd54a245382e5923a6f7dd4ef542ce9 100644 (file)
@@ -291,8 +291,6 @@ void MetricAggregator::refresh_subvolume_metrics_for_rank(
       counter->set(l_subvolume_metrics_write_iops, aggr_metric.write_iops);
       counter->set(l_subvolume_metrics_write_tp_Bps, aggr_metric.write_tBps);
       counter->set(l_subvolume_metrics_avg_write_latency, aggr_metric.avg_write_latency);
-      counter->set(l_subvolume_metrics_last_window_end, aggr_metric.time_window_last_end_sec);
-      counter->set(l_subvolume_metrics_last_window, aggr_metric.time_window_last_dur_sec);
 
       // Update query_metrics_map
       auto sub_key_func_subvolume = [this, &path](const MDSPerfMetricSubKeyDescriptor &desc,
index 7a57246db30f71ad2ca8f2369348b081f33ccaba..8ac3ef021048b394bdada17181861c484b08a760 100644 (file)
@@ -1150,6 +1150,7 @@ ceph_add_mds_perf_query(BaseMgrModule *self, PyObject *args)
   static const std::map<std::string, MDSPerfMetricSubKeyType> sub_key_types = {
     {"mds_rank", MDSPerfMetricSubKeyType::MDS_RANK},
     {"client_id", MDSPerfMetricSubKeyType::CLIENT_ID},
+    {"subvolume_path", MDSPerfMetricSubKeyType::SUBVOLUME_PATH},
   };
   static const std::map<std::string, MDSPerformanceCounterType> counter_types = {
     {"cap_hit", MDSPerformanceCounterType::CAP_HIT_METRIC},
@@ -1168,6 +1169,12 @@ ceph_add_mds_perf_query(BaseMgrModule *self, PyObject *args)
     {"stdev_write_latency", MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC},
     {"avg_metadata_latency", MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC},
     {"stdev_metadata_latency", MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC},
+    {"subv_read_iops", MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC},
+    {"subv_write_iops", MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC},
+    {"subv_read_throughput", MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC},
+    {"subv_write_throughput", MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC},
+    {"subv_avg_read_latency", MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC},
+    {"subv_avg_write_latency", MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC},
   };
 
   PyObject *py_query = nullptr;
index 7d5dacfd31d3459dc986af080938307fbb64cb3c..7b97e42857a0f1104cdb018ef99296926f55e457 100644 (file)
@@ -58,6 +58,16 @@ CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
 
 NON_EXISTENT_KEY_STR = "N/A"
 
+MDS_SUBVOLUME_QUERY_COUNTERS_MAP = OrderedDict({'subv_read_iops': 16,
+                                                'subv_write_iops': 17,
+                                                'subv_read_throughput': 18,
+                                                'subv_write_throughput': 19,
+                                                'subv_avg_read_latency': 20,
+                                                'subv_avg_write_latency': 21})
+MDS_SUBVOLUME_QUERY_COUNTERS = list(MDS_SUBVOLUME_QUERY_COUNTERS_MAP.keys())
+SUBVOLUME_QUERY_IDS = "subvolume_query_ids"
+QUERY_SUBVOLUME_COUNTERS = "query_raw_counters"
+
 logger = logging.getLogger(__name__)
 
 class FilterSpec(object):
@@ -386,6 +396,24 @@ class FSPerfStats(object):
         # send an asynchronous client metadata refresh
         self.update_client_meta()
 
+    def get_raw_perf_counters_subvolumes(self, query):
+        self.log.debug("get_raw_perf_counters_subvolumes={}".format(query))
+        raw_subvolume_counters = query.setdefault("subvolume_raw_counters", {})
+
+        for query_id in query[SUBVOLUME_QUERY_IDS]:
+            result = self.module.get_mds_perf_counters(query_id)
+            incoming_metrics = result['metrics'][1]
+
+            self.log.debug("get_raw_perf_counters_subvolumes queryid{}".format(query_id))
+
+            for counter in incoming_metrics:
+                self.log.debug("get_raw_perf_counters_subvolumes counter{}".format(counter))
+                try:
+                    subvolume_path = counter['k'][1][0]  # [mds_rank, subvol_path]
+                    raw_subvolume_counters[subvolume_path] = counter['c']
+                except (IndexError, KeyError) as e:
+                    self.log.error(f"Failed to parse subvolume counter {counter}: {e}")
+
     def get_raw_perf_counters_global(self, query):
         raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
         result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
@@ -404,6 +432,7 @@ class FSPerfStats(object):
     def process_mds_reports(self):
         for query in self.user_queries.values():
             self.get_raw_perf_counters(query)
+            self.get_raw_perf_counters_subvolumes(query)
             self.get_raw_perf_counters_global(query)
 
     def scrub_expired_queries(self):
@@ -412,7 +441,7 @@ class FSPerfStats(object):
             user_query = self.user_queries[filter_spec]
             self.log.debug("scrubbing query={}".format(user_query))
             if user_query[QUERY_LAST_REQUEST] < expire_time:
-                expired_query_ids = user_query[QUERY_IDS].copy()
+                expired_query_ids = user_query[QUERY_IDS].copy() + user_query[SUBVOLUME_QUERY_IDS].copy()
                 expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
                 self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
                 self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
@@ -431,6 +460,19 @@ class FSPerfStats(object):
             'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
             }
 
+    def prepare_subvolume_perf_query(self, rank):
+        mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
+        if rank != -1:
+            mds_rank_regex = '^({})$'.format(rank)
+
+        return {
+            'key_descriptor': [
+                {'type': 'mds_rank', 'regex': mds_rank_regex},
+                {'type': 'subvolume_path', 'regex': '^(.*)$'},
+            ],
+            'performance_counter_descriptors': MDS_SUBVOLUME_QUERY_COUNTERS,
+    }
+
     def prepare_global_perf_query(self, client_id, client_ip):
         client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
         return {
@@ -468,6 +510,32 @@ class FSPerfStats(object):
             raise
         return query_ids
 
+    def register_subvolume_perf_query(self, filter_spec):
+        """
+        Register subvolume perf queries for each MDS rank in the filter_spec.
+        """
+        mds_ranks = filter_spec.mds_ranks
+
+        query_ids = []
+        try:
+            # Register a perf query for each MDS rank
+            for rank in mds_ranks:
+                query = self.prepare_subvolume_perf_query(rank)
+                self.log.info("register_subvolume_perf_query: {}".format(query))
+
+                query_id = self.module.add_mds_perf_query(query)
+                if query_id is None:  # query ID can be 0
+                    raise RuntimeError("failed to add subvolume perf query: {}".format(query))
+                query_ids.append(query_id)
+
+        except Exception:
+            # Roll back all successful registrations
+            for query_id in query_ids:
+                self.module.remove_mds_perf_query(query_id)
+            raise
+
+        return query_ids
+
     def register_global_perf_query(self, filter_spec):
         client_id = filter_spec.client_id
         client_ip = filter_spec.client_ip
@@ -487,6 +555,7 @@ class FSPerfStats(object):
             user_query = {
                 QUERY_IDS : self.register_mds_perf_query(filter_spec),
                 GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
+                SUBVOLUME_QUERY_IDS: self.register_subvolume_perf_query(filter_spec),
                 QUERY_LAST_REQUEST : datetime.now(),
                 }
             self.user_queries[filter_spec] = user_query
@@ -498,16 +567,19 @@ class FSPerfStats(object):
         return user_query
 
     def generate_report(self, user_query):
-        result = {} # type: Dict
+        result = {}  # type: Dict
         global fs_list
-        # start with counter info -- metrics that are global and per mds
         result["version"] = PERF_STATS_VERSION
         result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
         result["counters"] = MDS_PERF_QUERY_COUNTERS
 
-        # fill in client metadata
         raw_perfs_global = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
         raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
+        raw_subvolumes = user_query.setdefault("subvolume_raw_counters", {})
+
+        logger.debug(f"raw_perfs={raw_perfs}, raw_subvolumes={raw_subvolumes}, user_query={user_query}")
+
+        # -- Populate client metadata
         with self.meta_lock:
             raw_counters_clients = []
             for val in raw_perfs.values():
@@ -521,7 +593,7 @@ class FSPerfStats(object):
                             client_meta = (result_meta.setdefault(fs_name, {})).setdefault(client_id, {})
                             client_meta.update(meta[fs_name][client_id])
 
-            # start populating global perf metrics w/ client metadata
+            # -- Global perf metrics (by client)
             metrics = result.setdefault("global_metrics", {})
             for fs_name in fs_list:
                 if fs_name in meta and len(meta[fs_name]):
@@ -531,16 +603,27 @@ class FSPerfStats(object):
                             del global_client_metrics[:]
                             global_client_metrics.extend(counters)
 
-            # and, now per-mds metrics keyed by mds rank along with delayed ranks
-            metrics = result.setdefault("metrics", {})
+            # -- Per-MDS metrics
+            mds_metrics = result.setdefault("metrics", {})
+            mds_metrics["delayed_ranks"] = []
+            if raw_perfs:
+                mds_metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]]
+                for rank, counters in raw_perfs.items():
+                    mds_key = f"mds.{rank}"
+                    mds_metrics[mds_key] = counters[1]
+            else:
+                logger.debug("No per-MDS raw_perfs available; skipping MDS metrics population")
+
+            # -- Subvolume metrics
+            if isinstance(next(iter(raw_subvolumes.keys()), None), str):
+                # Flat subvolumes keyed directly by subvolume_path
+                logger.debug("Detected flat subvolume counters")
+                result["subvolume_metrics"] = raw_subvolumes
+                result["subvolume_metrics"]["valid_metrics"] = MDS_SUBVOLUME_QUERY_COUNTERS
 
-            metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]]
-            for rank, counters in raw_perfs.items():
-                mds_key = "mds.{}".format(rank)
-                mds_metrics = metrics.setdefault(mds_key, {})
-                mds_metrics.update(counters[1])
         return result
 
+
     def extract_query_filters(self, cmd):
         mds_rank_spec = cmd.get('mds_rank', None)
         client_id_spec = cmd.get('client_id', None)