self.fs.set_bal_rank_mask(bal_rank_mask)
except CommandFailedError as e:
self.assertEqual(e.exitstatus, errno.EINVAL)
+
+
+class TestPermErrMsg(CephFSTestCase):
+
+ CLIENT_NAME = 'client.testuser'
+ FS1_NAME, FS2_NAME, FS3_NAME = 'abcd', 'efgh', 'ijkl'
+
+ EXPECTED_ERRNO = 22
+ EXPECTED_ERRMSG = ("Permission flags in MDS caps must start with 'r' or "
+ "'rw' or be '*' or 'all'")
+
+ MONCAP = f'allow r fsname={FS1_NAME}'
+ OSDCAP = f'allow rw tag cephfs data={FS1_NAME}'
+ MDSCAPS = [
+ 'allow w',
+ f'allow w fsname={FS1_NAME}',
+
+ f'allow rw fsname={FS1_NAME}, allow w fsname={FS2_NAME}',
+ f'allow w fsname={FS1_NAME}, allow rw fsname={FS2_NAME}',
+ f'allow w fsname={FS1_NAME}, allow w fsname={FS2_NAME}',
+
+ (f'allow rw fsname={FS1_NAME}, allow rw fsname={FS2_NAME}, allow '
+ f'w fsname={FS3_NAME}'),
+
+ # without space after comma
+ f'allow rw fsname={FS1_NAME},allow w fsname={FS2_NAME}',
+
+
+ 'allow wr',
+ f'allow wr fsname={FS1_NAME}',
+
+ f'allow rw fsname={FS1_NAME}, allow wr fsname={FS2_NAME}',
+ f'allow wr fsname={FS1_NAME}, allow rw fsname={FS2_NAME}',
+ f'allow wr fsname={FS1_NAME}, allow wr fsname={FS2_NAME}',
+
+ (f'allow rw fsname={FS1_NAME}, allow rw fsname={FS2_NAME}, allow '
+ f'wr fsname={FS3_NAME}'),
+
+ # without space after comma
+ f'allow rw fsname={FS1_NAME},allow wr fsname={FS2_NAME}']
+
+ def _negtestcmd(self, SUBCMD, MDSCAP):
+ return self.negtest_ceph_cmd(
+ args=(f'{SUBCMD} {self.CLIENT_NAME} '
+ f'mon "{self.MONCAP}" osd "{self.OSDCAP}" mds "{MDSCAP}"'),
+ retval=self.EXPECTED_ERRNO, errmsgs=self.EXPECTED_ERRMSG)
+
+ def test_auth_add(self):
+ for mdscap in self.MDSCAPS:
+ self._negtestcmd('auth add', mdscap)
+
+ def test_auth_get_or_create(self):
+ for mdscap in self.MDSCAPS:
+ self._negtestcmd('auth get-or-create', mdscap)
+
+ def test_auth_get_or_create_key(self):
+ for mdscap in self.MDSCAPS:
+ self._negtestcmd('auth get-or-create-key', mdscap)