class TestFsAuthorize(CephFSTestCase):
client_id = 'testuser'
client_name = 'client.' + client_id
+ CLIENTS_REQUIRED = 2
+ MDSS_REQUIRED = 3
def test_single_path_r(self):
PERM = 'r'
self._remount_and_run_tests(FS_AUTH_CAPS, keyring)
+ def test_when_MDS_caps_needs_update_but_others_dont(self):
+ '''
+ Test that the command "ceph fs authorize" successfully updates MDS
+ caps even when MON and OSD caps don't need an update.
+
+ Tests: https://tracker.ceph.com/issues/64182
+
+ In this test we run -
+
+ ceph fs authorize cephfs1 client.x / rw
+ ceph fs authorize cephfs2 client.x / rw
+ ceph fs authorize cephfs2 client.x /dir1 rw
+
+ The first command will create the keyring by adding the MDS cap for
+ root path & MON and OSD caps with name of the FS name (say "cephfs1").
+ Second command will update the all of client's caps -- MON, OSD and
+ MDS caps to add caps for 2nd CephFS. The third command doesn't need
+ to add MON and OSD caps since cap for "cephfs2" has been granted
+ already. Thus, third command only need to update the MDS cap, thus
+ testing the bug under consideration here.
+ '''
+ PERM = 'rw'
+ DIR = 'dir1'
+
+ self.fs2 = self.mds_cluster.newfs(name='cephfs2', create=True)
+ self.mount_b.remount(cephfs_name=self.fs2.name)
+ self.mount_b.run_shell(f'mkdir {DIR}')
+ self.captesters = (CapTester(self.mount_a, '/'),
+ CapTester(self.mount_b, '/'),
+ CapTester(self.mount_b, f'/{DIR}'))
+
+ FS_AUTH_CAPS = (('/', PERM), ('/', PERM), (DIR, PERM))
+ keyring = self.fs.authorize(self.client_id, FS_AUTH_CAPS[0])
+ keyring = self.fs2.authorize(self.client_id, FS_AUTH_CAPS[1])
+
+ # if the following block of code pass it implies that
+ # https://tracker.ceph.com/issues/64182 has been fixed
+ keyring = self.fs2.authorize(self.client_id, FS_AUTH_CAPS[2])
+ mdscaps = ('caps mds = "'
+ f'allow {PERM} fsname={self.fs.name}, '
+ f'allow {PERM} fsname={self.fs2.name}, '
+ f'allow {PERM} fsname={self.fs2.name} path={DIR}')
+ self.assertIn(mdscaps, keyring)
+
+ self._remount_and_run_tests_for_cap(
+ FS_AUTH_CAPS[0], self.captesters[0], self.fs.name, self.mount_a,
+ keyring)
+ self._remount_and_run_tests_for_cap(
+ FS_AUTH_CAPS[1], self.captesters[1], self.fs2.name, self.mount_b,
+ keyring)
+ self._remount_and_run_tests_for_cap(
+ FS_AUTH_CAPS[2], self.captesters[2], self.fs2.name, self.mount_b,
+ keyring)
+
def _remount_and_run_tests(self, fs_auth_caps, keyring):
+ '''
+ This method is specifically designed to meet needs of most of the
+ test case in this class. Following are assumptions made here -
+
+ 1. CephFS to be mounted is self.fs
+ 2. Mount object to be used is self.mount_a
+ 3. Keyring file will be created on the host specified in self.mount_a.
+ 4. CephFS dir that will serve as root is PATH component of particular
+ cap from FS_AUTH_CAPS.
+ '''
for i, c in enumerate(fs_auth_caps):
self.assertIn(i, (0, 1))
PATH = c[0]
client_keyring_path=keyring_path,
cephfs_mntpt=path)
+ def _remount_and_run_tests_for_cap(self, cap, captester, fsname, mount,
+ keyring):
+ PATH = cap[0]
+ PERM = cap[1]
+
+ cephfs_mntpt = os_path_join('/', PATH)
+ keyring_path = mount.client_remote.mktemp(data=keyring)
+ mount.remount(client_id=self.client_id, cephfs_mntpt=cephfs_mntpt,
+ cephfs_name=fsname, client_keyring_path=keyring_path)
+
+ captester.run_cap_tests(self.fs, self.client_id, PERM, PATH)
+
class TestFsAuthorizeUpdate(CephFSTestCase):
client_id = 'testuser'
map<string, string>& newcaps, ostream& out)
{
caps_update is_caps_update_reqd = CAPS_UPDATE_NOT_REQD;
+ caps_update is_caps_update_reqd_mon = CAPS_UPDATE_NOT_REQD;
+ caps_update is_caps_update_reqd_osd = CAPS_UPDATE_NOT_REQD;
+ caps_update is_caps_update_reqd_mds = CAPS_UPDATE_NOT_REQD;
if (e_auth.caps.empty()) {
return CAPS_UPDATE_REQD;
}
if (cap_entity == "mon") {
- is_caps_update_reqd = _merge_caps<MonCap>(cap_entity, new_cap_str,
+ is_caps_update_reqd_mon = _merge_caps<MonCap>(cap_entity, new_cap_str,
cur_cap_str, newcaps, out);
} else if (cap_entity == "osd") {
- is_caps_update_reqd = _merge_caps<OSDCap>(cap_entity, new_cap_str,
+ is_caps_update_reqd_osd = _merge_caps<OSDCap>(cap_entity, new_cap_str,
cur_cap_str, newcaps, out);
} else if (cap_entity == "mds") {
- is_caps_update_reqd = _merge_caps<MDSAuthCaps>(cap_entity, new_cap_str,
- cur_cap_str, newcaps, out);
- }
+ is_caps_update_reqd_mds = _merge_caps<MDSAuthCaps>(cap_entity,
+ new_cap_str, cur_cap_str, newcaps, out);
+ }
+ }
+
+ // if any one of MON, OSD or MDS caps failed to parse, it is pointless
+ // to run the update procedure.
+ if (is_caps_update_reqd_mon == CAPS_PARSING_ERR ||
+ is_caps_update_reqd_osd == CAPS_PARSING_ERR ||
+ is_caps_update_reqd_mds == CAPS_PARSING_ERR) {
+ is_caps_update_reqd = CAPS_PARSING_ERR;
+ // even if any one of MON, OSD or MDS caps needs an update, the update
+ // procedure needs to be executed.
+ } else if (is_caps_update_reqd_mon == CAPS_UPDATE_REQD ||
+ is_caps_update_reqd_osd == CAPS_UPDATE_REQD ||
+ is_caps_update_reqd_mds == CAPS_UPDATE_REQD) {
+ is_caps_update_reqd = CAPS_UPDATE_REQD;
}
return is_caps_update_reqd;