+"""
+Handle clock skews in monitors.
+"""
import logging
import contextlib
import ceph_manager
log = logging.getLogger(__name__)
class ClockSkewCheck:
- """
- Periodically check if there are any clock skews among the monitors in the
- quorum. By default, assume no skews are supposed to exist; that can be
- changed using the 'expect-skew' option. If 'fail-on-skew' is set to false,
- then we will always succeed and only report skews if any are found.
-
- This class does not spawn a thread. It assumes that, if that is indeed
- wanted, it should be done by a third party (for instance, the task using
- this class). We intend it as such in order to reuse this class if need be.
-
- This task accepts the following options:
-
- interval amount of seconds to wait in-between checks. (default: 30.0)
- max-skew maximum skew, in seconds, that is considered tolerable before
- issuing a warning. (default: 0.05)
- expect-skew 'true' or 'false', to indicate whether to expect a skew during
- the run or not. If 'true', the test will fail if no skew is
- found, and succeed if a skew is indeed found; if 'false', it's
- the other way around. (default: false)
- never-fail Don't fail the run if a skew is detected and we weren't
- expecting it, or if no skew is detected and we were expecting
- it. (default: False)
-
- at-least-once Runs at least once, even if we are told to stop.
- (default: True)
- at-least-once-timeout If we were told to stop but we are attempting to
- run at least once, timeout after this many seconds.
- (default: 600)
-
- Example:
- Expect a skew higher than 0.05 seconds, but only report it without failing
- the teuthology run.
+ """
+ Periodically check if there are any clock skews among the monitors in the
+ quorum. By default, assume no skews are supposed to exist; that can be
+ changed using the 'expect-skew' option. If 'fail-on-skew' is set to false,
+ then we will always succeed and only report skews if any are found.
+
+ This class does not spawn a thread. It assumes that, if that is indeed
+ wanted, it should be done by a third party (for instance, the task using
+ this class). We intend it as such in order to reuse this class if need be.
+
+ This task accepts the following options:
+
+ interval amount of seconds to wait in-between checks. (default: 30.0)
+ max-skew maximum skew, in seconds, that is considered tolerable before
+ issuing a warning. (default: 0.05)
+ expect-skew 'true' or 'false', to indicate whether to expect a skew during
+ the run or not. If 'true', the test will fail if no skew is
+ found, and succeed if a skew is indeed found; if 'false', it's
+ the other way around. (default: false)
+ never-fail Don't fail the run if a skew is detected and we weren't
+ expecting it, or if no skew is detected and we were expecting
+ it. (default: False)
+
+ at-least-once Runs at least once, even if we are told to stop.
+ (default: True)
+ at-least-once-timeout If we were told to stop but we are attempting to
+ run at least once, timeout after this many seconds.
+ (default: 600)
+
+ Example:
+ Expect a skew higher than 0.05 seconds, but only report it without
+ failing the teuthology run.
- mon_clock_skew_check:
interval: 30
max-skew: 0.05
expect_skew: true
never-fail: true
- """
-
- def __init__(self, ctx, manager, config, logger):
- self.ctx = ctx
- self.manager = manager;
-
- self.stopping = False
- self.logger = logger
- self.config = config
-
- if self.config is None:
- self.config = dict()
-
- self.check_interval = float(self.config.get('interval', 30.0))
-
- first_mon = teuthology.get_first_mon(ctx, config)
- remote = ctx.cluster.only(first_mon).remotes.keys()[0]
- proc = remote.run(
- args=[
- 'sudo',
- 'ceph-mon',
- '-i', first_mon[4:],
- '--show-config-value', 'mon_clock_drift_allowed'
- ], stdout=StringIO(), wait=True
- )
- self.max_skew = self.config.get('max-skew', float(proc.stdout.getvalue()))
-
- self.expect_skew = self.config.get('expect-skew', False)
- self.never_fail = self.config.get('never-fail', False)
- self.at_least_once = self.config.get('at-least-once', True)
- self.at_least_once_timeout = self.config.get('at-least-once-timeout', 600.0)
-
- def info(self, x):
- self.logger.info(x)
-
- def warn(self, x):
- self.logger.warn(x)
-
- def debug(self, x):
- self.logger.debug(x)
-
- def finish(self):
- self.stopping = True
-
- def sleep_interval(self):
- if self.check_interval > 0.0:
- self.debug('sleeping for {s} seconds'.format(
- s=self.check_interval))
- time.sleep(self.check_interval)
-
- def print_skews(self, skews):
- total = len(skews)
- if total > 0:
- self.info('---------- found {n} skews ----------'.format(n=total))
- for mon_id,values in skews.iteritems():
- self.info('mon.{id}: {v}'.format(id=mon_id,v=values))
- self.info('-------------------------------------')
- else:
- self.info('---------- no skews were found ----------')
-
- def do_check(self):
- self.info('start checking for clock skews')
- skews = dict()
- ran_once = False
- started_on = None
-
- while not self.stopping or (self.at_least_once and not ran_once):
-
- if self.at_least_once and not ran_once and self.stopping:
- if started_on is None:
- self.info('kicking-off timeout (if any)')
- started_on = time.time()
- elif self.at_least_once_timeout > 0.0:
- assert time.time() - started_on < self.at_least_once_timeout, \
- 'failed to obtain a timecheck before timeout expired'
-
- quorum_size = len(teuthology.get_mon_names(self.ctx))
- self.manager.wait_for_mon_quorum_size(quorum_size)
-
- health = self.manager.get_mon_health(True)
- timechecks = health['timechecks']
-
- clean_check = False
-
- if timechecks['round_status'] == 'finished':
- assert (timechecks['round'] % 2) == 0, \
- 'timecheck marked as finished but round ' \
- 'disagrees (r {r})'.format(
- r=timechecks['round'])
- clean_check = True
- else:
- assert timechecks['round_status'] == 'on-going', \
- 'timecheck status expected \'on-going\' ' \
- 'but found \'{s}\' instead'.format(
- s=timechecks['round_status'])
- if 'mons' in timechecks.keys() and len(timechecks['mons']) > 1:
- self.info('round still on-going, but there are available reports')
+ """
+
+ def __init__(self, ctx, manager, config, logger):
+ self.ctx = ctx
+ self.manager = manager
+
+ self.stopping = False
+ self.logger = logger
+ self.config = config
+
+ if self.config is None:
+ self.config = dict()
+
+ self.check_interval = float(self.config.get('interval', 30.0))
+
+ first_mon = teuthology.get_first_mon(ctx, config)
+ remote = ctx.cluster.only(first_mon).remotes.keys()[0]
+ proc = remote.run(
+ args=[
+ 'sudo',
+ 'ceph-mon',
+ '-i', first_mon[4:],
+ '--show-config-value', 'mon_clock_drift_allowed'
+ ], stdout=StringIO(), wait=True
+ )
+ self.max_skew = self.config.get('max-skew', float(proc.stdout.getvalue()))
+
+ self.expect_skew = self.config.get('expect-skew', False)
+ self.never_fail = self.config.get('never-fail', False)
+ self.at_least_once = self.config.get('at-least-once', True)
+ self.at_least_once_timeout = self.config.get('at-least-once-timeout', 600.0)
+
+ def info(self, x):
+ """
+ locally define logger for info messages
+ """
+ self.logger.info(x)
+
+ def warn(self, x):
+ """
+ locally define logger for warnings
+ """
+ self.logger.warn(x)
+
+ def debug(self, x):
+ """
+ locally define logger for debug messages
+ """
+ self.logger.info(x)
+ self.logger.debug(x)
+
+ def finish(self):
+ """
+ Break out of the do_check loop.
+ """
+ self.stopping = True
+
+ def sleep_interval(self):
+ """
+ If a sleep interval is set, sleep for that amount of time.
+ """
+ if self.check_interval > 0.0:
+ self.debug('sleeping for {s} seconds'.format(
+ s=self.check_interval))
+ time.sleep(self.check_interval)
+
+ def print_skews(self, skews):
+ """
+ Display skew values.
+ """
+ total = len(skews)
+ if total > 0:
+ self.info('---------- found {n} skews ----------'.format(n=total))
+ for mon_id, values in skews.iteritems():
+ self.info('mon.{id}: {v}'.format(id=mon_id, v=values))
+ self.info('-------------------------------------')
else:
- self.info('no timechecks available just yet')
- self.sleep_interval()
- continue
-
- assert len(timechecks['mons']) > 1, \
- 'there are not enough reported timechecks; ' \
- 'expected > 1 found {n}'.format(n=len(timechecks['mons']))
-
- for check in timechecks['mons']:
- mon_skew = float(check['skew'])
- mon_health = check['health']
- mon_id = check['name']
- if abs(mon_skew) > self.max_skew:
- assert mon_health == 'HEALTH_WARN', \
- 'mon.{id} health is \'{health}\' but skew {s} > max {ms}'.format(
- id=mon_id,health=mon_health,s=abs(mon_skew),ms=self.max_skew)
-
- log_str = 'mon.{id} with skew {s} > max {ms}'.format(
- id=mon_id,s=abs(mon_skew),ms=self.max_skew)
-
- """ add to skew list """
- details = check['details']
- skews[mon_id] = {'skew': mon_skew, 'details': details}
-
- if self.expect_skew:
- self.info('expected skew: {str}'.format(str=log_str))
- else:
- self.warn('unexpected skew: {str}'.format(str=log_str))
-
- if clean_check or (self.expect_skew and len(skews) > 0):
- ran_once = True
+ self.info('---------- no skews were found ----------')
+
+ def do_check(self):
+ """
+ Clock skew checker. Loops until finish() is called.
+ """
+ self.info('start checking for clock skews')
+ skews = dict()
+ ran_once = False
+
+ started_on = None
+
+ while not self.stopping or (self.at_least_once and not ran_once):
+
+ if self.at_least_once and not ran_once and self.stopping:
+ if started_on is None:
+ self.info('kicking-off timeout (if any)')
+ started_on = time.time()
+ elif self.at_least_once_timeout > 0.0:
+ assert time.time() - started_on < self.at_least_once_timeout, \
+ 'failed to obtain a timecheck before timeout expired'
+
+ quorum_size = len(teuthology.get_mon_names(self.ctx))
+ self.manager.wait_for_mon_quorum_size(quorum_size)
+
+ health = self.manager.get_mon_health(True)
+ timechecks = health['timechecks']
+
+ clean_check = False
+
+ if timechecks['round_status'] == 'finished':
+ assert (timechecks['round'] % 2) == 0, \
+ 'timecheck marked as finished but round ' \
+ 'disagrees (r {r})'.format(
+ r=timechecks['round'])
+ clean_check = True
+ else:
+ assert timechecks['round_status'] == 'on-going', \
+ 'timecheck status expected \'on-going\' ' \
+ 'but found \'{s}\' instead'.format(
+ s=timechecks['round_status'])
+ if 'mons' in timechecks.keys() and len(timechecks['mons']) > 1:
+ self.info('round still on-going, but there are available reports')
+ else:
+ self.info('no timechecks available just yet')
+ self.sleep_interval()
+ continue
+
+ assert len(timechecks['mons']) > 1, \
+ 'there are not enough reported timechecks; ' \
+ 'expected > 1 found {n}'.format(n=len(timechecks['mons']))
+
+ for check in timechecks['mons']:
+ mon_skew = float(check['skew'])
+ mon_health = check['health']
+ mon_id = check['name']
+ if abs(mon_skew) > self.max_skew:
+ assert mon_health == 'HEALTH_WARN', \
+ 'mon.{id} health is \'{health}\' but skew {s} > max {ms}'.format(
+ id=mon_id,health=mon_health,s=abs(mon_skew),ms=self.max_skew)
+
+ log_str = 'mon.{id} with skew {s} > max {ms}'.format(
+ id=mon_id,s=abs(mon_skew),ms=self.max_skew)
+
+ """ add to skew list """
+ details = check['details']
+ skews[mon_id] = {'skew': mon_skew, 'details': details}
+
+ if self.expect_skew:
+ self.info('expected skew: {str}'.format(str=log_str))
+ else:
+ self.warn('unexpected skew: {str}'.format(str=log_str))
+
+ if clean_check or (self.expect_skew and len(skews) > 0):
+ ran_once = True
+ self.print_skews(skews)
+ self.sleep_interval()
+
+ total = len(skews)
self.print_skews(skews)
- self.sleep_interval()
-
- total = len(skews)
- self.print_skews(skews)
- error_str = ''
- found_error = False
+ error_str = ''
+ found_error = False
- if self.expect_skew:
- if total == 0:
- error_str = 'We were expecting a skew, but none was found!'
- found_error = True
- else:
- if total > 0:
- error_str = 'We were not expecting a skew, but we did find it!'
- found_error = True
+ if self.expect_skew:
+ if total == 0:
+ error_str = 'We were expecting a skew, but none was found!'
+ found_error = True
+ else:
+ if total > 0:
+ error_str = 'We were not expecting a skew, but we did find it!'
+ found_error = True
- if found_error:
- self.info(error_str)
- if not self.never_fail:
- assert False, error_str
+ if found_error:
+ self.info(error_str)
+ if not self.never_fail:
+ assert False, error_str
@contextlib.contextmanager
def task(ctx, config):
- """
- Use clas ClockSkewCheck to check for clock skews on the monitors.
- This task will spawn a thread running ClockSkewCheck's do_check().
-
- All the configuration will be directly handled by ClockSkewCheck,
- so please refer to the class documentation for further information.
- """
- if config is None:
- config = {}
- assert isinstance(config, dict), \
- 'mon_clock_skew_check task only accepts a dict for configuration'
- log.info('Beginning mon_clock_skew_check...')
- first_mon = teuthology.get_first_mon(ctx, config)
- (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
- manager = ceph_manager.CephManager(
- mon,
- ctx=ctx,
- logger=log.getChild('ceph_manager'),
- )
-
- skew_check = ClockSkewCheck(ctx,
- manager, config,
- logger=log.getChild('mon_clock_skew_check'))
- skew_check_thread = gevent.spawn(skew_check.do_check)
- try:
- yield
- finally:
- log.info('joining mon_clock_skew_check')
- skew_check.finish()
- skew_check_thread.get()
+ """
+ Use clas ClockSkewCheck to check for clock skews on the monitors.
+ This task will spawn a thread running ClockSkewCheck's do_check().
+
+ All the configuration will be directly handled by ClockSkewCheck,
+ so please refer to the class documentation for further information.
+ """
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ 'mon_clock_skew_check task only accepts a dict for configuration'
+ log.info('Beginning mon_clock_skew_check...')
+ first_mon = teuthology.get_first_mon(ctx, config)
+ (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+ manager = ceph_manager.CephManager(
+ mon,
+ ctx=ctx,
+ logger=log.getChild('ceph_manager'),
+ )
+
+ skew_check = ClockSkewCheck(ctx,
+ manager, config,
+ logger=log.getChild('mon_clock_skew_check'))
+ skew_check_thread = gevent.spawn(skew_check.do_check)
+ try:
+ yield
+ finally:
+ log.info('joining mon_clock_skew_check')
+ skew_check.finish()
+ skew_check_thread.get()
+"""
+Monitor thrash
+"""
import logging
import contextlib
import ceph_manager
log = logging.getLogger(__name__)
def _get_mons(ctx):
- mons = [f[len('mon.'):] for f in teuthology.get_mon_names(ctx)]
- return mons
+ """
+ Get monitor names from the context value.
+ """
+ mons = [f[len('mon.'):] for f in teuthology.get_mon_names(ctx)]
+ return mons
class MonitorThrasher:
- """
- How it works::
+ """
+ How it works::
- pick a monitor
- kill it
- wait for quorum to be formed
- sleep for 'thrash_delay' seconds
- Options::
+ Options::
seed Seed to use on the RNG to reproduce a previous
behaviour (default: None; i.e., not set)
the monitor (default: 10)
thrash_delay Number of seconds to wait in-between
test iterations (default: 0)
- thrash_store Thrash monitor store before killing the monitor
- being thrashed (default: False)
+ thrash_store Thrash monitor store before killing the monitor being thrashed (default: False)
thrash_store_probability Probability of thrashing a monitor's store
(default: 50)
thrash_many Thrash multiple monitors instead of just one. If
Note: if 'store-thrash' is set to True, then 'maintain-quorum' must also
be set to True.
- For example::
+ For example::
tasks:
- ceph:
clients:
all:
- mon/workloadgen.sh
- """
- def __init__(self, ctx, manager, config, logger):
- self.ctx = ctx
- self.manager = manager
- self.manager.wait_for_clean()
-
- self.stopping = False
- self.logger = logger
- self.config = config
-
- if self.config is None:
- self.config = dict()
-
- """ Test reproducibility """
- self.random_seed = self.config.get('seed', None)
-
- if self.random_seed is None:
- self.random_seed = int(time.time())
-
- self.rng = random.Random()
- self.rng.seed(int(self.random_seed))
-
- """ Monitor thrashing """
- self.revive_delay = float(self.config.get('revive_delay', 10.0))
- self.thrash_delay = float(self.config.get('thrash_delay', 0.0))
-
- self.thrash_many = self.config.get('thrash_many', False)
- self.maintain_quorum = self.config.get('maintain_quorum', True)
-
- self.scrub = self.config.get('scrub', True)
-
- self.freeze_mon_probability = float(self.config.get('freeze_mon_probability', 10))
- self.freeze_mon_duration = float(self.config.get('freeze_mon_duration', 15.0))
-
- assert self.max_killable() > 0, \
- 'Unable to kill at least one monitor with the current config.'
-
- """ Store thrashing """
- self.store_thrash = self.config.get('store_thrash', False)
- self.store_thrash_probability = int(
- self.config.get('store_thrash_probability', 50))
- if self.store_thrash:
- assert self.store_thrash_probability > 0, \
- 'store_thrash is set, probability must be > 0'
- assert self.maintain_quorum, \
- 'store_thrash = true must imply maintain_quorum = true'
-
- self.thread = gevent.spawn(self.do_thrash)
-
- def log(self, x):
- self.logger.info(x)
-
- def do_join(self):
- self.stopping = True
- self.thread.get()
-
- def should_thrash_store(self):
- if not self.store_thrash:
- return False
- return self.rng.randrange(0,101) < self.store_thrash_probability
-
- def thrash_store(self, mon):
- addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
- self.log('thrashing mon.{id}@{addr} store'.format(id=mon,addr=addr))
- out = self.manager.raw_cluster_cmd('-m', addr, 'sync', 'force')
- j = json.loads(out)
- assert j['ret'] == 0, \
- 'error forcing store sync on mon.{id}:\n{ret}'.format(
- id=mon,ret=out)
-
- def should_freeze_mon(self):
- return self.rng.randrange(0,101) < self.freeze_mon_probability
-
- def freeze_mon(self, mon):
- log.info('Sending STOP to mon %s', mon)
- self.manager.signal_mon(mon, 19) # STOP
-
- def unfreeze_mon(self, mon):
- log.info('Sending CONT to mon %s', mon)
- self.manager.signal_mon(mon, 18) # CONT
-
- def kill_mon(self, mon):
- self.log('killing mon.{id}'.format(id=mon))
- self.manager.kill_mon(mon)
-
- def revive_mon(self, mon):
- self.log('reviving mon.{id}'.format(id=mon))
- self.manager.revive_mon(mon)
-
- def max_killable(self):
- m = len(_get_mons(self.ctx))
- if self.maintain_quorum:
- return max(math.ceil(m/2.0)-1,0)
- else:
- return m
-
- def do_thrash(self):
- self.log('start thrashing')
- self.log('seed: {s}, revive delay: {r}, thrash delay: {t} '\
- 'thrash many: {tm}, maintain quorum: {mq} '\
- 'store thrash: {st}, probability: {stp} '\
- 'freeze mon: prob {fp} duration {fd}'.format(
- s=self.random_seed,r=self.revive_delay,t=self.thrash_delay,
- tm=self.thrash_many, mq=self.maintain_quorum,
- st=self.store_thrash,stp=self.store_thrash_probability,
- fp=self.freeze_mon_probability,fd=self.freeze_mon_duration,
- ))
-
- while not self.stopping:
- mons = _get_mons(self.ctx)
- self.manager.wait_for_mon_quorum_size(len(mons))
- self.log('making sure all monitors are in the quorum')
- for m in mons:
- s = self.manager.get_mon_status(m)
- assert s['state'] == 'leader' or s['state'] == 'peon'
- assert len(s['quorum']) == len(mons)
-
- kill_up_to = self.rng.randrange(1, self.max_killable()+1)
- mons_to_kill = self.rng.sample(mons, kill_up_to)
- self.log('monitors to thrash: {m}'.format(m=mons_to_kill))
-
- mons_to_freeze = []
- for mon in mons:
- if mon in mons_to_kill:
- continue
- if self.should_freeze_mon():
- mons_to_freeze.append(mon)
- self.log('monitors to freeze: {m}'.format(m=mons_to_freeze))
-
- for mon in mons_to_kill:
- self.log('thrashing mon.{m}'.format(m=mon))
-
- """ we only thrash stores if we are maintaining quorum """
- if self.should_thrash_store() and self.maintain_quorum:
- self.thrash_store(mon)
-
- self.kill_mon(mon)
-
- if mons_to_freeze:
- for mon in mons_to_freeze:
- self.freeze_mon(mon)
- self.log('waiting for {delay} secs to unfreeze mons'.format(
- delay=self.freeze_mon_duration))
- time.sleep(self.freeze_mon_duration)
- for mon in mons_to_freeze:
- self.unfreeze_mon(mon)
-
- if self.maintain_quorum:
- self.manager.wait_for_mon_quorum_size(len(mons)-len(mons_to_kill))
- for m in mons:
- if m in mons_to_kill:
- continue
- s = self.manager.get_mon_status(m)
- assert s['state'] == 'leader' or s['state'] == 'peon'
- assert len(s['quorum']) == len(mons)-len(mons_to_kill)
-
- self.log('waiting for {delay} secs before reviving monitors'.format(
- delay=self.revive_delay))
- time.sleep(self.revive_delay)
-
- for mon in mons_to_kill:
- self.revive_mon(mon)
-
- # do more freezes
- if mons_to_freeze:
- for mon in mons_to_freeze:
- self.freeze_mon(mon)
- self.log('waiting for {delay} secs to unfreeze mons'.format(
- delay=self.freeze_mon_duration))
- time.sleep(self.freeze_mon_duration)
- for mon in mons_to_freeze:
- self.unfreeze_mon(mon)
-
- self.manager.wait_for_mon_quorum_size(len(mons))
- for m in mons:
- s = self.manager.get_mon_status(m)
- assert s['state'] == 'leader' or s['state'] == 'peon'
- assert len(s['quorum']) == len(mons)
-
- if self.scrub:
- self.log('triggering scrub')
- try:
- self.manager.raw_cluster_cmd('scrub')
- except Exception:
- pass
-
- if self.thrash_delay > 0.0:
- self.log('waiting for {delay} secs before continuing thrashing'.format(
- delay=self.thrash_delay))
- time.sleep(self.thrash_delay)
+ """
+ def __init__(self, ctx, manager, config, logger):
+ self.ctx = ctx
+ self.manager = manager
+ self.manager.wait_for_clean()
+
+ self.stopping = False
+ self.logger = logger
+ self.config = config
+
+ if self.config is None:
+ self.config = dict()
+
+ """ Test reproducibility """
+ self.random_seed = self.config.get('seed', None)
+
+ if self.random_seed is None:
+ self.random_seed = int(time.time())
+
+ self.rng = random.Random()
+ self.rng.seed(int(self.random_seed))
+
+ """ Monitor thrashing """
+ self.revive_delay = float(self.config.get('revive_delay', 10.0))
+ self.thrash_delay = float(self.config.get('thrash_delay', 0.0))
+
+ self.thrash_many = self.config.get('thrash_many', False)
+ self.maintain_quorum = self.config.get('maintain_quorum', True)
+
+ self.scrub = self.config.get('scrub', True)
+
+ self.freeze_mon_probability = float(self.config.get('freeze_mon_probability', 10))
+ self.freeze_mon_duration = float(self.config.get('freeze_mon_duration', 15.0))
+
+ assert self.max_killable() > 0, \
+ 'Unable to kill at least one monitor with the current config.'
+
+ """ Store thrashing """
+ self.store_thrash = self.config.get('store_thrash', False)
+ self.store_thrash_probability = int(
+ self.config.get('store_thrash_probability', 50))
+ if self.store_thrash:
+ assert self.store_thrash_probability > 0, \
+ 'store_thrash is set, probability must be > 0'
+ assert self.maintain_quorum, \
+ 'store_thrash = true must imply maintain_quorum = true'
+
+ self.thread = gevent.spawn(self.do_thrash)
+
+ def log(self, x):
+ """
+ locally log info messages
+ """
+ self.logger.info(x)
+
+ def do_join(self):
+ """
+ Break out of this processes thrashing loop.
+ """
+ self.stopping = True
+ self.thread.get()
+
+ def should_thrash_store(self):
+ """
+ If allowed, indicate that we should thrash a certain percentage of
+ the time as determined by the store_thrash_probability value.
+ """
+ if not self.store_thrash:
+ return False
+ return self.rng.randrange(0, 101) < self.store_thrash_probability
+
+ def thrash_store(self, mon):
+ """
+ Thrash the monitor specified.
+ :param mon: monitor to thrash
+ """
+ addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
+ self.log('thrashing mon.{id}@{addr} store'.format(id=mon, addr=addr))
+ out = self.manager.raw_cluster_cmd('-m', addr, 'sync', 'force')
+ j = json.loads(out)
+ assert j['ret'] == 0, \
+ 'error forcing store sync on mon.{id}:\n{ret}'.format(
+ id=mon,ret=out)
+
+ def should_freeze_mon(self):
+ """
+ Indicate that we should freeze a certain percentago of the time
+ as determined by the freeze_mon_probability value.
+ """
+ return self.rng.randrange(0, 101) < self.freeze_mon_probability
+
+ def freeze_mon(self, mon):
+ """
+ Send STOP signal to freeze the monitor.
+ """
+ log.info('Sending STOP to mon %s', mon)
+ self.manager.signal_mon(mon, 19) # STOP
+
+ def unfreeze_mon(self, mon):
+ """
+ Send CONT signal to unfreeze the monitor.
+ """
+ log.info('Sending CONT to mon %s', mon)
+ self.manager.signal_mon(mon, 18) # CONT
+
+ def kill_mon(self, mon):
+ """
+ Kill the monitor specified
+ """
+ self.log('killing mon.{id}'.format(id=mon))
+ self.manager.kill_mon(mon)
+
+ def revive_mon(self, mon):
+ """
+ Revive the monitor specified
+ """
+ self.log('killing mon.{id}'.format(id=mon))
+ self.log('reviving mon.{id}'.format(id=mon))
+ self.manager.revive_mon(mon)
+
+ def max_killable(self):
+ """
+ Return the maximum number of monitors we can kill.
+ """
+ m = len(_get_mons(self.ctx))
+ if self.maintain_quorum:
+ return max(math.ceil(m/2.0)-1, 0)
+ else:
+ return m
+
+ def do_thrash(self):
+ """
+ Cotinuously loop and thrash the monitors.
+ """
+ self.log('start thrashing')
+ self.log('seed: {s}, revive delay: {r}, thrash delay: {t} '\
+ 'thrash many: {tm}, maintain quorum: {mq} '\
+ 'store thrash: {st}, probability: {stp} '\
+ 'freeze mon: prob {fp} duration {fd}'.format(
+ s=self.random_seed,r=self.revive_delay,t=self.thrash_delay,
+ tm=self.thrash_many, mq=self.maintain_quorum,
+ st=self.store_thrash,stp=self.store_thrash_probability,
+ fp=self.freeze_mon_probability,fd=self.freeze_mon_duration,
+ ))
+
+ while not self.stopping:
+ mons = _get_mons(self.ctx)
+ self.manager.wait_for_mon_quorum_size(len(mons))
+ self.log('making sure all monitors are in the quorum')
+ for m in mons:
+ s = self.manager.get_mon_status(m)
+ assert s['state'] == 'leader' or s['state'] == 'peon'
+ assert len(s['quorum']) == len(mons)
+
+ kill_up_to = self.rng.randrange(1, self.max_killable()+1)
+ mons_to_kill = self.rng.sample(mons, kill_up_to)
+ self.log('monitors to thrash: {m}'.format(m=mons_to_kill))
+
+ mons_to_freeze = []
+ for mon in mons:
+ if mon in mons_to_kill:
+ continue
+ if self.should_freeze_mon():
+ mons_to_freeze.append(mon)
+ self.log('monitors to freeze: {m}'.format(m=mons_to_freeze))
+
+ for mon in mons_to_kill:
+ self.log('thrashing mon.{m}'.format(m=mon))
+
+ """ we only thrash stores if we are maintaining quorum """
+ if self.should_thrash_store() and self.maintain_quorum:
+ self.thrash_store(mon)
+
+ self.kill_mon(mon)
+
+ if mons_to_freeze:
+ for mon in mons_to_freeze:
+ self.freeze_mon(mon)
+ self.log('waiting for {delay} secs to unfreeze mons'.format(
+ delay=self.freeze_mon_duration))
+ time.sleep(self.freeze_mon_duration)
+ for mon in mons_to_freeze:
+ self.unfreeze_mon(mon)
+
+ if self.maintain_quorum:
+ self.manager.wait_for_mon_quorum_size(len(mons)-len(mons_to_kill))
+ for m in mons:
+ if m in mons_to_kill:
+ continue
+ s = self.manager.get_mon_status(m)
+ assert s['state'] == 'leader' or s['state'] == 'peon'
+ assert len(s['quorum']) == len(mons)-len(mons_to_kill)
+
+ self.log('waiting for {delay} secs before reviving monitors'.format(
+ delay=self.revive_delay))
+ time.sleep(self.revive_delay)
+
+ for mon in mons_to_kill:
+ self.revive_mon(mon)
+ # do more freezes
+ if mons_to_freeze:
+ for mon in mons_to_freeze:
+ self.freeze_mon(mon)
+ self.log('waiting for {delay} secs to unfreeze mons'.format(
+ delay=self.freeze_mon_duration))
+ time.sleep(self.freeze_mon_duration)
+ for mon in mons_to_freeze:
+ self.unfreeze_mon(mon)
+
+ self.manager.wait_for_mon_quorum_size(len(mons))
+ for m in mons:
+ s = self.manager.get_mon_status(m)
+ assert s['state'] == 'leader' or s['state'] == 'peon'
+ assert len(s['quorum']) == len(mons)
+
+ if self.scrub:
+ self.log('triggering scrub')
+ try:
+ self.manager.raw_cluster_cmd('scrub')
+ except Exception:
+ pass
+
+ if self.thrash_delay > 0.0:
+ self.log('waiting for {delay} secs before continuing thrashing'.format(
+ delay=self.thrash_delay))
+ time.sleep(self.thrash_delay)
@contextlib.contextmanager
def task(ctx, config):
- """
- Stress test the monitor by thrashing them while another task/workunit
- is running.
-
- Please refer to MonitorThrasher class for further information on the
- available options.
- """
- if config is None:
- config = {}
- assert isinstance(config, dict), \
- 'mon_thrash task only accepts a dict for configuration'
- assert len(_get_mons(ctx)) > 2, \
- 'mon_thrash task requires at least 3 monitors'
- log.info('Beginning mon_thrash...')
- first_mon = teuthology.get_first_mon(ctx, config)
- (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
- manager = ceph_manager.CephManager(
- mon,
- ctx=ctx,
- logger=log.getChild('ceph_manager'),
- )
- thrash_proc = MonitorThrasher(ctx,
- manager, config,
- logger=log.getChild('mon_thrasher'))
- try:
- log.debug('Yielding')
- yield
- finally:
- log.info('joining mon_thrasher')
- thrash_proc.do_join()
- mons = _get_mons(ctx)
- manager.wait_for_mon_quorum_size(len(mons))
+ """
+ Stress test the monitor by thrashing them while another task/workunit
+ is running.
+
+ Please refer to MonitorThrasher class for further information on the
+ available options.
+ """
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ 'mon_thrash task only accepts a dict for configuration'
+ assert len(_get_mons(ctx)) > 2, \
+ 'mon_thrash task requires at least 3 monitors'
+ log.info('Beginning mon_thrash...')
+ first_mon = teuthology.get_first_mon(ctx, config)
+ (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+ manager = ceph_manager.CephManager(
+ mon,
+ ctx=ctx,
+ logger=log.getChild('ceph_manager'),
+ )
+ thrash_proc = MonitorThrasher(ctx,
+ manager, config,
+ logger=log.getChild('mon_thrasher'))
+ try:
+ log.debug('Yielding')
+ yield
+ finally:
+ log.info('joining mon_thrasher')
+ thrash_proc.do_join()
+ mons = _get_mons(ctx)
+ manager.wait_for_mon_quorum_size(len(mons))