]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/cephfs: repair test_admin.FsAuthorize's compatibility
authorRishabh Dave <ridave@redhat.com>
Thu, 9 Jun 2022 13:09:55 +0000 (18:39 +0530)
committerRishabh Dave <ridave@redhat.com>
Thu, 21 Jul 2022 12:23:16 +0000 (17:53 +0530)
Upgrade test_admin.FsAuthorize to repair its compatibility with
caps_helper.py since caps_hepler.py has been heavily modified in
previous commits.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
qa/tasks/cephfs/test_admin.py

index 13dd641d85cc6eaf5572f8188d2068260a6a4cd8..4f85bc84f59cb6646097de4883f8e4ff49978bee 100644 (file)
@@ -12,7 +12,7 @@ from teuthology.exceptions import CommandFailedError
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
 from tasks.cephfs.filesystem import FileLayout, FSMissing
 from tasks.cephfs.fuse_mount import FuseMount
-from tasks.cephfs.caps_helper import CapsHelper
+from tasks.cephfs.caps_helper import CapTester
 
 log = logging.getLogger(__name__)
 
@@ -1200,25 +1200,27 @@ class TestMirroringCommands(CephFSTestCase):
         self._verify_mirroring(self.fs.name, "disabled")
 
 
-class TestFsAuthorize(CapsHelper):
+class TestFsAuthorize(CephFSTestCase):
     client_id = 'testuser'
     client_name = 'client.' + client_id
 
     def test_single_path_r(self):
-        perm = 'r'
-        filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
-        moncap = self.get_mon_cap_from_keyring(self.client_name)
+        PERM = 'r'
+        FS_AUTH_CAPS = (('/', PERM),)
+        self.captester = CapTester()
+        self.setup_test_env(FS_AUTH_CAPS)
 
-        self.run_mon_cap_tests(moncap, keyring)
-        self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+        self.captester.run_mon_cap_tests(self.fs, self.client_id)
+        self.captester.run_mds_cap_tests(PERM)
 
     def test_single_path_rw(self):
-        perm = 'rw'
-        filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
-        moncap = self.get_mon_cap_from_keyring(self.client_name)
+        PERM = 'rw'
+        FS_AUTH_CAPS = (('/', PERM),)
+        self.captester = CapTester()
+        self.setup_test_env(FS_AUTH_CAPS)
 
-        self.run_mon_cap_tests(moncap, keyring)
-        self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+        self.captester.run_mon_cap_tests(self.fs, self.client_id)
+        self.captester.run_mds_cap_tests(PERM)
 
     def test_single_path_rootsquash(self):
         filedata, filename = 'some data on fs 1', 'file_on_fs1'
@@ -1240,7 +1242,8 @@ class TestFsAuthorize(CapsHelper):
 
     def test_single_path_authorize_on_nonalphanumeric_fsname(self):
         """
-        That fs authorize command works on filesystems with names having [_.-] characters
+        That fs authorize command works on filesystems with names having [_.-]
+        characters
         """
         self.mount_a.umount_wait(require_clean=True)
         self.mds_cluster.delete_all_filesystems()
@@ -1252,41 +1255,42 @@ class TestFsAuthorize(CapsHelper):
                              f'osd "allow rw pool={self.fs.get_data_pool_name()}" '
                              f'mds allow')
         self.mount_a.remount(cephfs_name=self.fs.name)
-        perm = 'rw'
-        filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
-        self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+        PERM = 'rw'
+        FS_AUTH_CAPS = (('/', PERM),)
+        self.captester = CapTester()
+        self.setup_test_env(FS_AUTH_CAPS)
+        self.captester.run_mds_cap_tests(PERM)
 
     def test_multiple_path_r(self):
-        perm, paths = 'r', ('/dir1', '/dir2/dir22')
-        filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
-        moncap = self.get_mon_cap_from_keyring(self.client_name)
+        PERM = 'r'
+        FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM))
+        for c in FS_AUTH_CAPS:
+            self.mount_a.run_shell(f'mkdir -p .{c[0]}')
+        self.captesters = (CapTester(), CapTester())
+        self.setup_test_env(FS_AUTH_CAPS)
 
-        keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
-        for path in paths:
-            self.mount_a.remount(client_id=self.client_id,
-                                 client_keyring_path=keyring_path,
-                                 cephfs_mntpt=path)
-
-
-            # actual tests...
-            self.run_mon_cap_tests(moncap, keyring)
-            self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+        self.run_cap_test_one_by_one(FS_AUTH_CAPS)
 
     def test_multiple_path_rw(self):
-        perm, paths = 'rw', ('/dir1', '/dir2/dir22')
-        filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
-        moncap = self.get_mon_cap_from_keyring(self.client_name)
-
-        keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
-        for path in paths:
-            self.mount_a.remount(client_id=self.client_id,
-                                 client_keyring_path=keyring_path,
-                                 cephfs_mntpt=path)
-
-
+        PERM = 'rw'
+        FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM))
+        for c in FS_AUTH_CAPS:
+            self.mount_a.run_shell(f'mkdir -p .{c[0]}')
+        self.captesters = (CapTester(), CapTester())
+        self.setup_test_env(FS_AUTH_CAPS)
+
+        self.run_cap_test_one_by_one(FS_AUTH_CAPS)
+
+    def run_cap_test_one_by_one(self, fs_auth_caps):
+        keyring = self.run_cluster_cmd(f'auth get {self.client_name}')
+        for i, c in enumerate(fs_auth_caps):
+            self.assertIn(i, (0, 1))
+            PATH = c[0]
+            PERM = c[1]
+            self._remount(keyring, PATH)
             # actual tests...
-            self.run_mon_cap_tests(moncap, keyring)
-            self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+            self.captesters[i].run_mon_cap_tests(self.fs, self.client_id)
+            self.captesters[i].run_mds_cap_tests(PERM, PATH)
 
     def tearDown(self):
         self.mount_a.umount_wait()
@@ -1294,49 +1298,29 @@ class TestFsAuthorize(CapsHelper):
 
         super(type(self), self).tearDown()
 
-    def setup_for_single_path(self, perm):
-        filedata, filename = 'some data on fs 1', 'file_on_fs1'
-
-        filepath = os_path_join(self.mount_a.hostfs_mntpt, filename)
-        self.mount_a.write_file(filepath, filedata)
-
-        keyring = self.fs.authorize(self.client_id, ('/', perm))
+    def _remount(self, keyring, path='/'):
         keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
-
         self.mount_a.remount(client_id=self.client_id,
                              client_keyring_path=keyring_path,
-                             cephfs_mntpt='/')
-
-        return filepath, filedata, keyring
+                             cephfs_mntpt=path)
 
-    def setup_for_multiple_paths(self, perm, paths):
-        filedata, filename = 'some data on fs 1', 'file_on_fs1'
+    def setup_for_single_path(self, fs_auth_caps):
+        self.captester.write_test_files((self.mount_a,), '/')
+        keyring = self.fs.authorize(self.client_id, fs_auth_caps)
+        self._remount(keyring)
 
-        self.mount_a.run_shell('mkdir -p dir1/dir12/dir13 dir2/dir22/dir23')
+    def setup_for_multiple_paths(self, fs_auth_caps):
+        for i, c in enumerate(fs_auth_caps):
+            PATH = c[0]
+            self.captesters[i].write_test_files((self.mount_a,), PATH)
 
-        filepaths = []
-        for path in paths:
-            filepath = os_path_join(self.mount_a.hostfs_mntpt, path[1:], filename)
-            self.mount_a.write_file(filepath, filedata)
-            filepaths.append(filepath.replace(path, ''))
-        filepaths = tuple(filepaths)
+        self.fs.authorize(self.client_id, fs_auth_caps)
 
-        keyring = self.fs.authorize(self.client_id, (paths[0], perm, paths[1],
-                                                     perm))
-
-        return filepaths, filedata, keyring
-
-    def setup_test_env(self, perm, paths=()):
-        filepaths, filedata, keyring = self.setup_for_multiple_paths(perm, paths) if paths \
-            else self.setup_for_single_path(perm)
-
-        if not isinstance(filepaths, tuple):
-            filepaths = (filepaths, )
-        if not isinstance(filedata, tuple):
-            filedata = (filedata, )
-        mounts = (self.mount_a, )
-
-        return filepaths, filedata, mounts, keyring
+    def setup_test_env(self, fs_auth_caps):
+        if len(fs_auth_caps) == 1:
+            self.setup_for_single_path(fs_auth_caps[0])
+        else:
+            self.setup_for_multiple_paths(fs_auth_caps)
 
 
 class TestAdminCommandIdempotency(CephFSTestCase):