Helper methods to test that MON and MDS caps are enforced properly.
"""
from os.path import join as os_path_join
+from logging import getLogger
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.orchestra.run import Raw
+log = getLogger(__name__)
+
+
class CapTester(CephFSTestCase):
"""
Test that MON and MDS caps are enforced.
testpath = ''
for mount_x in mounts:
+ log.info(f'creating test file on FS {mount_x.cephfs_name} '
+ f'mounted at {mount_x.mountpoint}...')
dirpath = os_path_join(mount_x.hostfs_mntpt, testpath, dirname)
mount_x.run_shell(f'mkdir {dirpath}')
filepath = os_path_join(dirpath, filename)
f'hostfs_mntpt = {mount_x.hostfs_mntpt}')
mount_x.write_file(filepath, filedata)
self.test_set.append((mount_x, filepath, filedata))
+ log.info('test file created at {path} with data "{data}.')
def run_cap_tests(self, perm, mntpt=None):
# TODO
fsls = get_cluster_cmd_op(
args=f'fs ls --id {client_id} -k {keyring_path}')
+ log.info(f'output of fs ls cmd run by client.{client_id} -\n{fsls}')
if 'fsname=' not in moncap:
+ log.info('no FS name is mentioned in moncap, client has '
+ 'permission to list all files. moncap -\n{moncap}')
+ log.info('testing for presence of all FS names in output of '
+ '"fs ls" command run by client.')
+
fsls_admin = get_cluster_cmd_op(args='fs ls')
+ log.info('output of fs ls cmd run by admin -\n{fsls_admin}')
+
self.assertEqual(fsls, fsls_admin)
return
+ log.info('FS names are mentioned in moncap. moncap -\n{moncap}')
+ log.info('testing for presence of these FS names in output of '
+ '"fs ls" command run by client.')
for fsname in self._get_fsnames_from_moncap(moncap):
self.assertIn('name: ' + fsname, fsls)
def conduct_pos_test_for_read_caps(self):
for mount, path, data in self.test_set:
+ log.info(f'test read perm: read file {path} and expect data '
+ f'"{data}"')
contents = mount.read_file(path)
self.assertEqual(data, contents)
+ log.info(f'read perm was tested successfully: "{data}" was '
+ f'successfully read from path {path}')
def conduct_pos_test_for_write_caps(self):
for mount, path, data in self.test_set:
+ log.info(f'test write perm: try writing data "{data}" to '
+ f'file {path}.')
mount.write_file(path=path, data=data)
contents = mount.read_file(path=path)
self.assertEqual(data, contents)
+ log.info(f'write perm was tested was successfully: data '
+ f'"{data}" was successfully written to file "{path}".')
def conduct_neg_test_for_write_caps(self):
possible_errmsgs = ('permission denied', 'operation not permitted')
# don't use data, cmd args to write are set already above.
for mount, path, data in self.test_set:
+ log.info('test absence of write perm: expect failure '
+ f'writing data to file {path}.')
cmdargs.append(path)
mount.negtestcmd(args=cmdargs, retval=1, errmsgs=possible_errmsgs)
cmdargs.pop(-1)