assert member in container, msg
-class CapTester:
- """
- Test that MON and MDS caps are enforced.
+class MonCapTester:
+ '''
+ Tested that MON caps are enforced on client by checking if the name of
+ CephFS, which client is authorized for, is listed in output of commmand
+ "ceph fs ls".
+
+ USAGE: Create object of this class at the beginning of the test method
+ and when MON cap needs to be tested, call the method run_mon_cap_tests().
+ '''
+
+ def run_mon_cap_tests(self, fs, client_id):
+ """
+ Check that MON cap is enforced for a client by searching for a Ceph
+ FS name in output of cmd "fs ls" executed with that client's caps.
+
+ fs is any fs object so that ceph commands can be exceuted.
+ """
+ get_cluster_cmd_op = fs.mon_manager.raw_cluster_cmd
+ keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
+ moncap = get_mon_cap_from_keyring(keyring)
+ keyring_path = fs.admin_remote.mktemp(data=keyring)
- MDS caps are tested by exercising read-write permissions and MON caps are
- tested using output of command "ceph fs ls". Besides, it provides
- write_test_files() which creates test files at the given path on CephFS
- mounts passed to it.
+ fsls = get_cluster_cmd_op(
+ args=f'fs ls --id {client_id} -k {keyring_path}')
+ log.info(f'output of fs ls cmd run by client.{client_id} -\n{fsls}')
- USAGE: Call write_test_files() method at the beginning of the test and
- once the caps that needs to be tested are assigned to the client and
- CephFS be remount for caps to effective, call run_cap_tests(),
- run_mon_cap_tests() or run_mds_cap_tests() as per the need.
+ fsnames = get_fsnames_from_moncap(moncap)
+ if fsnames == []:
+ log.info('no FS name is mentioned in moncap, client has '
+ 'permission to list all files. moncap -\n{moncap}')
+ return
+
+ log.info('FS names are mentioned in moncap. moncap -\n{moncap}')
+ log.info('testing for presence of these FS names in output of '
+ '"fs ls" command run by client.')
+ for fsname in fsnames:
+ fsname_cap_str = f'name: {fsname}'
+ assert_in(fsname_cap_str, fsls)
+
+
+class MdsCapTester:
+ """
+ Test that MDS caps are enforced on the client by exercising read-write
+ permissions.
+
+ USAGE: Create object of this class at the beginning of the test method
+ and when MDS cap needs to be tested, call the method run_mds_cap_tests().
"""
def __init__(self, mount=None, path=''):
cephfs_mntpt = {self.mount.cephfs_mntpt}
hostfs_mntpt = {self.mount.hostfs_mntpt}''')
- def run_cap_tests(self, fs, client_id, perm, mntpt=None):
- self.run_mon_cap_tests(fs, client_id)
- self.run_mds_cap_tests(perm, mntpt=mntpt)
-
- def run_mon_cap_tests(self, fs, client_id):
- """
- Check that MON cap is enforced for a client by searching for a Ceph
- FS name in output of cmd "fs ls" executed with that client's caps.
-
- fs is any fs object so that ceph commands can be exceuted.
- """
- get_cluster_cmd_op = fs.mon_manager.raw_cluster_cmd
- keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
- moncap = get_mon_cap_from_keyring(keyring)
- keyring_path = fs.admin_remote.mktemp(data=keyring)
-
- fsls = get_cluster_cmd_op(
- args=f'fs ls --id {client_id} -k {keyring_path}')
- log.info(f'output of fs ls cmd run by client.{client_id} -\n{fsls}')
-
- fsnames = get_fsnames_from_moncap(moncap)
- if fsnames == []:
- log.info('no FS name is mentioned in moncap, client has '
- 'permission to list all files. moncap -\n{moncap}')
- return
-
- log.info('FS names are mentioned in moncap. moncap -\n{moncap}')
- log.info('testing for presence of these FS names in output of '
- '"fs ls" command run by client.')
- for fsname in fsnames:
- fsname_cap_str = f'name: {fsname}'
- assert_in(fsname_cap_str, fsls)
-
def run_mds_cap_tests(self, perm, mntpt=None):
"""
Run test for read perm and, for write perm, run positive test if it
log.info('absence of write perm was tested successfully: '
f'failed to be write data to file {self.path}.')
+
+class CapTester(MonCapTester, MdsCapTester):
+ '''
+ Test MON caps as well as MDS caps. For usage see docstrings of class
+ MDSCapTester.
+
+ USAGE: Create object of this class at the beginning of the test method
+ and when MON and MDS cap needs to be tested, call the method
+ run_cap_tests().
+ '''
+
+ def __init__(self, mount=None, path='', create_test_files=True):
+ if create_test_files:
+ self._create_test_files(mount, path)
+
+ def run_cap_tests(self, fs, client_id, perm, mntpt=None):
+ self.run_mon_cap_tests(fs, client_id)
+ self.run_mds_cap_tests(perm, mntpt)
+
def _conduct_neg_test_for_root_squash_caps(self, _cmdargs, sudo_write=False):
possible_errmsgs = ('permission denied', 'operation not permitted')
cmdargs = ['sudo'] if sudo_write else ['']
def conduct_pos_test_for_open_caps(self, sudo_read=True):
self.conduct_pos_test_for_read_caps(sudo_read)
-
import logging
from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from tasks.cephfs.caps_helper import (CapTester, gen_mon_cap_str,
- gen_osd_cap_str, gen_mds_cap_str)
+from tasks.cephfs.caps_helper import (MonCapTester, MdsCapTester,
+ gen_mon_cap_str, gen_osd_cap_str,
+ gen_mds_cap_str)
from teuthology.exceptions import CommandFailedError
class TestMONCaps(TestMultiFS):
def test_moncap_with_one_fs_names(self):
- self.captester = CapTester()
+ self.captester = MonCapTester()
moncap = gen_mon_cap_str((('r', self.fs1.name),))
self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id)
def test_moncap_with_multiple_fs_names(self):
- self.captester = CapTester()
+ self.captester = MonCapTester()
moncap = gen_mon_cap_str((('r', self.fs1.name), ('r', self.fs2.name)))
self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id)
def test_moncap_with_blanket_allow(self):
- self.captester = CapTester()
+ self.captester = MonCapTester()
moncap = 'allow r'
self.create_client(self.client_id, moncap)
"""
def test_rw_with_fsname_and_no_path_in_cap(self):
PERM = 'rw'
- self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
+ self.captesters = (MdsCapTester(self.mount_a),
+ MdsCapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
def test_r_with_fsname_and_no_path_in_cap(self):
PERM = 'r'
- self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
+ self.captesters = (MdsCapTester(self.mount_a),
+ MdsCapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
PERM, CEPHFS_MNTPT = 'rw', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
- self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
- CapTester(self.mount_b, CEPHFS_MNTPT))
+ self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
+ MdsCapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
PERM, CEPHFS_MNTPT = 'r', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
- self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
- CapTester(self.mount_b, CEPHFS_MNTPT))
+ self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
+ MdsCapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
PERM, CEPHFS_MNTPT = 'rw', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
- self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
- CapTester(self.mount_b, CEPHFS_MNTPT))
+ self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
+ MdsCapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
PERM, CEPHFS_MNTPT = 'r', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
- self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
- CapTester(self.mount_b, CEPHFS_MNTPT))
+ self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
+ MdsCapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
def test_rw_with_no_fsname_and_no_path(self):
PERM = 'rw'
- self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
+ self.captesters = (MdsCapTester(self.mount_a),
+ MdsCapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
def test_r_with_no_fsname_and_no_path(self):
PERM = 'r'
- self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
+ self.captesters = (MdsCapTester(self.mount_a),
+ MdsCapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)