static const std::map<std::string, MDSPerfMetricSubKeyType> sub_key_types = {
{"mds_rank", MDSPerfMetricSubKeyType::MDS_RANK},
{"client_id", MDSPerfMetricSubKeyType::CLIENT_ID},
+ {"subvolume_path", MDSPerfMetricSubKeyType::SUBVOLUME_PATH},
};
static const std::map<std::string, MDSPerformanceCounterType> counter_types = {
{"cap_hit", MDSPerformanceCounterType::CAP_HIT_METRIC},
{"stdev_write_latency", MDSPerformanceCounterType::STDEV_WRITE_LATENCY_METRIC},
{"avg_metadata_latency", MDSPerformanceCounterType::AVG_METADATA_LATENCY_METRIC},
{"stdev_metadata_latency", MDSPerformanceCounterType::STDEV_METADATA_LATENCY_METRIC},
+ {"subv_read_iops", MDSPerformanceCounterType::SUBV_READ_IOPS_METRIC},
+ {"subv_write_iops", MDSPerformanceCounterType::SUBV_WRITE_IOPS_METRIC},
+ {"subv_read_throughput", MDSPerformanceCounterType::SUBV_READ_THROUGHPUT_METRIC},
+ {"subv_write_throughput", MDSPerformanceCounterType::SUBV_WRITE_THROUGHPUT_METRIC},
+ {"subv_avg_read_latency", MDSPerformanceCounterType::SUBV_AVG_READ_LATENCY_METRIC},
+ {"subv_avg_write_latency", MDSPerformanceCounterType::SUBV_AVG_WRITE_LATENCY_METRIC},
};
PyObject *py_query = nullptr;
NON_EXISTENT_KEY_STR = "N/A"
+MDS_SUBVOLUME_QUERY_COUNTERS_MAP = OrderedDict({'subv_read_iops': 16,
+ 'subv_write_iops': 17,
+ 'subv_read_throughput': 18,
+ 'subv_write_throughput': 19,
+ 'subv_avg_read_latency': 20,
+ 'subv_avg_write_latency': 21})
+MDS_SUBVOLUME_QUERY_COUNTERS = list(MDS_SUBVOLUME_QUERY_COUNTERS_MAP.keys())
+SUBVOLUME_QUERY_IDS = "subvolume_query_ids"
+QUERY_SUBVOLUME_COUNTERS = "query_raw_counters"
+
logger = logging.getLogger(__name__)
class FilterSpec(object):
# send an asynchronous client metadata refresh
self.update_client_meta()
+ def get_raw_perf_counters_subvolumes(self, query):
+ self.log.debug("get_raw_perf_counters_subvolumes={}".format(query))
+ raw_subvolume_counters = query.setdefault("subvolume_raw_counters", {})
+
+ for query_id in query[SUBVOLUME_QUERY_IDS]:
+ result = self.module.get_mds_perf_counters(query_id)
+ incoming_metrics = result['metrics'][1]
+
+ self.log.debug("get_raw_perf_counters_subvolumes queryid{}".format(query_id))
+
+ for counter in incoming_metrics:
+ self.log.debug("get_raw_perf_counters_subvolumes counter{}".format(counter))
+ try:
+ subvolume_path = counter['k'][1][0] # [mds_rank, subvol_path]
+ raw_subvolume_counters[subvolume_path] = counter['c']
+ except (IndexError, KeyError) as e:
+ self.log.error(f"Failed to parse subvolume counter {counter}: {e}")
+
def get_raw_perf_counters_global(self, query):
raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
def process_mds_reports(self):
for query in self.user_queries.values():
self.get_raw_perf_counters(query)
+ self.get_raw_perf_counters_subvolumes(query)
self.get_raw_perf_counters_global(query)
def scrub_expired_queries(self):
user_query = self.user_queries[filter_spec]
self.log.debug("scrubbing query={}".format(user_query))
if user_query[QUERY_LAST_REQUEST] < expire_time:
- expired_query_ids = user_query[QUERY_IDS].copy()
+ expired_query_ids = user_query[QUERY_IDS].copy() + user_query[SUBVOLUME_QUERY_IDS].copy()
expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
}
+ def prepare_subvolume_perf_query(self, rank):
+ mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
+ if rank != -1:
+ mds_rank_regex = '^({})$'.format(rank)
+
+ return {
+ 'key_descriptor': [
+ {'type': 'mds_rank', 'regex': mds_rank_regex},
+ {'type': 'subvolume_path', 'regex': '^(.*)$'},
+ ],
+ 'performance_counter_descriptors': MDS_SUBVOLUME_QUERY_COUNTERS,
+ }
+
def prepare_global_perf_query(self, client_id, client_ip):
client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
return {
raise
return query_ids
+ def register_subvolume_perf_query(self, filter_spec):
+ """
+ Register subvolume perf queries for each MDS rank in the filter_spec.
+ """
+ mds_ranks = filter_spec.mds_ranks
+
+ query_ids = []
+ try:
+ # Register a perf query for each MDS rank
+ for rank in mds_ranks:
+ query = self.prepare_subvolume_perf_query(rank)
+ self.log.info("register_subvolume_perf_query: {}".format(query))
+
+ query_id = self.module.add_mds_perf_query(query)
+ if query_id is None: # query ID can be 0
+ raise RuntimeError("failed to add subvolume perf query: {}".format(query))
+ query_ids.append(query_id)
+
+ except Exception:
+ # Roll back all successful registrations
+ for query_id in query_ids:
+ self.module.remove_mds_perf_query(query_id)
+ raise
+
+ return query_ids
+
def register_global_perf_query(self, filter_spec):
client_id = filter_spec.client_id
client_ip = filter_spec.client_ip
user_query = {
QUERY_IDS : self.register_mds_perf_query(filter_spec),
GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
+ SUBVOLUME_QUERY_IDS: self.register_subvolume_perf_query(filter_spec),
QUERY_LAST_REQUEST : datetime.now(),
}
self.user_queries[filter_spec] = user_query
return user_query
def generate_report(self, user_query):
- result = {} # type: Dict
+ result = {} # type: Dict
global fs_list
- # start with counter info -- metrics that are global and per mds
result["version"] = PERF_STATS_VERSION
result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
result["counters"] = MDS_PERF_QUERY_COUNTERS
- # fill in client metadata
raw_perfs_global = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
+ raw_subvolumes = user_query.setdefault("subvolume_raw_counters", {})
+
+ logger.debug(f"raw_perfs={raw_perfs}, raw_subvolumes={raw_subvolumes}, user_query={user_query}")
+
+ # -- Populate client metadata
with self.meta_lock:
raw_counters_clients = []
for val in raw_perfs.values():
client_meta = (result_meta.setdefault(fs_name, {})).setdefault(client_id, {})
client_meta.update(meta[fs_name][client_id])
- # start populating global perf metrics w/ client metadata
+ # -- Global perf metrics (by client)
metrics = result.setdefault("global_metrics", {})
for fs_name in fs_list:
if fs_name in meta and len(meta[fs_name]):
del global_client_metrics[:]
global_client_metrics.extend(counters)
- # and, now per-mds metrics keyed by mds rank along with delayed ranks
- metrics = result.setdefault("metrics", {})
+ # -- Per-MDS metrics
+ mds_metrics = result.setdefault("metrics", {})
+ mds_metrics["delayed_ranks"] = []
+ if raw_perfs:
+ mds_metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]]
+ for rank, counters in raw_perfs.items():
+ mds_key = f"mds.{rank}"
+ mds_metrics[mds_key] = counters[1]
+ else:
+ logger.debug("No per-MDS raw_perfs available; skipping MDS metrics population")
+
+ # -- Subvolume metrics
+ if isinstance(next(iter(raw_subvolumes.keys()), None), str):
+ # Flat subvolumes keyed directly by subvolume_path
+ logger.debug("Detected flat subvolume counters")
+ result["subvolume_metrics"] = raw_subvolumes
+ result["subvolume_metrics"]["valid_metrics"] = MDS_SUBVOLUME_QUERY_COUNTERS
- metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]]
- for rank, counters in raw_perfs.items():
- mds_key = "mds.{}".format(rank)
- mds_metrics = metrics.setdefault(mds_key, {})
- mds_metrics.update(counters[1])
return result
+
def extract_query_filters(self, cmd):
mds_rank_spec = cmd.get('mds_rank', None)
client_id_spec = cmd.get('client_id', None)