log.info("cleaning directory {}".format(dirname))
self.mount_a.run_shell(["rm", "-rf", dirname])
+ def _extract_mds_cpu_entries(self, metrics):
+ global_metrics = metrics['global_metrics'].get(self.fs.name, {})
+ cpu_metrics = global_metrics.get('mds_cpu_usage', [])
+ entries = []
+ for entry in cpu_metrics:
+ percent = entry['counters'].get('percent_core_sum',
+ entry['counters'].get('percent'))
+ entries.append((entry['labels']['rank'], percent))
+ return sorted(entries)
+
+ def _extract_rank_perf_entries(self, perf_dump):
+ cpu_entries = []
+ open_entries = []
+ for entry in perf_dump.get('mds_rank_perf', []):
+ labels = entry.get('labels', {})
+ counters = entry.get('counters', {})
+ rank = labels.get('rank', 'unknown')
+ percent = counters.get('cpu_usage', counters.get('percent_core_sum', counters.get('percent', 0)))
+ open_requests = counters.get('open_requests', counters.get('count', 0))
+ cpu_entries.append((rank, percent))
+ open_entries.append((rank, open_requests))
+ return sorted(cpu_entries), sorted(open_entries)
+
+ def _get_rank_perf_entries_via_asok(self, rank=0):
+ mds_info = self.fs.get_rank(rank=rank)
+ mds_name = mds_info['name']
+ counter_dump = self.fs.mds_asok(['counter', 'dump'], mds_id=mds_name)
+ return self._extract_rank_perf_entries(counter_dump)
+
+ def _wait_for_mds_rank_metrics(self, expected_count, rank=0, tries=30):
+ cpu_entries = []
+ open_entries = []
+ action = f"wait for {expected_count} rank perf metrics entries (rank {rank})"
+ with safe_while(sleep=1, tries=tries, action=action) as proceed:
+ while proceed():
+ cpu_entries, open_entries = self._get_rank_perf_entries_via_asok(rank=rank)
+ if len(cpu_entries) >= expected_count and len(open_entries) >= expected_count:
+ return cpu_entries, open_entries
+ raise RuntimeError(f"Failed to fetch {expected_count} rank perf metrics entries "
+ f"(rank {rank}); last entries: {cpu_entries!r}, {open_entries!r}")
+
# basic check to verify if we get back metrics from each active mds rank
def test_metrics_from_rank(self):
# cleanup test directories
self._cleanup_test_dirs()
+ def test_mds_perf_metrics(self):
+ # Trigger some I/O so that CPU metrics are sampled
+ self._do_spread_io_all_clients()
+ time.sleep(5)
+
+ cpu_entries, open_entries = self._wait_for_mds_rank_metrics(expected_count=1, rank=0, tries=30)
+ self.assertEqual(len(cpu_entries), 1, cpu_entries)
+ self.assertEqual(cpu_entries[0][0], '0')
+ self.assertEqual(len(open_entries), 1, open_entries)
+ self.assertEqual(open_entries[0][0], '0')
+
+ # Grow the MDS cluster to include a second active rank
+ self.fs.grow(2)
+
+ fscid = self.fs.id
+ self._spread_directory_on_all_ranks(fscid)
+ self._do_spread_io_all_clients()
+ time.sleep(5)
+
+ cpu_entries, open_entries = self._wait_for_mds_rank_metrics(expected_count=2, rank=0, tries=30)
+ self.assertEqual(len(cpu_entries), 2, cpu_entries)
+ self.assertEqual(sorted(rank for rank, _ in cpu_entries), ['0', '1'])
+ self.assertEqual(len(open_entries), 2, open_entries)
+ self.assertEqual(sorted(rank for rank, _ in open_entries), ['0', '1'])
+
+ # Cleanup: remove directories and shrink back to a single MDS
+ self._cleanup_test_dirs()
+ self.fs.shrink(1)
+ time.sleep(5)
+
def test_query_mds_filter(self):
# validate
valid, metrics = self._get_metrics(