import json
from io import StringIO
+from os.path import join as os_path_join
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from tasks.cephfs.fuse_mount import FuseMount
-
from tasks.cephfs.filesystem import FileLayout
+from tasks.cephfs.fuse_mount import FuseMount
+from tasks.cephfs.caps_helper import CapsHelper
class TestAdminCommands(CephFSTestCase):
self.assertTrue("NAME" in s)
self.assertTrue("mon_host" in s)
+
def test_client_config(self):
"""
That I can successfully issue asok "config set" commands
self.mount_a.umount_wait(require_clean=True)
self.fs.mds_stop()
+
class TestMirroringCommands(CephFSTestCase):
CLIENTS_REQUIRED = 1
MDSS_REQUIRED = 1
self.fs.mon_manager.raw_cluster_cmd("fs", "reset", self.fs.name, "--yes-i-really-mean-it")
self.fs.wait_for_daemons()
self._verify_mirroring(self.fs.name, "disabled")
+
+
+class TestSubCmdFsAuthorize(CapsHelper):
+ client_id = 'testuser'
+ client_name = 'client.' + client_id
+
+ def test_single_path_r(self):
+ perm = 'r'
+ filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
+ moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+ self.run_mon_cap_tests(moncap, keyring)
+ self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+ def test_single_path_rw(self):
+ perm = 'rw'
+ filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
+ moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+ self.run_mon_cap_tests(moncap, keyring)
+ self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+ def test_multiple_path_r(self):
+ perm, paths = 'r', ('/dir1', '/dir2/dir22')
+ filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
+ moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+ keyring_path = self.create_keyring_file(self.mount_a.client_remote,
+ keyring)
+ for path in paths:
+ self.mount_a.remount(client_id=self.client_id,
+ client_keyring_path=keyring_path,
+ cephfs_mntpt=path)
+
+
+ # actual tests...
+ self.run_mon_cap_tests(moncap, keyring)
+ self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+ def test_multiple_path_rw(self):
+ perm, paths = 'rw', ('/dir1', '/dir2/dir22')
+ filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
+ moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+ keyring_path = self.create_keyring_file(self.mount_a.client_remote,
+ keyring)
+ for path in paths:
+ self.mount_a.remount(client_id=self.client_id,
+ client_keyring_path=keyring_path,
+ cephfs_mntpt=path)
+
+
+ # actual tests...
+ self.run_mon_cap_tests(moncap, keyring)
+ self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+ def tearDown(self):
+ self.mount_a.umount_wait()
+ self.run_cluster_cmd(f'auth rm {self.client_name}')
+
+ super(type(self), self).tearDown()
+
+ def setup_for_single_path(self, perm):
+ filedata, filename = 'some data on fs 1', 'file_on_fs1'
+
+ filepath = os_path_join(self.mount_a.hostfs_mntpt, filename)
+ self.mount_a.write_file(filepath, filedata)
+
+ keyring = self.fs.authorize(self.client_id, ('/', perm))
+ keyring_path = self.create_keyring_file(self.mount_a.client_remote,
+ keyring)
+
+ self.mount_a.remount(client_id=self.client_id,
+ client_keyring_path=keyring_path,
+ cephfs_mntpt='/')
+
+ return filepath, filedata, keyring
+
+ def setup_for_multiple_paths(self, perm, paths):
+ filedata, filename = 'some data on fs 1', 'file_on_fs1'
+
+ self.mount_a.run_shell('mkdir -p dir1/dir12/dir13 dir2/dir22/dir23')
+
+ filepaths = []
+ for path in paths:
+ filepath = os_path_join(self.mount_a.hostfs_mntpt, path[1:], filename)
+ self.mount_a.write_file(filepath, filedata)
+ filepaths.append(filepath.replace(path, ''))
+ filepaths = tuple(filepaths)
+
+ keyring = self.fs.authorize(self.client_id, (path[0], perm, path[1],
+ perm))
+
+ return filepaths, filedata, keyring
+
+ def setup_test_env(self, perm, paths=()):
+ filepaths, filedata, keyring = self.setup_for_multiple_paths(perm, paths) if paths \
+ else self.setup_for_single_path(perm)
+
+ if not isinstance(filepaths, tuple):
+ filepaths = (filepaths, )
+ if not isinstance(filedata, tuple):
+ filedata = (filedata, )
+ mounts = (self.mount_a, )
+
+ return filepaths, filedata, mounts, keyring