from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.filesystem import FileLayout, FSMissing
from tasks.cephfs.fuse_mount import FuseMount
-from tasks.cephfs.caps_helper import CapTester
+from tasks.cephfs.caps_helper import (CapTester, gen_mon_cap_str,
+ gen_osd_cap_str, gen_mds_cap_str)
log = logging.getLogger(__name__)
cephfs_mntpt=path)
+class TestFsAuthorizeUpdate(CephFSTestCase):
+ client_id = 'testuser'
+ client_name = f'client.{client_id}'
+ CLIENTS_REQUIRED = 2
+ MDSS_REQUIRED = 3
+
+ ######################################
+ # cases where "fs authorize" adds caps
+ ######################################
+
+ def test_add_caps_for_another_FS(self):
+ """
+ Test that "ceph fs authorize" adds caps for a second FS to a keyring
+ that already had caps for first FS.
+ """
+ self.fs1 = self.fs
+ self.fs2 = self.mds_cluster.newfs('testcephfs2')
+ self.mount_b.remount(cephfs_name=self.fs2.name)
+ self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
+ self.fs1.authorize(self.client_id, ('/', 'rw'))
+
+ ########### testing begins here.
+ TEST_PERM = 'rw'
+ self.fs2.authorize(self.client_id, ('/', TEST_PERM,))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ moncap = gen_mon_cap_str((('r', self.fs1.name),
+ ('r', self.fs2.name)))
+ osdcap = gen_osd_cap_str(((TEST_PERM, self.fs1.name),
+ (TEST_PERM, self.fs2.name)))
+ mdscap = gen_mds_cap_str(((TEST_PERM, self.fs1.name),
+ (TEST_PERM, self.fs2.name)))
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs1.name, keyring)
+ self._remount(self.mount_b, self.fs2.name, keyring)
+ self.captesters[0].run_cap_tests(self.fs1, self.client_id, TEST_PERM)
+ self.captesters[0].run_cap_tests(self.fs1, self.client_id, TEST_PERM)
+
+ def test_add_caps_for_same_FS_diff_path(self):
+ '''
+ Test that "ceph fs authorze" grants a new cap when it is run for a
+ second time for same client and same FS but with a differnt paths.
+
+ In other words, running following two commands -
+ $ ceph fs authorize a client.x1 /dir1 rw
+ $ ceph fs authorize a client.x1 /dir2 rw
+
+ Should produce following MDS caps -
+ caps mon = "allow r fsname=a"
+ caps osd = "allow rw tag cephfs data=a"
+ caps mds = "allow rw fsname=a path=dir1, allow rw fsname=a path=dir2"
+ '''
+ PERM, PATH1, PATH2 = 'rw', 'dir1', 'dir2'
+ self.mount_a.run_shell('mkdir dir1 dir2')
+ self.captester_a = CapTester(self.mount_a, PATH1)
+ self.captester_b = CapTester(self.mount_b, PATH2)
+ self.fs.authorize(self.client_id, ('/', PERM))
+
+ ########## testing begins here.
+ self.fs.authorize(self.client_id, (PATH1, PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ moncap = gen_mon_cap_str((('r', self.fs.name),))
+ osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+ mdscap = gen_mds_cap_str(((PERM, self.fs.name, PATH1),))
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs.name, keyring, PATH1)
+ self.captester_a.run_cap_tests(self.fs, self.client_id, PERM, PATH1)
+
+ ########### testing once more
+ self.fs.authorize(self.client_id, (PATH2, PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ moncap = gen_mon_cap_str((('r', self.fs.name),))
+ osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+ mdscap = gen_mds_cap_str(((PERM, self.fs.name, PATH1),
+ (PERM, self.fs.name, PATH2)))
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_b, self.fs.name, keyring, PATH2)
+ self.captester_b.run_cap_tests(self.fs, self.client_id, PERM, PATH2)
+
+ def test_add_caps_for_client_with_no_caps(self):
+ """
+ Test that "ceph fs authorize" adds caps to the keyring when the
+ entity already has a keyring but it contains no caps.
+ """
+ TEST_PERM = 'rw'
+ self.captester = CapTester(self.mount_a)
+ self.run_ceph_cmd(f'auth add {self.client_name}')
+
+ ######## testing begins here.
+ self.fs.authorize(self.client_id, ('/', TEST_PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ moncap = gen_mon_cap_str((('r', self.fs.name,),))
+ osdcap = gen_osd_cap_str(((TEST_PERM, self.fs.name),))
+ mdscap = gen_mds_cap_str(((TEST_PERM, self.fs.name),))
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs.name, keyring)
+ self.captester.run_cap_tests(self.fs, self.client_id, TEST_PERM)
+
+
+ #########################################
+ # cases where "fs authorize" changes caps
+ #########################################
+
+ def test_change_perms(self):
+ """
+ Test that "ceph fs authorize" updates the caps for a FS when the caps
+ for that FS were already present in that keyring.
+ """
+ OLD_PERM = 'rw'
+ NEW_PERM = 'r'
+ self.captester = CapTester(self.mount_a)
+
+ self.fs.authorize(self.client_id, ('/', OLD_PERM))
+ ########### testing begins here
+ self.fs.authorize(self.client_id, ('/', NEW_PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ moncap = gen_mon_cap_str((('r', self.fs.name,),))
+ osdcap = gen_osd_cap_str(((NEW_PERM, self.fs.name),))
+ mdscap = gen_mds_cap_str(((NEW_PERM, self.fs.name),))
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs.name, keyring)
+ self.captester.run_cap_tests(self.fs, self.client_id, NEW_PERM)
+
+ ################################################
+ # Cases where fs authorize maintains idempotency
+ ################################################
+
+ def test_idem_caps_passed_same_as_current_caps(self):
+ """
+ Test that "ceph fs authorize" exits with the keyring on stdout and the
+ expected error message on stderr when caps supplied to the subcommand
+ are already present in the entity's keyring.
+ """
+ PERM = 'rw'
+ self.captester = CapTester(self.mount_a)
+ self.fs.authorize(self.client_id, ('/', PERM))
+ keyring1 = self.fs.mon_manager.get_keyring(self.client_id)
+
+ ############# testing begins here.
+ proc = self.fs.mon_manager.run_cluster_cmd(
+ args=f'fs authorize {self.fs.name} {self.client_name} / {PERM}',
+ stdout=StringIO(), stderr=StringIO())
+ errmsg = proc.stderr.getvalue()
+ self.assertIn(f'no update for caps of {self.client_name}', errmsg)
+
+ keyring2 = self.fs.mon_manager.get_keyring(self.client_id)
+ self.assertIn(keyring1, keyring2)
+
+ self._remount(self.mount_a, self.fs.name, keyring2)
+ self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+ def test_idem_unaffected_root_squash(self):
+ """
+ Test that "root_squash" is not deleted from MDS caps when user runs
+ "fs authorize" a second time passing same FS name and path but not
+ with "root_squash"
+ In other words,
+ $ ceph fs authorize a client.x / rw root_squash
+ [client.x]
+ key = AQCvsyVkiDJZBBAAn1ClsPKvTfrCkUs01Eh8og==
+ $ ceph auth get client.x
+ [client.x]
+ key = AQD61CVkcA1QCRAAd0XYqPbHvcc+lpUAuc6Vcw==
+ caps mds = "allow rw fsname=a root_squash"
+ caps mon = "allow r fsname=a"
+ caps osd = "allow rw tag cephfs data=a"
+ $ ceph fs authorize a client.x / rw
+ $ ceph auth get client.x
+ [client.x]
+ key = AQD61CVkcA1QCRAAd0XYqPbHvcc+lpUAuc6Vcw==
+ caps mds = "allow rw fsname=a root_squash"
+ caps mon = "allow r fsname=a"
+ caps osd = "allow rw tag cephfs data=a"
+ """
+ PERM, PATH = 'rw', 'dir1'
+ self.mount_a.run_shell(f'mkdir {PATH}')
+ self.captester = CapTester(self.mount_a, PATH)
+ self.fs.authorize(self.client_id, (PATH, PERM, 'root_squash'))
+
+ ############# testing begins here.
+ self.fs.authorize(self.client_id, (PATH, PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ moncap = gen_mon_cap_str((('r', self.fs.name,),))
+ osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+ mdscap = gen_mds_cap_str(((PERM, self.fs.name, PATH),))
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs.name, keyring, PATH)
+ self.captester.run_cap_tests(self.fs, self.client_id, PERM, PATH)
+
+ def _get_uid(self):
+ return self.mount_a.client_remote.run(
+ args='id -u', stdout=StringIO()).stdout.getvalue().strip()
+
+ def _get_gid(self):
+ return self.mount_a.client_remote.run(
+ args='id -g', stdout=StringIO()).stdout.getvalue().strip()
+
+ def test_idem_unaffected_uid(self):
+ '''
+ 1. Create a client with caps that has FS name and UID in it.
+ 2. Run "ceph fs authorize" command for that client with same FS name.
+ 3. Test that UID (as well as any other part of cap) is unaffected,
+ i.e. same as before.
+ '''
+ PERM, UID = 'rw', self._get_uid()
+ self.captester = CapTester(self.mount_a)
+ moncap = gen_mon_cap_str((('r', self.fs.name,),))
+ osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+ mdscap = f'allow rw uid={UID}'
+ self.fs.mon_manager.run_cluster_cmd(
+ args=(f'auth add {self.client_name} mon "{moncap}" '
+ f'osd "{osdcap}" mds "{mdscap}"'))
+
+ ############# testing begins here.
+ self.fs.authorize(self.client_id, ('/', PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs.name, keyring)
+ self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+ def test_idem_unaffected_gids(self):
+ '''
+ 1. Create a client with caps that has FS name, UID and GID in it.
+ 2. Run "ceph fs authorize" command for that client with same FS name.
+ 3. Test that GID (as well as any other part of cap) is unaffected,
+ i.e. same as before.
+ '''
+ PERM, UID, GID = 'rw', self._get_uid(), self._get_gid()
+ self.captester = CapTester(self.mount_a)
+ moncap = gen_mon_cap_str((('r', self.fs.name),))
+ osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+ mdscap = f'allow rw uid={UID} gids={GID}'
+ self.fs.mon_manager.run_cluster_cmd(
+ args=(f'auth add {self.client_name} mon "{moncap}" '
+ f'osd "{osdcap}" mds "{mdscap}"'))
+
+ ############# testing begins here.
+ self.fs.authorize(self.client_id, ('/', PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs.name, keyring)
+ self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+ def test_idem_unaffected_gids_multiple(self):
+ '''
+ 1. Create client with caps with FS name, UID & multiple GIDs in it.
+ 2. Run "ceph fs authorize" command for that client with same FS name.
+ 3. Test that multiple GIDs (as well as any other part of cap) is
+ unaffected, i.e. same as before.
+ '''
+ PERM, UID = 'rw', self._get_uid()
+ gids = [int(self._get_gid()), 1001, 1002]
+ # Apparently code for MDS caps always arranges GID in ascending
+ # order. Let's sort so that latter cap string matches with keyring.
+ gids.sort()
+ gids = f'{gids[0]},{gids[1]},{gids[2]}'
+ self.captester = CapTester(self.mount_a)
+ moncap = gen_mon_cap_str((('r', self.fs.name),))
+ osdcap = gen_osd_cap_str(((PERM, self.fs.name),))
+ mdscap = f'allow rw uid={UID} gids={gids}'
+ self.fs.mon_manager.run_cluster_cmd(
+ args=(f'auth add {self.client_name} mon "{moncap}" '
+ f'osd "{osdcap}" mds "{mdscap}"'))
+
+ ############# testing begins here.
+ self.fs.authorize(self.client_id, ('/', PERM))
+ keyring = self.fs.mon_manager.get_keyring(self.client_id)
+ for cap in (moncap, osdcap, mdscap):
+ self.assertIn(cap, keyring)
+ self._remount(self.mount_a, self.fs.name, keyring)
+ self.captester.run_cap_tests(self.fs, self.client_id, PERM)
+
+ def _remount(self, mount_x, fsname, keyring, cephfs_mntpt='/'):
+ if len(cephfs_mntpt) > 1 and cephfs_mntpt[0] != '/':
+ cephfs_mntpt = '/' + cephfs_mntpt
+ keyring_path = mount_x.client_remote.mktemp(data=keyring)
+ mount_x.remount(client_id=self.client_id,
+ client_keyring_path=keyring_path,
+ cephfs_mntpt=cephfs_mntpt, cephfs_name=fsname)
+
+
class TestAdminCommandIdempotency(CephFSTestCase):
"""
Tests for administration command idempotency.