CLIENTS_REQUIRED = 1
MDSS_REQUIRED = 1
- def test_fsnames_can_only_by_goodchars(self):
- n = 'test_fsnames_can_only_by_goodchars'
- metapoolname, datapoolname = n+'-testmetapool', n+'-testdatapool'
- badname = n+'badname@#'
+ def check_pool_application_metadata_key_value(self, pool, app, key, value):
+ output = self.fs.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'application', 'get', pool, app, key)
+ self.assertEqual(str(output.strip()), value)
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
- n+metapoolname)
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
- n+datapoolname)
+ def setup_ec_pools(self, n, metadata=True, overwrites=True):
+ if metadata:
+ self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+"-meta", "8")
+ cmd = ['osd', 'erasure-code-profile', 'set', n+"-profile", "m=2", "k=2", "crush-failure-domain=osd"]
+ self.fs.mon_manager.raw_cluster_cmd(*cmd)
+ self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+"-data", "8", "erasure", n+"-profile")
+ if overwrites:
+ self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'set', n+"-data", 'allow_ec_overwrites', 'true')
- # test that fsname not with "goodchars" fails
- args = ['fs', 'new', badname, metapoolname, datapoolname]
- proc = self.fs.mon_manager.run_cluster_cmd(args=args,stderr=StringIO(),
- check_status=False)
- self.assertIn('invalid chars', proc.stderr.getvalue().lower())
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'rm', metapoolname,
- metapoolname,
- '--yes-i-really-really-mean-it-not-faking')
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'rm', datapoolname,
- datapoolname,
- '--yes-i-really-really-mean-it-not-faking')
+class TestFsStatus(TestAdminCommands):
+ """
+ Test "ceph fs status subcommand.
+ """
def test_fs_status(self):
"""
mdsmap = json.loads(self.fs.mon_manager.raw_cluster_cmd("fs", "status", "--format=json"))["mdsmap"]
self.assertEqual(mdsmap[0]["state"], "active")
- def _setup_ec_pools(self, n, metadata=True, overwrites=True):
- if metadata:
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+"-meta", "8")
- cmd = ['osd', 'erasure-code-profile', 'set', n+"-profile", "m=2", "k=2", "crush-failure-domain=osd"]
- self.fs.mon_manager.raw_cluster_cmd(*cmd)
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+"-data", "8", "erasure", n+"-profile")
- if overwrites:
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'set', n+"-data", 'allow_ec_overwrites', 'true')
- def _check_pool_application_metadata_key_value(self, pool, app, key, value):
- output = self.fs.mon_manager.raw_cluster_cmd(
- 'osd', 'pool', 'application', 'get', pool, app, key)
- self.assertEqual(str(output.strip()), value)
+class TestAddDataPool(TestAdminCommands):
+ """
+ Test "ceph fs add_data_pool" subcommand.
+ """
def test_add_data_pool_root(self):
"""
# Check whether https://tracker.ceph.com/issues/43061 is fixed
mon_cmd('osd', 'pool', 'application', 'enable', pool_name, 'cephfs')
self.fs.add_data_pool(pool_name, create=False)
- self._check_pool_application_metadata_key_value(
+ self.check_pool_application_metadata_key_value(
pool_name, 'cephfs', 'data', self.fs.name)
def test_add_data_pool_subdir(self):
"""
n = "test_add_data_pool_ec"
- self._setup_ec_pools(n, metadata=False)
+ self.setup_ec_pools(n, metadata=False)
self.fs.add_data_pool(n+"-data", create=False)
+
+class TestFsNew(TestAdminCommands):
+ """
+ Test "ceph fs new" subcommand.
+ """
+
+ def test_fsnames_can_only_by_goodchars(self):
+ n = 'test_fsnames_can_only_by_goodchars'
+ metapoolname, datapoolname = n+'-testmetapool', n+'-testdatapool'
+ badname = n+'badname@#'
+
+ self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+ n+metapoolname)
+ self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+ n+datapoolname)
+
+ # test that fsname not with "goodchars" fails
+ args = ['fs', 'new', badname, metapoolname, datapoolname]
+ proc = self.fs.mon_manager.run_cluster_cmd(args=args,stderr=StringIO(),
+ check_status=False)
+ self.assertIn('invalid chars', proc.stderr.getvalue().lower())
+
+ self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'rm', metapoolname,
+ metapoolname,
+ '--yes-i-really-really-mean-it-not-faking')
+ self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'rm', datapoolname,
+ datapoolname,
+ '--yes-i-really-really-mean-it-not-faking')
+
def test_new_default_ec(self):
"""
That a new file system warns/fails with an EC default data pool.
self.mount_a.umount_wait(require_clean=True)
self.mds_cluster.delete_all_filesystems()
n = "test_new_default_ec"
- self._setup_ec_pools(n)
+ self.setup_ec_pools(n)
try:
self.fs.mon_manager.raw_cluster_cmd('fs', 'new', n, n+"-meta", n+"-data")
except CommandFailedError as e:
self.mount_a.umount_wait(require_clean=True)
self.mds_cluster.delete_all_filesystems()
n = "test_new_default_ec_force"
- self._setup_ec_pools(n)
+ self.setup_ec_pools(n)
self.fs.mon_manager.raw_cluster_cmd('fs', 'new', n, n+"-meta", n+"-data", "--force")
def test_new_default_ec_no_overwrite(self):
self.mount_a.umount_wait(require_clean=True)
self.mds_cluster.delete_all_filesystems()
n = "test_new_default_ec_no_overwrite"
- self._setup_ec_pools(n, overwrites=False)
+ self.setup_ec_pools(n, overwrites=False)
try:
self.fs.mon_manager.raw_cluster_cmd('fs', 'new', n, n+"-meta", n+"-data")
except CommandFailedError as e:
mon_cmd('osd', 'pool', 'application', 'enable', p, 'cephfs')
mon_cmd('fs', 'new', fs_name, pool_names[0], pool_names[1])
for i in range(2):
- self._check_pool_application_metadata_key_value(
+ self.check_pool_application_metadata_key_value(
pool_names[i], 'cephfs', keys[i], fs_name)
+
class TestRequiredClientFeatures(CephFSTestCase):
CLIENTS_REQUIRED = 0
MDSS_REQUIRED = 1
self._verify_mirroring(self.fs.name, "disabled")
-class TestSubCmdFsAuthorize(CapsHelper):
+class TestFsAuthorize(CapsHelper):
client_id = 'testuser'
client_name = 'client.' + client_id