From: Venky Shankar Date: Tue, 10 Sep 2019 12:57:07 +0000 (-0400) Subject: test: add tests for validating MDS metrics via `perf stats` module X-Git-Tag: v16.1.0~867^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a95b364e3e50f4611ca02178a0ba8dee462868a5;p=ceph.git test: add tests for validating MDS metrics via `perf stats` module Fixes: http://tracker.ceph.com/issues/24285 Signed-off-by: Venky Shankar --- diff --git a/qa/suites/multimds/basic/tasks/cephfs_test_mds_metrics.yaml b/qa/suites/multimds/basic/tasks/cephfs_test_mds_metrics.yaml new file mode 100644 index 00000000000..7e5ac4150ea --- /dev/null +++ b/qa/suites/multimds/basic/tasks/cephfs_test_mds_metrics.yaml @@ -0,0 +1,5 @@ +tasks: +- cephfs_test_runner: + fail_on_skip: false + modules: + - tasks.cephfs.test_mds_metrics diff --git a/qa/tasks/cephfs/filesystem.py b/qa/tasks/cephfs/filesystem.py index 3c9d85775b4..bb91c75a9cb 100644 --- a/qa/tasks/cephfs/filesystem.py +++ b/qa/tasks/cephfs/filesystem.py @@ -167,6 +167,17 @@ class FSStatus(object): log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging raise RuntimeError("MDS id '{0}' not found in map".format(name)) + def get_mds_addrs(self, name): + """ + Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]" + """ + info = self.get_mds(name) + if info: + return [e['addr'] for e in info['addrs']['addrvec']] + else: + log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging + raise RuntimeError("MDS id '{0}' not found in map".format(name)) + def get_mds_gid(self, gid): """ Get the info for the given MDS gid. @@ -375,6 +386,45 @@ class MDSCluster(CephCluster): self._one_or_all(mds_id, set_block, in_parallel=False) + def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2): + """ + Block (using iptables) communications from a provided MDS to other MDSs. + Block all ports that an MDS uses for communication. + + :param blocked: True to block the MDS, False otherwise + :param mds_rank_1: MDS rank + :param mds_rank_2: MDS rank + :return: + """ + da_flag = "-A" if blocked else "-D" + + def set_block(mds_ids): + status = self.status() + + mds = mds_ids[0] + remote = self.mon_manager.find_remote('mds', mds) + addrs = status.get_mds_addrs(mds) + for addr in addrs: + ip_str, port_str = re.match("(.+):(.+)", addr).groups() + remote.run( + args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m", + "comment", "--comment", "teuthology"]) + + + mds = mds_ids[1] + remote = self.mon_manager.find_remote('mds', mds) + addrs = status.get_mds_addrs(mds) + for addr in addrs: + ip_str, port_str = re.match("(.+):(.+)", addr).groups() + remote.run( + args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m", + "comment", "--comment", "teuthology"]) + remote.run( + args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m", + "comment", "--comment", "teuthology"]) + + self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False) + def clear_firewall(self): clear_firewall(self._ctx) diff --git a/qa/tasks/cephfs/test_mds_metrics.py b/qa/tasks/cephfs/test_mds_metrics.py new file mode 100644 index 00000000000..e8fa3c413f9 --- /dev/null +++ b/qa/tasks/cephfs/test_mds_metrics.py @@ -0,0 +1,339 @@ +import os +import json +import time +import random +import logging + +from teuthology.contextutil import safe_while +from tasks.cephfs.cephfs_test_case import CephFSTestCase + +log = logging.getLogger(__name__) + +class TestMDSMetrics(CephFSTestCase): + CLIENTS_REQUIRED = 2 + MDSS_REQUIRED = 3 + + TEST_DIR_PERFIX = "test_mds_metrics" + + def setUp(self): + super(TestMDSMetrics, self).setUp() + self._start_with_single_active_mds() + self._enable_mgr_stats_plugin() + + def tearDown(self): + self._disable_mgr_stats_plugin() + super(TestMDSMetrics, self).tearDown() + + def _start_with_single_active_mds(self): + curr_max_mds = self.fs.get_var('max_mds') + if curr_max_mds > 1: + self.fs.shrink(1) + + def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[]): + def verify_metrics_cbk(metrics): + mds_metrics = metrics['metrics'] + if not len(mds_metrics) == active_mds_count + 1: # n active mdss + delayed set + return False + fs_status = self.fs.status() + nonlocal ranks + if not ranks: + ranks = set([info['rank'] for info in fs_status.get_ranks(self.fs.id)]) + for rank in ranks: + r = mds_metrics.get("mds.{}".format(rank), None) + if not r or not len(mds_metrics['delayed_ranks']) == 0: + return False + global_metrics = metrics['global_metrics'] + client_metadata = metrics['client_metadata'] + if not len(global_metrics) >= client_count or not len(client_metadata) >= client_count: + return False + return True + return verify_metrics_cbk + + def _fs_perf_stats(self, *args): + return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", *args) + + def _enable_mgr_stats_plugin(self): + return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", "stats") + + def _disable_mgr_stats_plugin(self): + return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", "stats") + + def _spread_directory_on_all_ranks(self, fscid): + fs_status = self.fs.status() + ranks = set([info['rank'] for info in fs_status.get_ranks(fscid)]) + # create a per-rank pinned directory + for rank in ranks: + dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PERFIX, rank) + self.mount_a.run_shell(["mkdir", dirname]) + self.mount_a.setfattr(dirname, "ceph.dir.pin", str(rank)) + log.info("pinning directory {0} to rank {1}".format(dirname, rank)) + for i in range(16): + filename = "{0}.{1}".format("test", i) + self.mount_a.write_n_mb(os.path.join(dirname, filename), 1) + + def _do_spread_io(self, fscid): + # spread readdir I/O + self.mount_b.run_shell(["find", "."]) + + def _do_spread_io_all_clients(self, fscid): + # spread readdir I/O + self.mount_a.run_shell(["find", "."]) + self.mount_b.run_shell(["find", "."]) + + def _cleanup_test_dirs(self): + dirnames = self.mount_a.run_shell(["ls"]).stdout.getvalue() + for dirname in dirnames.split("\n"): + if dirname.startswith(TestMDSMetrics.TEST_DIR_PERFIX): + log.info("cleaning directory {}".format(dirname)) + self.mount_a.run_shell(["rm", "-rf", dirname]) + + def _get_metrics(self, verifier_callback, trials, *args): + metrics = None + done = False + with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed: + while proceed(): + metrics = json.loads(self._fs_perf_stats(*args)) + done = verifier_callback(metrics) + if done: + break + return done, metrics + + # basic check to verify if we get back metrics from each active mds rank + + def test_metrics_from_rank(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + def test_metrics_post_client_disconnection(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + self.mount_a.umount_wait() + + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED - 1), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + def test_metrics_mds_grow(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # grow the mds cluster + self.fs.grow(2) + + fscid = self.fs.id + # spread directory per rank + self._spread_directory_on_all_ranks(fscid) + + # spread some I/O + self._do_spread_io(fscid) + + # wait a bit for mgr to get updated metrics + time.sleep(5) + + # validate + valid, metrics = self._get_metrics(self.verify_mds_metrics( + active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED) , 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # cleanup test directories + self._cleanup_test_dirs() + + def test_metrics_mds_grow_and_shrink(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # grow the mds cluster + self.fs.grow(2) + + fscid = self.fs.id + # spread directory per rank + self._spread_directory_on_all_ranks(fscid) + + # spread some I/O + self._do_spread_io(fscid) + + # wait a bit for mgr to get updated metrics + time.sleep(5) + + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # shrink mds cluster + self.fs.shrink(1) + + # wait a bit for mgr to get updated metrics + time.sleep(5) + + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # cleanup test directories + self._cleanup_test_dirs() + + def test_delayed_metrics(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # grow the mds cluster + self.fs.grow(2) + + fscid = self.fs.id + # spread directory per rank + self._spread_directory_on_all_ranks(fscid) + + # spread some I/O + self._do_spread_io(fscid) + + # wait a bit for mgr to get updated metrics + time.sleep(5) + + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # do not give this mds any chance + delayed_rank = 1 + mds_id_rank0 = self.fs.get_rank(rank=0)['name'] + mds_id_rank1 = self.fs.get_rank(rank=1)['name'] + + self.fs.set_inter_mds_block(True, mds_id_rank0, mds_id_rank1) + + def verify_delayed_metrics(metrics): + mds_metrics = metrics['metrics'] + r = mds_metrics.get("mds.{}".format(delayed_rank), None) + if not r or not delayed_rank in mds_metrics['delayed_ranks']: + return False + return True + # validate + valid, metrics = self._get_metrics(verify_delayed_metrics, 30) + log.debug("metrics={0}".format(metrics)) + + self.assertTrue(valid) + self.fs.set_inter_mds_block(False, mds_id_rank0, mds_id_rank1) + + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # cleanup test directories + self._cleanup_test_dirs() + + def test_query_mds_filter(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # grow the mds cluster + self.fs.grow(2) + + fscid = self.fs.id + # spread directory per rank + self._spread_directory_on_all_ranks(fscid) + + # spread some I/O + self._do_spread_io(fscid) + + # wait a bit for mgr to get updated metrics + time.sleep(5) + + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # initiate a new query with `--mds_rank` filter and validate if + # we get metrics *only* from that mds. + filtered_mds = 1 + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED, + ranks=[filtered_mds]), 30, '--mds_rank={}'.format(filtered_mds)) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + def test_query_client_filter(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + mds_metrics = metrics['metrics'] + # pick an random client + client = random.choice(list(mds_metrics['mds.0'].keys())) + # could have used regex to extract client id + client_id = (client.split(' ')[0]).split('.')[-1] + + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=1), 30, '--client_id={}'.format(client_id)) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + def test_query_mds_and_client_filter(self): + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + # grow the mds cluster + self.fs.grow(2) + + fscid = self.fs.id + # spread directory per rank + self._spread_directory_on_all_ranks(fscid) + + # spread some I/O + self._do_spread_io_all_clients(fscid) + + # wait a bit for mgr to get updated metrics + time.sleep(5) + + # validate + valid, metrics = self._get_metrics( + self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid) + + mds_metrics = metrics['metrics'] + + # pick an random client + client = random.choice(list(mds_metrics['mds.1'].keys())) + # could have used regex to extract client id + client_id = (client.split(' ')[0]).split('.')[-1] + filtered_mds = 1 + valid, metrics = self._get_metrics( + self.verify_mds_metrics(client_count=1, ranks=[filtered_mds]), + 30, '--mds_rank={}'.format(filtered_mds), '--client_id={}'.format(client_id)) + log.debug("metrics={0}".format(metrics)) + self.assertTrue(valid)