log = logging.getLogger(__name__)
-class TestMDSMetrics(CephFSTestCase):
- CLIENTS_REQUIRED = 2
- MDSS_REQUIRED = 3
+class TestMetrics(CephFSTestCase):
- TEST_DIR_PERFIX = "test_mds_metrics"
+ def _fs_perf_stats(self, *args):
+ return self.get_ceph_cmd_stdout("fs", "perf", "stats", *args)
- def setUp(self):
- super(TestMDSMetrics, self).setUp()
- self._start_with_single_active_mds()
- self._enable_mgr_stats_plugin()
+ def _enable_mgr_stats_plugin(self):
+ return self.get_ceph_cmd_stdout("mgr", "module", "enable", "stats")
- def tearDown(self):
- self._disable_mgr_stats_plugin()
- super(TestMDSMetrics, self).tearDown()
+ def _disable_mgr_stats_plugin(self):
+ return self.get_ceph_cmd_stdout("mgr", "module", "disable", "stats")
- def _start_with_single_active_mds(self):
- curr_max_mds = self.fs.get_var('max_mds')
- if curr_max_mds > 1:
- self.fs.shrink(1)
+ def _do_spread_io_all_clients(self):
+ # spread readdir I/O
+ self.mount_a.run_shell(["find", "."])
+ self.mount_b.run_shell(["find", "."])
+
+ def _get_metrics(self, verifier_callback, trials, *args):
+ metrics = None
+ done = False
+ with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed:
+ while proceed():
+ metrics = json.loads(self._fs_perf_stats(*args))
+ done = verifier_callback(metrics)
+ if done:
+ break
+ return done, metrics
def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[], mul_fs=[]):
def verify_metrics_cbk(metrics):
return True
return verify_metrics_cbk
- def _fs_perf_stats(self, *args):
- return self.get_ceph_cmd_stdout("fs", "perf", "stats", *args)
+ def tearDown(self):
+ self._disable_mgr_stats_plugin()
+ super(TestMetrics, self).tearDown()
- def _enable_mgr_stats_plugin(self):
- return self.get_ceph_cmd_stdout("mgr", "module", "enable", "stats")
+class TestMDSMetrics(TestMetrics):
+ CLIENTS_REQUIRED = 2
+ MDSS_REQUIRED = 3
+ TEST_DIR_PREFIX = "test_mds_metrics"
- def _disable_mgr_stats_plugin(self):
- return self.get_ceph_cmd_stdout("mgr", "module", "disable", "stats")
+ def setUp(self):
+ super(TestMDSMetrics, self).setUp()
+ self._start_with_single_active_mds()
+ self._enable_mgr_stats_plugin()
+
+ def _start_with_single_active_mds(self):
+ curr_max_mds = self.fs.get_var('max_mds')
+ if curr_max_mds > 1:
+ self.fs.shrink(1)
def _spread_directory_on_all_ranks(self, fscid):
fs_status = self.fs.status()
ranks = set([info['rank'] for info in fs_status.get_ranks(fscid)])
# create a per-rank pinned directory
for rank in ranks:
- dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PERFIX, rank)
+ dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PREFIX, rank)
self.mount_a.run_shell(["mkdir", dirname])
self.mount_a.setfattr(dirname, "ceph.dir.pin", str(rank))
log.info("pinning directory {0} to rank {1}".format(dirname, rank))
filename = "{0}.{1}".format("test", i)
self.mount_a.write_n_mb(os.path.join(dirname, filename), 1)
- def _do_spread_io(self, fscid):
+ def _do_spread_io(self):
# spread readdir I/O
self.mount_b.run_shell(["find", "."])
- def _do_spread_io_all_clients(self, fscid):
- # spread readdir I/O
- self.mount_a.run_shell(["find", "."])
- self.mount_b.run_shell(["find", "."])
-
def _cleanup_test_dirs(self):
dirnames = self.mount_a.run_shell(["ls"]).stdout.getvalue()
for dirname in dirnames.split("\n"):
- if dirname.startswith(TestMDSMetrics.TEST_DIR_PERFIX):
+ if dirname.startswith(TestMDSMetrics.TEST_DIR_PREFIX):
log.info("cleaning directory {}".format(dirname))
self.mount_a.run_shell(["rm", "-rf", dirname])
- def _get_metrics(self, verifier_callback, trials, *args):
- metrics = None
- done = False
- with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed:
- while proceed():
- metrics = json.loads(self._fs_perf_stats(*args))
- done = verifier_callback(metrics)
- if done:
- break
- return done, metrics
-
- def _setup_fs(self, fs_name, client_id):
- fs_a = self.mds_cluster.newfs(name=fs_name)
-
- self.mds_cluster.mds_restart()
-
- # Wait for filesystem to go healthy
- fs_a.wait_for_daemons()
-
- # Reconfigure client auth caps
- self.get_ceph_cmd_result(
- 'auth', 'caps', f"client.{client_id}",
- 'mds', 'allow',
- 'mon', 'allow r',
- 'osd', f'allow rw pool={fs_a.get_data_pool_name()}')
-
- return fs_a
-
# basic check to verify if we get back metrics from each active mds rank
def test_metrics_from_rank(self):
self._spread_directory_on_all_ranks(fscid)
# spread some I/O
- self._do_spread_io(fscid)
+ self._do_spread_io()
# wait a bit for mgr to get updated metrics
time.sleep(5)
self._spread_directory_on_all_ranks(fscid)
# spread some I/O
- self._do_spread_io(fscid)
+ self._do_spread_io()
# wait a bit for mgr to get updated metrics
time.sleep(5)
self._spread_directory_on_all_ranks(fscid)
# spread some I/O
- self._do_spread_io(fscid)
+ self._do_spread_io()
# wait a bit for mgr to get updated metrics
time.sleep(5)
self._spread_directory_on_all_ranks(fscid)
# spread some I/O
- self._do_spread_io(fscid)
+ self._do_spread_io()
# wait a bit for mgr to get updated metrics
time.sleep(5)
self._spread_directory_on_all_ranks(fscid)
# spread some I/O
- self._do_spread_io_all_clients(fscid)
+ self._do_spread_io_all_clients()
# wait a bit for mgr to get updated metrics
time.sleep(5)
self._spread_directory_on_all_ranks(fscid)
# spread some I/O
- self._do_spread_io_all_clients(fscid)
+ self._do_spread_io_all_clients()
# wait a bit for mgr to get updated metrics
time.sleep(5)
# cleanup test directories
self._cleanup_test_dirs()
- def test_client_metrics_and_metadata(self):
- self.mount_a.umount_wait()
- self.mount_b.umount_wait()
- self.fs.delete_all_filesystems()
+ def test_non_existing_mds_rank(self):
+ def verify_filtered_metrics(metrics):
+ # checks if the metrics has non empty client_metadata and global_metrics
+ if metrics['client_metadata'].get(self.fs.name, {})\
+ or metrics['global_metrics'].get(self.fs.name, {}):
+ return True
+ return False
+
+ try:
+ # validate
+ filter_rank = random.randint(1, 10)
+ valid, metrics = self._get_metrics(verify_filtered_metrics, 30,
+ '--mds_rank={}'.format(filter_rank))
+ log.info(f'metrics={metrics}')
+ self.assertFalse(valid, "Fetched 'ceph fs perf stats' metrics using nonexistent MDS rank")
+ except MaxWhileTries:
+ # success
+ pass
- self.run_ceph_cmd("fs", "flag", "set", "enable_multiple",
- "true", "--yes-i-really-mean-it")
+class TestMultiFSMetrics(TestMetrics):
+ CLIENTS_REQUIRED = 2
+ MDSS_REQUIRED = 2
+ REQUIRE_BACKUP_FILESYSTEM = True
+ TEST_DIR_PREFIX = "test_multifs_metrics"
- # creating filesystem
- fs_a = self._setup_fs(fs_name="fs1", client_id=self.mount_a.client_id)
+ def setUp(self):
+ super(TestMultiFSMetrics, self).setUp()
+ self.fs1 = self.fs
+ self.fs2 = self.backup_fs
+ self.mount_b.umount_wait() # fs1 was on mount_b as per CephFSTestCase
+ self.mount_b.mount_wait(cephfs_name=self.fs2.name)
+ self._enable_mgr_stats_plugin()
- # Mount a client on fs_a
- self.mount_a.mount_wait(cephfs_name=fs_a.name)
+ def _spread_directory_on_all_mounts(self):
+ for m in range(0, self.CLIENTS_REQUIRED):
+ dirname = f"{self.TEST_DIR_PREFIX}_{m}"
+ self.mounts[m].run_shell(["mkdir", dirname])
+ for i in range(16):
+ filename = "{0}.{1}".format("test", i)
+ self.mounts[m].write_n_mb(os.path.join(dirname, filename), 1)
+
+ def _cleanup_test_dirs(self):
+ for m in range(0, self.CLIENTS_REQUIRED):
+ dirnames = self.mounts[m].run_shell(["ls"]).stdout.getvalue()
+ for dirname in dirnames.split("\n"):
+ if dirname.startswith(self.TEST_DIR_PREFIX):
+ log.info("cleaning directory {}".format(dirname))
+ self.mounts[m].run_shell(["rm", "-rf", dirname])
+
+ def test_client_metrics_and_metadata(self):
+ # do some I/O on fs1
self.mount_a.write_n_mb("pad.bin", 1)
self.mount_a.write_n_mb("test.bin", 2)
self.mount_a.path_to_ino("test.bin")
self.mount_a.create_files()
- # creating another filesystem
- fs_b = self._setup_fs(fs_name="fs2", client_id=self.mount_b.client_id)
-
- # Mount a client on fs_b
- self.mount_b.mount_wait(cephfs_name=fs_b.name)
+ # do some I/O on fs2
self.mount_b.write_n_mb("test.bin", 1)
self.mount_b.path_to_ino("test.bin")
self.mount_b.create_files()
- fscid_list = [fs_a.id, fs_b.id]
+ fscid_list = [self.fs1.id, self.fs2.id]
# validate
valid, metrics = self._get_metrics(
- self.verify_mds_metrics(client_count=1, mul_fs=fscid_list), 30)
+ self.verify_mds_metrics(client_count=2, mul_fs=fscid_list), 30)
log.debug(f"metrics={metrics}")
self.assertTrue(valid)
- client_metadata_a = metrics['client_metadata']['fs1']
- client_metadata_b = metrics['client_metadata']['fs2']
+ client_metadata_a = metrics['client_metadata'][f'{self.fs1.name}']
+ client_metadata_b = metrics['client_metadata'][f'{self.fs2.name}']
for i in client_metadata_a:
if not (client_metadata_a[i]['hostname']):
- raise RuntimeError("hostname of fs1 not found!")
+ raise RuntimeError(f"hostname of {self.fs1.name} not found!")
if not (client_metadata_a[i]['valid_metrics']):
- raise RuntimeError("valid_metrics of fs1 not found!")
+ raise RuntimeError(f"valid_metrics of {self.fs1.name} not found!")
for i in client_metadata_b:
if not (client_metadata_b[i]['hostname']):
- raise RuntimeError("hostname of fs2 not found!")
+ raise RuntimeError(f"hostname of {self.fs2.name} not found!")
if not (client_metadata_b[i]['valid_metrics']):
- raise RuntimeError("valid_metrics of fs2 not found!")
-
- def test_non_existing_mds_rank(self):
- def verify_filtered_metrics(metrics):
- # checks if the metrics has non empty client_metadata and global_metrics
- if metrics['client_metadata'].get(self.fs.name, {})\
- or metrics['global_metrics'].get(self.fs.name, {}):
- return True
- return False
-
- try:
- # validate
- filter_rank = random.randint(1, 10)
- valid, metrics = self._get_metrics(verify_filtered_metrics, 30,
- '--mds_rank={}'.format(filter_rank))
- log.info(f'metrics={metrics}')
- self.assertFalse(valid, "Fetched 'ceph fs perf stats' metrics using nonexistent MDS rank")
- except MaxWhileTries:
- # success
- pass
+ raise RuntimeError(f"valid_metrics of {self.fs2.name} not found!")
+ self._cleanup_test_dirs()
def test_perf_stats_stale_metrics_with_multiple_filesystem(self):
- self.mount_a.umount_wait()
- self.mount_b.umount_wait()
- self.fs.delete_all_filesystems()
-
- self.run_ceph_cmd("fs", "flag", "set", "enable_multiple",
- "true", "--yes-i-really-mean-it")
-
- # creating filesystem
- fs_b = self._setup_fs(fs_name="fs2", client_id=self.mount_b.client_id)
+ # do some I/O on fs1
+ self.mount_a.write_n_mb("test.bin", 1)
+ self.mount_a.path_to_ino("test.bin")
+ self.mount_a.create_files()
- # Mount a client on fs_b
- self.mount_b.mount_wait(cephfs_name=fs_b.name)
- self.mount_b.write_n_mb("test.bin", 1)
+ # do some I/O on fs2
+ self.mount_b.write_n_mb("pad.bin", 1)
+ self.mount_b.write_n_mb("test.bin", 2)
self.mount_b.path_to_ino("test.bin")
self.mount_b.create_files()
- # creating another filesystem
- fs_a = self._setup_fs(fs_name="fs1", client_id=self.mount_a.client_id)
-
- # Mount a client on fs_a
- self.mount_a.mount_wait(cephfs_name=fs_a.name)
- self.mount_a.write_n_mb("pad.bin", 1)
- self.mount_a.write_n_mb("test.bin", 2)
- self.mount_a.path_to_ino("test.bin")
- self.mount_a.create_files()
+ fscid_list = [self.fs1.id, self.fs2.id]
# validate
valid, metrics = self._get_metrics(
- self.verify_mds_metrics(client_count=1, mul_fs=[fs_a.id, fs_b.id]), 30)
+ self.verify_mds_metrics(client_count=2, mul_fs=fscid_list), 30)
log.debug(f"metrics={metrics}")
self.assertTrue(valid)
# get mounted client's entries from the global_metrics.
- client_a_name = f'client.{self.mount_a.get_global_id()}'
-
+ client_b_name = f'client.{self.mount_b.get_global_id()}'
global_metrics = metrics['global_metrics']
- client_a_metrics = global_metrics.get("fs1", {}).get(client_a_name, {})
+ client_b_metrics = global_metrics.get(self.fs2.name, {}).get(client_b_name, {})
- # fail active mds of fs_a
- fs_a_mds = fs_a.get_active_names()[0]
- self.mds_cluster.mds_fail(fs_a_mds)
- fs_a.wait_for_state('up:active', rank=0, timeout=30)
+ # fail active mds of fs2 (the filesystem created secondly).
+ fs2_mds = self.fs2.get_active_names()[0]
+ self.mds_cluster.mds_fail(fs2_mds)
+ self.fs2.wait_for_state('up:active', rank=0, timeout=30)
# spread directory per rank
- self._spread_directory_on_all_ranks(fs_a.id)
+ self._spread_directory_on_all_mounts()
# spread some I/O
- self._do_spread_io_all_clients(fs_a.id)
+ self._do_spread_io_all_clients()
# wait a bit for mgr to get updated metrics
time.sleep(5)
# validate
try:
valid, metrics_new = self._get_metrics(
- self.verify_mds_metrics(client_count=1, mul_fs=[fs_a.id, fs_b.id]), 30)
+ self.verify_mds_metrics(client_count=2, mul_fs=fscid_list), 30)
log.debug(f'metrics={metrics_new}')
self.assertTrue(valid)
client_metadata = metrics_new['client_metadata']
- client_a_metadata = client_metadata.get("fs1", {}).get(client_a_name, {})
+ client_b_metadata = client_metadata.get(self.fs2.name, {}).get(client_b_name, {})
global_metrics = metrics_new['global_metrics']
- client_a_metrics_new = global_metrics.get("fs1", {}).get(client_a_name, {})
+ client_b_metrics_new = global_metrics.get(self.fs2.name, {}).get(client_b_name, {})
# the metrics should be different for the test to succeed.
- self.assertTrue(client_a_metadata and client_a_metrics_new
- and (client_a_metrics_new != client_a_metrics),
+ self.assertTrue(client_b_metadata and client_b_metrics_new
+ and (client_b_metrics_new != client_b_metrics),
"Invalid 'ceph fs perf stats' metrics after"
- f" rank0 mds of {fs_a.name} failover")
+ f" rank0 mds of {self.fs2.name} failover")
except MaxWhileTries:
raise RuntimeError("Failed to fetch `ceph fs perf stats` metrics")
finally:
# cleanup test directories
self._cleanup_test_dirs()
-