log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
raise RuntimeError("MDS id '{0}' not found in map".format(name))
+ def get_mds_addrs(self, name):
+ """
+ Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
+ """
+ info = self.get_mds(name)
+ if info:
+ return [e['addr'] for e in info['addrs']['addrvec']]
+ else:
+ log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
+ raise RuntimeError("MDS id '{0}' not found in map".format(name))
+
def get_mds_gid(self, gid):
"""
Get the info for the given MDS gid.
self._one_or_all(mds_id, set_block, in_parallel=False)
+ def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
+ """
+ Block (using iptables) communications from a provided MDS to other MDSs.
+ Block all ports that an MDS uses for communication.
+
+ :param blocked: True to block the MDS, False otherwise
+ :param mds_rank_1: MDS rank
+ :param mds_rank_2: MDS rank
+ :return:
+ """
+ da_flag = "-A" if blocked else "-D"
+
+ def set_block(mds_ids):
+ status = self.status()
+
+ mds = mds_ids[0]
+ remote = self.mon_manager.find_remote('mds', mds)
+ addrs = status.get_mds_addrs(mds)
+ for addr in addrs:
+ ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+ remote.run(
+ args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+
+
+ mds = mds_ids[1]
+ remote = self.mon_manager.find_remote('mds', mds)
+ addrs = status.get_mds_addrs(mds)
+ for addr in addrs:
+ ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+ remote.run(
+ args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+ remote.run(
+ args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+
+ self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
+
def clear_firewall(self):
clear_firewall(self._ctx)
--- /dev/null
+import os
+import json
+import time
+import random
+import logging
+
+from teuthology.contextutil import safe_while
+from tasks.cephfs.cephfs_test_case import CephFSTestCase
+
+log = logging.getLogger(__name__)
+
+class TestMDSMetrics(CephFSTestCase):
+ CLIENTS_REQUIRED = 2
+ MDSS_REQUIRED = 3
+
+ TEST_DIR_PERFIX = "test_mds_metrics"
+
+ def setUp(self):
+ super(TestMDSMetrics, self).setUp()
+ self._start_with_single_active_mds()
+ self._enable_mgr_stats_plugin()
+
+ def tearDown(self):
+ self._disable_mgr_stats_plugin()
+ super(TestMDSMetrics, self).tearDown()
+
+ def _start_with_single_active_mds(self):
+ curr_max_mds = self.fs.get_var('max_mds')
+ if curr_max_mds > 1:
+ self.fs.shrink(1)
+
+ def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[]):
+ def verify_metrics_cbk(metrics):
+ mds_metrics = metrics['metrics']
+ if not len(mds_metrics) == active_mds_count + 1: # n active mdss + delayed set
+ return False
+ fs_status = self.fs.status()
+ nonlocal ranks
+ if not ranks:
+ ranks = set([info['rank'] for info in fs_status.get_ranks(self.fs.id)])
+ for rank in ranks:
+ r = mds_metrics.get("mds.{}".format(rank), None)
+ if not r or not len(mds_metrics['delayed_ranks']) == 0:
+ return False
+ global_metrics = metrics['global_metrics']
+ client_metadata = metrics['client_metadata']
+ if not len(global_metrics) >= client_count or not len(client_metadata) >= client_count:
+ return False
+ return True
+ return verify_metrics_cbk
+
+ def _fs_perf_stats(self, *args):
+ return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", *args)
+
+ def _enable_mgr_stats_plugin(self):
+ return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", "stats")
+
+ def _disable_mgr_stats_plugin(self):
+ return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", "stats")
+
+ def _spread_directory_on_all_ranks(self, fscid):
+ fs_status = self.fs.status()
+ ranks = set([info['rank'] for info in fs_status.get_ranks(fscid)])
+ # create a per-rank pinned directory
+ for rank in ranks:
+ dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PERFIX, rank)
+ self.mount_a.run_shell(["mkdir", dirname])
+ self.mount_a.setfattr(dirname, "ceph.dir.pin", str(rank))
+ log.info("pinning directory {0} to rank {1}".format(dirname, rank))
+ for i in range(16):
+ filename = "{0}.{1}".format("test", i)
+ self.mount_a.write_n_mb(os.path.join(dirname, filename), 1)
+
+ def _do_spread_io(self, fscid):
+ # spread readdir I/O
+ self.mount_b.run_shell(["find", "."])
+
+ def _do_spread_io_all_clients(self, fscid):
+ # spread readdir I/O
+ self.mount_a.run_shell(["find", "."])
+ self.mount_b.run_shell(["find", "."])
+
+ def _cleanup_test_dirs(self):
+ dirnames = self.mount_a.run_shell(["ls"]).stdout.getvalue()
+ for dirname in dirnames.split("\n"):
+ if dirname.startswith(TestMDSMetrics.TEST_DIR_PERFIX):
+ log.info("cleaning directory {}".format(dirname))
+ self.mount_a.run_shell(["rm", "-rf", dirname])
+
+ def _get_metrics(self, verifier_callback, trials, *args):
+ metrics = None
+ done = False
+ with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed:
+ while proceed():
+ metrics = json.loads(self._fs_perf_stats(*args))
+ done = verifier_callback(metrics)
+ if done:
+ break
+ return done, metrics
+
+ # basic check to verify if we get back metrics from each active mds rank
+
+ def test_metrics_from_rank(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ def test_metrics_post_client_disconnection(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ self.mount_a.umount_wait()
+
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED - 1), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ def test_metrics_mds_grow(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # grow the mds cluster
+ self.fs.grow(2)
+
+ fscid = self.fs.id
+ # spread directory per rank
+ self._spread_directory_on_all_ranks(fscid)
+
+ # spread some I/O
+ self._do_spread_io(fscid)
+
+ # wait a bit for mgr to get updated metrics
+ time.sleep(5)
+
+ # validate
+ valid, metrics = self._get_metrics(self.verify_mds_metrics(
+ active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED) , 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # cleanup test directories
+ self._cleanup_test_dirs()
+
+ def test_metrics_mds_grow_and_shrink(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # grow the mds cluster
+ self.fs.grow(2)
+
+ fscid = self.fs.id
+ # spread directory per rank
+ self._spread_directory_on_all_ranks(fscid)
+
+ # spread some I/O
+ self._do_spread_io(fscid)
+
+ # wait a bit for mgr to get updated metrics
+ time.sleep(5)
+
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # shrink mds cluster
+ self.fs.shrink(1)
+
+ # wait a bit for mgr to get updated metrics
+ time.sleep(5)
+
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # cleanup test directories
+ self._cleanup_test_dirs()
+
+ def test_delayed_metrics(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # grow the mds cluster
+ self.fs.grow(2)
+
+ fscid = self.fs.id
+ # spread directory per rank
+ self._spread_directory_on_all_ranks(fscid)
+
+ # spread some I/O
+ self._do_spread_io(fscid)
+
+ # wait a bit for mgr to get updated metrics
+ time.sleep(5)
+
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # do not give this mds any chance
+ delayed_rank = 1
+ mds_id_rank0 = self.fs.get_rank(rank=0)['name']
+ mds_id_rank1 = self.fs.get_rank(rank=1)['name']
+
+ self.fs.set_inter_mds_block(True, mds_id_rank0, mds_id_rank1)
+
+ def verify_delayed_metrics(metrics):
+ mds_metrics = metrics['metrics']
+ r = mds_metrics.get("mds.{}".format(delayed_rank), None)
+ if not r or not delayed_rank in mds_metrics['delayed_ranks']:
+ return False
+ return True
+ # validate
+ valid, metrics = self._get_metrics(verify_delayed_metrics, 30)
+ log.debug("metrics={0}".format(metrics))
+
+ self.assertTrue(valid)
+ self.fs.set_inter_mds_block(False, mds_id_rank0, mds_id_rank1)
+
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # cleanup test directories
+ self._cleanup_test_dirs()
+
+ def test_query_mds_filter(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # grow the mds cluster
+ self.fs.grow(2)
+
+ fscid = self.fs.id
+ # spread directory per rank
+ self._spread_directory_on_all_ranks(fscid)
+
+ # spread some I/O
+ self._do_spread_io(fscid)
+
+ # wait a bit for mgr to get updated metrics
+ time.sleep(5)
+
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # initiate a new query with `--mds_rank` filter and validate if
+ # we get metrics *only* from that mds.
+ filtered_mds = 1
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED,
+ ranks=[filtered_mds]), 30, '--mds_rank={}'.format(filtered_mds))
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ def test_query_client_filter(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ mds_metrics = metrics['metrics']
+ # pick an random client
+ client = random.choice(list(mds_metrics['mds.0'].keys()))
+ # could have used regex to extract client id
+ client_id = (client.split(' ')[0]).split('.')[-1]
+
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=1), 30, '--client_id={}'.format(client_id))
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ def test_query_mds_and_client_filter(self):
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ # grow the mds cluster
+ self.fs.grow(2)
+
+ fscid = self.fs.id
+ # spread directory per rank
+ self._spread_directory_on_all_ranks(fscid)
+
+ # spread some I/O
+ self._do_spread_io_all_clients(fscid)
+
+ # wait a bit for mgr to get updated metrics
+ time.sleep(5)
+
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)
+
+ mds_metrics = metrics['metrics']
+
+ # pick an random client
+ client = random.choice(list(mds_metrics['mds.1'].keys()))
+ # could have used regex to extract client id
+ client_id = (client.split(' ')[0]).split('.')[-1]
+ filtered_mds = 1
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=1, ranks=[filtered_mds]),
+ 30, '--mds_rank={}'.format(filtered_mds), '--client_id={}'.format(client_id))
+ log.debug("metrics={0}".format(metrics))
+ self.assertTrue(valid)