Adds a new WatchedProcess class for the daemon watchdog to monitor during testing, and stop in the event of a bark.
The DaemonWatchdog is also updated to monitor the WatchedProcesses.
Signed-off-by: Chris Harris <harriscr@uk.ibm.com>
@contextlib.contextmanager
def watchdog_setup(ctx, config):
ctx.ceph[config['cluster']].thrashers = []
- ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config, ctx.ceph[config['cluster']].thrashers)
+ ctx.ceph[config["cluster"]].watched_processes = []
+ ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config)
ctx.ceph[config['cluster']].watchdog.start()
yield
log = logging.getLogger(__name__)
+class BarkError(Exception):
+ """
+ An exception for the watchdog bark. We can use this as an exception
+ higer up in the stack to give a reason why the test failed
+ """
+
+ def __init__(self, bark_reason):
+ self.message = f"The WatchDog barked due to {bark_reason}"
+ super().__init__(self.message)
+
class DaemonWatchdog(Greenlet):
"""
DaemonWatchdog::
- Watch Ceph daemons for failures. If an extended failure is detected (i.e.
- not intentional), then the watchdog will unmount file systems and send
- SIGTERM to all daemons. The duration of an extended failure is configurable
- with watchdog_daemon_timeout.
+ This process monitors 3 classes of running processes for failures:
+ 1. Ceph daemons e.g OSDs
+ 2. Thrashers - These are test processes used to create errors during
+ testing e.g. RBDMirrorThrasher in qa/tasks/rbd_mirror_thrash.py
+ 3. Processes - Any WatchedProcess that is running as part of a test suite.
+ These processses are typically things like I/O exercisers
+ e.g. CephTestRados in qa/tasks/rados.py
+
+ If an extended failure is detected in a daemon process (i.e. not intentional), then
+ the watchdog will bark. It will also bark is an assert is raised during the running
+ of any monitored Thrashers or WatchedProcesses
+
+ When the watchdog barks it will
+ - unmount file systems and send SIGTERM to all daemons.
+ - stop all thrasher processes by calling their stop_and_join() method
+ - stop any watched processes by calling their stop() method
+
+ The duration of an extended failure is configurable with watchdog_daemon_timeout.
ceph:
watchdog:
watchdog will bark.
"""
- def __init__(self, ctx, config, thrashers):
+ def __init__(self, ctx, config):
super(DaemonWatchdog, self).__init__()
self.config = ctx.config.get('watchdog', {})
self.ctx = ctx
self.cluster = config.get('cluster', 'ceph')
self.name = 'watchdog'
self.stopping = Event()
- self.thrashers = thrashers
+ self.thrashers = ctx.ceph[config["cluster"]].thrashers
+ self.watched_processes = ctx.ceph[config["cluster"]].watched_processes
def _run(self):
try:
def stop(self):
self.stopping.set()
- def bark(self):
+ def bark(self, reason):
self.log("BARK! unmounting mounts and killing all daemons")
if hasattr(self.ctx, 'mounts'):
for mount in self.ctx.mounts.values():
except:
self.logger.exception("ignoring exception:")
+ for thrasher in self.thrashers:
+ self.log("Killing thrasher {name}".format(name=thrasher.name))
+ thrasher.stop_and_join()
+
+ for proc in self.watched_processes:
+ self.log("Killing remote process {process_id}".format(process_id=proc.id))
+ proc.set_exception(BarkError(reason))
+ proc.stop()
+
def watch(self):
self.log("watchdog starting")
daemon_timeout = int(self.config.get('daemon_timeout', 300))
daemon_restart = self.config.get('daemon_restart', False)
daemon_failure_time = {}
+ bark_reason: str = ""
while not self.stopping.is_set():
bark = False
now = time.time()
self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
if delta > daemon_timeout:
bark = True
+ bark_reason = f"Daemon {name} has failed"
if daemon_restart == 'normal' and daemon.proc.exitstatus == 0:
self.log(f"attempting to restart daemon {name}")
daemon.restart()
for thrasher in self.thrashers:
if thrasher.exception is not None:
self.log("{name} failed".format(name=thrasher.name))
- thrasher.stop_and_join()
+ bark_reason = f"Thrasher {name} threw exception {thrasher.exception}"
+ bark = True
+
+ for proc in self.watched_processes:
+ if proc.exception is not None:
+ self.log("Remote process %s failed" % proc.id)
+ bark_reason = f"Remote process {proc.id} threw exception {proc.exception}"
bark = True
if bark:
- self.bark()
+ self.bark(bark_reason)
return
sleep(5)
"""
Rados modle-based integration tests
"""
+
import contextlib
import logging
-import gevent
-from teuthology import misc as teuthology
+from typing import Any, Dict
+import gevent
+from teuthology import misc as teuthology
from teuthology.orchestra import run
+from .watched_process import WatchedProcess
+
log = logging.getLogger(__name__)
+
+class CephTestRados(WatchedProcess):
+ """
+ The WatchedProcess class for ceph_test_rados. This allows us to monitor
+ any ceph_test_rados processes for error, and to kill the remote processes when
+ the DaemonWatchdog barks.
+
+ It also raises the assert from the watchdog so that the failure reason shown in
+ the test result is the reason the watchdog barked.
+ """
+
+ def __init__(self, ctx: Dict[Any, Any], config: Dict[Any, Any], cluster: str, sub_processes: Dict[str, Any]):
+ super(CephTestRados, self).__init__()
+
+ self._ctx = ctx
+ self._config = config
+ self._cluster: str = cluster
+ self._sub_processes = sub_processes
+ self._name: str = f"ceph-test-rados-{self._cluster}"
+
+ @property
+ def id(self) -> str:
+ return self._name
+
+ def stop(self) -> None:
+ debug: str = f"Stopping {self._name}"
+ if self._exception:
+ debug += f" due to exception {self._exception}"
+ log.debug(debug)
+ for test_id, proc in self._sub_processes.items():
+ log.info("Stopping instance %s", test_id)
+ proc.stdin.close()
+
@contextlib.contextmanager
def task(ctx, config):
"""
fast_read: enable ec_pool's fast_read
min_size: set the min_size of created pool
pool_snaps: use pool snapshots instead of selfmanaged snapshots
- write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED.
- This mean data don't access in the near future.
- Let osd backend don't keep data in cache.
- pct_update_delay: delay before primary propogates pct on write pause,
+ write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED.
+ This mean data don't access in the near future.
+ Let osd backend don't keep data in cache.
+ pct_update_delay: delay before primary propogates pct on write pause,
defaults to 5s if balance_reads is set
For example::
m: 1
crush-failure-domain: osd
pool_snaps: true
- write_fadvise_dontneed: true
+ write_fadvise_dontneed: true
runs: 10
- interactive:
'adjust-ulimits',
'ceph-coverage',
'{tdir}/archive/coverage'.format(tdir=testdir),
+ 'daemon-helper',
+ 'kill',
'ceph_test_rados']
if config.get('ec_pool', False):
args.extend(['--no-omap'])
args.extend([
'--op', op, str(weight)
])
-
+
def thread():
"""Thread spawned by gevent"""
profile_name = None
crush_name = None
+ cluster = config.get("cluster", "ceph")
+
for i in range(int(config.get('runs', '1'))):
log.info("starting run %s out of %s", str(i), config.get('runs', '1'))
tests = {}
erasure_code_profile_name=profile_name,
erasure_code_crush_rule_name=crush_name,
erasure_code_use_overwrites=
- config.get('erasure_code_use_overwrites', False)
+ config.get('erasure_code_use_overwrites', False),
)
created_pools.append(pool)
if config.get('fast_read', False):
if pct_update_delay:
manager.raw_cluster_cmd(
'osd', 'pool', 'set', pool,
- 'pct_update_delay', str(pct_update_delay));
- min_size = config.get('min_size', None);
+ 'pct_update_delay', str(pct_update_delay))
+ min_size = config.get('min_size', None)
if min_size is not None:
manager.raw_cluster_cmd(
'osd', 'pool', 'set', pool, 'min_size', str(min_size))
wait=False
)
tests[id_] = proc
+
+ watched_process: CephTestRados = CephTestRados(ctx, config, cluster, tests)
+ ctx.ceph[cluster].watched_processes.append(watched_process)
+ try:
+ run.wait(tests.values())
+ except Exception as e:
+ watched_process.set_exception(e)
+
run.wait(tests.values())
wait_for_all_active_clean_pgs = config.get("wait_for_all_active_clean_pgs", False)
# usually set when we do min_size testing.
- if wait_for_all_active_clean_pgs:
+ if wait_for_all_active_clean_pgs:
# Make sure we finish the test first before deleting the pool.
# Mainly used for test_pool_min_size
manager.wait_for_clean()
manager.wait_for_all_osds_up(timeout=1800)
+ if watched_process.exception:
+ raise watched_process.exception
+
for pool in created_pools:
- manager.wait_snap_trimming_complete(pool);
+ manager.wait_snap_trimming_complete(pool)
manager.remove_pool(pool)
running = gevent.spawn(thread)
--- /dev/null
+"""
+WatchedProcess process base class.
+
+This can be applied to an object that we want the DaemonWatchdog to monitor
+for failure, and bark when it sees an error
+"""
+
+from abc import ABCMeta, abstractmethod
+from typing import Optional
+
+from gevent.greenlet import Greenlet
+
+
+class WatchedProcess(Greenlet, metaclass=ABCMeta):
+ """
+ WatchedProcess:
+
+ The abstract base class for a process that the DaemonWatchdog can monitor
+ for errors. It is based on the ThrasherGreenlet class in qa/tasks/thrasher.py
+ """
+ def __init__(self) -> None:
+ self._exception: Optional[BaseException] = None
+
+ @property
+ def exception(self) -> Optional[BaseException]:
+ return self._exception
+
+ @property
+ @abstractmethod
+ def id(self) -> str:
+ """
+ Return a string identifier for this process
+ """
+
+ def set_exception(self, e: Exception) -> None:
+ """
+ Set the exception for this process
+ """
+ self._exception = e
+
+ @abstractmethod
+ def stop(self) -> None:
+ """
+ Stop the remote process running
+ """