]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/cephfs: add tests for updation of caps via fs authorize
authorRishabh Dave <ridave@redhat.com>
Wed, 9 Jun 2021 08:22:13 +0000 (13:52 +0530)
committerRishabh Dave <ridave@redhat.com>
Sun, 30 Jul 2023 17:40:28 +0000 (23:10 +0530)
Add tests for the case where "fs authorize" subcommand upgrades the caps
for the client.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
qa/tasks/cephfs/test_admin.py
qa/tasks/cephfs/test_multifs_auth.py

index 76614fff7f0715dd65720b637e6b7ccaf5b4449b..42339a0497804a4ee266beb17dd2e88ec3848f01 100644 (file)
@@ -11,7 +11,8 @@ from teuthology.exceptions import CommandFailedError
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
 from tasks.cephfs.filesystem import FileLayout, FSMissing
 from tasks.cephfs.fuse_mount import FuseMount
-from tasks.cephfs.caps_helper import CapTester
+from tasks.cephfs.caps_helper import (CapTester, gen_mon_cap_str,
+                                      gen_osd_cap_str, gen_mds_cap_str)
 
 log = logging.getLogger(__name__)
 
@@ -1300,6 +1301,294 @@ class TestFsAuthorize(CephFSTestCase):
                              cephfs_mntpt=path)
 
 
+class TestFsAuthorizeUpdate(CephFSTestCase):
+    client_id = 'testuser'
+    client_name = f'client.{client_id}'
+    CLIENTS_REQUIRED = 2
+    MDSS_REQUIRED = 3
+
+    ######################################
+    # cases where "fs authorize" adds caps
+    ######################################
+
+    def test_add_caps_for_another_FS(self):
+        """
+        Test that "ceph fs authorize" adds caps for a second FS to a keyring
+        that already had caps for first FS.
+        """
+        self.fs1 = self.fs
+        self.fs2 = self.mds_cluster.newfs('testcephfs2')
+        self.mount_b.remount(cephfs_name=self.fs2.name)
+        self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
+        self.fs1.authorize(self.client_id, ('/', 'rw'))
+
+        ########### testing begins here.
+        TEST_PERM = 'rw'
+        self.fs2.authorize(self.client_id, ('/', TEST_PERM,))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        moncap = gen_mon_cap_str((('r', self.fs1.name),
+                                  ('r', self.fs2.name)))
+        osdcap = gen_osd_cap_str(((TEST_PERM, self.fs1.name),
+                                  (TEST_PERM, self.fs2.name)))
+        mdscap = gen_mds_cap_str(((TEST_PERM, self.fs1.name),
+                                  (TEST_PERM, self.fs2.name)))
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs1.name, keyring)
+        self._remount(self.mount_b, self.fs2.name, keyring)
+        self.captesters[0].run_cap_tests(self.fs1, self.client_id, TEST_PERM)
+        self.captesters[0].run_cap_tests(self.fs1, self.client_id, TEST_PERM)
+
+    def test_add_caps_for_same_FS_diff_path(self):
+        '''
+        Test that "ceph fs authorze" grants a new cap when it is run for a
+        second time for same client and same FS but with a differnt paths.
+
+        In other words, running following two commands -
+        $ ceph fs authorize a client.x1 /dir1 rw
+        $ ceph fs authorize a client.x1 /dir2 rw
+
+        Should produce following MDS caps -
+        caps mon = "allow r fsname=a"
+        caps osd = "allow rw tag cephfs data=a"
+        caps mds = "allow rw fsname=a path=dir1, allow rw fsname=a path=dir2"
+        '''
+        PERM, PATH1, PATH2 = 'rw', 'dir1', 'dir2'
+        self.mount_a.run_shell('mkdir dir1 dir2')
+        self.captester_a = CapTester(self.mount_a, PATH1)
+        self.captester_b = CapTester(self.mount_b, PATH2)
+        self.fs.authorize(self.client_id, ('/', PERM))
+
+        ########## testing begins here.
+        self.fs.authorize(self.client_id, (PATH1, PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        moncap = gen_mon_cap_str((('r', self.fs.name),))
+        osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+        mdscap = gen_mds_cap_str(((PERM, self.fs.name, PATH1),))
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs.name, keyring, PATH1)
+        self.captester_a.run_cap_tests(self.fs, self.client_id, PERM, PATH1)
+
+        ########### testing once more
+        self.fs.authorize(self.client_id, (PATH2, PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        moncap = gen_mon_cap_str((('r', self.fs.name),))
+        osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+        mdscap = gen_mds_cap_str(((PERM, self.fs.name, PATH1),
+                                  (PERM, self.fs.name, PATH2)))
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_b, self.fs.name, keyring, PATH2)
+        self.captester_b.run_cap_tests(self.fs, self.client_id, PERM, PATH2)
+
+    def test_add_caps_for_client_with_no_caps(self):
+        """
+        Test that "ceph fs authorize" adds caps to the keyring when the
+        entity already has a keyring but it contains no caps.
+        """
+        TEST_PERM = 'rw'
+        self.captester = CapTester(self.mount_a)
+        self.run_ceph_cmd(f'auth add {self.client_name}')
+
+        ######## testing begins here.
+        self.fs.authorize(self.client_id, ('/', TEST_PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        moncap = gen_mon_cap_str((('r', self.fs.name,),))
+        osdcap = gen_osd_cap_str(((TEST_PERM, self.fs.name),))
+        mdscap = gen_mds_cap_str(((TEST_PERM, self.fs.name),))
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs.name, keyring)
+        self.captester.run_cap_tests(self.fs, self.client_id, TEST_PERM)
+
+
+    #########################################
+    # cases where "fs authorize" changes caps
+    #########################################
+
+    def test_change_perms(self):
+        """
+        Test that "ceph fs authorize" updates the caps for a FS when the caps
+        for that FS were already present in that keyring.
+        """
+        OLD_PERM = 'rw'
+        NEW_PERM = 'r'
+        self.captester = CapTester(self.mount_a)
+
+        self.fs.authorize(self.client_id, ('/', OLD_PERM))
+        ########### testing begins here
+        self.fs.authorize(self.client_id, ('/', NEW_PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        moncap = gen_mon_cap_str((('r', self.fs.name,),))
+        osdcap = gen_osd_cap_str(((NEW_PERM, self.fs.name),))
+        mdscap = gen_mds_cap_str(((NEW_PERM, self.fs.name),))
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs.name, keyring)
+        self.captester.run_cap_tests(self.fs, self.client_id, NEW_PERM)
+
+    ################################################
+    # Cases where fs authorize maintains idempotency
+    ################################################
+
+    def test_idem_caps_passed_same_as_current_caps(self):
+        """
+        Test that "ceph fs authorize" exits with the keyring on stdout and the
+        expected error message on stderr when caps supplied to the subcommand
+        are already present in the entity's keyring.
+        """
+        PERM = 'rw'
+        self.captester = CapTester(self.mount_a)
+        self.fs.authorize(self.client_id, ('/', PERM))
+        keyring1 = self.fs.mon_manager.get_keyring(self.client_id)
+
+        ############# testing begins here.
+        proc = self.fs.mon_manager.run_cluster_cmd(
+            args=f'fs authorize {self.fs.name} {self.client_name} / {PERM}',
+            stdout=StringIO(), stderr=StringIO())
+        errmsg = proc.stderr.getvalue()
+        self.assertIn(f'no update for caps of {self.client_name}', errmsg)
+
+        keyring2 = self.fs.mon_manager.get_keyring(self.client_id)
+        self.assertIn(keyring1, keyring2)
+
+        self._remount(self.mount_a, self.fs.name, keyring2)
+        self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+    def test_idem_unaffected_root_squash(self):
+        """
+        Test that "root_squash" is not deleted from MDS caps when user runs
+        "fs authorize" a second time passing same FS name and path but not
+        with "root_squash"
+        In other words,
+        $ ceph fs authorize a client.x / rw root_squash
+        [client.x]
+        key = AQCvsyVkiDJZBBAAn1ClsPKvTfrCkUs01Eh8og==
+        $ ceph auth get client.x
+        [client.x]
+                key = AQD61CVkcA1QCRAAd0XYqPbHvcc+lpUAuc6Vcw==
+                caps mds = "allow rw fsname=a root_squash"
+                caps mon = "allow r fsname=a"
+                caps osd = "allow rw tag cephfs data=a"
+        $ ceph fs authorize a client.x / rw
+        $ ceph auth get client.x
+        [client.x]
+                key = AQD61CVkcA1QCRAAd0XYqPbHvcc+lpUAuc6Vcw==
+                caps mds = "allow rw fsname=a root_squash"
+                caps mon = "allow r fsname=a"
+                caps osd = "allow rw tag cephfs data=a"
+        """
+        PERM, PATH = 'rw', 'dir1'
+        self.mount_a.run_shell(f'mkdir {PATH}')
+        self.captester = CapTester(self.mount_a, PATH)
+        self.fs.authorize(self.client_id, (PATH, PERM, 'root_squash'))
+
+        ############# testing begins here.
+        self.fs.authorize(self.client_id, (PATH, PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        moncap = gen_mon_cap_str((('r', self.fs.name,),))
+        osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+        mdscap = gen_mds_cap_str(((PERM, self.fs.name, PATH),))
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs.name, keyring, PATH)
+        self.captester.run_cap_tests(self.fs, self.client_id, PERM, PATH)
+
+    def _get_uid(self):
+        return self.mount_a.client_remote.run(
+            args='id -u', stdout=StringIO()).stdout.getvalue().strip()
+
+    def _get_gid(self):
+        return self.mount_a.client_remote.run(
+            args='id -g', stdout=StringIO()).stdout.getvalue().strip()
+
+    def test_idem_unaffected_uid(self):
+        '''
+        1. Create a client with caps that has FS name and UID in it.
+        2. Run "ceph fs authorize" command for that client with same FS name.
+        3. Test that UID (as well as any other part of cap) is unaffected,
+           i.e. same as before.
+        '''
+        PERM, UID = 'rw', self._get_uid()
+        self.captester = CapTester(self.mount_a)
+        moncap = gen_mon_cap_str((('r', self.fs.name,),))
+        osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+        mdscap = f'allow rw uid={UID}'
+        self.fs.mon_manager.run_cluster_cmd(
+            args=(f'auth add {self.client_name} mon "{moncap}" '
+                  f'osd "{osdcap}" mds "{mdscap}"'))
+
+        ############# testing begins here.
+        self.fs.authorize(self.client_id, ('/', PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs.name, keyring)
+        self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+    def test_idem_unaffected_gids(self):
+        '''
+        1. Create a client with caps that has FS name, UID and GID in it.
+        2. Run "ceph fs authorize" command for that client with same FS name.
+        3. Test that GID (as well as any other part of cap) is unaffected,
+           i.e. same as before.
+        '''
+        PERM, UID, GID = 'rw', self._get_uid(), self._get_gid()
+        self.captester = CapTester(self.mount_a)
+        moncap = gen_mon_cap_str((('r', self.fs.name),))
+        osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+        mdscap = f'allow rw uid={UID} gids={GID}'
+        self.fs.mon_manager.run_cluster_cmd(
+            args=(f'auth add {self.client_name} mon "{moncap}" '
+                  f'osd "{osdcap}" mds "{mdscap}"'))
+
+        ############# testing begins here.
+        self.fs.authorize(self.client_id, ('/', PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs.name, keyring)
+        self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+    def test_idem_unaffected_gids_multiple(self):
+        '''
+        1. Create client with caps with FS name, UID & multiple GIDs in it.
+        2. Run "ceph fs authorize" command for that client with same FS name.
+        3. Test that multiple GIDs (as well as any other part of cap) is
+           unaffected, i.e. same as before.
+        '''
+        PERM, UID = 'rw', self._get_uid()
+        gids = [int(self._get_gid()), 1001, 1002]
+        # Apparently code for MDS caps always arranges GID in ascending
+        # order. Let's sort so that latter cap string matches with keyring.
+        gids.sort()
+        gids = f'{gids[0]},{gids[1]},{gids[2]}'
+        self.captester = CapTester(self.mount_a)
+        moncap = gen_mon_cap_str((('r', self.fs.name),))
+        osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+        mdscap = f'allow rw uid={UID} gids={gids}'
+        self.fs.mon_manager.run_cluster_cmd(
+            args=(f'auth add {self.client_name} mon "{moncap}" '
+                  f'osd "{osdcap}" mds "{mdscap}"'))
+
+        ############# testing begins here.
+        self.fs.authorize(self.client_id, ('/', PERM))
+        keyring = self.fs.mon_manager.get_keyring(self.client_id)
+        for cap in (moncap, osdcap, mdscap):
+            self.assertIn(cap, keyring)
+        self._remount(self.mount_a, self.fs.name, keyring)
+        self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+    def _remount(self, mount_x, fsname, keyring, cephfs_mntpt='/'):
+        if len(cephfs_mntpt) > 1 and cephfs_mntpt[0] != '/':
+            cephfs_mntpt = '/' + cephfs_mntpt
+        keyring_path = mount_x.client_remote.mktemp(data=keyring)
+        mount_x.remount(client_id=self.client_id,
+                        client_keyring_path=keyring_path,
+                        cephfs_mntpt=cephfs_mntpt, cephfs_name=fsname)
+
+
 class TestAdminCommandIdempotency(CephFSTestCase):
     """
     Tests for administration command idempotency.
index acf434294d13cd6e635423ceab7849dff7a6078f..e40ccb79af458c5b9940b69319199991d89627d1 100644 (file)
@@ -96,9 +96,9 @@ class TestMDSCaps(TestMultiFS):
         self.captesters[1].run_mds_cap_tests(PERM)
 
     def test_rw_with_fsname_and_path_in_cap(self):
-        PERM, CEPHFS_MNTPT = 'rw', 'dir1'
-        self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
-        self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
+        PERM, CEPHFS_MNTPT = 'rw', '/dir1'
+        self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
+        self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
         self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
                            MdsCapTester(self.mount_b, CEPHFS_MNTPT))
 
@@ -110,9 +110,9 @@ class TestMDSCaps(TestMultiFS):
         self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
 
     def test_r_with_fsname_and_path_in_cap(self):
-        PERM, CEPHFS_MNTPT = 'r', 'dir1'
-        self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
-        self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
+        PERM, CEPHFS_MNTPT = 'r', '/dir1'
+        self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
+        self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
         self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
                            MdsCapTester(self.mount_b, CEPHFS_MNTPT))
 
@@ -126,9 +126,9 @@ class TestMDSCaps(TestMultiFS):
     # XXX: this tests the backward compatibility; "allow rw path=<dir1>" is
     # treated as "allow rw fsname=* path=<dir1>"
     def test_rw_with_no_fsname_and_path_in_cap(self):
-        PERM, CEPHFS_MNTPT = 'rw', 'dir1'
-        self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
-        self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
+        PERM, CEPHFS_MNTPT = 'rw', '/dir1'
+        self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
+        self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
         self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
                            MdsCapTester(self.mount_b, CEPHFS_MNTPT))
 
@@ -142,9 +142,9 @@ class TestMDSCaps(TestMultiFS):
     # XXX: this tests the backward compatibility; "allow r path=<dir1>" is
     # treated as "allow r fsname=* path=<dir1>"
     def test_r_with_no_fsname_and_path_in_cap(self):
-        PERM, CEPHFS_MNTPT = 'r', 'dir1'
-        self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
-        self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
+        PERM, CEPHFS_MNTPT = 'r', '/dir1'
+        self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
+        self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
         self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
                            MdsCapTester(self.mount_b, CEPHFS_MNTPT))
 
@@ -202,7 +202,8 @@ class TestMDSCaps(TestMultiFS):
     def remount_with_new_client(self, keyring, cephfs_mntpt='/'):
         log.info(f'keyring generated for testing is -\n{keyring}')
 
-        if isinstance(cephfs_mntpt, str) and cephfs_mntpt != '/' :
+        if (isinstance(cephfs_mntpt, str) and cephfs_mntpt != '/'  and
+            cephfs_mntpt[0] != '/'):
             cephfs_mntpt = '/' + cephfs_mntpt
 
         keyring_path = self.mount_a.client_remote.mktemp(data=keyring)