From eaf2a8360d0d70b20d5ea61022fdde4f6a9b6464 Mon Sep 17 00:00:00 2001 From: Jos Collin Date: Fri, 11 Apr 2025 11:38:20 +0530 Subject: [PATCH] qa: fix multi-fs tests in test_mds_metrics.py * Avoids the unnecessary setup, when writing a multi-fs test. Avoids creating the default filesystem, deleting it and creating the required filesystems, mounting them. This change uses the filesystems created using 'REQUIRE_BACKUP_FILESYSTEM' for conducting tests. * This change consequently fixes the old/deleted filesystems appearing in the `perf stats` output, making it stale output. * Drops unused function parameters. Fixes: https://tracker.ceph.com/issues/68001 Fixes: https://tracker.ceph.com/issues/68446 Signed-off-by: Jos Collin --- qa/tasks/cephfs/test_mds_metrics.py | 271 +++++++++++++--------------- 1 file changed, 128 insertions(+), 143 deletions(-) diff --git a/qa/tasks/cephfs/test_mds_metrics.py b/qa/tasks/cephfs/test_mds_metrics.py index ffece497cb30d..38be3177a9f9f 100644 --- a/qa/tasks/cephfs/test_mds_metrics.py +++ b/qa/tasks/cephfs/test_mds_metrics.py @@ -11,25 +11,32 @@ from tasks.cephfs.cephfs_test_case import CephFSTestCase log = logging.getLogger(__name__) -class TestMDSMetrics(CephFSTestCase): - CLIENTS_REQUIRED = 2 - MDSS_REQUIRED = 3 +class TestMetrics(CephFSTestCase): - TEST_DIR_PERFIX = "test_mds_metrics" + def _fs_perf_stats(self, *args): + return self.get_ceph_cmd_stdout("fs", "perf", "stats", *args) - def setUp(self): - super(TestMDSMetrics, self).setUp() - self._start_with_single_active_mds() - self._enable_mgr_stats_plugin() + def _enable_mgr_stats_plugin(self): + return self.get_ceph_cmd_stdout("mgr", "module", "enable", "stats") - def tearDown(self): - self._disable_mgr_stats_plugin() - super(TestMDSMetrics, self).tearDown() + def _disable_mgr_stats_plugin(self): + return self.get_ceph_cmd_stdout("mgr", "module", "disable", "stats") - def _start_with_single_active_mds(self): - curr_max_mds = self.fs.get_var('max_mds') - if curr_max_mds > 1: - self.fs.shrink(1) + def _do_spread_io_all_clients(self): + # spread readdir I/O + self.mount_a.run_shell(["find", "."]) + self.mount_b.run_shell(["find", "."]) + + def _get_metrics(self, verifier_callback, trials, *args): + metrics = None + done = False + with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed: + while proceed(): + metrics = json.loads(self._fs_perf_stats(*args)) + done = verifier_callback(metrics) + if done: + break + return done, metrics def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[], mul_fs=[]): def verify_metrics_cbk(metrics): @@ -56,21 +63,31 @@ class TestMDSMetrics(CephFSTestCase): return True return verify_metrics_cbk - def _fs_perf_stats(self, *args): - return self.get_ceph_cmd_stdout("fs", "perf", "stats", *args) + def tearDown(self): + self._disable_mgr_stats_plugin() + super(TestMetrics, self).tearDown() - def _enable_mgr_stats_plugin(self): - return self.get_ceph_cmd_stdout("mgr", "module", "enable", "stats") +class TestMDSMetrics(TestMetrics): + CLIENTS_REQUIRED = 2 + MDSS_REQUIRED = 3 + TEST_DIR_PREFIX = "test_mds_metrics" - def _disable_mgr_stats_plugin(self): - return self.get_ceph_cmd_stdout("mgr", "module", "disable", "stats") + def setUp(self): + super(TestMDSMetrics, self).setUp() + self._start_with_single_active_mds() + self._enable_mgr_stats_plugin() + + def _start_with_single_active_mds(self): + curr_max_mds = self.fs.get_var('max_mds') + if curr_max_mds > 1: + self.fs.shrink(1) def _spread_directory_on_all_ranks(self, fscid): fs_status = self.fs.status() ranks = set([info['rank'] for info in fs_status.get_ranks(fscid)]) # create a per-rank pinned directory for rank in ranks: - dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PERFIX, rank) + dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PREFIX, rank) self.mount_a.run_shell(["mkdir", dirname]) self.mount_a.setfattr(dirname, "ceph.dir.pin", str(rank)) log.info("pinning directory {0} to rank {1}".format(dirname, rank)) @@ -78,50 +95,17 @@ class TestMDSMetrics(CephFSTestCase): filename = "{0}.{1}".format("test", i) self.mount_a.write_n_mb(os.path.join(dirname, filename), 1) - def _do_spread_io(self, fscid): + def _do_spread_io(self): # spread readdir I/O self.mount_b.run_shell(["find", "."]) - def _do_spread_io_all_clients(self, fscid): - # spread readdir I/O - self.mount_a.run_shell(["find", "."]) - self.mount_b.run_shell(["find", "."]) - def _cleanup_test_dirs(self): dirnames = self.mount_a.run_shell(["ls"]).stdout.getvalue() for dirname in dirnames.split("\n"): - if dirname.startswith(TestMDSMetrics.TEST_DIR_PERFIX): + if dirname.startswith(TestMDSMetrics.TEST_DIR_PREFIX): log.info("cleaning directory {}".format(dirname)) self.mount_a.run_shell(["rm", "-rf", dirname]) - def _get_metrics(self, verifier_callback, trials, *args): - metrics = None - done = False - with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed: - while proceed(): - metrics = json.loads(self._fs_perf_stats(*args)) - done = verifier_callback(metrics) - if done: - break - return done, metrics - - def _setup_fs(self, fs_name, client_id): - fs_a = self.mds_cluster.newfs(name=fs_name) - - self.mds_cluster.mds_restart() - - # Wait for filesystem to go healthy - fs_a.wait_for_daemons() - - # Reconfigure client auth caps - self.get_ceph_cmd_result( - 'auth', 'caps', f"client.{client_id}", - 'mds', 'allow', - 'mon', 'allow r', - 'osd', f'allow rw pool={fs_a.get_data_pool_name()}') - - return fs_a - # basic check to verify if we get back metrics from each active mds rank def test_metrics_from_rank(self): @@ -160,7 +144,7 @@ class TestMDSMetrics(CephFSTestCase): self._spread_directory_on_all_ranks(fscid) # spread some I/O - self._do_spread_io(fscid) + self._do_spread_io() # wait a bit for mgr to get updated metrics time.sleep(5) @@ -189,7 +173,7 @@ class TestMDSMetrics(CephFSTestCase): self._spread_directory_on_all_ranks(fscid) # spread some I/O - self._do_spread_io(fscid) + self._do_spread_io() # wait a bit for mgr to get updated metrics time.sleep(5) @@ -230,7 +214,7 @@ class TestMDSMetrics(CephFSTestCase): self._spread_directory_on_all_ranks(fscid) # spread some I/O - self._do_spread_io(fscid) + self._do_spread_io() # wait a bit for mgr to get updated metrics time.sleep(5) @@ -285,7 +269,7 @@ class TestMDSMetrics(CephFSTestCase): self._spread_directory_on_all_ranks(fscid) # spread some I/O - self._do_spread_io(fscid) + self._do_spread_io() # wait a bit for mgr to get updated metrics time.sleep(5) @@ -375,7 +359,7 @@ class TestMDSMetrics(CephFSTestCase): self._spread_directory_on_all_ranks(fscid) # spread some I/O - self._do_spread_io_all_clients(fscid) + self._do_spread_io_all_clients() # wait a bit for mgr to get updated metrics time.sleep(5) @@ -463,7 +447,7 @@ class TestMDSMetrics(CephFSTestCase): self._spread_directory_on_all_ranks(fscid) # spread some I/O - self._do_spread_io_all_clients(fscid) + self._do_spread_io_all_clients() # wait a bit for mgr to get updated metrics time.sleep(5) @@ -495,124 +479,126 @@ class TestMDSMetrics(CephFSTestCase): # cleanup test directories self._cleanup_test_dirs() - def test_client_metrics_and_metadata(self): - self.mount_a.umount_wait() - self.mount_b.umount_wait() - self.fs.delete_all_filesystems() + def test_non_existing_mds_rank(self): + def verify_filtered_metrics(metrics): + # checks if the metrics has non empty client_metadata and global_metrics + if metrics['client_metadata'].get(self.fs.name, {})\ + or metrics['global_metrics'].get(self.fs.name, {}): + return True + return False + + try: + # validate + filter_rank = random.randint(1, 10) + valid, metrics = self._get_metrics(verify_filtered_metrics, 30, + '--mds_rank={}'.format(filter_rank)) + log.info(f'metrics={metrics}') + self.assertFalse(valid, "Fetched 'ceph fs perf stats' metrics using nonexistent MDS rank") + except MaxWhileTries: + # success + pass - self.run_ceph_cmd("fs", "flag", "set", "enable_multiple", - "true", "--yes-i-really-mean-it") +class TestMultiFSMetrics(TestMetrics): + CLIENTS_REQUIRED = 2 + MDSS_REQUIRED = 2 + REQUIRE_BACKUP_FILESYSTEM = True + TEST_DIR_PREFIX = "test_multifs_metrics" - # creating filesystem - fs_a = self._setup_fs(fs_name="fs1", client_id=self.mount_a.client_id) + def setUp(self): + super(TestMultiFSMetrics, self).setUp() + self.fs1 = self.fs + self.fs2 = self.backup_fs + self.mount_b.umount_wait() # fs1 was on mount_b as per CephFSTestCase + self.mount_b.mount_wait(cephfs_name=self.fs2.name) + self._enable_mgr_stats_plugin() - # Mount a client on fs_a - self.mount_a.mount_wait(cephfs_name=fs_a.name) + def _spread_directory_on_all_mounts(self): + for m in range(0, self.CLIENTS_REQUIRED): + dirname = f"{self.TEST_DIR_PREFIX}_{m}" + self.mounts[m].run_shell(["mkdir", dirname]) + for i in range(16): + filename = "{0}.{1}".format("test", i) + self.mounts[m].write_n_mb(os.path.join(dirname, filename), 1) + + def _cleanup_test_dirs(self): + for m in range(0, self.CLIENTS_REQUIRED): + dirnames = self.mounts[m].run_shell(["ls"]).stdout.getvalue() + for dirname in dirnames.split("\n"): + if dirname.startswith(self.TEST_DIR_PREFIX): + log.info("cleaning directory {}".format(dirname)) + self.mounts[m].run_shell(["rm", "-rf", dirname]) + + def test_client_metrics_and_metadata(self): + # do some I/O on fs1 self.mount_a.write_n_mb("pad.bin", 1) self.mount_a.write_n_mb("test.bin", 2) self.mount_a.path_to_ino("test.bin") self.mount_a.create_files() - # creating another filesystem - fs_b = self._setup_fs(fs_name="fs2", client_id=self.mount_b.client_id) - - # Mount a client on fs_b - self.mount_b.mount_wait(cephfs_name=fs_b.name) + # do some I/O on fs2 self.mount_b.write_n_mb("test.bin", 1) self.mount_b.path_to_ino("test.bin") self.mount_b.create_files() - fscid_list = [fs_a.id, fs_b.id] + fscid_list = [self.fs1.id, self.fs2.id] # validate valid, metrics = self._get_metrics( - self.verify_mds_metrics(client_count=1, mul_fs=fscid_list), 30) + self.verify_mds_metrics(client_count=2, mul_fs=fscid_list), 30) log.debug(f"metrics={metrics}") self.assertTrue(valid) - client_metadata_a = metrics['client_metadata']['fs1'] - client_metadata_b = metrics['client_metadata']['fs2'] + client_metadata_a = metrics['client_metadata'][f'{self.fs1.name}'] + client_metadata_b = metrics['client_metadata'][f'{self.fs2.name}'] for i in client_metadata_a: if not (client_metadata_a[i]['hostname']): - raise RuntimeError("hostname of fs1 not found!") + raise RuntimeError(f"hostname of {self.fs1.name} not found!") if not (client_metadata_a[i]['valid_metrics']): - raise RuntimeError("valid_metrics of fs1 not found!") + raise RuntimeError(f"valid_metrics of {self.fs1.name} not found!") for i in client_metadata_b: if not (client_metadata_b[i]['hostname']): - raise RuntimeError("hostname of fs2 not found!") + raise RuntimeError(f"hostname of {self.fs2.name} not found!") if not (client_metadata_b[i]['valid_metrics']): - raise RuntimeError("valid_metrics of fs2 not found!") - - def test_non_existing_mds_rank(self): - def verify_filtered_metrics(metrics): - # checks if the metrics has non empty client_metadata and global_metrics - if metrics['client_metadata'].get(self.fs.name, {})\ - or metrics['global_metrics'].get(self.fs.name, {}): - return True - return False - - try: - # validate - filter_rank = random.randint(1, 10) - valid, metrics = self._get_metrics(verify_filtered_metrics, 30, - '--mds_rank={}'.format(filter_rank)) - log.info(f'metrics={metrics}') - self.assertFalse(valid, "Fetched 'ceph fs perf stats' metrics using nonexistent MDS rank") - except MaxWhileTries: - # success - pass + raise RuntimeError(f"valid_metrics of {self.fs2.name} not found!") + self._cleanup_test_dirs() def test_perf_stats_stale_metrics_with_multiple_filesystem(self): - self.mount_a.umount_wait() - self.mount_b.umount_wait() - self.fs.delete_all_filesystems() - - self.run_ceph_cmd("fs", "flag", "set", "enable_multiple", - "true", "--yes-i-really-mean-it") - - # creating filesystem - fs_b = self._setup_fs(fs_name="fs2", client_id=self.mount_b.client_id) + # do some I/O on fs1 + self.mount_a.write_n_mb("test.bin", 1) + self.mount_a.path_to_ino("test.bin") + self.mount_a.create_files() - # Mount a client on fs_b - self.mount_b.mount_wait(cephfs_name=fs_b.name) - self.mount_b.write_n_mb("test.bin", 1) + # do some I/O on fs2 + self.mount_b.write_n_mb("pad.bin", 1) + self.mount_b.write_n_mb("test.bin", 2) self.mount_b.path_to_ino("test.bin") self.mount_b.create_files() - # creating another filesystem - fs_a = self._setup_fs(fs_name="fs1", client_id=self.mount_a.client_id) - - # Mount a client on fs_a - self.mount_a.mount_wait(cephfs_name=fs_a.name) - self.mount_a.write_n_mb("pad.bin", 1) - self.mount_a.write_n_mb("test.bin", 2) - self.mount_a.path_to_ino("test.bin") - self.mount_a.create_files() + fscid_list = [self.fs1.id, self.fs2.id] # validate valid, metrics = self._get_metrics( - self.verify_mds_metrics(client_count=1, mul_fs=[fs_a.id, fs_b.id]), 30) + self.verify_mds_metrics(client_count=2, mul_fs=fscid_list), 30) log.debug(f"metrics={metrics}") self.assertTrue(valid) # get mounted client's entries from the global_metrics. - client_a_name = f'client.{self.mount_a.get_global_id()}' - + client_b_name = f'client.{self.mount_b.get_global_id()}' global_metrics = metrics['global_metrics'] - client_a_metrics = global_metrics.get("fs1", {}).get(client_a_name, {}) + client_b_metrics = global_metrics.get(self.fs2.name, {}).get(client_b_name, {}) - # fail active mds of fs_a - fs_a_mds = fs_a.get_active_names()[0] - self.mds_cluster.mds_fail(fs_a_mds) - fs_a.wait_for_state('up:active', rank=0, timeout=30) + # fail active mds of fs2 (the filesystem created secondly). + fs2_mds = self.fs2.get_active_names()[0] + self.mds_cluster.mds_fail(fs2_mds) + self.fs2.wait_for_state('up:active', rank=0, timeout=30) # spread directory per rank - self._spread_directory_on_all_ranks(fs_a.id) + self._spread_directory_on_all_mounts() # spread some I/O - self._do_spread_io_all_clients(fs_a.id) + self._do_spread_io_all_clients() # wait a bit for mgr to get updated metrics time.sleep(5) @@ -620,24 +606,23 @@ class TestMDSMetrics(CephFSTestCase): # validate try: valid, metrics_new = self._get_metrics( - self.verify_mds_metrics(client_count=1, mul_fs=[fs_a.id, fs_b.id]), 30) + self.verify_mds_metrics(client_count=2, mul_fs=fscid_list), 30) log.debug(f'metrics={metrics_new}') self.assertTrue(valid) client_metadata = metrics_new['client_metadata'] - client_a_metadata = client_metadata.get("fs1", {}).get(client_a_name, {}) + client_b_metadata = client_metadata.get(self.fs2.name, {}).get(client_b_name, {}) global_metrics = metrics_new['global_metrics'] - client_a_metrics_new = global_metrics.get("fs1", {}).get(client_a_name, {}) + client_b_metrics_new = global_metrics.get(self.fs2.name, {}).get(client_b_name, {}) # the metrics should be different for the test to succeed. - self.assertTrue(client_a_metadata and client_a_metrics_new - and (client_a_metrics_new != client_a_metrics), + self.assertTrue(client_b_metadata and client_b_metrics_new + and (client_b_metrics_new != client_b_metrics), "Invalid 'ceph fs perf stats' metrics after" - f" rank0 mds of {fs_a.name} failover") + f" rank0 mds of {self.fs2.name} failover") except MaxWhileTries: raise RuntimeError("Failed to fetch `ceph fs perf stats` metrics") finally: # cleanup test directories self._cleanup_test_dirs() - -- 2.39.5