if mdss.remotes:
log.info('Setting up CephFS filesystem...')
- ceph_fs = Filesystem(ctx) # TODO: make Filesystem cluster-aware
- if not ceph_fs.legacy_configured():
- ceph_fs.create()
+ Filesystem(ctx, create='cephfs') # TODO: make Filesystem cluster-aware
is_active_mds = lambda role: 'mds.' in role and not role.endswith('-s') and '-s-' not in role
all_roles = [item for remote_roles in mdss.remotes.values() for item in remote_roles]
LOAD_SETTINGS = []
def setUp(self):
- if len(self.fs.mds_ids) < self.MDSS_REQUIRED:
+ if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED:
raise case.SkipTest("Only have {0} MDSs, require {1}".format(
- len(self.fs.mds_ids), self.MDSS_REQUIRED
+ len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED
))
if len(self.mounts) < self.CLIENTS_REQUIRED:
raise case.SkipTest("kclient clients must be on separate nodes")
if self.REQUIRE_ONE_CLIENT_REMOTE:
- if self.mounts[0].client_remote.hostname in self.fs.get_mds_hostnames():
+ if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames():
raise case.SkipTest("Require first client to be on separate server from MDSs")
if self.REQUIRE_MEMSTORE:
- objectstore = self.fs.get_config("osd_objectstore", "osd")
+ objectstore = self.mds_cluster.get_config("osd_objectstore", "osd")
if objectstore != "memstore":
# You certainly *could* run this on a real OSD, but you don't want to sit
# here for hours waiting for the test to fill up a 1TB drive!
for i in range(0, self.CLIENTS_REQUIRED):
setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i])
- self.fs.clear_firewall()
+ self.mds_cluster.clear_firewall()
# Unmount in order to start each test on a fresh mount, such
# that test_barrier can have a firm expectation of what OSD
# the filesystem rather than just doing a rm -rf of files
self.mds_cluster.mds_stop()
self.mds_cluster.delete_all_filesystems()
+ self.fs = None # is now invalid!
# In case the previous filesystem had filled up the RADOS cluster, wait for that
# flag to pass.
- osd_mon_report_interval_max = int(self.fs.get_config("osd_mon_report_interval_max", service_type='osd'))
- self.wait_until_true(lambda: not self.fs.is_full(),
+ osd_mon_report_interval_max = int(self.mds_cluster.get_config("osd_mon_report_interval_max", service_type='osd'))
+ self.wait_until_true(lambda: not self.mds_cluster.is_full(),
timeout=osd_mon_report_interval_max * 5)
# In case anything is in the OSD blacklist list, clear it out. This is to avoid
# the OSD map changing in the background (due to blacklist expiry) while tests run.
try:
- self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "clear")
+ self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "clear")
except CommandFailedError:
# Fallback for older Ceph cluster
- blacklist = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd",
+ blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
"dump", "--format=json-pretty"))['blacklist']
log.info("Removing {0} blacklist entries".format(len(blacklist)))
for addr, blacklisted_at in blacklist.items():
- self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
+ self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
- # In case some test messed with auth caps, reset them
client_mount_ids = [m.client_id for m in self.mounts]
- for client_id in client_mount_ids:
- self.fs.mon_manager.raw_cluster_cmd_result(
- 'auth', 'caps', "client.{0}".format(client_id),
- 'mds', 'allow',
- 'mon', 'allow r',
- 'osd', 'allow rw pool={0}'.format(self.fs.get_data_pool_name()))
-
- log.info(client_mount_ids)
-
# In case the test changes the IDs of clients, stash them so that we can
# reset in tearDown
self._original_client_ids = client_mount_ids
+ log.info(client_mount_ids)
# In case there were any extra auth identities around from a previous
# test, delete them
for entry in self.auth_list():
ent_type, ent_id = entry['entity'].split(".")
if ent_type == "client" and ent_id not in client_mount_ids and ent_id != "admin":
- self.fs.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
+ self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
if self.REQUIRE_FILESYSTEM:
- self.fs.create()
+ self.fs = self.mds_cluster.newfs(True)
self.fs.mds_restart()
+
+ # In case some test messed with auth caps, reset them
+ for client_id in client_mount_ids:
+ self.mds_cluster.mon_manager.raw_cluster_cmd_result(
+ 'auth', 'caps', "client.{0}".format(client_id),
+ 'mds', 'allow',
+ 'mon', 'allow r',
+ 'osd', 'allow rw pool={0}'.format(self.fs.get_data_pool_name()))
+
+ # wait for mds restart to complete...
self.fs.wait_for_daemons()
if not self.mount_a.is_mounted():
self.mount_a.mount()
# Load an config settings of interest
for setting in self.LOAD_SETTINGS:
setattr(self, setting, int(self.fs.mds_asok(
- ['config', 'get', setting], self.fs.mds_ids[0]
+ ['config', 'get', setting], self.mds_cluster.mds_ids[0]
)[setting]))
self.configs_set = set()
def tearDown(self):
- self.fs.clear_firewall()
+ self.mds_cluster.clear_firewall()
for m in self.mounts:
m.teardown()
"""
Convenience wrapper on "ceph auth list"
"""
- return json.loads(self.fs.mon_manager.raw_cluster_cmd(
+ return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd(
"auth", "list", "--format=json-pretty"
))['auth_dump']
MDS ranks or in the list of standbys
"""
def get_daemon_names():
- fs_map = self.mds_cluster.get_fs_map()
- names = [m['name'] for m in fs_map['standbys']]
- for fs in fs_map['filesystems']:
- names.extend([info['name'] for info in fs['mdsmap']['info'].values()])
-
- return names
+ return [info['name'] for info in self.mds_cluster.status().get_all()]
if daemon_ids is None:
daemon_ids = self.mds_cluster.mds_ids
it does)
"""
try:
- self.fs.mds_daemons[daemon_id].proc.wait()
+ self.mds_cluster.mds_daemons[daemon_id].proc.wait()
except CommandFailedError as e:
log.info("MDS '{0}' crashed with status {1} as expected".format(daemon_id, e.exitstatus))
- self.fs.mds_daemons[daemon_id].proc = None
+ self.mds_cluster.mds_daemons[daemon_id].proc = None
# Go remove the coredump from the crash, otherwise teuthology.internal.coredump will
# catch it later and treat it as a failure.
- p = self.fs.mds_daemons[daemon_id].remote.run(args=[
+ p = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
"sudo", "sysctl", "-n", "kernel.core_pattern"], stdout=StringIO())
core_pattern = p.stdout.getvalue().strip()
if os.path.dirname(core_pattern): # Non-default core_pattern with a directory in it
# Determine the PID of the crashed MDS by inspecting the MDSMap, it had
# to talk to the mons to get assigned a rank to reach the point of crashing
- addr = self.fs.mon_manager.get_mds_status(daemon_id)['addr']
+ addr = self.mds_cluster.mon_manager.get_mds_status(daemon_id)['addr']
pid_str = addr.split("/")[1]
log.info("Determined crasher PID was {0}".format(pid_str))
core_glob = re.sub("%[a-z]", "*", core_glob) # Match all for all other % tokens
# Verify that we see the expected single coredump matching the expected pattern
- ls_proc = self.fs.mds_daemons[daemon_id].remote.run(args=[
+ ls_proc = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
"sudo", "ls", run.Raw(core_glob)
], stdout=StringIO())
cores = [f for f in ls_proc.stdout.getvalue().strip().split("\n") if f]
log.info("Found core file {0}, deleting it".format(cores[0]))
- self.fs.mds_daemons[daemon_id].remote.run(args=[
+ self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
"sudo", "rm", "-f", cores[0]
])
else:
def __str__(self):
return "Object not found: '{0}'".format(self._object_name)
+class FSStatus(object):
+ """
+ Operations on a snapshot of the FSMap.
+ """
+ def __init__(self, mon_manager):
+ self.mon = mon_manager
+ self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json-pretty"))
+
+ def __str__(self):
+ return json.dumps(self.map, indent = 2, sort_keys = True)
+
+ # Expose the fsmap for manual inspection.
+ def __getitem__(self, key):
+ """
+ Get a field from the fsmap.
+ """
+ return self.map[key]
+
+ def get_filesystems(self):
+ """
+ Iterator for all filesystems.
+ """
+ for fs in self.map['filesystems']:
+ yield fs
+
+ def get_all(self):
+ """
+ Iterator for all the mds_info components in the FSMap.
+ """
+ for info in self.get_standbys():
+ yield info
+ for fs in self.map['filesystems']:
+ for info in fs['mdsmap']['info'].values():
+ yield info
+
+ def get_standbys(self):
+ """
+ Iterator for all standbys.
+ """
+ for info in self.map['standbys']:
+ yield info
+
+ def get_fsmap(self, fscid):
+ """
+ Get the fsmap for the given FSCID.
+ """
+ for fs in self.map['filesystems']:
+ if fscid is None or fs['id'] == fscid:
+ return fs
+ raise RuntimeError("FSCID {0} not in map".format(fscid))
+
+ def get_fsmap_byname(self, name):
+ """
+ Get the fsmap for the given file system name.
+ """
+ for fs in self.map['filesystems']:
+ if name is None or fs['mdsmap']['fs_name'] == name:
+ return fs
+ raise RuntimeError("FS {0} not in map".format(name))
+
+ def get_replays(self, fscid):
+ """
+ Get the standby:replay MDS for the given FSCID.
+ """
+ fs = self.get_fsmap(fscid)
+ for info in fs['mdsmap']['info'].values():
+ if info['state'] == 'up:standby-replay':
+ yield info
+
+ def get_ranks(self, fscid):
+ """
+ Get the ranks for the given FSCID.
+ """
+ fs = self.get_fsmap(fscid)
+ for info in fs['mdsmap']['info'].values():
+ if info['rank'] >= 0:
+ yield info
+
+ def get_rank(self, fscid, rank):
+ """
+ Get the rank for the given FSCID.
+ """
+ for info in self.get_ranks(fscid):
+ if info['rank'] == rank:
+ return info
+ raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
+
+ def get_mds(self, name):
+ """
+ Get the info for the given MDS name.
+ """
+ for info in self.get_all():
+ if info['name'] == name:
+ return info
+ return None
+
+ def get_mds_addr(self, name):
+ """
+ Return the instance addr as a string, like "10.214.133.138:6807\/10825"
+ """
+ info = self.get_mds(name)
+ if info:
+ return info['addr']
+ else:
+ log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
+ raise RuntimeError("MDS id '{0}' not found in map".format(name))
class CephCluster(object):
@property
self._one_or_all(mds_id, _fail_restart)
- def get_filesystem(self, name):
- return Filesystem(self._ctx, name)
+ def newfs(self, name):
+ return Filesystem(self._ctx, create=name)
- def get_fs_map(self):
- fs_map = json.loads(self.mon_manager.raw_cluster_cmd("fs", "dump", "--format=json-pretty"))
- return fs_map
+ def status(self):
+ return FSStatus(self.mon_manager)
def delete_all_filesystems(self):
"""
Remove all filesystems that exist, and any pools in use by them.
"""
- fs_ls = json.loads(self.mon_manager.raw_cluster_cmd("fs", "ls", "--format=json-pretty"))
- for fs in fs_ls:
- self.mon_manager.raw_cluster_cmd("fs", "set", fs['name'], "cluster_down", "true")
- mds_map = json.loads(
- self.mon_manager.raw_cluster_cmd(
- "fs", "get", fs['name'], "--format=json-pretty"))['mdsmap']
+ pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+ pool_id_name = {}
+ for pool in pools:
+ pool_id_name[pool['pool']] = pool['pool_name']
+ status = self.status()
+ for fs in status.get_filesystems():
+ mdsmap = fs['mdsmap']
+ name = mdsmap['fs_name']
+ metadata_pool = pool_id_name[mdsmap['metadata_pool']]
+
+ self.mon_manager.raw_cluster_cmd("fs", "set", name, "cluster_down", "true")
- for gid in mds_map['up'].values():
+ for gid in mdsmap['up'].values():
self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
- self.mon_manager.raw_cluster_cmd('fs', 'rm', fs['name'], '--yes-i-really-mean-it')
+ self.mon_manager.raw_cluster_cmd('fs', 'rm', name, '--yes-i-really-mean-it')
self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
- fs['metadata_pool'],
- fs['metadata_pool'],
+ metadata_pool, metadata_pool,
'--yes-i-really-really-mean-it')
- for data_pool in fs['data_pools']:
+ for data_pool in mdsmap['data_pools']:
+ data_pool = pool_id_name[data_pool]
self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
data_pool, data_pool,
'--yes-i-really-really-mean-it')
def get_standby_daemons(self):
- return set([s['name'] for s in self.get_fs_map()['standbys']])
+ return set([s['name'] for s in self.status().get_standbys()])
def get_mds_hostnames(self):
result = set()
def set_block(_mds_id):
remote = self.mon_manager.find_remote('mds', _mds_id)
+ status = self.status()
- addr = self.get_mds_addr(_mds_id)
+ addr = status.get_mds_addr(_mds_id)
ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
remote.run(
def clear_firewall(self):
clear_firewall(self._ctx)
- def _all_info(self):
- """
- Iterator for all the mds_info components in the FSMap
- """
- fs_map = self.get_fs_map()
- for i in fs_map['standbys']:
- yield i
- for fs in fs_map['filesystems']:
- for i in fs['mdsmap']['info'].values():
- yield i
-
- def get_mds_addr(self, mds_id):
- """
- Return the instance addr as a string, like "10.214.133.138:6807\/10825"
- """
- for mds_info in self._all_info():
- if mds_info['name'] == mds_id:
- return mds_info['addr']
-
- log.warn(json.dumps(list(self._all_info()), indent=2)) # dump for debugging
- raise RuntimeError("MDS id '{0}' not found in map".format(mds_id))
-
def get_mds_info(self, mds_id):
- for mds_info in self._all_info():
- if mds_info['name'] == mds_id:
- return mds_info
+ return FSStatus(self.mon_manager).get_mds(mds_id)
- return None
-
- def get_mds_info_by_rank(self, mds_rank):
- for mds_info in self._all_info():
- if mds_info['rank'] == mds_rank:
- return mds_info
+ def is_full(self):
+ flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
+ return 'full' in flags
- return None
+ def is_pool_full(self, pool_name):
+ pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
+ for pool in pools:
+ if pool['pool_name'] == pool_name:
+ return 'full' in pool['flags_names'].split(",")
+ raise RuntimeError("Pool not found '{0}'".format(pool_name))
class Filesystem(MDSCluster):
"""
This object is for driving a CephFS filesystem. The MDS daemons driven by
MDSCluster may be shared with other Filesystems.
"""
- def __init__(self, ctx, name=None):
+ def __init__(self, ctx, fscid=None, create=None):
super(Filesystem, self).__init__(ctx)
- if name is None:
- name = "cephfs"
-
- self.name = name
- self.metadata_pool_name = "{0}_metadata".format(name)
- self.data_pool_name = "{0}_data".format(name)
+ self.id = None
+ self.name = None
+ self.metadata_pool_name = None
+ self.data_pools = None
client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
self.client_id = client_list[0]
self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
+ if create is not None:
+ if fscid is not None:
+ raise RuntimeError("cannot specify fscid when creating fs")
+ if create is True:
+ self.name = 'cephfs'
+ else:
+ self.name = create
+ if not self.legacy_configured():
+ self.create()
+ elif fscid is not None:
+ self.id = fscid
+ self.getinfo(refresh = True)
+
+ def getinfo(self, refresh = False):
+ status = self.status()
+ if self.id is not None:
+ fsmap = status.get_fsmap(self.id)
+ elif self.name is not None:
+ fsmap = status.get_fsmap_byname(self.name)
+ else:
+ fss = [fs for fs in status.get_filesystems()]
+ if len(fss) == 1:
+ fsmap = fss[0]
+ elif len(fss) == 0:
+ raise RuntimeError("no file system available")
+ else:
+ raise RuntimeError("more than one file system available")
+ self.id = fsmap['id']
+ self.name = fsmap['mdsmap']['fs_name']
+ self.get_pool_names(status = status, refresh = refresh)
+ return status
+
+ def deactivate(self, rank):
+ if rank < 0:
+ raise RuntimeError("invalid rank")
+ elif rank == 0:
+ raise RuntimeError("cannot deactivate rank 0")
+ self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
+
+ def set_max_mds(self, max_mds):
+ self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
+
def get_pgs_per_fs_pool(self):
"""
Calculate how many PGs to use when creating a pool, in order to avoid raising any
osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
return pg_warn_min_per_osd * osd_count
- def get_all_mds_info(self):
- return self.get_mds_info()
-
def create(self):
+ if self.name is None:
+ self.name = "cephfs"
+ if self.metadata_pool_name is None:
+ self.metadata_pool_name = "{0}_metadata".format(self.name)
+ data_pool_name = "{0}_data".format(self.name)
+
log.info("Creating filesystem '{0}'".format(self.name))
pgs_per_fs_pool = self.get_pgs_per_fs_pool()
self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
self.metadata_pool_name, pgs_per_fs_pool.__str__())
self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
- self.data_pool_name, pgs_per_fs_pool.__str__())
+ data_pool_name, pgs_per_fs_pool.__str__())
self.mon_manager.raw_cluster_cmd('fs', 'new',
- self.name, self.metadata_pool_name, self.data_pool_name)
+ self.name, self.metadata_pool_name, data_pool_name)
+
+ self.getinfo(refresh = True)
def exists(self):
"""
out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
pools = json.loads(out_text)
metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
+ if metadata_pool_exists:
+ self.metadata_pool_name = 'metadata'
except CommandFailedError as e:
# For use in upgrade tests, Ceph cuttlefish and earlier don't support
# structured output (--format) from the CLI.
return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
def get_mds_map(self):
- fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
- return fs['mdsmap']
-
- def get_data_pool_name(self):
- return self.data_pool_name
-
- def get_data_pool_id(self):
+ return self.status().get_fsmap(self.id)['mdsmap']
+
+ def add_data_pool(self, name):
+ self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
+ self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
+ self.get_pool_names(refresh = True)
+ for poolid, fs_name in self.data_pools.items():
+ if name == fs_name:
+ return poolid
+ raise RuntimeError("could not get just created pool '{0}'".format(name))
+
+ def get_pool_names(self, refresh = False, status = None):
+ if refresh or self.metadata_pool_name is None or self.data_pools is None:
+ if status is None:
+ status = self.status()
+ fsmap = status.get_fsmap(self.id)
+
+ osd_map = self.mon_manager.get_osd_dump_json()
+ id_to_name = {}
+ for p in osd_map['pools']:
+ id_to_name[p['pool']] = p['pool_name']
+
+ self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
+ self.data_pools = {}
+ for data_pool in fsmap['mdsmap']['data_pools']:
+ self.data_pools[data_pool] = id_to_name[data_pool]
+
+ def get_data_pool_name(self, refresh = False):
+ if refresh or self.data_pools is None:
+ self.get_pool_names(refresh = True)
+ assert(len(self.data_pools) == 1)
+ return self.data_pools.values()[0]
+
+ def get_data_pool_id(self, refresh = False):
"""
Don't call this if you have multiple data pools
:return: integer
"""
- pools = self.get_mds_map()['data_pools']
- assert(len(pools) == 1)
- return pools[0]
-
- def get_data_pool_names(self):
- osd_map = self.mon_manager.get_osd_dump_json()
- id_to_name = {}
- for p in osd_map['pools']:
- id_to_name[p['pool']] = p['pool_name']
+ if refresh or self.data_pools is None:
+ self.get_pool_names(refresh = True)
+ assert(len(self.data_pools) == 1)
+ return self.data_pools.keys()[0]
- return [id_to_name[pool_id] for pool_id in self.get_mds_map()['data_pools']]
+ def get_data_pool_names(self, refresh = False):
+ if refresh or self.data_pools is None:
+ self.get_pool_names(refresh = True)
+ return self.data_pools.values()
def get_metadata_pool_name(self):
return self.metadata_pool_name
def get_namespace_id(self):
- fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
- return fs['id']
+ return self.id
def get_pool_df(self, pool_name):
"""
def recreate(self):
log.info("Creating new filesystem")
self.delete_all_filesystems()
+ self.id = None
self.create()
def put_metadata_object_raw(self, object_id, infile):
return self.json_asok(command, 'mds', mds_id)
- def is_full(self):
- flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
- return 'full' in flags
-
- def is_pool_full(self, pool_name):
- pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
- for pool in pools:
- if pool['pool_name'] == pool_name:
- return 'full' in pool['flags_names'].split(",")
-
- raise RuntimeError("Pool not found '{0}'".format(pool_name))
-
def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None):
"""
Block until the MDS reaches a particular state, or a failure condition
started_at = time.time()
while True:
+ status = self.status()
if mds_id is not None:
# mds_info is None if no daemon with this ID exists in the map
- mds_info = self.mon_manager.get_mds_status(mds_id)
+ mds_info = status.get_mds(mds_id)
current_state = mds_info['state'] if mds_info else None
log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
else:
# In general, look for a single MDS
- mds_status = self.get_mds_map()
- states = [m['state'] for m in mds_status['info'].values()]
+ states = [m['state'] for m in status.get_ranks(self.id)]
if [s for s in states if s == goal_state] == [goal_state]:
current_state = goal_state
elif reject in states:
elif reject is not None and current_state == reject:
raise RuntimeError("MDS in reject state {0}".format(current_state))
elif timeout is not None and elapsed > timeout:
- log.error("MDS status at timeout: {0}".format(self.get_mds_map()))
+ log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
raise RuntimeError(
"Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
elapsed, goal_state, current_state
are updated correctly.
"""
- def get_pool_id(name):
- return self.fs.mon_manager.get_pool_dump(name)['pool']
-
old_data_pool_name = self.fs.get_data_pool_name()
- old_pool_id = get_pool_id(old_data_pool_name)
+ old_pool_id = self.fs.get_data_pool_id()
# Create a file for subsequent checks
self.mount_a.run_shell(["mkdir", "parent_a"])
# Create a new data pool
new_pool_name = "data_new"
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', new_pool_name,
- self.fs.get_pgs_per_fs_pool().__str__())
- self.fs.mon_manager.raw_cluster_cmd('mds', 'add_data_pool', new_pool_name)
- new_pool_id = get_pool_id(new_pool_name)
+ new_pool_id = self.fs.add_data_pool(new_pool_name)
# That an object which has switched pools gets its backtrace updated
self.mount_a.run_shell(["setfattr", "-n", "ceph.file.layout.pool", "-v", new_pool_name, "./parent_b/alpha"])
# Start the MDS
self.fs.mds_restart()
self.fs.wait_for_daemons()
- import json
- log.info(json.dumps(self.mds_cluster.get_fs_map(), indent=2))
+ log.info(str(self.mds_cluster.status()))
# Mount a client
self.mount_a.mount()
if replay:
self.set_conf("mds.{0}".format(follower), "mds_standby_replay", "true")
- def get_info_by_name(self, fs, mds_name):
- if fs is None:
- mds_info = self.mds_cluster.get_fs_map()['standbys']
+ def get_info_by_name(self, mds_name):
+ status = self.mds_cluster.status()
+ info = status.get_mds(mds_name)
+ if info is None:
+ log.warn(str(status))
+ raise RuntimeError("MDS '{0}' not found".format(mds_name))
else:
- mds_info = fs.get_mds_map()['info'].values()
- for info in mds_info:
- if info['name'] == mds_name:
- return info
-
- log.warn(json.dumps(mds_info, indent=2))
- raise RuntimeError("MDS '{0}' not found".format(mds_name))
+ return info
def test_standby_replay_unused(self):
# Pick out exactly 3 daemons to be run during test
self.set_standby_for(mds_a, mds_c, True)
# Create FS and start A
- fs_a = self.mds_cluster.get_filesystem("alpha")
- fs_a.create()
+ fs_a = self.mds_cluster.newfs("alpha")
self.mds_cluster.mds_restart(mds_a)
fs_a.wait_for_daemons()
self.assertEqual(fs_a.get_active_names(), [mds_a])
# Start B, he should go into standby replay
self.mds_cluster.mds_restart(mds_b)
self.wait_for_daemon_start([mds_b])
- info_b = self.get_info_by_name(fs_a, mds_b)
+ info_b = self.get_info_by_name(mds_b)
self.assertEqual(info_b['state'], "up:standby-replay")
self.assertEqual(info_b['standby_for_name'], mds_a)
self.assertEqual(info_b['rank'], 0)
# Start C, he should go into standby (*not* replay)
self.mds_cluster.mds_restart(mds_c)
self.wait_for_daemon_start([mds_c])
- info_c = self.get_info_by_name(None, mds_c)
+ info_c = self.get_info_by_name(mds_c)
self.assertEqual(info_c['state'], "up:standby")
self.assertEqual(info_c['standby_for_name'], mds_a)
self.assertEqual(info_c['rank'], -1)
self.mds_cluster.mds_stop(mds_b)
self.mds_cluster.mds_fail(mds_b)
self.wait_until_equal(
- lambda: self.get_info_by_name(fs_a, mds_c)['state'],
+ lambda: self.get_info_by_name(mds_c)['state'],
"up:standby-replay",
60)
- info_c = self.get_info_by_name(fs_a, mds_c)
+ info_c = self.get_info_by_name(mds_c)
self.assertEqual(info_c['state'], "up:standby-replay")
self.assertEqual(info_c['standby_for_name'], mds_a)
self.assertEqual(info_c['rank'], 0)
self.set_standby_for(mds_b, mds_a, False)
# Create FS alpha and get mds_a to come up as active
- fs_a = self.mds_cluster.get_filesystem("alpha")
- fs_a.create()
+ fs_a = self.mds_cluster.newfs("alpha")
self.mds_cluster.mds_restart(mds_a)
fs_a.wait_for_daemons()
self.assertEqual(fs_a.get_active_names(), [mds_a])
self.wait_for_daemon_start([mds_b])
# See the standby come up as the correct rank
- info_b = self.get_info_by_name(fs_a, mds_b)
+ info_b = self.get_info_by_name(mds_b)
self.assertEqual(info_b['state'], "up:standby-replay")
self.assertEqual(info_b['standby_for_name'], mds_a)
self.assertEqual(info_b['rank'], 0)
self.set_standby_for(mds_b, mds_b_s, True)
# Create FS alpha and get mds_a to come up as active
- fs_a = self.mds_cluster.get_filesystem("alpha")
- fs_a.create()
+ fs_a = self.mds_cluster.newfs("alpha")
fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name,
'allow_multimds', "true",
"--yes-i-really-mean-it")
self.wait_for_daemon_start([mds_b_s])
self.mds_cluster.mds_restart(mds_a_s)
self.wait_for_daemon_start([mds_a_s])
- info_b_s = self.get_info_by_name(fs_a, mds_b_s)
+ info_b_s = self.get_info_by_name(mds_b_s)
self.assertEqual(info_b_s['state'], "up:standby-replay")
- info_a_s = self.get_info_by_name(fs_a, mds_a_s)
+ info_a_s = self.get_info_by_name(mds_a_s)
self.assertEqual(info_a_s['state'], "up:standby-replay")
# Shrink the cluster
def setUp(self):
super(TestMultiFilesystems, self).setUp()
- self.fs.mon_manager.raw_cluster_cmd("fs", "flag", "set",
- "enable_multiple", "true",
- "--yes-i-really-mean-it")
+ self.mds_cluster.mon_manager.raw_cluster_cmd("fs", "flag", "set",
+ "enable_multiple", "true",
+ "--yes-i-really-mean-it")
+
def _setup_two(self):
- fs_a = self.mds_cluster.get_filesystem("alpha")
- fs_b = self.mds_cluster.get_filesystem("bravo")
- fs_a.create()
- fs_b.create()
+ fs_a = self.mds_cluster.newfs("alpha")
+ fs_b = self.mds_cluster.newfs("bravo")
self.mds_cluster.mds_restart()
# Reconfigure client auth caps
for mount in self.mounts:
- self.fs.mon_manager.raw_cluster_cmd_result(
+ self.mds_cluster.mon_manager.raw_cluster_cmd_result(
'auth', 'caps', "client.{0}".format(mount.client_id),
'mds', 'allow',
'mon', 'allow r',
set_standby_for(mds_d, mds_c, False)
# Create FS alpha and get mds_a to come up as active
- fs_a = self.mds_cluster.get_filesystem("alpha")
- fs_a.create()
+ fs_a = self.mds_cluster.newfs("alpha")
self.mds_cluster.mds_restart(mds_a)
fs_a.wait_for_daemons()
self.assertEqual(fs_a.get_active_names(), [mds_a])
# Create FS bravo and get mds_c to come up as active
- fs_b = self.mds_cluster.get_filesystem("bravo")
- fs_b.create()
+ fs_b = self.mds_cluster.newfs("bravo")
self.mds_cluster.mds_restart(mds_c)
fs_b.wait_for_daemons()
self.assertEqual(fs_b.get_active_names(), [mds_c])
self.set_conf("mds.{0}".format(follower_id),
"mds_standby_for_fscid", fscid)
- fs_a = self.mds_cluster.get_filesystem("alpha")
- fs_a.create()
- fs_b = self.mds_cluster.get_filesystem("bravo")
- fs_b.create()
+ fs_a = self.mds_cluster.newfs("alpha")
+ fs_b = self.mds_cluster.newfs("bravo")
set_standby_for(0, fs_a, mds_a)
set_standby_for(0, fs_a, mds_b)
set_standby_for(0, fs_b, mds_c)
"mds_standby_for_fscid", fscid)
# Create two filesystems which should have two ranks each
- fs_a = self.mds_cluster.get_filesystem("alpha")
- fs_a.create()
+ fs_a = self.mds_cluster.newfs("alpha")
fs_a.mon_manager.raw_cluster_cmd("fs", "set", fs_a.name,
"allow_multimds", "true",
"--yes-i-really-mean-it")
- fs_b = self.mds_cluster.get_filesystem("bravo")
- fs_b.create()
+ fs_b = self.mds_cluster.newfs("bravo")
fs_b.mon_manager.raw_cluster_cmd("fs", "set", fs_b.name,
"allow_multimds", "true",
"--yes-i-really-mean-it")
# Set up
client_name = "client.{0}".format(self.mount_a.client_id)
new_pool_name = "data_new"
- self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', new_pool_name,
- self.fs.get_pgs_per_fs_pool().__str__())
- self.fs.mon_manager.raw_cluster_cmd('mds', 'add_data_pool', new_pool_name)
+ self.fs.add_data_pool(new_pool_name)
self.mount_a.run_shell(["touch", "layoutfile"])
self.mount_a.run_shell(["mkdir", "layoutdir"])
to use when selecting a random value from a range. To always use the maximum
value, set no_random to true. The config is a dict containing some or all of:
- seed: [no default] seed the random number generator
-
- randomize: [default: true] enables randomization and use the max/min values
-
- max_thrash: [default: 1] the maximum number of MDSs that will be thrashed at
+ max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
any given time.
max_thrash_delay: [default: 30] maximum number of seconds to delay before
thrashing again.
+ max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
+ the replay state before thrashing.
+
max_revive_delay: [default: 10] maximum number of seconds to delay before
- bringing back a thrashed MDS
+ bringing back a thrashed MDS.
- thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
- during replay. Value should be between 0.0 and 1.0
+ randomize: [default: true] enables randomization and use the max/min values
- max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
- the replay state before thrashing
+ seed: [no default] seed the random number generator
- thrash_weights: allows specific MDSs to be thrashed more/less frequently. This option
- overrides anything specified by max_thrash. This option is a dict containing
- mds.x: weight pairs. For example, [mds.a: 0.7, mds.b: 0.3, mds.c: 0.0]. Each weight
- is a value from 0.0 to 1.0. Any MDSs not specified will be automatically
- given a weight of 0.0. For a given MDS, by default the trasher delays for up
- to max_thrash_delay, trashes, waits for the MDS to recover, and iterates. If a non-zero
- weight is specified for an MDS, for each iteration the thrasher chooses whether to thrash
- during that iteration based on a random value [0-1] not exceeding the weight of that MDS.
+ thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
+ during replay. Value should be between 0.0 and 1.0.
+
+ thrash_max_mds: [default: 0.25] likelihood that the max_mds of the mds
+ cluster will be modified to a value [1, current) or (current, starting
+ max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
+ deactivated to reach the new max_mds. Value should be between 0.0 and 1.0.
+
+ thrash_weights: allows specific MDSs to be thrashed more/less frequently.
+ This option overrides anything specified by max_thrash. This option is a
+ dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
+ 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not
+ specified will be automatically given a weight of 0.0 (not thrashed).
+ For a given MDS, by default the trasher delays for up to
+ max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
+ If a non-zero weight is specified for an MDS, for each iteration the
+ thrasher chooses whether to thrash during that iteration based on a
+ random value [0-1] not exceeding the weight of that MDS.
Examples::
"""
- def __init__(self, ctx, manager, mds_cluster, config, logger, failure_group, weight):
+ def __init__(self, ctx, manager, config, logger, fs, max_mds):
super(MDSThrasher, self).__init__()
self.ctx = ctx
self.manager = manager
assert self.manager.is_clean()
- self.mds_cluster = mds_cluster
+ self.config = config
+ self.logger = logger
+ self.fs = fs
+ self.max_mds = max_mds
self.stopping = Event()
- self.logger = logger
- self.config = config
self.randomize = bool(self.config.get('randomize', True))
- self.max_thrash_delay = float(self.config.get('thrash_delay', 30.0))
+ self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.25))
+ self.max_thrash = int(self.config.get('max_thrash', 1))
+ self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
v=self.thrash_in_replay)
-
self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
-
self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
- self.failure_group = failure_group
- self.weight = weight
-
- # TODO support multiple filesystems: will require behavioural change to select
- # which filesystem to act on when doing rank-ish things
- self.fs = Filesystem(self.ctx)
-
def _run(self):
try:
self.do_thrash()
"powercycling requested but RemoteConsole is not "
"initialized. Check ipmi config.")
- def kill_mds_by_rank(self, rank):
- """
- kill_mds wrapper to kill based on rank passed.
- """
- status = self.mds_cluster.get_mds_info_by_rank(rank)
- self.kill_mds(status['name'])
-
def revive_mds(self, mds, standby_for_rank=None):
"""
Revive mds -- do an ipmpi powercycle (if indicated by the config)
args.extend(['--hot-standby', standby_for_rank])
self.ctx.daemons.get_daemon('mds', mds).restart(*args)
- def revive_mds_by_rank(self, rank, standby_for_rank=None):
- """
- revive_mds wrapper to revive based on rank passed.
- """
- status = self.mds_cluster.get_mds_info_by_rank(rank)
- self.revive_mds(status['name'], standby_for_rank)
-
- def get_mds_status_all(self):
- return self.fs.get_mds_map()
+ def wait_for_stable(self, rank = None, gid = None):
+ self.log('waiting for mds cluster to stabilize...')
+ status = self.fs.status()
+ itercount = 0
+ while True:
+ max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+ if rank is not None:
+ try:
+ info = status.get_rank(self.fs.id, rank)
+ if info['gid'] != gid:
+ self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
+ return status, info['name']
+ except:
+ pass # no rank present
+ else:
+ ranks = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, list(status.get_ranks(self.fs.id)))
+ count = len(ranks)
+ if count >= max_mds:
+ self.log('mds cluster has {count} alive and active, now stable!'.format(count = count))
+ return status, None
+ itercount = itercount + 1
+ if itercount > 10:
+ self.log('mds map: {status}'.format(status=self.fs.status()))
+ time.sleep(2)
+ status = self.fs.status()
def do_thrash(self):
"""
Perform the random thrashing action
"""
- self.log('starting mds_do_thrash for failure group: ' + ', '.join(
- ['mds.{_id}'.format(_id=_f) for _f in self.failure_group]))
+ self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
+ stats = {
+ "max_mds": 0,
+ "deactivate": 0,
+ "kill": 0,
+ }
+
while not self.stopping.is_set():
delay = self.max_thrash_delay
if self.randomize:
if self.stopping.is_set():
continue
- skip = random.randrange(0.0, 1.0)
- if self.weight < 1.0 and skip > self.weight:
- self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip,
- weight=self.weight))
- continue
-
- # find the active mds in the failure group
- statuses = [self.mds_cluster.get_mds_info(m) for m in self.failure_group]
- actives = filter(lambda s: s and s['state'] == 'up:active', statuses)
- assert len(actives) == 1, 'Can only have one active in a failure group'
-
- active_mds = actives[0]['name']
- active_rank = actives[0]['rank']
-
- self.log('kill mds.{id} (rank={r})'.format(id=active_mds, r=active_rank))
- self.kill_mds_by_rank(active_rank)
-
- # wait for mon to report killed mds as crashed
- last_laggy_since = None
- itercount = 0
- while True:
- failed = self.fs.get_mds_map()['failed']
- status = self.mds_cluster.get_mds_info(active_mds)
- if not status:
- break
- if 'laggy_since' in status:
- last_laggy_since = status['laggy_since']
- break
- if any([(f == active_mds) for f in failed]):
- break
- self.log(
- 'waiting till mds map indicates mds.{_id} is laggy/crashed, in failed state, or mds.{_id} is removed from mdsmap'.format(
- _id=active_mds))
- itercount = itercount + 1
- if itercount > 10:
- self.log('mds map: {status}'.format(status=self.mds_cluster.get_fs_map()))
- time.sleep(2)
- if last_laggy_since:
- self.log(
- 'mds.{_id} reported laggy/crashed since: {since}'.format(_id=active_mds, since=last_laggy_since))
- else:
- self.log('mds.{_id} down, removed from mdsmap'.format(_id=active_mds, since=last_laggy_since))
-
- # wait for a standby mds to takeover and become active
- takeover_mds = None
- takeover_rank = None
- itercount = 0
- while True:
- statuses = [self.mds_cluster.get_mds_info(m) for m in self.failure_group]
- actives = filter(lambda s: s and s['state'] == 'up:active', statuses)
- if len(actives) > 0:
- assert len(actives) == 1, 'Can only have one active in failure group'
- takeover_mds = actives[0]['name']
- takeover_rank = actives[0]['rank']
- break
- itercount = itercount + 1
- if itercount > 10:
- self.log('mds map: {status}'.format(status=self.mds_cluster.get_fs_map()))
-
- self.log('New active mds is mds.{_id}'.format(_id=takeover_mds))
-
- # wait for a while before restarting old active to become new
- # standby
- delay = self.max_revive_delay
- if self.randomize:
- delay = random.randrange(0.0, self.max_revive_delay)
-
- self.log('waiting for {delay} secs before reviving mds.{id}'.format(
- delay=delay, id=active_mds))
- time.sleep(delay)
-
- self.log('reviving mds.{id}'.format(id=active_mds))
- self.revive_mds(active_mds, standby_for_rank=takeover_rank)
-
- status = {}
- while True:
- status = self.mds_cluster.get_mds_info(active_mds)
- if status and (status['state'] == 'up:standby' or status['state'] == 'up:standby-replay'):
+ status = self.fs.status()
+
+ if random.randrange(0.0, 1.0) <= self.thrash_max_mds:
+ max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+ options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
+ if len(options) > 0:
+ sample = random.sample(options, 1)
+ new_max_mds = sample[0]
+ self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
+ self.fs.set_max_mds(new_max_mds)
+ stats['max_mds'] += 1
+
+ # Now randomly deactivate mds if we shrank
+ for rank in random.sample(range(1, max_mds), max(0, max_mds-new_max_mds)):
+ self.fs.deactivate(rank)
+ stats['deactivate'] += 1
+
+ status = self.wait_for_stable()[0]
+
+ count = 0
+ for info in status.get_ranks(self.fs.id):
+ name = info['name']
+ label = 'mds.' + name
+ rank = info['rank']
+ gid = info['gid']
+
+ # if thrash_weights isn't specified and we've reached max_thrash,
+ # we're done
+ count = count + 1
+ if 'thrash_weights' not in self.config and count > self.max_thrash:
break
- self.log(
- 'waiting till mds map indicates mds.{_id} is in standby or standby-replay'.format(_id=active_mds))
- time.sleep(2)
- self.log('mds.{_id} reported in {state} state'.format(_id=active_mds, state=status['state']))
- # don't do replay thrashing right now
- continue
- # this might race with replay -> active transition...
- if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
-
- delay = self.max_replay_thrash_delay
- if self.randomize:
- delay = random.randrange(0.0, self.max_replay_thrash_delay)
- time.sleep(delay)
- self.log('kill replaying mds.{id}'.format(id=self.to_kill))
- self.kill_mds(self.to_kill)
+ weight = 1.0
+ if 'thrash_weights' in self.config:
+ weight = self.config['thrash_weights'].get(label, '0.0')
+ skip = random.randrange(0.0, 1.0)
+ if weight <= skip:
+ self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
+ continue
+ self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
+ self.kill_mds(name)
+ stats['kill'] += 1
+
+ # wait for mon to report killed mds as crashed
+ last_laggy_since = None
+ itercount = 0
+ while True:
+ status = self.fs.status()
+ info = status.get_mds(name)
+ if not info:
+ break
+ if 'laggy_since' in info:
+ last_laggy_since = info['laggy_since']
+ break
+ if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
+ break
+ self.log(
+ 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
+ label=label))
+ itercount = itercount + 1
+ if itercount > 10:
+ self.log('mds map: {status}'.format(status=status))
+ time.sleep(2)
+
+ if last_laggy_since:
+ self.log(
+ '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
+ else:
+ self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
+
+ # wait for a standby mds to takeover and become active
+ status, takeover_mds = self.wait_for_stable(rank, gid)
+ self.log('New active mds is mds.{_id}'.format(_id=takeover_mds))
+
+ # wait for a while before restarting old active to become new
+ # standby
delay = self.max_revive_delay
if self.randomize:
delay = random.randrange(0.0, self.max_revive_delay)
- self.log('waiting for {delay} secs before reviving mds.{id}'.format(
- delay=delay, id=self.to_kill))
+ self.log('waiting for {delay} secs before reviving {label}'.format(
+ delay=delay, label=label))
time.sleep(delay)
- self.log('revive mds.{id}'.format(id=self.to_kill))
- self.revive_mds(self.to_kill)
+ self.log('reviving {label}'.format(label=label))
+ self.revive_mds(name)
+
+ while True:
+ status = self.fs.status()
+ info = status.get_mds(name)
+ if info and info['state'] in ('up:standby', 'up:standby-replay'):
+ self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
+ break
+ self.log(
+ 'waiting till mds map indicates {label} is in standby or standby-replay'.format(label=label))
+ time.sleep(2)
+
+ for stat in stats:
+ self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
+
+ # don't do replay thrashing right now
+# for info in status.get_replays(self.fs.id):
+# # this might race with replay -> active transition...
+# if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
+# delay = self.max_replay_thrash_delay
+# if self.randomize:
+# delay = random.randrange(0.0, self.max_replay_thrash_delay)
+# time.sleep(delay)
+# self.log('kill replaying mds.{id}'.format(id=self.to_kill))
+# self.kill_mds(self.to_kill)
+#
+# delay = self.max_revive_delay
+# if self.randomize:
+# delay = random.randrange(0.0, self.max_revive_delay)
+#
+# self.log('waiting for {delay} secs before reviving mds.{id}'.format(
+# delay=delay, id=self.to_kill))
+# time.sleep(delay)
+#
+# self.log('revive mds.{id}'.format(id=self.to_kill))
+# self.revive_mds(self.to_kill)
@contextlib.contextmanager
log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
random.seed(seed)
- max_thrashers = config.get('max_thrash', 1)
- thrashers = {}
-
(first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
manager = ceph_manager.CephManager(
first, ctx=ctx, logger=log.getChild('ceph_manager'),
# make sure everyone is in active, standby, or standby-replay
log.info('Wait for all MDSs to reach steady state...')
- statuses = None
- statuses_by_rank = None
+ status = mds_cluster.status()
while True:
- statuses = {m: mds_cluster.get_mds_info(m) for m in mdslist}
- statuses_by_rank = {}
- for _, s in statuses.iteritems():
- if isinstance(s, dict):
- statuses_by_rank[s['rank']] = s
-
- ready = filter(lambda (_, s): s is not None and (s['state'] == 'up:active'
- or s['state'] == 'up:standby'
- or s['state'] == 'up:standby-replay'),
- statuses.items())
- if len(ready) == len(statuses):
+ steady = True
+ for info in status.get_all():
+ state = info['state']
+ if state not in ('up:active', 'up:standby', 'up:standby-replay'):
+ steady = False
+ break
+ if steady:
break
time.sleep(2)
+ status = mds_cluster.status()
log.info('Ready to start thrashing')
- # setup failure groups
- failure_groups = {}
- actives = {s['name']: s for (_, s) in statuses.iteritems() if s['state'] == 'up:active'}
- log.info('Actives is: {d}'.format(d=actives))
- log.info('Statuses is: {d}'.format(d=statuses_by_rank))
- for active in actives:
- for (r, s) in statuses.iteritems():
- if s['standby_for_name'] == active:
- if not active in failure_groups:
- failure_groups[active] = []
- log.info('Assigning mds rank {r} to failure group {g}'.format(r=r, g=active))
- failure_groups[active].append(r)
-
manager.wait_for_clean()
- for (active, standbys) in failure_groups.iteritems():
- weight = 1.0
- if 'thrash_weights' in config:
- weight = int(config['thrash_weights'].get('mds.{_id}'.format(_id=active), '0.0'))
-
- failure_group = [active]
- failure_group.extend(standbys)
-
+ thrashers = {}
+ for fs in status.get_filesystems():
+ name = fs['mdsmap']['fs_name']
+ log.info('Running thrasher against FS {f}'.format(f = name))
thrasher = MDSThrasher(
- ctx, manager, mds_cluster, config,
- logger=log.getChild('mds_thrasher.failure_group.[{a}, {sbs}]'.format(
- a=active,
- sbs=', '.join(standbys)
+ ctx, manager, config,
+ log.getChild('fs.[{f}]'.format(f = name)),
+ Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds']
)
- ),
- failure_group=failure_group,
- weight=weight)
thrasher.start()
- thrashers[active] = thrasher
-
- # if thrash_weights isn't specified and we've reached max_thrash,
- # we're done
- if 'thrash_weights' not in config and len(thrashers) == max_thrashers:
- break
+ thrashers[name] = thrasher
try:
log.debug('Yielding')
yield
finally:
log.info('joining mds_thrashers')
- for t in thrashers:
- log.info('join thrasher for failure group [{fg}]'.format(fg=', '.join(failure_group)))
- thrashers[t].stop()
- thrashers[t].get() # Raise any exception from _run()
- thrashers[t].join()
+ for name in thrashers:
+ log.info('join thrasher mds_thrasher.fs.[{f}]'.format(f=name))
+ thrashers[name].stop()
+ thrashers[name].get() # Raise any exception from _run()
+ thrashers[name].join()
log.info('done joining')