]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/cephfs: add more helper methods to caps_helper.py
authorRishabh Dave <ridave@redhat.com>
Mon, 13 Mar 2023 10:46:13 +0000 (16:16 +0530)
committerRishabh Dave <ridave@redhat.com>
Thu, 4 Apr 2024 11:39:42 +0000 (17:09 +0530)
Add methods that will accept read/write permissions, CephFS names and
CephFS mount point and in return will generate string form of MON, OSD
and MDS caps exactly as it is reported in Ceph keyrings.

Replace similar code in test_multifs_auth.py with calls to these helper
methods.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
(cherry picked from commit 969a93d0dcf1848623b0bd581b8e2e9a5a556839)

qa/tasks/cephfs/caps_helper.py
qa/tasks/cephfs/test_multifs_auth.py

index 1ead57b71565302b07b22499c5b29f3cbd33f17a..ac6133a9445c6ee75fa2d5fa54e5ef537871a8a3 100644 (file)
@@ -12,6 +12,107 @@ from teuthology.orchestra.run import Raw
 log = getLogger(__name__)
 
 
+def gen_mon_cap_str(caps):
+    """
+    Accepts a tuple of tuples, where the inner tuple contains read/write
+    permissions and CephFS name.
+
+    caps = ((perm1, fsname1), (perm2, fsname2))
+    """
+    def _unpack_tuple(c):
+        if len(c) == 1:
+            perm, fsname = c[0], None
+        elif len(c) == 2:
+            perm, fsname = c
+        else:
+            raise RuntimeError('caps tuple received isn\'t right')
+        return perm, fsname
+
+    def _gen_mon_cap_str(c):
+        perm, fsname = _unpack_tuple(c)
+        mon_cap = f'allow {perm}'
+        if fsname:
+            mon_cap += f' fsname={fsname}'
+        return mon_cap
+
+    if len(caps) == 1:
+        return _gen_mon_cap_str(caps[0])
+
+    mon_cap = ''
+    for i, c in enumerate(caps):
+        mon_cap += _gen_mon_cap_str(c)
+        if i != len(caps) - 1:
+            mon_cap += ', '
+
+    return mon_cap
+
+def gen_osd_cap_str(caps):
+    """
+    Accepts a tuple of tuples, where the inner tuple contains read/write
+    permissions and CephFS name.
+
+    caps = ((perm1, fsname1), (perm2, fsname2))
+    """
+    def _gen_osd_cap_str(c):
+        perm, fsname = c
+        osd_cap = f'allow {perm} tag cephfs'
+        if fsname:
+            osd_cap += f' data={fsname}'
+        return osd_cap
+
+    if len(caps) == 1:
+        return _gen_osd_cap_str(caps[0])
+
+    osd_cap = ''
+    for i, c in enumerate(caps):
+        osd_cap += _gen_osd_cap_str(c)
+        if i != len(caps) - 1:
+            osd_cap += ', '
+
+    return osd_cap
+
+def gen_mds_cap_str(caps):
+    """
+    Accepts a tuple of tuples where inner the tuple containts read/write
+    permissions, Ceph FS name and CephFS mountpoint.
+
+    caps = ((perm1, fsname1, cephfs_mntpt1), (perm2, fsname2, cephfs_mntpt2))
+    """
+    def _unpack_tuple(c):
+        if len(c) == 2:
+            perm, fsname, cephfs_mntpt = c[0], c[1], '/'
+        elif len(c) == 3:
+            perm, fsname, cephfs_mntpt = c
+        elif len(c) < 2:
+            raise RuntimeError('received items are too less in caps')
+        else: # len(c) > 3
+            raise RuntimeError('received items are too many in caps')
+
+        return perm, fsname, cephfs_mntpt
+
+    def _gen_mds_cap_str(c):
+        perm, fsname, cephfs_mntpt = _unpack_tuple(c)
+        mds_cap = f'allow {perm}'
+        if fsname:
+            mds_cap += f' fsname={fsname}'
+        if cephfs_mntpt != '/':
+            if cephfs_mntpt[0] == '/':
+                cephfs_mntpt = cephfs_mntpt[1:]
+            mds_cap += f' path={cephfs_mntpt}'
+        return mds_cap
+
+    if len(caps) == 1:
+        return _gen_mds_cap_str(caps[0])
+
+    mds_cap = ''
+    for i, c in enumerate(caps):
+        mds_cap +=  _gen_mds_cap_str(c)
+        if i != len(caps) - 1:
+            mds_cap += ', '
+
+    return mds_cap
+
+
 class CapTester(CephFSTestCase):
     """
     Test that MON and MDS caps are enforced.
index 592a84164532e4dd976b3b6cd748a2f773a391e0..d5920b1c996bdc6ab66094c80708f8f8ce752a14 100644 (file)
@@ -4,7 +4,8 @@ Test for Ceph clusters with multiple FSs.
 import logging
 
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from tasks.cephfs.caps_helper import CapTester
+from tasks.cephfs.caps_helper import (CapTester, gen_mon_cap_str,
+                                      gen_osd_cap_str, gen_mds_cap_str)
 
 from teuthology.exceptions import CommandFailedError
 
@@ -41,14 +42,13 @@ class TestMultiFS(CephFSTestCase):
 class TestMONCaps(TestMultiFS):
 
     def test_moncap_with_one_fs_names(self):
-        moncap = f'allow r fsname={self.fs1.name}'
+        moncap = gen_mon_cap_str((('r', self.fs1.name),))
         self.create_client(self.client_id, moncap)
 
         self.captester.run_mon_cap_tests(self.fs1, self.client_id)
 
     def test_moncap_with_multiple_fs_names(self):
-        moncap = (f'allow r fsname={self.fs1.name}, '
-                  f'allow r fsname={self.fs2.name}')
+        moncap = gen_mon_cap_str((('r', self.fs1.name), ('r', self.fs2.name)))
         self.create_client(self.client_id, moncap)
 
         self.captester.run_mon_cap_tests(self.fs1, self.client_id)
@@ -155,31 +155,16 @@ class TestMDSCaps(TestMultiFS):
 
         super(type(self), self).tearDown()
 
-    def generate_caps(self, perm, fsname, cephfs_mntpt):
+    def _create_client(self, perm, fsname=False, cephfs_mntpt='/'):
         moncap = 'allow r'
-        osdcap = (f'allow {perm} tag cephfs data={self.fs1.name}, '
-                  f'allow {perm} tag cephfs data={self.fs2.name}')
-
+        osdcap = gen_osd_cap_str(((perm, self.fs1.name),
+                                  (perm, self.fs2.name)))
         if fsname:
-            if cephfs_mntpt == '/':
-                mdscap = (f'allow {perm} fsname={self.fs1.name}, '
-                          f'allow {perm} fsname={self.fs2.name}')
-            else:
-                mdscap = (f'allow {perm} fsname={self.fs1.name} '
-                          f'path=/{cephfs_mntpt}, '
-                          f'allow {perm} fsname={self.fs2.name} '
-                          f'path=/{cephfs_mntpt}')
+            mdscap = gen_mds_cap_str(((perm, self.fs1.name, cephfs_mntpt),
+                                      (perm, self.fs2.name, cephfs_mntpt)))
         else:
-            if cephfs_mntpt == '/':
-                mdscap = f'allow {perm}'
-            else:
-                mdscap = f'allow {perm} path=/{cephfs_mntpt}'
-
-        return moncap, osdcap, mdscap
-
-    def _create_client(self, perm, fsname=False, cephfs_mntpt='/'):
-        moncap, osdcap, mdscap = self.generate_caps(perm, fsname,
-                                                    cephfs_mntpt)
+            mdscap = gen_mds_cap_str(((perm, None, cephfs_mntpt),
+                                      (perm, None, cephfs_mntpt)))
 
         keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
         keyring_paths = []
@@ -236,10 +221,11 @@ class TestClientsWithoutAuth(TestMultiFS):
 
     def test_mount_mon_and_osd_caps_present_mds_caps_absent(self):
         # setup part...
-        moncap = f'allow rw fsname={self.fs1.name}, allow rw fsname={self.fs2.name}'
-        mdscap = f'allow rw fsname={self.fs1.name}'
-        osdcap = (f'allow rw tag cephfs data={self.fs1.name}, allow rw tag '
-                  f'cephfs data={self.fs2.name}')
+        moncap = gen_mon_cap_str((('rw', self.fs1.name),
+                                  ('rw', self.fs2.name)))
+        mdscap = gen_mds_cap_str((('rw', self.fs1.name),))
+        osdcap = gen_osd_cap_str((('rw', self.fs1.name,),
+                                  ('rw', self.fs2.name)))
         keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
         keyring_path = self.mount_a.client_remote.mktemp(data=keyring)