log = getLogger(__name__)
+def gen_mon_cap_str(caps):
+ """
+ Accepts a tuple of tuples, where the inner tuple contains read/write
+ permissions and CephFS name.
+
+ caps = ((perm1, fsname1), (perm2, fsname2))
+ """
+ def _unpack_tuple(c):
+ if len(c) == 1:
+ perm, fsname = c[0], None
+ elif len(c) == 2:
+ perm, fsname = c
+ else:
+ raise RuntimeError('caps tuple received isn\'t right')
+ return perm, fsname
+
+ def _gen_mon_cap_str(c):
+ perm, fsname = _unpack_tuple(c)
+ mon_cap = f'allow {perm}'
+ if fsname:
+ mon_cap += f' fsname={fsname}'
+ return mon_cap
+
+ if len(caps) == 1:
+ return _gen_mon_cap_str(caps[0])
+
+ mon_cap = ''
+ for i, c in enumerate(caps):
+ mon_cap += _gen_mon_cap_str(c)
+ if i != len(caps) - 1:
+ mon_cap += ', '
+
+ return mon_cap
+
+def gen_osd_cap_str(caps):
+ """
+ Accepts a tuple of tuples, where the inner tuple contains read/write
+ permissions and CephFS name.
+
+ caps = ((perm1, fsname1), (perm2, fsname2))
+ """
+ def _gen_osd_cap_str(c):
+ perm, fsname = c
+ osd_cap = f'allow {perm} tag cephfs'
+ if fsname:
+ osd_cap += f' data={fsname}'
+ return osd_cap
+
+ if len(caps) == 1:
+ return _gen_osd_cap_str(caps[0])
+
+ osd_cap = ''
+ for i, c in enumerate(caps):
+ osd_cap += _gen_osd_cap_str(c)
+ if i != len(caps) - 1:
+ osd_cap += ', '
+
+ return osd_cap
+
+def gen_mds_cap_str(caps):
+ """
+ Accepts a tuple of tuples where inner the tuple containts read/write
+ permissions, Ceph FS name and CephFS mountpoint.
+
+ caps = ((perm1, fsname1, cephfs_mntpt1), (perm2, fsname2, cephfs_mntpt2))
+ """
+ def _unpack_tuple(c):
+ if len(c) == 2:
+ perm, fsname, cephfs_mntpt = c[0], c[1], '/'
+ elif len(c) == 3:
+ perm, fsname, cephfs_mntpt = c
+ elif len(c) < 2:
+ raise RuntimeError('received items are too less in caps')
+ else: # len(c) > 3
+ raise RuntimeError('received items are too many in caps')
+
+ return perm, fsname, cephfs_mntpt
+
+ def _gen_mds_cap_str(c):
+ perm, fsname, cephfs_mntpt = _unpack_tuple(c)
+ mds_cap = f'allow {perm}'
+ if fsname:
+ mds_cap += f' fsname={fsname}'
+ if cephfs_mntpt != '/':
+ if cephfs_mntpt[0] == '/':
+ cephfs_mntpt = cephfs_mntpt[1:]
+ mds_cap += f' path={cephfs_mntpt}'
+ return mds_cap
+
+ if len(caps) == 1:
+ return _gen_mds_cap_str(caps[0])
+
+ mds_cap = ''
+ for i, c in enumerate(caps):
+ mds_cap += _gen_mds_cap_str(c)
+ if i != len(caps) - 1:
+ mds_cap += ', '
+
+ return mds_cap
+
+
class CapTester(CephFSTestCase):
"""
Test that MON and MDS caps are enforced.
import logging
from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from tasks.cephfs.caps_helper import CapTester
+from tasks.cephfs.caps_helper import (CapTester, gen_mon_cap_str,
+ gen_osd_cap_str, gen_mds_cap_str)
from teuthology.exceptions import CommandFailedError
class TestMONCaps(TestMultiFS):
def test_moncap_with_one_fs_names(self):
- moncap = f'allow r fsname={self.fs1.name}'
+ moncap = gen_mon_cap_str((('r', self.fs1.name),))
self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id)
def test_moncap_with_multiple_fs_names(self):
- moncap = (f'allow r fsname={self.fs1.name}, '
- f'allow r fsname={self.fs2.name}')
+ moncap = gen_mon_cap_str((('r', self.fs1.name), ('r', self.fs2.name)))
self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id)
super(type(self), self).tearDown()
- def generate_caps(self, perm, fsname, cephfs_mntpt):
+ def _create_client(self, perm, fsname=False, cephfs_mntpt='/'):
moncap = 'allow r'
- osdcap = (f'allow {perm} tag cephfs data={self.fs1.name}, '
- f'allow {perm} tag cephfs data={self.fs2.name}')
-
+ osdcap = gen_osd_cap_str(((perm, self.fs1.name),
+ (perm, self.fs2.name)))
if fsname:
- if cephfs_mntpt == '/':
- mdscap = (f'allow {perm} fsname={self.fs1.name}, '
- f'allow {perm} fsname={self.fs2.name}')
- else:
- mdscap = (f'allow {perm} fsname={self.fs1.name} '
- f'path=/{cephfs_mntpt}, '
- f'allow {perm} fsname={self.fs2.name} '
- f'path=/{cephfs_mntpt}')
+ mdscap = gen_mds_cap_str(((perm, self.fs1.name, cephfs_mntpt),
+ (perm, self.fs2.name, cephfs_mntpt)))
else:
- if cephfs_mntpt == '/':
- mdscap = f'allow {perm}'
- else:
- mdscap = f'allow {perm} path=/{cephfs_mntpt}'
-
- return moncap, osdcap, mdscap
-
- def _create_client(self, perm, fsname=False, cephfs_mntpt='/'):
- moncap, osdcap, mdscap = self.generate_caps(perm, fsname,
- cephfs_mntpt)
+ mdscap = gen_mds_cap_str(((perm, None, cephfs_mntpt),
+ (perm, None, cephfs_mntpt)))
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
keyring_paths = []
def test_mount_mon_and_osd_caps_present_mds_caps_absent(self):
# setup part...
- moncap = f'allow rw fsname={self.fs1.name}, allow rw fsname={self.fs2.name}'
- mdscap = f'allow rw fsname={self.fs1.name}'
- osdcap = (f'allow rw tag cephfs data={self.fs1.name}, allow rw tag '
- f'cephfs data={self.fs2.name}')
+ moncap = gen_mon_cap_str((('rw', self.fs1.name),
+ ('rw', self.fs2.name)))
+ mdscap = gen_mds_cap_str((('rw', self.fs1.name),))
+ osdcap = gen_osd_cap_str((('rw', self.fs1.name,),
+ ('rw', self.fs2.name)))
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
keyring_path = self.mount_a.client_remote.mktemp(data=keyring)