import ceph_manager
import random
import time
-import gevent
+from gevent.greenlet import Greenlet
+from gevent.event import Event
from teuthology import misc as teuthology
log = logging.getLogger(__name__)
-class MDSThrasher:
+class MDSThrasher(Greenlet):
"""
MDSThrasher::
max_revive_delay: [default: 10] maximum number of seconds to delay before
bringing back a thrashed MDS
-
+
thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
during replay. Value should be between 0.0 and 1.0
"""
def __init__(self, ctx, manager, config, logger, failure_group, weight):
+ super(MDSThrasher, self).__init__()
+
self.ctx = ctx
self.manager = manager
- self.manager.wait_for_clean()
+ assert self.manager.is_clean()
- self.stopping = False
+ self.stopping = Event()
self.logger = logger
self.config = config
self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
- self.thread = gevent.spawn(self.do_thrash)
-
self.failure_group = failure_group
self.weight = weight
+ def _run(self):
+ try:
+ self.do_thrash()
+ except:
+ # Log exceptions here so we get the full backtrace (it's lost
+ # by the time someone does a .get() on this greenlet)
+ self.logger.exception("Exception in do_thrash:")
+ raise
+
def log(self, x):
"""Write data to logger assigned to this MDThrasher"""
self.logger.info(x)
- def do_join(self):
- """Thread finished"""
- self.stopping = True
- self.thread.get()
+ def stop(self):
+ self.stopping.set()
def do_thrash(self):
"""
"""
self.log('starting mds_do_thrash for failure group: ' + ', '.join(
['mds.{_id}'.format(_id=_f) for _f in self.failure_group]))
- while not self.stopping:
+ while not self.stopping.is_set():
delay = self.max_thrash_delay
if self.randomize:
delay = random.randrange(0.0, self.max_thrash_delay)
if delay > 0.0:
self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
- time.sleep(delay)
+ self.stopping.wait(delay)
+ if self.stopping.is_set():
+ continue
skip = random.randrange(0.0, 1.0)
if self.weight < 1.0 and skip > self.weight:
self.manager.kill_mds_by_rank(active_rank)
# wait for mon to report killed mds as crashed
- status = {}
last_laggy_since = None
itercount = 0
while True:
log.info('Assigning mds rank {r} to failure group {g}'.format(r=r, g=active))
failure_groups[active].append(r)
+ manager.wait_for_clean()
for (active, standbys) in failure_groups.iteritems():
-
weight = 1.0
if 'thrash_weights' in config:
weight = int(config['thrash_weights'].get('mds.{_id}'.format(_id=active), '0.0'))
failure_group = [active]
failure_group.extend(standbys)
- thrashers[active] = MDSThrasher(
+
+ thrasher = MDSThrasher(
ctx, manager, config,
logger=log.getChild('mds_thrasher.failure_group.[{a}, {sbs}]'.format(
a=active,
),
failure_group=failure_group,
weight=weight)
+ thrasher.start()
+ thrashers[active] = thrasher
+
# if thrash_weights isn't specified and we've reached max_thrash,
# we're done
if not 'thrash_weights' in config and len(thrashers) == max_thrashers:
log.info('joining mds_thrashers')
for t in thrashers:
log.info('join thrasher for failure group [{fg}]'.format(fg=', '.join(failure_group)))
- thrashers[t].do_join()
+ thrashers[t].stop()
+ thrashers[t].join()
log.info('done joining')