From 53f22d9493408064735e0cad486a08b067839546 Mon Sep 17 00:00:00 2001 From: Sam Lang Date: Wed, 9 Jan 2013 16:02:42 -0600 Subject: [PATCH] task/mds_thrasher: New task for thrashing the mds Signed-off-by: Sam Lang --- teuthology/misc.py | 10 +- teuthology/task/ceph.py | 11 +- teuthology/task/ceph_manager.py | 45 +++++ teuthology/task/mds_thrash.py | 309 ++++++++++++++++++++++++++++++++ 4 files changed, 368 insertions(+), 7 deletions(-) create mode 100644 teuthology/task/mds_thrash.py diff --git a/teuthology/misc.py b/teuthology/misc.py index 0de60dbfe91e5..ab188b7c94edc 100644 --- a/teuthology/misc.py +++ b/teuthology/misc.py @@ -128,11 +128,13 @@ def skeleton_config(roles, ips): conf.setdefault(role, {}) conf[role]['mon addr'] = addr # set up standby mds's - for roles in roles: - for role in roles: - if role.startswith('mds.') and role.endswith('-s'): + for roles_subset in roles: + for role in roles_subset: + if role.startswith('mds.'): conf.setdefault(role, {}) - conf[role]['mds standby for name'] = role[:-2] + if role.find('-s-') != -1: + standby_mds = role[role.find('-s-')+3:] + conf[role]['mds standby for name'] = standby_mds return conf def roles_of_type(roles_for_host, type_): diff --git a/teuthology/task/ceph.py b/teuthology/task/ceph.py index 2a0b0fd6383b5..c84513119cd31 100644 --- a/teuthology/task/ceph.py +++ b/teuthology/task/ceph.py @@ -41,12 +41,16 @@ class DaemonState(object): self.proc = None self.log.info('Stopped') - def restart(self): + def restart(self, *args, **kwargs): self.log.info('Restarting') if self.proc is not None: self.log.debug('stopping old one...') self.stop() - self.proc = self.remote.run(*self.command_args, **self.command_kwargs) + cmd_args = list(self.command_args) + cmd_args.extend(args) + cmd_kwargs = self.command_kwargs + cmd_kwargs.update(kwargs) + self.proc = self.remote.run(*cmd_args, **cmd_kwargs) self.log.info('Started') def running(self): @@ -841,7 +845,7 @@ def run_daemon(ctx, config, type_): for id_ in teuthology.roles_of_type(roles_for_host, type_): name = '%s.%s' % (type_, id_) - if not id_.endswith('-s'): + if not (id_.endswith('-s')) and (id_.find('-s-') == -1): num_active += 1 run_cmd = [ @@ -870,6 +874,7 @@ def run_daemon(ctx, config, type_): run_cmd.extend([ 'env', 'CPUPROFILE=%s' % profile_path ]) run_cmd.extend(run_cmd_tail) + ctx.daemons.add_daemon(remote, type_, id_, args=run_cmd, logger=log.getChild(name), diff --git a/teuthology/task/ceph_manager.py b/teuthology/task/ceph_manager.py index 51c2f2febc9e7..e0bff247b3eac 100644 --- a/teuthology/task/ceph_manager.py +++ b/teuthology/task/ceph_manager.py @@ -611,3 +611,48 @@ class CephManager: if debug: self.log('health:\n{h}'.format(h=out)) return json.loads(out) + + ## metadata servers + + def kill_mds(self, mds): + self.ctx.daemons.get_daemon('mds', mds).stop() + + def kill_mds_by_rank(self, rank): + status = self.get_mds_status_by_rank(rank) + self.ctx.daemons.get_daemon('mds', status['name']).stop() + + def revive_mds(self, mds, standby_for_rank=None): + args = [] + if standby_for_rank: + args.extend(['--hot-standby', standby_for_rank]) + self.ctx.daemons.get_daemon('mds', mds).restart(*args) + + def revive_mds_by_rank(self, rank, standby_for_rank=None): + args = [] + if standby_for_rank: + args.extend(['--hot-standby', standby_for_rank]) + status = self.get_mds_status_by_rank(rank) + self.ctx.daemons.get_daemon('mds', status['name']).restart(*args) + + def get_mds_status(self, mds): + out = self.raw_cluster_cmd('mds', 'dump', '--format=json') + j = json.loads(' '.join(out.splitlines()[1:])) + # collate; for dup ids, larger gid wins. + for info in j['info'].itervalues(): + if info['name'] == mds: + return info + return None + + def get_mds_status_by_rank(self, rank): + out = self.raw_cluster_cmd('mds', 'dump', '--format=json') + j = json.loads(' '.join(out.splitlines()[1:])) + # collate; for dup ids, larger gid wins. + for info in j['info'].itervalues(): + if info['rank'] == rank: + return info + return None + + def get_mds_status_all(self): + out = self.raw_cluster_cmd('mds', 'dump', '--format=json') + j = json.loads(' '.join(out.splitlines()[1:])) + return j diff --git a/teuthology/task/mds_thrash.py b/teuthology/task/mds_thrash.py new file mode 100644 index 0000000000000..d143b4f7a9722 --- /dev/null +++ b/teuthology/task/mds_thrash.py @@ -0,0 +1,309 @@ +import logging +import contextlib +import ceph_manager +import random +import time +import gevent +from teuthology import misc as teuthology + +log = logging.getLogger(__name__) + +class MDSThrasher: + """ + MDSThrasher:: + + The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc). + + The config is optional. Many of the config parameters are a a maximum value + to use when selecting a random value from a range. To always use the maximum + value, set no_random to true. The config is a dict containing some or all of: + + seed: [no default] seed the random number generator + + randomize: [default: true] enables randomization and use the max/min values + + max_thrash: [default: 1] the maximum number of MDSs that will be thrashed at + any given time. + + max_thrash_delay: [default: 30] maximum number of seconds to delay before + thrashing again. + + max_revive_delay: [default: 10] maximum number of seconds to delay before + bringing back a thrashed MDS + + thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed + during replay. Value should be between 0.0 and 1.0 + + max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in + the replay state before thrashing + + thrash_weights: allows specific MDSs to be thrashed more/less frequently. This option + overrides anything specified by max_thrash. This option is a dict containing + mds.x: weight pairs. For example, [mds.a: 0.7, mds.b: 0.3, mds.c: 0.0]. Each weight + is a value from 0.0 to 1.0. Any MDSs not specified will be automatically + given a weight of 0.0. For a given MDS, by default the trasher delays for up + to max_thrash_delay, trashes, waits for the MDS to recover, and iterates. If a non-zero + weight is specified for an MDS, for each iteration the thrasher chooses whether to thrash + during that iteration based on a random value [0-1] not exceeding the weight of that MDS. + + Examples:: + + + The following example sets the likelihood that mds.a will be thrashed + to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the + likelihood that an MDS will be thrashed in replay to 40%. + Thrash weights do not have to sum to 1. + + tasks: + - ceph: + - mds_thrash: + thrash_weights: + - mds.a: 0.8 + - mds.b: 0.2 + thrash_in_replay: 0.4 + - ceph-fuse: + - workunit: + clients: + all: [suites/fsx.sh] + + The following example disables randomization, and uses the max delay values: + + tasks: + - ceph: + - mds_thrash: + max_thrash_delay: 10 + max_revive_delay: 1 + max_replay_thrash_delay: 4 + + """ + def __init__(self, ctx, manager, config, logger, failure_group, weight): + self.ctx = ctx + self.manager = manager + self.manager.wait_for_clean() + + self.stopping = False + self.logger = logger + self.config = config + + self.randomize = bool(self.config.get('randomize', True)) + self.max_thrash_delay = float(self.config.get('thrash_delay', 30.0)) + self.thrash_in_replay = float(self.config.get('thrash_in_replay', False)) + assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(v=self.thrash_in_replay) + + self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0)) + + self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0)) + + self.thread = gevent.spawn(self.do_thrash) + + self.failure_group = failure_group + self.weight = weight + + def log(self, x): + self.logger.info(x) + + def do_join(self): + self.stopping = True + self.thread.get() + + def do_thrash(self): + self.log('starting mds_do_thrash for failure group: ' + ', '.join(['mds.{_id}'.format(_id=_f) for _f in self.failure_group])) + while not self.stopping: + delay = self.max_thrash_delay + if self.randomize: + delay = random.randrange(0.0, self.max_thrash_delay) + + if delay > 0.0: + self.log('waiting for {delay} secs before thrashing'.format(delay=delay)) + time.sleep(delay) + + skip = random.randrange(0.0, 1.0) + if self.weight < 1.0 and skip > self.weight: + self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight)) + continue + + # find the active mds in the failure group + statuses = [self.manager.get_mds_status(m) for m in self.failure_group] + actives = filter(lambda s: s['state'] == 'up:active', statuses) + assert len(actives) == 1, 'Can only have one active in a failure group' + + active_mds = actives[0]['name'] + active_rank = actives[0]['rank'] + + self.log('kill mds.{id} (rank={r})'.format(id=active_mds, r=active_rank)) + self.manager.kill_mds_by_rank(active_rank) + + # wait for mon to report killed mds as crashed + status = {} + last_laggy_since = None + while True: + failed = self.manager.get_mds_status_all()['failed'] + status = self.manager.get_mds_status(active_mds) + if not status: + break + if 'laggy_since' in status: + last_laggy_since = status['laggy_since'] + break + if any([(f == active_mds) for f in failed]): + break + self.log('waiting till mds map indicates mds.{_id} is laggy/crashed, in failed state, or mds.{_id} is removed from mdsmap'.format(_id=active_mds)) + time.sleep(2) + if last_laggy_since: + self.log('mds.{_id} reported laggy/crashed since: {since}'.format(_id=active_mds, since=last_laggy_since)) + else: + self.log('mds.{_id} down, removed from mdsmap'.format(_id=active_mds, since=last_laggy_since)) + + # wait for a standby mds to takeover and become active + takeover_mds = None + takeover_rank = None + while True: + statuses = [self.manager.get_mds_status(m) for m in self.failure_group] + actives = filter(lambda s: s and s['state'] == 'up:active', statuses) + if len(actives) > 0: + assert len(actives) == 1, 'Can only have one active in failure group' + takeover_mds = actives[0]['name'] + takeover_rank = actives[0]['rank'] + break + + self.log('New active mds is mds.{_id}'.format(_id=takeover_mds)) + + # wait for a while before restarting old active to become new + # standby + delay = self.max_revive_delay + if self.randomize: + delay = random.randrange(0.0, self.max_revive_delay) + + self.log('waiting for {delay} secs before reviving mds.{id}'.format( + delay=delay,id=active_mds)) + time.sleep(delay) + + self.log('reviving mds.{id}'.format(id=active_mds)) + self.manager.revive_mds(active_mds, standby_for_rank=takeover_rank) + + status = {} + while True: + status = self.manager.get_mds_status(active_mds) + if status and (status['state'] == 'up:standby' or status['state'] == 'up:standby-replay'): + break + self.log('waiting till mds map indicates mds.{_id} is in standby or standby-replay'.format(_id=active_mds)) + time.sleep(2) + self.log('mds.{_id} reported in {state} state'.format(_id=active_mds, state=status['state'])) + + # don't do replay thrashing right now + continue + # this might race with replay -> active transition... + if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay: + + delay = self.max_replay_thrash_delay + if self.randomize: + delay = random.randrange(0.0, self.max_replay_thrash_delay) + time.sleep(delay) + self.log('kill replaying mds.{id}'.format(id=self.to_kill)) + self.manager.kill_mds(self.to_kill) + + delay = self.max_revive_delay + if self.randomize: + delay = random.randrange(0.0, self.max_revive_delay) + + self.log('waiting for {delay} secs before reviving mds.{id}'.format( + delay=delay,id=self.to_kill)) + time.sleep(delay) + + self.log('revive mds.{id}'.format(id=self.to_kill)) + self.manager.revive_mds(self.to_kill) + +@contextlib.contextmanager +def task(ctx, config): + """ + Stress test the mds by thrashing while another task/workunit + is running. + + Please refer to MDSThrasher class for further information on the + available options. + """ + if config is None: + config = {} + assert isinstance(config, dict), \ + 'mds_thrash task only accepts a dict for configuration' + mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds')) + assert len(mdslist) > 1, \ + 'mds_thrash task requires at least 2 metadata servers' + + # choose random seed + seed = None + if 'seed' in config: + seed = int(config['seed']) + else: + seed = int(time.time()) + log.info('mds thrasher using random seed: {seed}'.format(seed=seed)) + random.seed(seed) + + max_thrashers = config.get('max_thrash', 1) + managers = {} + thrashers = {} + + (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys() + manager = ceph_manager.CephManager( + first, ctx=ctx, logger=log.getChild('ceph_manager'), + ) + + statuses = {m : manager.get_mds_status(m) for m in mdslist} + statuses_by_rank = {s['rank'] : s for (_,s) in statuses.iteritems()} + + log.info('Wait for all MDSs to reach steady state...') + # make sure everyone is in active, standby, or standby-replay + while True: + ready = filter(lambda (_,s): s['state'] == 'up:active' + or s['state'] == 'up:standby' + or s['state'] == 'up:standby-replay', + statuses.items()) + if len(ready) == len(statuses): + break + time.sleep(2) + log.info('Ready to start thrashing') + + # setup failure groups + failure_groups = {} + actives = {s['name'] : s for (_,s) in statuses.iteritems() if s['state'] == 'up:active'} + log.info('Actives is: {d}'.format(d=actives)) + log.info('Statuses is: {d}'.format(d=statuses_by_rank)) + for active in actives: + for (r,s) in statuses.iteritems(): + if s['standby_for_name'] == active: + if not active in failure_groups: + failure_groups[active] = [] + log.info('Assigning mds rank {r} to failure group {g}'.format(r=r, g=active)) + failure_groups[active].append(r) + + for (active,standbys) in failure_groups.iteritems(): + + weight = 1.0 + if 'thrash_weights' in config: + weight = int(config['thrash_weights'].get('mds.{_id}'.format(_id=active), '0.0')) + + failure_group = [active] + failure_group.extend(standbys) + thrashers[active] = MDSThrasher( + ctx, manager, config, + logger=log.getChild('mds_thrasher.failure_group.[{a}, {sbs}]'.format( + a=active, + sbs=', '.join(standbys) + ) + ), + failure_group=failure_group, + weight=weight) + # if thrash_weights isn't specified and we've reached max_thrash, + # we're done + if not 'thrash_weights' in config and len(thrashers) == max_thrashers: + break + + try: + log.debug('Yielding') + yield + finally: + log.info('joining mds_thrashers') + for t in thrashers: + log.info('join thrasher for failure group [{fg}]'.format(fg=', '.join(failure_group)) + ) + thrashers[t].do_join() + log.info('done joining') -- 2.39.5