]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
Readjust the indentation of mon_clock_skew_check.py and mon_thrash.py. 192/head
authorWarren Usui <warren.usui@inktank.com>
Fri, 14 Feb 2014 02:18:56 +0000 (18:18 -0800)
committerWarren Usui <warren.usui@inktank.com>
Fri, 14 Feb 2014 04:36:08 +0000 (20:36 -0800)
Added docstrings.

Fixes: 6537
Signed-off-by: Warren Usui <warren.usui@inktank.com>
teuthology/task/mon_clock_skew_check.py
teuthology/task/mon_thrash.py

index 000f3bf89b6fd0ee9b0e98ccbfcdcb29c6bac23a..891e6ec484ede7c5f7c2b4480a6be2678e4e32a4 100644 (file)
@@ -1,3 +1,6 @@
+"""
+Handle clock skews in monitors.
+"""
 import logging
 import contextlib
 import ceph_manager
@@ -9,227 +12,250 @@ from teuthology import misc as teuthology
 log = logging.getLogger(__name__)
 
 class ClockSkewCheck:
-  """
-  Periodically check if there are any clock skews among the monitors in the
-  quorum. By default, assume no skews are supposed to exist; that can be
-  changed using the 'expect-skew' option. If 'fail-on-skew' is set to false,
-  then we will always succeed and only report skews if any are found.
-
-  This class does not spawn a thread. It assumes that, if that is indeed
-  wanted, it should be done by a third party (for instance, the task using
-  this class). We intend it as such in order to reuse this class if need be.
-
-  This task accepts the following options:
-
-   interval     amount of seconds to wait in-between checks. (default: 30.0)
-   max-skew     maximum skew, in seconds, that is considered tolerable before
-                issuing a warning. (default: 0.05)
-   expect-skew  'true' or 'false', to indicate whether to expect a skew during
-                the run or not. If 'true', the test will fail if no skew is
-                found, and succeed if a skew is indeed found; if 'false', it's
-                the other way around. (default: false)
-   never-fail   Don't fail the run if a skew is detected and we weren't
-                expecting it, or if no skew is detected and we were expecting
-                it. (default: False)
-
-   at-least-once          Runs at least once, even if we are told to stop.
-                          (default: True)
-   at-least-once-timeout  If we were told to stop but we are attempting to
-                          run at least once, timeout after this many seconds.
-                          (default: 600)
-
-  Example:
-    Expect a skew higher than 0.05 seconds, but only report it without failing
-    the teuthology run.
+    """
+    Periodically check if there are any clock skews among the monitors in the
+    quorum. By default, assume no skews are supposed to exist; that can be
+    changed using the 'expect-skew' option. If 'fail-on-skew' is set to false,
+    then we will always succeed and only report skews if any are found.
+
+    This class does not spawn a thread. It assumes that, if that is indeed
+    wanted, it should be done by a third party (for instance, the task using
+    this class). We intend it as such in order to reuse this class if need be.
+
+    This task accepts the following options:
+
+    interval     amount of seconds to wait in-between checks. (default: 30.0)
+    max-skew     maximum skew, in seconds, that is considered tolerable before
+                 issuing a warning. (default: 0.05)
+    expect-skew  'true' or 'false', to indicate whether to expect a skew during
+                 the run or not. If 'true', the test will fail if no skew is
+                 found, and succeed if a skew is indeed found; if 'false', it's
+                 the other way around. (default: false)
+    never-fail   Don't fail the run if a skew is detected and we weren't
+                 expecting it, or if no skew is detected and we were expecting
+                 it. (default: False)
+
+    at-least-once          Runs at least once, even if we are told to stop.
+                           (default: True)
+    at-least-once-timeout  If we were told to stop but we are attempting to
+                           run at least once, timeout after this many seconds.
+                           (default: 600)
+
+    Example:
+        Expect a skew higher than 0.05 seconds, but only report it without
+        failing the teuthology run.
 
     - mon_clock_skew_check:
         interval: 30
         max-skew: 0.05
         expect_skew: true
         never-fail: true
-  """
-
-  def __init__(self, ctx, manager, config, logger):
-    self.ctx = ctx
-    self.manager = manager;
-
-    self.stopping = False
-    self.logger = logger
-    self.config = config
-
-    if self.config is None:
-      self.config = dict()
-
-    self.check_interval = float(self.config.get('interval', 30.0))
-
-    first_mon = teuthology.get_first_mon(ctx, config)
-    remote = ctx.cluster.only(first_mon).remotes.keys()[0]
-    proc = remote.run(
-        args=[
-          'sudo',
-          'ceph-mon',
-          '-i', first_mon[4:],
-          '--show-config-value', 'mon_clock_drift_allowed'
-          ], stdout=StringIO(), wait=True
-        )
-    self.max_skew = self.config.get('max-skew', float(proc.stdout.getvalue()))
-
-    self.expect_skew = self.config.get('expect-skew', False)
-    self.never_fail = self.config.get('never-fail', False)
-    self.at_least_once = self.config.get('at-least-once', True)
-    self.at_least_once_timeout = self.config.get('at-least-once-timeout', 600.0)
-
-  def info(self, x):
-    self.logger.info(x)
-
-  def warn(self, x):
-    self.logger.warn(x)
-
-  def debug(self, x):
-    self.logger.debug(x)
-
-  def finish(self):
-    self.stopping = True
-
-  def sleep_interval(self):
-    if self.check_interval > 0.0:
-      self.debug('sleeping for {s} seconds'.format(
-        s=self.check_interval))
-      time.sleep(self.check_interval)
-
-  def print_skews(self, skews):
-    total = len(skews)
-    if total > 0:
-      self.info('---------- found {n} skews ----------'.format(n=total))
-      for mon_id,values in skews.iteritems():
-        self.info('mon.{id}: {v}'.format(id=mon_id,v=values))
-      self.info('-------------------------------------')
-    else:
-      self.info('---------- no skews were found ----------')
-
-  def do_check(self):
-    self.info('start checking for clock skews')
-    skews = dict()
-    ran_once = False
-    started_on = None
-
-    while not self.stopping or (self.at_least_once and not ran_once):
-
-      if self.at_least_once and not ran_once and self.stopping:
-        if started_on is None:
-          self.info('kicking-off timeout (if any)')
-          started_on = time.time()
-        elif self.at_least_once_timeout > 0.0:
-          assert time.time() - started_on < self.at_least_once_timeout, \
-              'failed to obtain a timecheck before timeout expired'
-
-      quorum_size = len(teuthology.get_mon_names(self.ctx))
-      self.manager.wait_for_mon_quorum_size(quorum_size)
-
-      health = self.manager.get_mon_health(True)
-      timechecks = health['timechecks']
-
-      clean_check = False
-
-      if timechecks['round_status'] == 'finished':
-        assert (timechecks['round'] % 2) == 0, \
-            'timecheck marked as finished but round ' \
-            'disagrees (r {r})'.format(
-                r=timechecks['round'])
-        clean_check = True
-      else:
-        assert timechecks['round_status'] == 'on-going', \
-            'timecheck status expected \'on-going\' ' \
-            'but found \'{s}\' instead'.format(
-                s=timechecks['round_status'])
-        if 'mons' in timechecks.keys() and len(timechecks['mons']) > 1:
-          self.info('round still on-going, but there are available reports')
+    """
+
+    def __init__(self, ctx, manager, config, logger):
+        self.ctx = ctx
+        self.manager = manager
+
+        self.stopping = False
+        self.logger = logger
+        self.config = config
+
+        if self.config is None:
+            self.config = dict()
+
+        self.check_interval = float(self.config.get('interval', 30.0))
+
+        first_mon = teuthology.get_first_mon(ctx, config)
+        remote = ctx.cluster.only(first_mon).remotes.keys()[0]
+        proc = remote.run(
+            args=[
+                'sudo',
+                'ceph-mon',
+                '-i', first_mon[4:],
+                '--show-config-value', 'mon_clock_drift_allowed'
+                ], stdout=StringIO(), wait=True
+                )
+        self.max_skew = self.config.get('max-skew', float(proc.stdout.getvalue()))
+
+        self.expect_skew = self.config.get('expect-skew', False)
+        self.never_fail = self.config.get('never-fail', False)
+        self.at_least_once = self.config.get('at-least-once', True)
+        self.at_least_once_timeout = self.config.get('at-least-once-timeout', 600.0)
+
+    def info(self, x):
+        """
+        locally define logger for info messages
+        """
+        self.logger.info(x)
+
+    def warn(self, x):
+        """
+        locally define logger for warnings
+        """
+        self.logger.warn(x)
+
+    def debug(self, x):
+        """
+        locally define logger for debug messages
+        """
+        self.logger.info(x)
+        self.logger.debug(x)
+
+    def finish(self):
+        """
+        Break out of the do_check loop.
+        """
+        self.stopping = True
+
+    def sleep_interval(self):
+        """
+        If a sleep interval is set, sleep for that amount of time.
+        """
+        if self.check_interval > 0.0:
+            self.debug('sleeping for {s} seconds'.format(
+                s=self.check_interval))
+            time.sleep(self.check_interval)
+
+    def print_skews(self, skews):
+        """
+        Display skew values.
+        """
+        total = len(skews)
+        if total > 0:
+            self.info('---------- found {n} skews ----------'.format(n=total))
+            for mon_id, values in skews.iteritems():
+                self.info('mon.{id}: {v}'.format(id=mon_id, v=values))
+            self.info('-------------------------------------')
         else:
-          self.info('no timechecks available just yet')
-          self.sleep_interval()
-          continue
-
-      assert len(timechecks['mons']) > 1, \
-          'there are not enough reported timechecks; ' \
-          'expected > 1 found {n}'.format(n=len(timechecks['mons']))
-
-      for check in timechecks['mons']:
-        mon_skew = float(check['skew'])
-        mon_health = check['health']
-        mon_id = check['name']
-        if abs(mon_skew) > self.max_skew:
-          assert mon_health == 'HEALTH_WARN', \
-              'mon.{id} health is \'{health}\' but skew {s} > max {ms}'.format(
-                  id=mon_id,health=mon_health,s=abs(mon_skew),ms=self.max_skew)
-
-          log_str = 'mon.{id} with skew {s} > max {ms}'.format(
-            id=mon_id,s=abs(mon_skew),ms=self.max_skew)
-
-          """ add to skew list """
-          details = check['details']
-          skews[mon_id] = {'skew': mon_skew, 'details': details}
-
-          if self.expect_skew:
-            self.info('expected skew: {str}'.format(str=log_str))
-          else:
-            self.warn('unexpected skew: {str}'.format(str=log_str))
-
-      if clean_check or (self.expect_skew and len(skews) > 0):
-        ran_once = True
+            self.info('---------- no skews were found ----------')
+
+    def do_check(self):
+        """
+        Clock skew checker.  Loops until finish() is called.
+        """
+        self.info('start checking for clock skews')
+        skews = dict()
+        ran_once = False
+        
+        started_on = None
+
+        while not self.stopping or (self.at_least_once and not ran_once):
+
+            if self.at_least_once and not ran_once and self.stopping:
+                if started_on is None:
+                    self.info('kicking-off timeout (if any)')
+                    started_on = time.time()
+                elif self.at_least_once_timeout > 0.0:
+                    assert time.time() - started_on < self.at_least_once_timeout, \
+                        'failed to obtain a timecheck before timeout expired'
+
+            quorum_size = len(teuthology.get_mon_names(self.ctx))
+            self.manager.wait_for_mon_quorum_size(quorum_size)
+
+            health = self.manager.get_mon_health(True)
+            timechecks = health['timechecks']
+
+            clean_check = False
+
+            if timechecks['round_status'] == 'finished':
+                assert (timechecks['round'] % 2) == 0, \
+                    'timecheck marked as finished but round ' \
+                    'disagrees (r {r})'.format(
+                        r=timechecks['round'])
+                clean_check = True
+            else:
+                assert timechecks['round_status'] == 'on-going', \
+                        'timecheck status expected \'on-going\' ' \
+                        'but found \'{s}\' instead'.format(
+                            s=timechecks['round_status'])
+                if 'mons' in timechecks.keys() and len(timechecks['mons']) > 1:
+                    self.info('round still on-going, but there are available reports')
+                else:
+                    self.info('no timechecks available just yet')
+                    self.sleep_interval()
+                    continue
+
+            assert len(timechecks['mons']) > 1, \
+                'there are not enough reported timechecks; ' \
+                'expected > 1 found {n}'.format(n=len(timechecks['mons']))
+
+            for check in timechecks['mons']:
+                mon_skew = float(check['skew'])
+                mon_health = check['health']
+                mon_id = check['name']
+                if abs(mon_skew) > self.max_skew:
+                    assert mon_health == 'HEALTH_WARN', \
+                        'mon.{id} health is \'{health}\' but skew {s} > max {ms}'.format(
+                            id=mon_id,health=mon_health,s=abs(mon_skew),ms=self.max_skew)
+
+                    log_str = 'mon.{id} with skew {s} > max {ms}'.format(
+                        id=mon_id,s=abs(mon_skew),ms=self.max_skew)
+
+                    """ add to skew list """
+                    details = check['details']
+                    skews[mon_id] = {'skew': mon_skew, 'details': details}
+
+                    if self.expect_skew:
+                        self.info('expected skew: {str}'.format(str=log_str))
+                    else:
+                        self.warn('unexpected skew: {str}'.format(str=log_str))
+
+            if clean_check or (self.expect_skew and len(skews) > 0):
+                ran_once = True
+                self.print_skews(skews)
+            self.sleep_interval()
+
+        total = len(skews)
         self.print_skews(skews)
-      self.sleep_interval()
-
-    total = len(skews)
-    self.print_skews(skews)
 
-    error_str = ''
-    found_error = False
+        error_str = ''
+        found_error = False
 
-    if self.expect_skew:
-      if total == 0:
-        error_str = 'We were expecting a skew, but none was found!'
-        found_error = True
-    else:
-      if total > 0:
-        error_str = 'We were not expecting a skew, but we did find it!'
-        found_error = True
+        if self.expect_skew:
+            if total == 0:
+                error_str = 'We were expecting a skew, but none was found!'
+                found_error = True
+        else:
+            if total > 0:
+                error_str = 'We were not expecting a skew, but we did find it!'
+                found_error = True
 
-    if found_error:
-      self.info(error_str)
-      if not self.never_fail:
-        assert False, error_str
+        if found_error:
+            self.info(error_str)
+            if not self.never_fail:
+                assert False, error_str
 
 @contextlib.contextmanager
 def task(ctx, config):
-  """
-  Use clas ClockSkewCheck to check for clock skews on the monitors.
-  This task will spawn a thread running ClockSkewCheck's do_check().
-
-  All the configuration will be directly handled by ClockSkewCheck,
-  so please refer to the class documentation for further information.
-  """
-  if config is None:
-    config = {}
-  assert isinstance(config, dict), \
-      'mon_clock_skew_check task only accepts a dict for configuration'
-  log.info('Beginning mon_clock_skew_check...')
-  first_mon = teuthology.get_first_mon(ctx, config)
-  (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
-  manager = ceph_manager.CephManager(
-      mon,
-      ctx=ctx,
-      logger=log.getChild('ceph_manager'),
-      )
-
-  skew_check = ClockSkewCheck(ctx,
-      manager, config,
-      logger=log.getChild('mon_clock_skew_check'))
-  skew_check_thread = gevent.spawn(skew_check.do_check)
-  try:
-    yield
-  finally:
-    log.info('joining mon_clock_skew_check')
-    skew_check.finish()
-    skew_check_thread.get()
+    """
+    Use clas ClockSkewCheck to check for clock skews on the monitors.
+    This task will spawn a thread running ClockSkewCheck's do_check().
+
+    All the configuration will be directly handled by ClockSkewCheck,
+    so please refer to the class documentation for further information.
+    """
+    if config is None:
+        config = {}
+    assert isinstance(config, dict), \
+        'mon_clock_skew_check task only accepts a dict for configuration'
+    log.info('Beginning mon_clock_skew_check...')
+    first_mon = teuthology.get_first_mon(ctx, config)
+    (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+    manager = ceph_manager.CephManager(
+        mon,
+        ctx=ctx,
+        logger=log.getChild('ceph_manager'),
+        )
+
+    skew_check = ClockSkewCheck(ctx,
+        manager, config,
+        logger=log.getChild('mon_clock_skew_check'))
+    skew_check_thread = gevent.spawn(skew_check.do_check)
+    try:
+        yield
+    finally:
+        log.info('joining mon_clock_skew_check')
+        skew_check.finish()
+        skew_check_thread.get()
 
 
index fff73d81e867abec530034269c3e506ec7e4fe95..7dc7caad8463d6e9ba6d8d806c202ac092091a7f 100644 (file)
@@ -1,3 +1,6 @@
+"""
+Monitor thrash
+"""
 import logging
 import contextlib
 import ceph_manager
@@ -11,12 +14,15 @@ from teuthology import misc as teuthology
 log = logging.getLogger(__name__)
 
 def _get_mons(ctx):
-  mons = [f[len('mon.'):] for f in teuthology.get_mon_names(ctx)]
-  return mons
+    """
+    Get monitor names from the context value.
+    """
+    mons = [f[len('mon.'):] for f in teuthology.get_mon_names(ctx)]
+    return mons
 
 class MonitorThrasher:
-  """
-  How it works::
+    """
+    How it works::
 
     - pick a monitor
     - kill it
@@ -26,7 +32,7 @@ class MonitorThrasher:
     - wait for quorum to be formed
     - sleep for 'thrash_delay' seconds
 
-  Options::
+    Options::
 
     seed                Seed to use on the RNG to reproduce a previous
                         behaviour (default: None; i.e., not set)
@@ -34,8 +40,7 @@ class MonitorThrasher:
                         the monitor (default: 10)
     thrash_delay        Number of seconds to wait in-between
                         test iterations (default: 0)
-    thrash_store        Thrash monitor store before killing the monitor
-                        being thrashed (default: False)
+    thrash_store        Thrash monitor store before killing the monitor being thrashed (default: False)
     thrash_store_probability  Probability of thrashing a monitor's store
                               (default: 50)
     thrash_many         Thrash multiple monitors instead of just one. If
@@ -58,7 +63,7 @@ class MonitorThrasher:
     Note: if 'store-thrash' is set to True, then 'maintain-quorum' must also
           be set to True.
 
-  For example::
+    For example::
 
     tasks:
     - ceph:
@@ -75,228 +80,264 @@ class MonitorThrasher:
         clients:
           all:
             - mon/workloadgen.sh
-  """
-  def __init__(self, ctx, manager, config, logger):
-    self.ctx = ctx
-    self.manager = manager
-    self.manager.wait_for_clean()
-
-    self.stopping = False
-    self.logger = logger
-    self.config = config
-
-    if self.config is None:
-      self.config = dict()
-
-    """ Test reproducibility """
-    self.random_seed = self.config.get('seed', None)
-
-    if self.random_seed is None:
-      self.random_seed = int(time.time())
-
-    self.rng = random.Random()
-    self.rng.seed(int(self.random_seed))
-
-    """ Monitor thrashing """
-    self.revive_delay = float(self.config.get('revive_delay', 10.0))
-    self.thrash_delay = float(self.config.get('thrash_delay', 0.0))
-
-    self.thrash_many = self.config.get('thrash_many', False)
-    self.maintain_quorum = self.config.get('maintain_quorum', True)
-
-    self.scrub = self.config.get('scrub', True)
-
-    self.freeze_mon_probability = float(self.config.get('freeze_mon_probability', 10))
-    self.freeze_mon_duration = float(self.config.get('freeze_mon_duration', 15.0))
-
-    assert self.max_killable() > 0, \
-        'Unable to kill at least one monitor with the current config.'
-
-    """ Store thrashing """
-    self.store_thrash = self.config.get('store_thrash', False)
-    self.store_thrash_probability = int(
-        self.config.get('store_thrash_probability', 50))
-    if self.store_thrash:
-      assert self.store_thrash_probability > 0, \
-          'store_thrash is set, probability must be > 0'
-      assert self.maintain_quorum, \
-          'store_thrash = true must imply maintain_quorum = true'
-
-    self.thread = gevent.spawn(self.do_thrash)
-
-  def log(self, x):
-    self.logger.info(x)
-
-  def do_join(self):
-    self.stopping = True
-    self.thread.get()
-
-  def should_thrash_store(self):
-    if not self.store_thrash:
-      return False
-    return self.rng.randrange(0,101) < self.store_thrash_probability
-
-  def thrash_store(self, mon):
-    addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
-    self.log('thrashing mon.{id}@{addr} store'.format(id=mon,addr=addr))
-    out = self.manager.raw_cluster_cmd('-m', addr, 'sync', 'force')
-    j = json.loads(out)
-    assert j['ret'] == 0, \
-        'error forcing store sync on mon.{id}:\n{ret}'.format(
-            id=mon,ret=out)
-
-  def should_freeze_mon(self):
-    return self.rng.randrange(0,101) < self.freeze_mon_probability
-
-  def freeze_mon(self, mon):
-    log.info('Sending STOP to mon %s', mon)
-    self.manager.signal_mon(mon, 19)  # STOP
-
-  def unfreeze_mon(self, mon):
-    log.info('Sending CONT to mon %s', mon)
-    self.manager.signal_mon(mon, 18)  # CONT
-
-  def kill_mon(self, mon):
-    self.log('killing mon.{id}'.format(id=mon))
-    self.manager.kill_mon(mon)
-
-  def revive_mon(self, mon):
-    self.log('reviving mon.{id}'.format(id=mon))
-    self.manager.revive_mon(mon)
-
-  def max_killable(self):
-    m = len(_get_mons(self.ctx))
-    if self.maintain_quorum:
-      return max(math.ceil(m/2.0)-1,0)
-    else:
-      return m
-
-  def do_thrash(self):
-    self.log('start thrashing')
-    self.log('seed: {s}, revive delay: {r}, thrash delay: {t} '\
-               'thrash many: {tm}, maintain quorum: {mq} '\
-               'store thrash: {st}, probability: {stp} '\
-               'freeze mon: prob {fp} duration {fd}'.format(
-          s=self.random_seed,r=self.revive_delay,t=self.thrash_delay,
-          tm=self.thrash_many, mq=self.maintain_quorum,
-          st=self.store_thrash,stp=self.store_thrash_probability,
-          fp=self.freeze_mon_probability,fd=self.freeze_mon_duration,
-          ))
-
-    while not self.stopping:
-      mons = _get_mons(self.ctx)
-      self.manager.wait_for_mon_quorum_size(len(mons))
-      self.log('making sure all monitors are in the quorum')
-      for m in mons:
-        s = self.manager.get_mon_status(m)
-        assert s['state'] == 'leader' or s['state'] == 'peon'
-        assert len(s['quorum']) == len(mons)
-
-      kill_up_to = self.rng.randrange(1, self.max_killable()+1)
-      mons_to_kill = self.rng.sample(mons, kill_up_to)
-      self.log('monitors to thrash: {m}'.format(m=mons_to_kill))
-
-      mons_to_freeze = []
-      for mon in mons:
-        if mon in mons_to_kill:
-          continue
-        if self.should_freeze_mon():
-          mons_to_freeze.append(mon)
-      self.log('monitors to freeze: {m}'.format(m=mons_to_freeze))
-
-      for mon in mons_to_kill:
-        self.log('thrashing mon.{m}'.format(m=mon))
-
-        """ we only thrash stores if we are maintaining quorum """
-        if self.should_thrash_store() and self.maintain_quorum:
-          self.thrash_store(mon)
-
-        self.kill_mon(mon)
-
-      if mons_to_freeze:
-        for mon in mons_to_freeze:
-          self.freeze_mon(mon)
-        self.log('waiting for {delay} secs to unfreeze mons'.format(
-            delay=self.freeze_mon_duration))
-        time.sleep(self.freeze_mon_duration)
-        for mon in mons_to_freeze:
-          self.unfreeze_mon(mon)
-
-      if self.maintain_quorum:
-        self.manager.wait_for_mon_quorum_size(len(mons)-len(mons_to_kill))
-        for m in mons:
-          if m in mons_to_kill:
-            continue
-          s = self.manager.get_mon_status(m)
-          assert s['state'] == 'leader' or s['state'] == 'peon'
-          assert len(s['quorum']) == len(mons)-len(mons_to_kill)
-
-      self.log('waiting for {delay} secs before reviving monitors'.format(
-        delay=self.revive_delay))
-      time.sleep(self.revive_delay)
-
-      for mon in mons_to_kill:
-        self.revive_mon(mon)
-
-      # do more freezes
-      if mons_to_freeze:
-        for mon in mons_to_freeze:
-          self.freeze_mon(mon)
-        self.log('waiting for {delay} secs to unfreeze mons'.format(
-            delay=self.freeze_mon_duration))
-        time.sleep(self.freeze_mon_duration)
-        for mon in mons_to_freeze:
-          self.unfreeze_mon(mon)
-
-      self.manager.wait_for_mon_quorum_size(len(mons))
-      for m in mons:
-        s = self.manager.get_mon_status(m)
-        assert s['state'] == 'leader' or s['state'] == 'peon'
-        assert len(s['quorum']) == len(mons)
-
-      if self.scrub:
-        self.log('triggering scrub')
-        try:
-          self.manager.raw_cluster_cmd('scrub')
-        except Exception:
-          pass
-
-      if self.thrash_delay > 0.0:
-        self.log('waiting for {delay} secs before continuing thrashing'.format(
-          delay=self.thrash_delay))
-        time.sleep(self.thrash_delay)
+    """
+    def __init__(self, ctx, manager, config, logger):
+        self.ctx = ctx
+        self.manager = manager
+        self.manager.wait_for_clean()
+
+        self.stopping = False
+        self.logger = logger
+        self.config = config
+
+        if self.config is None:
+            self.config = dict()
+
+        """ Test reproducibility """
+        self.random_seed = self.config.get('seed', None)
+
+        if self.random_seed is None:
+            self.random_seed = int(time.time())
+
+        self.rng = random.Random()
+        self.rng.seed(int(self.random_seed))
+
+        """ Monitor thrashing """
+        self.revive_delay = float(self.config.get('revive_delay', 10.0))
+        self.thrash_delay = float(self.config.get('thrash_delay', 0.0))
+
+        self.thrash_many = self.config.get('thrash_many', False)
+        self.maintain_quorum = self.config.get('maintain_quorum', True)
+
+        self.scrub = self.config.get('scrub', True)
+
+        self.freeze_mon_probability = float(self.config.get('freeze_mon_probability', 10))
+        self.freeze_mon_duration = float(self.config.get('freeze_mon_duration', 15.0))
+
+        assert self.max_killable() > 0, \
+            'Unable to kill at least one monitor with the current config.'
+
+        """ Store thrashing """
+        self.store_thrash = self.config.get('store_thrash', False)
+        self.store_thrash_probability = int(
+            self.config.get('store_thrash_probability', 50))
+        if self.store_thrash:
+            assert self.store_thrash_probability > 0, \
+                'store_thrash is set, probability must be > 0'
+            assert self.maintain_quorum, \
+                'store_thrash = true must imply maintain_quorum = true'
+
+        self.thread = gevent.spawn(self.do_thrash)
+
+    def log(self, x):
+        """
+        locally log info messages
+        """
+        self.logger.info(x)
+
+    def do_join(self):
+        """
+        Break out of this processes thrashing loop.
+        """
+        self.stopping = True
+        self.thread.get()
+
+    def should_thrash_store(self):
+        """
+        If allowed, indicate that we should thrash a certain percentage of
+        the time as determined by the store_thrash_probability value.
+        """
+        if not self.store_thrash:
+            return False
+        return self.rng.randrange(0, 101) < self.store_thrash_probability
+
+    def thrash_store(self, mon):
+        """
+        Thrash the monitor specified.
+        :param mon: monitor to thrash
+        """
+        addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
+        self.log('thrashing mon.{id}@{addr} store'.format(id=mon, addr=addr))
+        out = self.manager.raw_cluster_cmd('-m', addr, 'sync', 'force')
+        j = json.loads(out)
+        assert j['ret'] == 0, \
+            'error forcing store sync on mon.{id}:\n{ret}'.format(
+                id=mon,ret=out)
+
+    def should_freeze_mon(self):
+        """
+        Indicate that we should freeze a certain percentago of the time
+        as determined by the freeze_mon_probability value.
+        """
+        return self.rng.randrange(0, 101) < self.freeze_mon_probability
+
+    def freeze_mon(self, mon):
+        """
+        Send STOP signal to freeze the monitor.
+        """
+        log.info('Sending STOP to mon %s', mon)
+        self.manager.signal_mon(mon, 19)  # STOP
+
+    def unfreeze_mon(self, mon):
+        """
+        Send CONT signal to unfreeze the monitor.
+        """
+        log.info('Sending CONT to mon %s', mon)
+        self.manager.signal_mon(mon, 18)  # CONT
+
+    def kill_mon(self, mon):
+        """
+        Kill the monitor specified
+        """
+        self.log('killing mon.{id}'.format(id=mon))
+        self.manager.kill_mon(mon)
+
+    def revive_mon(self, mon):
+        """
+        Revive the monitor specified
+        """
+        self.log('killing mon.{id}'.format(id=mon))
+        self.log('reviving mon.{id}'.format(id=mon))
+        self.manager.revive_mon(mon)
+
+    def max_killable(self):
+        """
+        Return the maximum number of monitors we can kill.
+        """
+        m = len(_get_mons(self.ctx))
+        if self.maintain_quorum:
+            return max(math.ceil(m/2.0)-1, 0)
+        else:
+            return m
+
+    def do_thrash(self):
+        """
+        Cotinuously loop and thrash the monitors.
+        """
+        self.log('start thrashing')
+        self.log('seed: {s}, revive delay: {r}, thrash delay: {t} '\
+                   'thrash many: {tm}, maintain quorum: {mq} '\
+                   'store thrash: {st}, probability: {stp} '\
+                   'freeze mon: prob {fp} duration {fd}'.format(
+                s=self.random_seed,r=self.revive_delay,t=self.thrash_delay,
+                tm=self.thrash_many, mq=self.maintain_quorum,
+                st=self.store_thrash,stp=self.store_thrash_probability,
+                fp=self.freeze_mon_probability,fd=self.freeze_mon_duration,
+                ))
+
+        while not self.stopping:
+            mons = _get_mons(self.ctx)
+            self.manager.wait_for_mon_quorum_size(len(mons))
+            self.log('making sure all monitors are in the quorum')
+            for m in mons:
+                s = self.manager.get_mon_status(m)
+                assert s['state'] == 'leader' or s['state'] == 'peon'
+                assert len(s['quorum']) == len(mons)
+
+            kill_up_to = self.rng.randrange(1, self.max_killable()+1)
+            mons_to_kill = self.rng.sample(mons, kill_up_to)
+            self.log('monitors to thrash: {m}'.format(m=mons_to_kill))
+
+            mons_to_freeze = []
+            for mon in mons:
+                if mon in mons_to_kill:
+                    continue
+                if self.should_freeze_mon():
+                    mons_to_freeze.append(mon)
+            self.log('monitors to freeze: {m}'.format(m=mons_to_freeze))
+
+            for mon in mons_to_kill:
+                self.log('thrashing mon.{m}'.format(m=mon))
+
+                """ we only thrash stores if we are maintaining quorum """
+                if self.should_thrash_store() and self.maintain_quorum:
+                    self.thrash_store(mon)
+
+                self.kill_mon(mon)
+
+            if mons_to_freeze:
+                for mon in mons_to_freeze:
+                    self.freeze_mon(mon)
+                self.log('waiting for {delay} secs to unfreeze mons'.format(
+                    delay=self.freeze_mon_duration))
+                time.sleep(self.freeze_mon_duration)
+                for mon in mons_to_freeze:
+                    self.unfreeze_mon(mon)
+
+            if self.maintain_quorum:
+                self.manager.wait_for_mon_quorum_size(len(mons)-len(mons_to_kill))
+                for m in mons:
+                    if m in mons_to_kill:
+                        continue
+                    s = self.manager.get_mon_status(m)
+                    assert s['state'] == 'leader' or s['state'] == 'peon'
+                    assert len(s['quorum']) == len(mons)-len(mons_to_kill)
+
+            self.log('waiting for {delay} secs before reviving monitors'.format(
+                delay=self.revive_delay))
+            time.sleep(self.revive_delay)
+
+            for mon in mons_to_kill:
+                self.revive_mon(mon)
+            # do more freezes
+            if mons_to_freeze:
+                for mon in mons_to_freeze:
+                    self.freeze_mon(mon)
+                self.log('waiting for {delay} secs to unfreeze mons'.format(
+                    delay=self.freeze_mon_duration))
+                time.sleep(self.freeze_mon_duration)
+                for mon in mons_to_freeze:
+                    self.unfreeze_mon(mon)
+
+            self.manager.wait_for_mon_quorum_size(len(mons))
+            for m in mons:
+                s = self.manager.get_mon_status(m)
+                assert s['state'] == 'leader' or s['state'] == 'peon'
+                assert len(s['quorum']) == len(mons)
+
+            if self.scrub:
+                self.log('triggering scrub')
+                try:
+                    self.manager.raw_cluster_cmd('scrub')
+                except Exception:
+                    pass
+
+            if self.thrash_delay > 0.0:
+                self.log('waiting for {delay} secs before continuing thrashing'.format(
+                    delay=self.thrash_delay))
+                time.sleep(self.thrash_delay)
 
 @contextlib.contextmanager
 def task(ctx, config):
-  """
-  Stress test the monitor by thrashing them while another task/workunit
-  is running.
-
-  Please refer to MonitorThrasher class for further information on the
-  available options.
-  """
-  if config is None:
-    config = {}
-  assert isinstance(config, dict), \
-      'mon_thrash task only accepts a dict for configuration'
-  assert len(_get_mons(ctx)) > 2, \
-      'mon_thrash task requires at least 3 monitors'
-  log.info('Beginning mon_thrash...')
-  first_mon = teuthology.get_first_mon(ctx, config)
-  (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
-  manager = ceph_manager.CephManager(
-      mon,
-      ctx=ctx,
-      logger=log.getChild('ceph_manager'),
-      )
-  thrash_proc = MonitorThrasher(ctx,
-      manager, config,
-      logger=log.getChild('mon_thrasher'))
-  try:
-    log.debug('Yielding')
-    yield
-  finally:
-    log.info('joining mon_thrasher')
-    thrash_proc.do_join()
-    mons = _get_mons(ctx)
-    manager.wait_for_mon_quorum_size(len(mons))
+    """
+    Stress test the monitor by thrashing them while another task/workunit
+    is running.
+
+    Please refer to MonitorThrasher class for further information on the
+    available options.
+    """
+    if config is None:
+        config = {}
+    assert isinstance(config, dict), \
+        'mon_thrash task only accepts a dict for configuration'
+    assert len(_get_mons(ctx)) > 2, \
+        'mon_thrash task requires at least 3 monitors'
+    log.info('Beginning mon_thrash...')
+    first_mon = teuthology.get_first_mon(ctx, config)
+    (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+    manager = ceph_manager.CephManager(
+        mon,
+        ctx=ctx,
+        logger=log.getChild('ceph_manager'),
+        )
+    thrash_proc = MonitorThrasher(ctx,
+        manager, config,
+        logger=log.getChild('mon_thrasher'))
+    try:
+        log.debug('Yielding')
+        yield
+    finally:
+        log.info('joining mon_thrasher')
+        thrash_proc.do_join()
+        mons = _get_mons(ctx)
+        manager.wait_for_mon_quorum_size(len(mons))