#self.run_mon_cap_tests()
self.run_mds_cap_tests(perm, mntpt=mntpt)
- def run_mon_cap_tests(self, moncap, keyring):
+ def _get_fsnames_from_moncap(self, moncap):
+ fsnames = []
+ while moncap.find('fsname=') != -1:
+ fsname_first_char = moncap.index('fsname=') + len('fsname=')
+
+ if ',' in moncap:
+ last = moncap.index(',')
+ fsname = moncap[fsname_first_char : last]
+ moncap = moncap.replace(moncap[0 : last+1], '')
+ else:
+ fsname = moncap[fsname_first_char : ]
+ moncap = moncap.replace(moncap[0 : ], '')
+
+ fsnames.append(fsname)
+
+ return fsnames
+
+ def run_mon_cap_tests(self, def_fs, client_id):
"""
Check that MON cap is enforced for a client by searching for a Ceph
FS name in output of cmd "fs ls" executed with that client's caps.
+
+ def_fs stands for default FS on Ceph cluster.
"""
- keyring_path = self.fs.admin_remote.mktemp(data=keyring)
+ get_cluster_cmd_op = def_fs.mon_manager.raw_cluster_cmd
+
+ keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
+
+ moncap = None
+ for line in keyring.split('\n'):
+ if 'caps mon' in line:
+ moncap = line[line.find(' = "') + 4 : -1]
+ break
+ else:
+ raise RuntimeError('run_mon_cap_tests(): mon cap not found in '
+ 'keyring. keyring -\n' + keyring)
- fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k '
- f'{keyring_path}')
+ keyring_path = def_fs.admin_remote.mktemp(data=keyring)
+
+ fsls = get_cluster_cmd_op(
+ args=f'fs ls --id {client_id} -k {keyring_path}')
if 'fsname=' not in moncap:
- fsls_admin = self.run_cluster_cmd('fs ls')
+ fsls_admin = get_cluster_cmd_op(args='fs ls')
self.assertEqual(fsls, fsls_admin)
return
- fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \
- (self.fs.name,)
- for fsname in fss:
- if fsname in moncap:
- self.assertIn('name: ' + fsname, fsls)
- else:
- self.assertNotIn('name: ' + fsname, fsls)
+ for fsname in self._get_fsnames_from_moncap(moncap):
+ self.assertIn('name: ' + fsname, fsls)
def run_mds_cap_tests(self, perm, mntpt=None):
"""
cmdargs.append(path)
mount.negtestcmd(args=cmdargs, retval=1, errmsgs=possible_errmsgs)
cmdargs.pop(-1)
-
- def get_mon_cap_from_keyring(self, client_name):
- keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}')
- for line in keyring.split('\n'):
- if 'caps mon' in line:
- return line[line.find(' = "') + 4 : -1]
-
- raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. '
- 'keyring -\n' + keyring)
+ log.info('absence of write perm was tested successfully: '
+ f'failed to be write data to file {path}.')