]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test: add tests for validating MDS metrics via `perf stats` module 29951/head
authorVenky Shankar <vshankar@redhat.com>
Tue, 10 Sep 2019 12:57:07 +0000 (08:57 -0400)
committerVenky Shankar <vshankar@redhat.com>
Mon, 12 Oct 2020 11:34:51 +0000 (07:34 -0400)
Fixes: http://tracker.ceph.com/issues/24285
Signed-off-by: Venky Shankar <vshankar@redhat.com>
qa/suites/multimds/basic/tasks/cephfs_test_mds_metrics.yaml [new file with mode: 0644]
qa/tasks/cephfs/filesystem.py
qa/tasks/cephfs/test_mds_metrics.py [new file with mode: 0644]

diff --git a/qa/suites/multimds/basic/tasks/cephfs_test_mds_metrics.yaml b/qa/suites/multimds/basic/tasks/cephfs_test_mds_metrics.yaml
new file mode 100644 (file)
index 0000000..7e5ac41
--- /dev/null
@@ -0,0 +1,5 @@
+tasks:
+- cephfs_test_runner:
+    fail_on_skip: false
+    modules:
+      - tasks.cephfs.test_mds_metrics
index 3c9d85775b46cd258a137802317b20bfcd6869d2..bb91c75a9cbe66e85f2409061c55594c5949dc19 100644 (file)
@@ -167,6 +167,17 @@ class FSStatus(object):
             log.warning(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
             raise RuntimeError("MDS id '{0}' not found in map".format(name))
 
+    def get_mds_addrs(self, name):
+        """
+        Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
+        """
+        info = self.get_mds(name)
+        if info:
+            return [e['addr'] for e in info['addrs']['addrvec']]
+        else:
+            log.warn(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
+            raise RuntimeError("MDS id '{0}' not found in map".format(name))
+
     def get_mds_gid(self, gid):
         """
         Get the info for the given MDS gid.
@@ -375,6 +386,45 @@ class MDSCluster(CephCluster):
 
         self._one_or_all(mds_id, set_block, in_parallel=False)
 
+    def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
+        """
+        Block (using iptables) communications from a provided MDS to other MDSs.
+        Block all ports that an MDS uses for communication.
+
+        :param blocked: True to block the MDS, False otherwise
+        :param mds_rank_1: MDS rank
+        :param mds_rank_2: MDS rank
+        :return:
+        """
+        da_flag = "-A" if blocked else "-D"
+
+        def set_block(mds_ids):
+            status = self.status()
+
+            mds = mds_ids[0]
+            remote = self.mon_manager.find_remote('mds', mds)
+            addrs = status.get_mds_addrs(mds)
+            for addr in addrs:
+                ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+                remote.run(
+                    args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+                          "comment", "--comment", "teuthology"])
+
+
+            mds = mds_ids[1]
+            remote = self.mon_manager.find_remote('mds', mds)
+            addrs = status.get_mds_addrs(mds)
+            for addr in addrs:
+                ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+                remote.run(
+                    args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
+                          "comment", "--comment", "teuthology"])
+                remote.run(
+                    args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+                          "comment", "--comment", "teuthology"])
+
+        self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
+
     def clear_firewall(self):
         clear_firewall(self._ctx)
 
diff --git a/qa/tasks/cephfs/test_mds_metrics.py b/qa/tasks/cephfs/test_mds_metrics.py
new file mode 100644 (file)
index 0000000..e8fa3c4
--- /dev/null
@@ -0,0 +1,339 @@
+import os
+import json
+import time
+import random
+import logging
+
+from teuthology.contextutil import safe_while
+from tasks.cephfs.cephfs_test_case import CephFSTestCase
+
+log = logging.getLogger(__name__)
+
+class TestMDSMetrics(CephFSTestCase):
+    CLIENTS_REQUIRED = 2
+    MDSS_REQUIRED = 3
+
+    TEST_DIR_PERFIX = "test_mds_metrics"
+
+    def setUp(self):
+        super(TestMDSMetrics, self).setUp()
+        self._start_with_single_active_mds()
+        self._enable_mgr_stats_plugin()
+
+    def tearDown(self):
+        self._disable_mgr_stats_plugin()
+        super(TestMDSMetrics, self).tearDown()
+
+    def _start_with_single_active_mds(self):
+        curr_max_mds = self.fs.get_var('max_mds')
+        if curr_max_mds > 1:
+            self.fs.shrink(1)
+
+    def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[]):
+        def verify_metrics_cbk(metrics):
+            mds_metrics = metrics['metrics']
+            if not len(mds_metrics) == active_mds_count + 1: # n active mdss + delayed set
+                return False
+            fs_status = self.fs.status()
+            nonlocal ranks
+            if not ranks:
+                ranks = set([info['rank'] for info in fs_status.get_ranks(self.fs.id)])
+            for rank in ranks:
+                r = mds_metrics.get("mds.{}".format(rank), None)
+                if not r or not len(mds_metrics['delayed_ranks']) == 0:
+                    return False
+            global_metrics = metrics['global_metrics']
+            client_metadata = metrics['client_metadata']
+            if not len(global_metrics) >= client_count or not len(client_metadata) >= client_count:
+                return False
+            return True
+        return verify_metrics_cbk
+
+    def _fs_perf_stats(self, *args):
+        return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", *args)
+
+    def _enable_mgr_stats_plugin(self):
+        return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", "stats")
+
+    def _disable_mgr_stats_plugin(self):
+        return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", "stats")
+
+    def _spread_directory_on_all_ranks(self, fscid):
+        fs_status = self.fs.status()
+        ranks = set([info['rank'] for info in fs_status.get_ranks(fscid)])
+        # create a per-rank pinned directory
+        for rank in ranks:
+            dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PERFIX, rank)
+            self.mount_a.run_shell(["mkdir", dirname])
+            self.mount_a.setfattr(dirname, "ceph.dir.pin", str(rank))
+            log.info("pinning directory {0} to rank {1}".format(dirname, rank))
+            for i in range(16):
+                filename = "{0}.{1}".format("test", i)
+                self.mount_a.write_n_mb(os.path.join(dirname, filename), 1)
+
+    def _do_spread_io(self, fscid):
+        # spread readdir I/O
+        self.mount_b.run_shell(["find", "."])
+
+    def _do_spread_io_all_clients(self, fscid):
+        # spread readdir I/O
+        self.mount_a.run_shell(["find", "."])
+        self.mount_b.run_shell(["find", "."])
+
+    def _cleanup_test_dirs(self):
+        dirnames = self.mount_a.run_shell(["ls"]).stdout.getvalue()
+        for dirname in dirnames.split("\n"):
+            if dirname.startswith(TestMDSMetrics.TEST_DIR_PERFIX):
+                log.info("cleaning directory {}".format(dirname))
+                self.mount_a.run_shell(["rm", "-rf", dirname])
+
+    def _get_metrics(self, verifier_callback, trials, *args):
+        metrics = None
+        done = False
+        with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed:
+            while proceed():
+                metrics = json.loads(self._fs_perf_stats(*args))
+                done = verifier_callback(metrics)
+                if done:
+                    break
+        return done, metrics
+
+    # basic check to verify if we get back metrics from each active mds rank
+
+    def test_metrics_from_rank(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+    def test_metrics_post_client_disconnection(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        self.mount_a.umount_wait()
+
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED - 1), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+    def test_metrics_mds_grow(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # grow the mds cluster
+        self.fs.grow(2)
+
+        fscid = self.fs.id
+        # spread directory per rank
+        self._spread_directory_on_all_ranks(fscid)
+
+        # spread some I/O
+        self._do_spread_io(fscid)
+
+        # wait a bit for mgr to get updated metrics
+        time.sleep(5)
+
+        # validate
+        valid, metrics = self._get_metrics(self.verify_mds_metrics(
+            active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED) , 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # cleanup test directories
+        self._cleanup_test_dirs()
+
+    def test_metrics_mds_grow_and_shrink(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # grow the mds cluster
+        self.fs.grow(2)
+
+        fscid = self.fs.id
+        # spread directory per rank
+        self._spread_directory_on_all_ranks(fscid)
+
+        # spread some I/O
+        self._do_spread_io(fscid)
+
+        # wait a bit for mgr to get updated metrics
+        time.sleep(5)
+
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # shrink mds cluster
+        self.fs.shrink(1)
+
+        # wait a bit for mgr to get updated metrics
+        time.sleep(5)
+
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # cleanup test directories
+        self._cleanup_test_dirs()
+
+    def test_delayed_metrics(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # grow the mds cluster
+        self.fs.grow(2)
+
+        fscid = self.fs.id
+        # spread directory per rank
+        self._spread_directory_on_all_ranks(fscid)
+
+        # spread some I/O
+        self._do_spread_io(fscid)
+
+        # wait a bit for mgr to get updated metrics
+        time.sleep(5)
+
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # do not give this mds any chance
+        delayed_rank = 1
+        mds_id_rank0 = self.fs.get_rank(rank=0)['name']
+        mds_id_rank1 = self.fs.get_rank(rank=1)['name']
+
+        self.fs.set_inter_mds_block(True, mds_id_rank0, mds_id_rank1)
+
+        def verify_delayed_metrics(metrics):
+            mds_metrics = metrics['metrics']
+            r = mds_metrics.get("mds.{}".format(delayed_rank), None)
+            if not r or not delayed_rank in mds_metrics['delayed_ranks']:
+                return False
+            return True
+        # validate
+        valid, metrics = self._get_metrics(verify_delayed_metrics, 30)
+        log.debug("metrics={0}".format(metrics))
+
+        self.assertTrue(valid)
+        self.fs.set_inter_mds_block(False, mds_id_rank0, mds_id_rank1)
+
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # cleanup test directories
+        self._cleanup_test_dirs()
+
+    def test_query_mds_filter(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # grow the mds cluster
+        self.fs.grow(2)
+
+        fscid = self.fs.id
+        # spread directory per rank
+        self._spread_directory_on_all_ranks(fscid)
+
+        # spread some I/O
+        self._do_spread_io(fscid)
+
+        # wait a bit for mgr to get updated metrics
+        time.sleep(5)
+
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # initiate a new query with `--mds_rank` filter and validate if
+        # we get metrics *only* from that mds.
+        filtered_mds = 1
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED,
+                                    ranks=[filtered_mds]), 30, '--mds_rank={}'.format(filtered_mds))
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+    def test_query_client_filter(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        mds_metrics = metrics['metrics']
+        # pick an random client
+        client = random.choice(list(mds_metrics['mds.0'].keys()))
+        # could have used regex to extract client id
+        client_id = (client.split(' ')[0]).split('.')[-1]
+
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=1), 30, '--client_id={}'.format(client_id))
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+    def test_query_mds_and_client_filter(self):
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        # grow the mds cluster
+        self.fs.grow(2)
+
+        fscid = self.fs.id
+        # spread directory per rank
+        self._spread_directory_on_all_ranks(fscid)
+
+        # spread some I/O
+        self._do_spread_io_all_clients(fscid)
+
+        # wait a bit for mgr to get updated metrics
+        time.sleep(5)
+
+        # validate
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)
+
+        mds_metrics = metrics['metrics']
+
+        # pick an random client
+        client = random.choice(list(mds_metrics['mds.1'].keys()))
+        # could have used regex to extract client id
+        client_id = (client.split(' ')[0]).split('.')[-1]
+        filtered_mds = 1
+        valid, metrics = self._get_metrics(
+            self.verify_mds_metrics(client_count=1, ranks=[filtered_mds]),
+            30, '--mds_rank={}'.format(filtered_mds), '--client_id={}'.format(client_id))
+        log.debug("metrics={0}".format(metrics))
+        self.assertTrue(valid)