from os.path import join as os_path_join
from teuthology.exceptions import CommandFailedError
+from teuthology.contextutil import safe_while
from tasks.cephfs.cephfs_test_case import CephFSTestCase, classhook
from tasks.cephfs.filesystem import FileLayout, FSMissing
log = logging.getLogger(__name__)
+class TestLabeledPerfCounters(CephFSTestCase):
+ CLIENTS_REQUIRED = 2
+ MDSS_REQUIRED = 1
+
+ def test_per_client_labeled_perf_counters(self):
+ """
+ That the per-client labelled perf counters depict the clients
+ performaing IO.
+ """
+ def get_counters_for(filesystem, client_id):
+ dump = self.fs.rank_tell(["counter", "dump"])
+ per_client_metrics_key = f'mds_client_metrics-{filesystem}'
+ counters = [c["counters"] for \
+ c in dump[per_client_metrics_key] if c["labels"]["client"] == client_id]
+ return counters[0]
+
+ # sleep a bit so that we get updated clients...
+ time.sleep(10)
+
+ # lookout for clients...
+ dump = self.fs.rank_tell(["counter", "dump"])
+
+ fs_suffix = dump["mds_client_metrics"][0]["labels"]["fs_name"]
+ self.assertGreaterEqual(dump["mds_client_metrics"][0]["counters"]["num_clients"], 2)
+
+ per_client_metrics_key = f'mds_client_metrics-{fs_suffix}'
+ mount_a_id = f'client.{self.mount_a.get_global_id()}'
+ mount_b_id = f'client.{self.mount_b.get_global_id()}'
+
+ clients = [c["labels"]["client"] for c in dump[per_client_metrics_key]]
+ self.assertIn(mount_a_id, clients)
+ self.assertIn(mount_b_id, clients)
+
+ # write workload
+ self.mount_a.create_n_files("test_dir/test_file", 1000, sync=True)
+ with safe_while(sleep=1, tries=30, action=f'wait for counters - {mount_a_id}') as proceed:
+ counters_dump_a = get_counters_for(fs_suffix, mount_a_id)
+ while proceed():
+ if counters_dump_a["total_write_ops"] > 0 and counters_dump_a["total_write_size"] > 0:
+ return True
+
+ # read from the other client
+ for i in range(100):
+ self.mount_b.open_background(basename=f'test_dir/test_file_{i}', write=False)
+ with safe_while(sleep=1, tries=30, action=f'wait for counters - {mount_b_id}') as proceed:
+ counters_dump_b = get_counters_for(fs_suffix, mount_b_id)
+ while proceed():
+ if counters_dump_b["total_read_ops"] > 0 and counters_dump_b["total_read_size"] > 0:
+ return True
+
+ self.fs.teardown()
+
class TestAdminCommands(CephFSTestCase):
"""
Tests for administration command.