import ceph_manager
import itertools
import random
+import signal
import time
from gevent import sleep
log = logging.getLogger(__name__)
+class DaemonWatchdog(Greenlet):
+ """
+ DaemonWatchdog::
+
+ Watch Ceph daemons for failures. If an extended failure is detected (i.e.
+ not intentional), then the watchdog will unmount file systems and send
+ SIGTERM to all daemons. The duration of an extended failure is configurable
+ with watchdog_daemon_timeout.
+
+ watchdog_daemon_timeout [default: 300]: number of seconds a daemon
+ is allowed to be failed before the watchdog will bark.
+ """
+
+ def __init__(self, ctx, manager, config, thrashers):
+ Greenlet.__init__(self)
+ self.ctx = ctx
+ self.config = config
+ self.e = None
+ self.logger = log.getChild('daemon_watchdog')
+ self.manager = manager
+ self.name = 'watchdog'
+ self.stopping = Event()
+ self.thrashers = thrashers
+
+ def _run(self):
+ try:
+ self.watch()
+ except Exception as e:
+ # See _run exception comment for MDSThrasher
+ self.e = e
+ self.logger.exception("exception:")
+ # allow successful completion so gevent doesn't see an exception...
+
+ def log(self, x):
+ """Write data to logger"""
+ self.logger.info(x)
+
+ def stop(self):
+ self.stopping.set()
+
+ def bark(self):
+ self.log("BARK! unmounting mounts and killing all daemons")
+ for mount in self.ctx.mounts.values():
+ try:
+ mount.umount_wait(force=True)
+ except:
+ self.logger.exception("ignoring exception:")
+ daemons = []
+ daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
+ daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
+ for daemon in daemons:
+ try:
+ daemon.signal(signal.SIGTERM)
+ except:
+ self.logger.exception("ignoring exception:")
+
+ def watch(self):
+ self.log("watchdog starting")
+ daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
+ daemon_failure_time = {}
+ while not self.stopping.is_set():
+ bark = False
+ now = time.time()
+
+ mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
+ mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
+ clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
+
+ #for daemon in mons:
+ # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
+ #for daemon in mdss:
+ # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
+
+ daemon_failures = []
+ daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
+ daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
+ for daemon in daemon_failures:
+ name = daemon.role + '.' + daemon.id_
+ dt = daemon_failure_time.setdefault(name, (daemon, now))
+ assert dt[0] is daemon
+ delta = now-dt[1]
+ self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
+ if delta > daemon_timeout:
+ bark = True
+
+ # If a daemon is no longer failed, remove it from tracking:
+ for name in daemon_failure_time.keys():
+ if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
+ self.log("daemon {name} has been restored".format(name=name))
+ del daemon_failure_time[name]
+
+ for thrasher in self.thrashers:
+ if thrasher.e is not None:
+ self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
+ bark = True
+
+ if bark:
+ self.bark()
+ return
+
+ sleep(5)
+
+ self.log("watchdog finished")
class MDSThrasher(Greenlet):
"""
"""
- def __init__(self, ctx, manager, config, logger, fs, max_mds):
- super(MDSThrasher, self).__init__()
+ def __init__(self, ctx, manager, config, fs, max_mds):
+ Greenlet.__init__(self)
- self.ctx = ctx
- self.manager = manager
- assert self.manager.is_clean()
self.config = config
- self.logger = logger
+ self.ctx = ctx
+ self.e = None
+ self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
self.fs = fs
+ self.manager = manager
self.max_mds = max_mds
-
+ self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
self.stopping = Event()
self.randomize = bool(self.config.get('randomize', True))
def _run(self):
try:
self.do_thrash()
- except:
- # Log exceptions here so we get the full backtrace (it's lost
- # by the time someone does a .get() on this greenlet)
- self.logger.exception("Exception in do_thrash:")
- raise
+ except Exception as e:
+ # Log exceptions here so we get the full backtrace (gevent loses them).
+ # Also allow succesful completion as gevent exception handling is a broken mess:
+ #
+ # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
+ # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
+ # self.print_exception(context, type, value, tb)
+ # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
+ # traceback.print_exception(type, value, tb, file=errstream)
+ # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
+ # _print(file, 'Traceback (most recent call last):')
+ # File "/usr/lib/python2.7/traceback.py", line 13, in _print
+ # file.write(str+terminator)
+ # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
+ self.e = e
+ self.logger.exception("exception:")
+ # allow successful completion so gevent doesn't see an exception...
def log(self, x):
"""Write data to logger assigned to this MDThrasher"""
self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
else:
if rank is not None:
- if len(actives) >= max_mds:
- # no replacement can occur!
- return status
try:
info = status.get_rank(self.fs.id, rank)
if info['gid'] != gid and "up:active" == info['state']:
return status
except:
pass # no rank present
+ if len(actives) >= max_mds:
+ # no replacement can occur!
+ self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
+ return status
else:
if len(actives) >= max_mds:
self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
if itercount > 300/2: # 5 minutes
raise RuntimeError('timeout waiting for cluster to stabilize')
elif itercount % 5 == 0:
- self.log('mds map: {status}'.format(status=self.fs.status()))
+ self.log('mds map: {status}'.format(status=status))
+ else:
+ self.log('no change')
sleep(2)
def do_thrash(self):
status = mds_cluster.status()
log.info('Ready to start thrashing')
+ thrashers = []
+
+ watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
+ watchdog.start()
+
manager.wait_for_clean()
- thrashers = {}
+ assert manager.is_clean()
for fs in status.get_filesystems():
- name = fs['mdsmap']['fs_name']
- log.info('Running thrasher against FS {f}'.format(f = name))
- thrasher = MDSThrasher(
- ctx, manager, config,
- log.getChild('fs.[{f}]'.format(f = name)),
- Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds']
- )
+ thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
thrasher.start()
- thrashers[name] = thrasher
+ thrashers.append(thrasher)
try:
log.debug('Yielding')
yield
finally:
log.info('joining mds_thrashers')
- for name in thrashers:
- log.info('join thrasher mds_thrasher.fs.[{f}]'.format(f=name))
- thrashers[name].stop()
- thrashers[name].get() # Raise any exception from _run()
- thrashers[name].join()
+ for thrasher in thrashers:
+ thrasher.stop()
+ if thrasher.e:
+ raise RuntimeError('error during thrashing')
+ thrasher.join()
log.info('done joining')
+
+ watchdog.stop()
+ watchdog.join()