]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/cephfs: move few methods such that they can be reused
authorRishabh Dave <ridave@redhat.com>
Thu, 6 Apr 2023 09:42:14 +0000 (15:12 +0530)
committerRishabh Dave <ridave@redhat.com>
Sat, 6 Apr 2024 10:14:16 +0000 (15:44 +0530)
Move get_mon_cap_from_keyring() and get_fsnmes_from_moncap() from class
CapTester to main namespace of caps_helper.py so that they can be
imported freely and reused by tests.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
(cherry picked from commit ea9f13e5536120c135e85c4962bb989e83910d6b)

qa/tasks/cephfs/caps_helper.py

index af604df017e683d60530235b132bc492b0181597..580896ceb6bb1b22ae9754f2eb50915036e81683 100644 (file)
@@ -112,6 +112,41 @@ def gen_mds_cap_str(caps):
     return mds_cap
 
 
+def get_mon_cap_from_keyring(keyring):
+    '''
+    Extract MON cap for keyring and return it.
+    '''
+    moncap = None
+    for line in keyring.split('\n'):
+        if 'caps mon' in line:
+            moncap = line[line.find(' = "') + 4 : -1]
+            break
+    else:
+        raise RuntimeError('mon cap not found in keyring below -\n{keyring}')
+    return moncap
+
+
+def get_fsnames_from_moncap(moncap):
+    '''
+    Extract FS names from MON cap and return all of them.
+    '''
+    fsnames = []
+    while moncap.find('fsname=') != -1:
+        fsname_first_char = moncap.index('fsname=') + len('fsname=')
+
+        if ',' in moncap:
+            last = moncap.index(',')
+            fsname = moncap[fsname_first_char : last]
+            moncap = moncap.replace(moncap[0 : last+1], '')
+        else:
+            fsname = moncap[fsname_first_char : ]
+            moncap = moncap.replace(moncap[0 : ], '')
+
+        fsnames.append(fsname)
+
+    return fsnames
+
+
 def assert_equal(first, second):
     msg = f'Variables are not equal.\nfirst = {first}\nsecond = {second}'
     assert first == second, msg
@@ -203,34 +238,6 @@ class CapTester:
         #self.run_mon_cap_tests()
         self.run_mds_cap_tests(perm, mntpt=mntpt)
 
-    def _get_fsnames_from_moncap(self, moncap):
-        fsnames = []
-        while moncap.find('fsname=') != -1:
-            fsname_first_char = moncap.index('fsname=') + len('fsname=')
-
-            if ',' in moncap:
-                last = moncap.index(',')
-                fsname = moncap[fsname_first_char : last]
-                moncap = moncap.replace(moncap[0 : last+1], '')
-            else:
-                fsname = moncap[fsname_first_char : ]
-                moncap = moncap.replace(moncap[0 : ], '')
-
-            fsnames.append(fsname)
-
-        return fsnames
-
-    def _get_mon_cap_from_keyring(self, keyring):
-        moncap = None
-        for line in keyring.split('\n'):
-            if 'caps mon' in line:
-                moncap = line[line.find(' = "') + 4 : -1]
-                break
-        else:
-            raise RuntimeError('run_mon_cap_tests(): mon cap not found in '
-                               'keyring. keyring -\n' + keyring)
-        return moncap
-
     def run_mon_cap_tests(self, fs, client_id):
         """
         Check that MON cap is enforced for a client by searching for a Ceph
@@ -240,14 +247,14 @@ class CapTester:
         """
         get_cluster_cmd_op = fs.mon_manager.raw_cluster_cmd
         keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
-        moncap = self._get_mon_cap_from_keyring(keyring)
+        moncap = get_mon_cap_from_keyring(keyring)
         keyring_path = fs.admin_remote.mktemp(data=keyring)
 
         fsls = get_cluster_cmd_op(
             args=f'fs ls --id {client_id} -k {keyring_path}')
         log.info(f'output of fs ls cmd run by client.{client_id} -\n{fsls}')
 
-        fsnames = self._get_fsnames_from_moncap(moncap)
+        fsnames = get_fsnames_from_moncap(moncap)
         if fsnames == []:
             log.info('no FS name is mentioned in moncap, client has '
                      'permission to list all files. moncap -\n{moncap}')