]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
task/mds_thrasher: New task for thrashing the mds
authorSam Lang <sam.lang@inktank.com>
Wed, 9 Jan 2013 22:02:42 +0000 (16:02 -0600)
committerSam Lang <sam.lang@inktank.com>
Fri, 18 Jan 2013 21:48:52 +0000 (15:48 -0600)
Signed-off-by: Sam Lang <sam.lang@inktank.com>
teuthology/misc.py
teuthology/task/ceph.py
teuthology/task/ceph_manager.py
teuthology/task/mds_thrash.py [new file with mode: 0644]

index 0de60dbfe91e53bc3f4fd7d47ccebc84922c342f..ab188b7c94edcd7d1acffe258262077ab30883e8 100644 (file)
@@ -128,11 +128,13 @@ def skeleton_config(roles, ips):
         conf.setdefault(role, {})
         conf[role]['mon addr'] = addr
     # set up standby mds's
-    for roles in roles:
-        for role in roles:
-            if role.startswith('mds.') and role.endswith('-s'):
+    for roles_subset in roles:
+        for role in roles_subset:
+            if role.startswith('mds.'):
                 conf.setdefault(role, {})
-                conf[role]['mds standby for name'] = role[:-2]
+                if role.find('-s-') != -1:
+                    standby_mds = role[role.find('-s-')+3:]
+                    conf[role]['mds standby for name'] = standby_mds
     return conf
 
 def roles_of_type(roles_for_host, type_):
index 2a0b0fd6383b54eef159ffbbf5abe522402f1c53..c84513119cd319428de81868dd6a7c1d788f3d2b 100644 (file)
@@ -41,12 +41,16 @@ class DaemonState(object):
         self.proc = None
         self.log.info('Stopped')
 
-    def restart(self):
+    def restart(self, *args, **kwargs):
         self.log.info('Restarting')
         if self.proc is not None:
             self.log.debug('stopping old one...')
             self.stop()
-        self.proc = self.remote.run(*self.command_args, **self.command_kwargs)
+        cmd_args = list(self.command_args)
+        cmd_args.extend(args)
+        cmd_kwargs = self.command_kwargs
+        cmd_kwargs.update(kwargs)
+        self.proc = self.remote.run(*cmd_args, **cmd_kwargs)
         self.log.info('Started')
 
     def running(self):
@@ -841,7 +845,7 @@ def run_daemon(ctx, config, type_):
         for id_ in teuthology.roles_of_type(roles_for_host, type_):
             name = '%s.%s' % (type_, id_)
 
-            if not id_.endswith('-s'):
+            if not (id_.endswith('-s')) and (id_.find('-s-') == -1):
                 num_active += 1
 
             run_cmd = [
@@ -870,6 +874,7 @@ def run_daemon(ctx, config, type_):
                 run_cmd.extend([ 'env', 'CPUPROFILE=%s' % profile_path ])
 
             run_cmd.extend(run_cmd_tail)
+
             ctx.daemons.add_daemon(remote, type_, id_,
                                    args=run_cmd,
                                    logger=log.getChild(name),
index 51c2f2febc9e761e6dd92913ecc640f733de9109..e0bff247b3eac1bcf7befcbfebc58e23424c7cd8 100644 (file)
@@ -611,3 +611,48 @@ class CephManager:
       if debug:
         self.log('health:\n{h}'.format(h=out))
       return json.loads(out)
+
+    ## metadata servers
+
+    def kill_mds(self, mds):
+        self.ctx.daemons.get_daemon('mds', mds).stop()
+
+    def kill_mds_by_rank(self, rank):
+        status = self.get_mds_status_by_rank(rank)
+        self.ctx.daemons.get_daemon('mds', status['name']).stop()
+
+    def revive_mds(self, mds, standby_for_rank=None):
+        args = []
+        if standby_for_rank:
+          args.extend(['--hot-standby', standby_for_rank])
+        self.ctx.daemons.get_daemon('mds', mds).restart(*args)
+
+    def revive_mds_by_rank(self, rank, standby_for_rank=None):
+        args = []
+        if standby_for_rank:
+          args.extend(['--hot-standby', standby_for_rank])
+        status = self.get_mds_status_by_rank(rank)
+        self.ctx.daemons.get_daemon('mds', status['name']).restart(*args)
+
+    def get_mds_status(self, mds):
+        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
+        j = json.loads(' '.join(out.splitlines()[1:]))
+        # collate; for dup ids, larger gid wins.
+        for info in j['info'].itervalues():
+          if info['name'] == mds:
+              return info
+        return None
+
+    def get_mds_status_by_rank(self, rank):
+        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
+        j = json.loads(' '.join(out.splitlines()[1:]))
+        # collate; for dup ids, larger gid wins.
+        for info in j['info'].itervalues():
+          if info['rank'] == rank:
+              return info
+        return None
+
+    def get_mds_status_all(self):
+        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
+        j = json.loads(' '.join(out.splitlines()[1:]))
+        return j
diff --git a/teuthology/task/mds_thrash.py b/teuthology/task/mds_thrash.py
new file mode 100644 (file)
index 0000000..d143b4f
--- /dev/null
@@ -0,0 +1,309 @@
+import logging
+import contextlib
+import ceph_manager
+import random
+import time
+import gevent
+from teuthology import misc as teuthology
+
+log = logging.getLogger(__name__)
+
+class MDSThrasher:
+  """
+  MDSThrasher::
+
+  The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
+
+  The config is optional.  Many of the config parameters are a a maximum value
+  to use when selecting a random value from a range.  To always use the maximum
+  value, set no_random to true.  The config is a dict containing some or all of:
+
+  seed: [no default] seed the random number generator
+
+  randomize: [default: true] enables randomization and use the max/min values
+
+  max_thrash: [default: 1] the maximum number of MDSs that will be thrashed at
+    any given time.
+
+  max_thrash_delay: [default: 30] maximum number of seconds to delay before
+    thrashing again.
+
+  max_revive_delay: [default: 10] maximum number of seconds to delay before
+    bringing back a thrashed MDS
+
+  thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
+    during replay.  Value should be between 0.0 and 1.0
+
+  max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
+    the replay state before thrashing
+
+  thrash_weights: allows specific MDSs to be thrashed more/less frequently.  This option
+    overrides anything specified by max_thrash.  This option is a dict containing
+    mds.x: weight pairs.  For example, [mds.a: 0.7, mds.b: 0.3, mds.c: 0.0].  Each weight
+    is a value from 0.0 to 1.0.  Any MDSs not specified will be automatically
+    given a weight of 0.0.  For a given MDS, by default the trasher delays for up
+    to max_thrash_delay, trashes, waits for the MDS to recover, and iterates.  If a non-zero
+    weight is specified for an MDS, for each iteration the thrasher chooses whether to thrash
+    during that iteration based on a random value [0-1] not exceeding the weight of that MDS.
+
+  Examples::
+
+
+    The following example sets the likelihood that mds.a will be thrashed
+    to 80%, mds.b to 20%, and other MDSs will not be thrashed.  It also sets the
+    likelihood that an MDS will be thrashed in replay to 40%.
+    Thrash weights do not have to sum to 1.
+
+    tasks:
+    - ceph:
+    - mds_thrash:
+        thrash_weights:
+          - mds.a: 0.8
+          - mds.b: 0.2
+        thrash_in_replay: 0.4
+    - ceph-fuse:
+    - workunit:
+        clients:
+          all: [suites/fsx.sh]
+
+    The following example disables randomization, and uses the max delay values:
+
+    tasks:
+    - ceph:
+    - mds_thrash:
+        max_thrash_delay: 10
+        max_revive_delay: 1
+        max_replay_thrash_delay: 4
+
+  """
+  def __init__(self, ctx, manager, config, logger, failure_group, weight):
+    self.ctx = ctx
+    self.manager = manager
+    self.manager.wait_for_clean()
+
+    self.stopping = False
+    self.logger = logger
+    self.config = config
+
+    self.randomize = bool(self.config.get('randomize', True))
+    self.max_thrash_delay = float(self.config.get('thrash_delay', 30.0))
+    self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
+    assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(v=self.thrash_in_replay)
+
+    self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
+
+    self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
+
+    self.thread = gevent.spawn(self.do_thrash)
+
+    self.failure_group = failure_group
+    self.weight = weight
+
+  def log(self, x):
+    self.logger.info(x)
+
+  def do_join(self):
+    self.stopping = True
+    self.thread.get()
+
+  def do_thrash(self):
+    self.log('starting mds_do_thrash for failure group: ' + ', '.join(['mds.{_id}'.format(_id=_f) for _f in self.failure_group]))
+    while not self.stopping:
+      delay = self.max_thrash_delay
+      if self.randomize:
+          delay = random.randrange(0.0, self.max_thrash_delay)
+
+      if delay > 0.0:
+        self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
+        time.sleep(delay)
+
+      skip = random.randrange(0.0, 1.0)
+      if self.weight < 1.0 and skip > self.weight:
+          self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
+          continue
+
+      # find the active mds in the failure group
+      statuses = [self.manager.get_mds_status(m) for m in self.failure_group]
+      actives = filter(lambda s: s['state'] == 'up:active', statuses)
+      assert len(actives) == 1, 'Can only have one active in a failure group'
+
+      active_mds = actives[0]['name']
+      active_rank = actives[0]['rank']
+
+      self.log('kill mds.{id} (rank={r})'.format(id=active_mds, r=active_rank))
+      self.manager.kill_mds_by_rank(active_rank)
+
+      # wait for mon to report killed mds as crashed
+      status = {}
+      last_laggy_since = None
+      while True:
+        failed = self.manager.get_mds_status_all()['failed']
+        status = self.manager.get_mds_status(active_mds)
+        if not status:
+            break
+        if 'laggy_since' in status:
+            last_laggy_since = status['laggy_since']
+            break
+        if any([(f == active_mds) for f in failed]):
+            break
+        self.log('waiting till mds map indicates mds.{_id} is laggy/crashed, in failed state, or mds.{_id} is removed from mdsmap'.format(_id=active_mds))
+        time.sleep(2)
+      if last_laggy_since:
+        self.log('mds.{_id} reported laggy/crashed since: {since}'.format(_id=active_mds, since=last_laggy_since))
+      else:
+        self.log('mds.{_id} down, removed from mdsmap'.format(_id=active_mds, since=last_laggy_since))
+
+      # wait for a standby mds to takeover and become active
+      takeover_mds = None
+      takeover_rank = None
+      while True:
+        statuses = [self.manager.get_mds_status(m) for m in self.failure_group]
+        actives = filter(lambda s: s and s['state'] == 'up:active', statuses)
+        if len(actives) > 0:
+            assert len(actives) == 1, 'Can only have one active in failure group'
+            takeover_mds = actives[0]['name']
+            takeover_rank = actives[0]['rank']
+            break
+
+      self.log('New active mds is mds.{_id}'.format(_id=takeover_mds))
+
+      # wait for a while before restarting old active to become new
+      # standby
+      delay = self.max_revive_delay
+      if self.randomize:
+          delay = random.randrange(0.0, self.max_revive_delay)
+
+      self.log('waiting for {delay} secs before reviving mds.{id}'.format(
+        delay=delay,id=active_mds))
+      time.sleep(delay)
+
+      self.log('reviving mds.{id}'.format(id=active_mds))
+      self.manager.revive_mds(active_mds, standby_for_rank=takeover_rank)
+
+      status = {}
+      while True:
+        status = self.manager.get_mds_status(active_mds)
+        if status and (status['state'] == 'up:standby' or status['state'] == 'up:standby-replay'):
+            break
+        self.log('waiting till mds map indicates mds.{_id} is in standby or standby-replay'.format(_id=active_mds))
+        time.sleep(2)
+      self.log('mds.{_id} reported in {state} state'.format(_id=active_mds, state=status['state']))
+
+      # don't do replay thrashing right now
+      continue
+      # this might race with replay -> active transition...
+      if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
+
+        delay = self.max_replay_thrash_delay
+        if self.randomize:
+          delay = random.randrange(0.0, self.max_replay_thrash_delay)
+        time.sleep(delay)
+        self.log('kill replaying mds.{id}'.format(id=self.to_kill))
+        self.manager.kill_mds(self.to_kill)
+
+        delay = self.max_revive_delay
+        if self.randomize:
+            delay = random.randrange(0.0, self.max_revive_delay)
+
+        self.log('waiting for {delay} secs before reviving mds.{id}'.format(
+            delay=delay,id=self.to_kill))
+        time.sleep(delay)
+
+        self.log('revive mds.{id}'.format(id=self.to_kill))
+        self.manager.revive_mds(self.to_kill)
+
+@contextlib.contextmanager
+def task(ctx, config):
+  """
+  Stress test the mds by thrashing while another task/workunit
+  is running.
+
+  Please refer to MDSThrasher class for further information on the
+  available options.
+  """
+  if config is None:
+    config = {}
+  assert isinstance(config, dict), \
+      'mds_thrash task only accepts a dict for configuration'
+  mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
+  assert len(mdslist) > 1, \
+      'mds_thrash task requires at least 2 metadata servers'
+
+  # choose random seed
+  seed = None
+  if 'seed' in config:
+      seed = int(config['seed'])
+  else:
+      seed = int(time.time())
+  log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
+  random.seed(seed)
+
+  max_thrashers = config.get('max_thrash', 1)
+  managers = {}
+  thrashers = {}
+
+  (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
+  manager = ceph_manager.CephManager(
+    first, ctx=ctx, logger=log.getChild('ceph_manager'),
+    )
+
+  statuses = {m : manager.get_mds_status(m) for m in mdslist}
+  statuses_by_rank = {s['rank'] : s for (_,s) in statuses.iteritems()}
+
+  log.info('Wait for all MDSs to reach steady state...')
+  # make sure everyone is in active, standby, or standby-replay
+  while True:
+      ready = filter(lambda (_,s): s['state'] == 'up:active'
+                        or s['state'] == 'up:standby'
+                        or s['state'] == 'up:standby-replay',
+                     statuses.items())
+      if len(ready) == len(statuses):
+          break
+      time.sleep(2)
+  log.info('Ready to start thrashing')
+
+  # setup failure groups
+  failure_groups = {}
+  actives = {s['name'] : s for (_,s) in statuses.iteritems() if s['state'] == 'up:active'}
+  log.info('Actives is: {d}'.format(d=actives))
+  log.info('Statuses is: {d}'.format(d=statuses_by_rank))
+  for active in actives:
+      for (r,s) in statuses.iteritems():
+          if s['standby_for_name'] == active:
+              if not active in failure_groups:
+                  failure_groups[active] = []
+              log.info('Assigning mds rank {r} to failure group {g}'.format(r=r, g=active))
+              failure_groups[active].append(r)
+
+  for (active,standbys) in failure_groups.iteritems():
+
+      weight = 1.0
+      if 'thrash_weights' in config:
+          weight = int(config['thrash_weights'].get('mds.{_id}'.format(_id=active), '0.0'))
+
+      failure_group = [active]
+      failure_group.extend(standbys)
+      thrashers[active] = MDSThrasher(
+          ctx, manager, config,
+          logger=log.getChild('mds_thrasher.failure_group.[{a}, {sbs}]'.format(
+                                    a=active,
+                                    sbs=', '.join(standbys)
+                                    )
+                                ),
+          failure_group=failure_group,
+          weight=weight)
+      # if thrash_weights isn't specified and we've reached max_thrash,
+      # we're done
+      if not 'thrash_weights' in config and len(thrashers) == max_thrashers:
+          break
+
+  try:
+    log.debug('Yielding')
+    yield
+  finally:
+    log.info('joining mds_thrashers')
+    for t in thrashers:
+      log.info('join thrasher for failure group [{fg}]'.format(fg=', '.join(failure_group))
+                )
+      thrashers[t].do_join()
+    log.info('done joining')