import os
import re
-from shlex import split as shlex_split
-
from tasks.ceph_test_case import CephTestCase
from teuthology import contextutil
class RunCephCmd:
+ def run_ceph_cmd(self, *args, **kwargs):
+ if kwargs.get('args') is None and args:
+ if len(args) == 1:
+ args = args[0]
+ kwargs['args'] = args
+ return self.mon_manager.run_cluster_cmd(**kwargs)
+
def get_ceph_cmd_result(self, *args, **kwargs):
if kwargs.get('args') is None and args:
if len(args) == 1:
args = args[0]
kwargs['args'] = args
- return self.mon_manager.run_cluster_cmd(**kwargs).exitstatus
+ return self.run_ceph_cmd(**kwargs).exitstatus
class CephFSTestCase(CephTestCase, RunCephCmd):
# In case anything is in the OSD blocklist list, clear it out. This is to avoid
# the OSD map changing in the background (due to blocklist expiry) while tests run.
try:
- self.mds_cluster.mon_manager.run_cluster_cmd(args="osd blocklist clear")
+ self.run_ceph_cmd("osd blocklist clear")
except CommandFailedError:
# Fallback for older Ceph cluster
try:
"dump", "--format=json-pretty"))['blocklist']
log.info(f"Removing {len(blocklist)} blocklist entries")
for addr, blocklisted_at in blocklist.items():
- self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blocklist", "rm", addr)
+ self.run_ceph_cmd("osd", "blocklist", "rm", addr)
except KeyError:
# Fallback for more older Ceph clusters, who will use 'blacklist' instead.
blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
"dump", "--format=json-pretty"))['blacklist']
log.info(f"Removing {len(blacklist)} blacklist entries")
for addr, blocklisted_at in blacklist.items():
- self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
+ self.run_ceph_cmd("osd", "blacklist", "rm", addr)
def _init_mon_manager(self):
# if vstart_runner.py has invoked this code
for entry in self.auth_list():
ent_type, ent_id = entry['entity'].split(".")
if ent_type == "client" and ent_id not in client_mount_ids and not (ent_id == "admin" or ent_id[:6] == 'mirror'):
- self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
+ self.run_ceph_cmd("auth", "del", entry['entity'])
if self.REQUIRE_FILESYSTEM:
self.fs = self.mds_cluster.newfs(create=True)
if self.REQUIRE_BACKUP_FILESYSTEM:
if not self.REQUIRE_FILESYSTEM:
self.skipTest("backup filesystem requires a primary filesystem as well")
- self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set',
- 'enable_multiple', 'true',
- '--yes-i-really-mean-it')
+ self.run_ceph_cmd('fs', 'flag', 'set', 'enable_multiple', 'true',
+ '--yes-i-really-mean-it')
self.backup_fs = self.mds_cluster.newfs(name="backup_fs")
self.backup_fs.wait_for_daemons()
except contextutil.MaxWhileTries as e:
raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e
- def run_cluster_cmd(self, cmd):
- if isinstance(cmd, str):
- cmd = shlex_split(cmd)
- return self.fs.mon_manager.raw_cluster_cmd(*cmd)
-
def create_client(self, client_id, moncap=None, osdcap=None, mdscap=None):
if not (moncap or osdcap or mdscap):
if self.fs:
if mdscap:
cmd += ['mds', mdscap]
- self.run_cluster_cmd(cmd)
- return self.run_cluster_cmd(f'auth get {self.client_name}')
+ self.run_ceph_cmd(*cmd)
+ return self.run_ceph_cmd(f'auth get {self.client_name}')
# test that fsname not with "goodchars" fails
args = ['fs', 'new', badname, metapoolname, datapoolname]
- proc = self.fs.mon_manager.run_cluster_cmd(args=args,stderr=StringIO(),
- check_status=False)
+ proc = self.run_ceph_cmd(args=args, stderr=StringIO(),
+ check_status=False)
self.assertIn('invalid chars', proc.stderr.getvalue().lower())
self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'rm', metapoolname,
keys = ['metadata', 'data']
pool_names = [fs_name+'-'+key for key in keys]
for p in pool_names:
- self.run_cluster_cmd(f'osd pool create {p}')
- self.run_cluster_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
+ self.run_ceph_cmd(f'osd pool create {p}')
+ self.run_ceph_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
self.fs.status().get_fsmap(fscid)
for i in range(2):
self.check_pool_application_metadata_key_value(pool_names[i], 'cephfs', keys[i], fs_name)
keys = ['metadata', 'data']
pool_names = [fs_name+'-'+key for key in keys]
for p in pool_names:
- self.run_cluster_cmd(f'osd pool create {p}')
- self.run_cluster_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
- self.run_cluster_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
+ self.run_ceph_cmd(f'osd pool create {p}')
+ self.run_ceph_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
+ self.run_ceph_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
self.fs.status().get_fsmap(fscid)
def test_fs_new_with_specific_id_fails_without_force_flag(self):
keys = ['metadata', 'data']
pool_names = [fs_name+'-'+key for key in keys]
for p in pool_names:
- self.run_cluster_cmd(f'osd pool create {p}')
+ self.run_ceph_cmd(f'osd pool create {p}')
try:
- self.run_cluster_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid}')
+ self.run_ceph_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid}')
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EINVAL,
"invalid error code on creating a file system with specifc ID without --force flag")
keys = ['metadata', 'data']
pool_names = [fs_name+'-'+key for key in keys]
for p in pool_names:
- self.run_cluster_cmd(f'osd pool create {p}')
+ self.run_ceph_cmd(f'osd pool create {p}')
try:
- self.run_cluster_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
+ self.run_ceph_cmd(f'fs new {fs_name} {pool_names[0]} {pool_names[1]} --fscid {fscid} --force')
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EINVAL,
"invalid error code on creating a file system with specifc ID that is already in use")
new_fs_name = 'new_cephfs'
client_id = 'test_new_cephfs'
- self.run_cluster_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
+ self.run_ceph_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
# authorize a cephx ID access to the renamed file system.
# use the ID to write to the file system.
# cleanup
self.mount_a.umount_wait()
- self.run_cluster_cmd(f'auth rm client.{client_id}')
+ self.run_ceph_cmd(f'auth rm client.{client_id}')
def test_fs_rename_idempotency(self):
"""
orig_fs_name = self.fs.name
new_fs_name = 'new_cephfs'
- self.run_cluster_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
- self.run_cluster_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
+ self.run_ceph_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
+ self.run_ceph_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
# original file system name does not appear in `fs ls` command
self.assertFalse(self.fs.exists())
new_fs_name = 'new_cephfs'
data_pool = self.fs.get_data_pool_name()
metadata_pool = self.fs.get_metadata_pool_name()
- self.run_cluster_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
+ self.run_ceph_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
try:
- self.run_cluster_cmd(f"fs new {orig_fs_name} {metadata_pool} {data_pool}")
+ self.run_ceph_cmd(f"fs new {orig_fs_name} {metadata_pool} {data_pool}")
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EINVAL,
"invalid error code on creating a new file system with old "
"existing pools to fail.")
try:
- self.run_cluster_cmd(f"fs new {orig_fs_name} {metadata_pool} {data_pool} --force")
+ self.run_ceph_cmd(f"fs new {orig_fs_name} {metadata_pool} {data_pool} --force")
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EEXIST,
"invalid error code on creating a new file system with old "
"existing pools, and --force flag to fail.")
try:
- self.run_cluster_cmd(f"fs new {orig_fs_name} {metadata_pool} {data_pool} "
+ self.run_ceph_cmd(f"fs new {orig_fs_name} {metadata_pool} {data_pool} "
"--allow-dangerous-metadata-overlay")
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EINVAL,
That renaming a file system without '--yes-i-really-mean-it' flag fails.
"""
try:
- self.run_cluster_cmd(f"fs rename {self.fs.name} new_fs")
+ self.run_ceph_cmd(f"fs rename {self.fs.name} new_fs")
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EPERM,
"invalid error code on renaming a file system without the "
That renaming a non-existent file system fails.
"""
try:
- self.run_cluster_cmd("fs rename non_existent_fs new_fs --yes-i-really-mean-it")
+ self.run_ceph_cmd("fs rename non_existent_fs new_fs --yes-i-really-mean-it")
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on renaming a non-existent fs")
else:
self.fs2 = self.mds_cluster.newfs(name='cephfs2', create=True)
try:
- self.run_cluster_cmd(f"fs rename {self.fs.name} {self.fs2.name} --yes-i-really-mean-it")
+ self.run_ceph_cmd(f"fs rename {self.fs.name} {self.fs2.name} --yes-i-really-mean-it")
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EINVAL,
"invalid error code on renaming to a fs name that is already in use")
orig_fs_name = self.fs.name
new_fs_name = 'new_cephfs'
- self.run_cluster_cmd(f'fs mirror enable {orig_fs_name}')
+ self.run_ceph_cmd(f'fs mirror enable {orig_fs_name}')
try:
- self.run_cluster_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
+ self.run_ceph_cmd(f'fs rename {orig_fs_name} {new_fs_name} --yes-i-really-mean-it')
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EPERM, "invalid error code on renaming a mirrored file system")
else:
self.fail("expected renaming of a mirrored file system to fail")
- self.run_cluster_cmd(f'fs mirror disable {orig_fs_name}')
+ self.run_ceph_cmd(f'fs mirror disable {orig_fs_name}')
class TestDump(CephFSTestCase):
fs_name = "cephfs-_."
self.fs = self.mds_cluster.newfs(name=fs_name)
self.fs.wait_for_daemons()
- self.run_cluster_cmd(f'auth caps client.{self.mount_a.client_id} '
- f'mon "allow r" '
- f'osd "allow rw pool={self.fs.get_data_pool_name()}" '
- f'mds allow')
+ self.run_ceph_cmd(f'auth caps client.{self.mount_a.client_id} '
+ f'mon "allow r" '
+ f'osd "allow rw pool={self.fs.get_data_pool_name()}" '
+ f'mds allow')
self.mount_a.remount(cephfs_name=self.fs.name)
perm = 'rw'
filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
def tearDown(self):
self.mount_a.umount_wait()
- self.run_cluster_cmd(f'auth rm {self.client_name}')
+ self.run_ceph_cmd(f'auth rm {self.client_name}')
super(type(self), self).tearDown()