]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/cephfs: improve caps_helper.CapTester.run_mon_cap_tests()
authorRishabh Dave <ridave@redhat.com>
Thu, 6 Apr 2023 09:23:17 +0000 (14:53 +0530)
committerRishabh Dave <ridave@redhat.com>
Sat, 6 Apr 2024 10:14:16 +0000 (15:44 +0530)
This method checks if the output of the command "ceph fs ls" for client
ID it receives is same as the output printed for client.admin. Don't do
so, limit the test to only checking if "ceph fs ls --id client.x -k
keyring_file" prints fs name for which client.x has permissions.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
(cherry picked from commit ad68a5512105e102a87d1fe9648257eebd4c432e)

qa/tasks/cephfs/caps_helper.py

index 6b083914db0bb352408ecb67f7ad84a2afba3fc2..af604df017e683d60530235b132bc492b0181597 100644 (file)
@@ -220,17 +220,7 @@ class CapTester:
 
         return fsnames
 
-    def run_mon_cap_tests(self, def_fs, client_id):
-        """
-        Check that MON cap is enforced for a client by searching for a Ceph
-        FS name in output of cmd "fs ls" executed with that client's caps.
-
-        def_fs stands for default FS on Ceph cluster.
-        """
-        get_cluster_cmd_op = def_fs.mon_manager.raw_cluster_cmd
-
-        keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
-
+    def _get_mon_cap_from_keyring(self, keyring):
         moncap = None
         for line in keyring.split('\n'):
             if 'caps mon' in line:
@@ -239,29 +229,34 @@ class CapTester:
         else:
             raise RuntimeError('run_mon_cap_tests(): mon cap not found in '
                                'keyring. keyring -\n' + keyring)
+        return moncap
 
-        keyring_path = def_fs.admin_remote.mktemp(data=keyring)
+    def run_mon_cap_tests(self, fs, client_id):
+        """
+        Check that MON cap is enforced for a client by searching for a Ceph
+        FS name in output of cmd "fs ls" executed with that client's caps.
+
+        fs is any fs object so that ceph commands can be exceuted.
+        """
+        get_cluster_cmd_op = fs.mon_manager.raw_cluster_cmd
+        keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
+        moncap = self._get_mon_cap_from_keyring(keyring)
+        keyring_path = fs.admin_remote.mktemp(data=keyring)
 
         fsls = get_cluster_cmd_op(
             args=f'fs ls --id {client_id} -k {keyring_path}')
         log.info(f'output of fs ls cmd run by client.{client_id} -\n{fsls}')
 
-        if 'fsname=' not in moncap:
+        fsnames = self._get_fsnames_from_moncap(moncap)
+        if fsnames == []:
             log.info('no FS name is mentioned in moncap, client has '
                      'permission to list all files. moncap -\n{moncap}')
-            log.info('testing for presence of all FS names in output of '
-                     '"fs ls" command run by client.')
-
-            fsls_admin = get_cluster_cmd_op(args='fs ls')
-            log.info('output of fs ls cmd run by admin -\n{fsls_admin}')
-
-            assert_equal(fsls, fsls_admin)
             return
 
         log.info('FS names are mentioned in moncap. moncap -\n{moncap}')
         log.info('testing for presence of these FS names in output of '
                  '"fs ls" command run by client.')
-        for fsname in self._get_fsnames_from_moncap(moncap):
+        for fsname in fsnames:
             fsname_cap_str = f'name: {fsname}'
             assert_in(fsname_cap_str, fsls)