from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.filesystem import FileLayout, FSMissing
from tasks.cephfs.fuse_mount import FuseMount
-from tasks.cephfs.caps_helper import CapsHelper
+from tasks.cephfs.caps_helper import CapTester
log = logging.getLogger(__name__)
self._verify_mirroring(self.fs.name, "disabled")
-class TestFsAuthorize(CapsHelper):
+class TestFsAuthorize(CephFSTestCase):
client_id = 'testuser'
client_name = 'client.' + client_id
def test_single_path_r(self):
- perm = 'r'
- filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
- moncap = self.get_mon_cap_from_keyring(self.client_name)
+ PERM = 'r'
+ FS_AUTH_CAPS = (('/', PERM),)
+ self.captester = CapTester()
+ self.setup_test_env(FS_AUTH_CAPS)
- self.run_mon_cap_tests(moncap, keyring)
- self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+ self.captester.run_mon_cap_tests(self.fs, self.client_id)
+ self.captester.run_mds_cap_tests(PERM)
def test_single_path_rw(self):
- perm = 'rw'
- filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
- moncap = self.get_mon_cap_from_keyring(self.client_name)
+ PERM = 'rw'
+ FS_AUTH_CAPS = (('/', PERM),)
+ self.captester = CapTester()
+ self.setup_test_env(FS_AUTH_CAPS)
- self.run_mon_cap_tests(moncap, keyring)
- self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+ self.captester.run_mon_cap_tests(self.fs, self.client_id)
+ self.captester.run_mds_cap_tests(PERM)
def test_single_path_rootsquash(self):
filedata, filename = 'some data on fs 1', 'file_on_fs1'
def test_single_path_authorize_on_nonalphanumeric_fsname(self):
"""
- That fs authorize command works on filesystems with names having [_.-] characters
+ That fs authorize command works on filesystems with names having [_.-]
+ characters
"""
self.mount_a.umount_wait(require_clean=True)
self.mds_cluster.delete_all_filesystems()
f'osd "allow rw pool={self.fs.get_data_pool_name()}" '
f'mds allow')
self.mount_a.remount(cephfs_name=self.fs.name)
- perm = 'rw'
- filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
- self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+ PERM = 'rw'
+ FS_AUTH_CAPS = (('/', PERM),)
+ self.captester = CapTester()
+ self.setup_test_env(FS_AUTH_CAPS)
+ self.captester.run_mds_cap_tests(PERM)
def test_multiple_path_r(self):
- perm, paths = 'r', ('/dir1', '/dir2/dir22')
- filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
- moncap = self.get_mon_cap_from_keyring(self.client_name)
+ PERM = 'r'
+ FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM))
+ for c in FS_AUTH_CAPS:
+ self.mount_a.run_shell(f'mkdir -p .{c[0]}')
+ self.captesters = (CapTester(), CapTester())
+ self.setup_test_env(FS_AUTH_CAPS)
- keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
- for path in paths:
- self.mount_a.remount(client_id=self.client_id,
- client_keyring_path=keyring_path,
- cephfs_mntpt=path)
-
-
- # actual tests...
- self.run_mon_cap_tests(moncap, keyring)
- self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+ self.run_cap_test_one_by_one(FS_AUTH_CAPS)
def test_multiple_path_rw(self):
- perm, paths = 'rw', ('/dir1', '/dir2/dir22')
- filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
- moncap = self.get_mon_cap_from_keyring(self.client_name)
-
- keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
- for path in paths:
- self.mount_a.remount(client_id=self.client_id,
- client_keyring_path=keyring_path,
- cephfs_mntpt=path)
-
-
+ PERM = 'rw'
+ FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM))
+ for c in FS_AUTH_CAPS:
+ self.mount_a.run_shell(f'mkdir -p .{c[0]}')
+ self.captesters = (CapTester(), CapTester())
+ self.setup_test_env(FS_AUTH_CAPS)
+
+ self.run_cap_test_one_by_one(FS_AUTH_CAPS)
+
+ def run_cap_test_one_by_one(self, fs_auth_caps):
+ keyring = self.run_cluster_cmd(f'auth get {self.client_name}')
+ for i, c in enumerate(fs_auth_caps):
+ self.assertIn(i, (0, 1))
+ PATH = c[0]
+ PERM = c[1]
+ self._remount(keyring, PATH)
# actual tests...
- self.run_mon_cap_tests(moncap, keyring)
- self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+ self.captesters[i].run_mon_cap_tests(self.fs, self.client_id)
+ self.captesters[i].run_mds_cap_tests(PERM, PATH)
def tearDown(self):
self.mount_a.umount_wait()
super(type(self), self).tearDown()
- def setup_for_single_path(self, perm):
- filedata, filename = 'some data on fs 1', 'file_on_fs1'
-
- filepath = os_path_join(self.mount_a.hostfs_mntpt, filename)
- self.mount_a.write_file(filepath, filedata)
-
- keyring = self.fs.authorize(self.client_id, ('/', perm))
+ def _remount(self, keyring, path='/'):
keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
-
self.mount_a.remount(client_id=self.client_id,
client_keyring_path=keyring_path,
- cephfs_mntpt='/')
-
- return filepath, filedata, keyring
+ cephfs_mntpt=path)
- def setup_for_multiple_paths(self, perm, paths):
- filedata, filename = 'some data on fs 1', 'file_on_fs1'
+ def setup_for_single_path(self, fs_auth_caps):
+ self.captester.write_test_files((self.mount_a,), '/')
+ keyring = self.fs.authorize(self.client_id, fs_auth_caps)
+ self._remount(keyring)
- self.mount_a.run_shell('mkdir -p dir1/dir12/dir13 dir2/dir22/dir23')
+ def setup_for_multiple_paths(self, fs_auth_caps):
+ for i, c in enumerate(fs_auth_caps):
+ PATH = c[0]
+ self.captesters[i].write_test_files((self.mount_a,), PATH)
- filepaths = []
- for path in paths:
- filepath = os_path_join(self.mount_a.hostfs_mntpt, path[1:], filename)
- self.mount_a.write_file(filepath, filedata)
- filepaths.append(filepath.replace(path, ''))
- filepaths = tuple(filepaths)
+ self.fs.authorize(self.client_id, fs_auth_caps)
- keyring = self.fs.authorize(self.client_id, (paths[0], perm, paths[1],
- perm))
-
- return filepaths, filedata, keyring
-
- def setup_test_env(self, perm, paths=()):
- filepaths, filedata, keyring = self.setup_for_multiple_paths(perm, paths) if paths \
- else self.setup_for_single_path(perm)
-
- if not isinstance(filepaths, tuple):
- filepaths = (filepaths, )
- if not isinstance(filedata, tuple):
- filedata = (filedata, )
- mounts = (self.mount_a, )
-
- return filepaths, filedata, mounts, keyring
+ def setup_test_env(self, fs_auth_caps):
+ if len(fs_auth_caps) == 1:
+ self.setup_for_single_path(fs_auth_caps[0])
+ else:
+ self.setup_for_multiple_paths(fs_auth_caps)
class TestAdminCommandIdempotency(CephFSTestCase):