From: Rishabh Dave Date: Sat, 30 Apr 2022 07:35:53 +0000 (+0530) Subject: qa/cephfs: upgrade caps_helper.CapTester.run_mon_cap_tests() X-Git-Tag: v18.0.0~162^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=463c9e171422f959a481ab3e434c87789b235fe3;p=ceph.git qa/cephfs: upgrade caps_helper.CapTester.run_mon_cap_tests() With this commit, the design of run_mon_cap_tests() is now aligned with rest of CapTester -- it's not meant to be inherited by test class anymore and it is to be called by using CapTester instance. This commit also changes working of this method. Now instead of obtaining FS names from Python objects representing FSs, it obtains those names from MON cap. This removes the need to pass the FS objects around. Signed-off-by: Rishabh Dave --- diff --git a/qa/tasks/cephfs/caps_helper.py b/qa/tasks/cephfs/caps_helper.py index 69e78b2dffed..64cec67fcecf 100644 --- a/qa/tasks/cephfs/caps_helper.py +++ b/qa/tasks/cephfs/caps_helper.py @@ -68,28 +68,55 @@ class CapTester(CephFSTestCase): #self.run_mon_cap_tests() self.run_mds_cap_tests(perm, mntpt=mntpt) - def run_mon_cap_tests(self, moncap, keyring): + def _get_fsnames_from_moncap(self, moncap): + fsnames = [] + while moncap.find('fsname=') != -1: + fsname_first_char = moncap.index('fsname=') + len('fsname=') + + if ',' in moncap: + last = moncap.index(',') + fsname = moncap[fsname_first_char : last] + moncap = moncap.replace(moncap[0 : last+1], '') + else: + fsname = moncap[fsname_first_char : ] + moncap = moncap.replace(moncap[0 : ], '') + + fsnames.append(fsname) + + return fsnames + + def run_mon_cap_tests(self, def_fs, client_id): """ Check that MON cap is enforced for a client by searching for a Ceph FS name in output of cmd "fs ls" executed with that client's caps. + + def_fs stands for default FS on Ceph cluster. """ - keyring_path = self.fs.admin_remote.mktemp(data=keyring) + get_cluster_cmd_op = def_fs.mon_manager.raw_cluster_cmd + + keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}') + + moncap = None + for line in keyring.split('\n'): + if 'caps mon' in line: + moncap = line[line.find(' = "') + 4 : -1] + break + else: + raise RuntimeError('run_mon_cap_tests(): mon cap not found in ' + 'keyring. keyring -\n' + keyring) - fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k ' - f'{keyring_path}') + keyring_path = def_fs.admin_remote.mktemp(data=keyring) + + fsls = get_cluster_cmd_op( + args=f'fs ls --id {client_id} -k {keyring_path}') if 'fsname=' not in moncap: - fsls_admin = self.run_cluster_cmd('fs ls') + fsls_admin = get_cluster_cmd_op(args='fs ls') self.assertEqual(fsls, fsls_admin) return - fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \ - (self.fs.name,) - for fsname in fss: - if fsname in moncap: - self.assertIn('name: ' + fsname, fsls) - else: - self.assertNotIn('name: ' + fsname, fsls) + for fsname in self._get_fsnames_from_moncap(moncap): + self.assertIn('name: ' + fsname, fsls) def run_mds_cap_tests(self, perm, mntpt=None): """ @@ -135,12 +162,5 @@ class CapTester(CephFSTestCase): cmdargs.append(path) mount.negtestcmd(args=cmdargs, retval=1, errmsgs=possible_errmsgs) cmdargs.pop(-1) - - def get_mon_cap_from_keyring(self, client_name): - keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}') - for line in keyring.split('\n'): - if 'caps mon' in line: - return line[line.find(' = "') + 4 : -1] - - raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. ' - 'keyring -\n' + keyring) + log.info('absence of write perm was tested successfully: ' + f'failed to be write data to file {path}.')