return mds_cap
+def get_mon_cap_from_keyring(keyring):
+ '''
+ Extract MON cap for keyring and return it.
+ '''
+ moncap = None
+ for line in keyring.split('\n'):
+ if 'caps mon' in line:
+ moncap = line[line.find(' = "') + 4 : -1]
+ break
+ else:
+ raise RuntimeError('mon cap not found in keyring below -\n{keyring}')
+ return moncap
+
+
+def get_fsnames_from_moncap(moncap):
+ '''
+ Extract FS names from MON cap and return all of them.
+ '''
+ fsnames = []
+ while moncap.find('fsname=') != -1:
+ fsname_first_char = moncap.index('fsname=') + len('fsname=')
+
+ if ',' in moncap:
+ last = moncap.index(',')
+ fsname = moncap[fsname_first_char : last]
+ moncap = moncap.replace(moncap[0 : last+1], '')
+ else:
+ fsname = moncap[fsname_first_char : ]
+ moncap = moncap.replace(moncap[0 : ], '')
+
+ fsnames.append(fsname)
+
+ return fsnames
+
+
def assert_equal(first, second):
msg = f'Variables are not equal.\nfirst = {first}\nsecond = {second}'
assert first == second, msg
#self.run_mon_cap_tests()
self.run_mds_cap_tests(perm, mntpt=mntpt)
- def _get_fsnames_from_moncap(self, moncap):
- fsnames = []
- while moncap.find('fsname=') != -1:
- fsname_first_char = moncap.index('fsname=') + len('fsname=')
-
- if ',' in moncap:
- last = moncap.index(',')
- fsname = moncap[fsname_first_char : last]
- moncap = moncap.replace(moncap[0 : last+1], '')
- else:
- fsname = moncap[fsname_first_char : ]
- moncap = moncap.replace(moncap[0 : ], '')
-
- fsnames.append(fsname)
-
- return fsnames
-
- def _get_mon_cap_from_keyring(self, keyring):
- moncap = None
- for line in keyring.split('\n'):
- if 'caps mon' in line:
- moncap = line[line.find(' = "') + 4 : -1]
- break
- else:
- raise RuntimeError('run_mon_cap_tests(): mon cap not found in '
- 'keyring. keyring -\n' + keyring)
- return moncap
-
def run_mon_cap_tests(self, fs, client_id):
"""
Check that MON cap is enforced for a client by searching for a Ceph
"""
get_cluster_cmd_op = fs.mon_manager.raw_cluster_cmd
keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
- moncap = self._get_mon_cap_from_keyring(keyring)
+ moncap = get_mon_cap_from_keyring(keyring)
keyring_path = fs.admin_remote.mktemp(data=keyring)
fsls = get_cluster_cmd_op(
args=f'fs ls --id {client_id} -k {keyring_path}')
log.info(f'output of fs ls cmd run by client.{client_id} -\n{fsls}')
- fsnames = self._get_fsnames_from_moncap(moncap)
+ fsnames = get_fsnames_from_moncap(moncap)
if fsnames == []:
log.info('no FS name is mentioned in moncap, client has '
'permission to list all files. moncap -\n{moncap}')