From eb8108cc5a02f9698c997cb985a9c2bef39862ad Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Mon, 21 Jul 2025 13:25:34 +0100 Subject: [PATCH] qa: Adding a new class for the daemonwatchdog to monitor Adds a new WatchedProcess class for the daemon watchdog to monitor during testing, and stop in the event of a bark. The DaemonWatchdog is also updated to monitor the WatchedProcesses. Signed-off-by: Chris Harris --- qa/tasks/ceph.py | 3 +- qa/tasks/daemonwatchdog.py | 60 +++++++++++++++++++++++----- qa/tasks/rados.py | 78 ++++++++++++++++++++++++++++++------- qa/tasks/watched_process.py | 45 +++++++++++++++++++++ 4 files changed, 163 insertions(+), 23 deletions(-) create mode 100644 qa/tasks/watched_process.py diff --git a/qa/tasks/ceph.py b/qa/tasks/ceph.py index 819154c98db..ea860a135f9 100644 --- a/qa/tasks/ceph.py +++ b/qa/tasks/ceph.py @@ -528,7 +528,8 @@ def cephfs_setup(ctx, config): @contextlib.contextmanager def watchdog_setup(ctx, config): ctx.ceph[config['cluster']].thrashers = [] - ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config, ctx.ceph[config['cluster']].thrashers) + ctx.ceph[config["cluster"]].watched_processes = [] + ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config) ctx.ceph[config['cluster']].watchdog.start() yield diff --git a/qa/tasks/daemonwatchdog.py b/qa/tasks/daemonwatchdog.py index 234a26e10ea..7003a6b7985 100644 --- a/qa/tasks/daemonwatchdog.py +++ b/qa/tasks/daemonwatchdog.py @@ -8,14 +8,38 @@ from gevent.event import Event log = logging.getLogger(__name__) +class BarkError(Exception): + """ + An exception for the watchdog bark. We can use this as an exception + higer up in the stack to give a reason why the test failed + """ + + def __init__(self, bark_reason): + self.message = f"The WatchDog barked due to {bark_reason}" + super().__init__(self.message) + class DaemonWatchdog(Greenlet): """ DaemonWatchdog:: - Watch Ceph daemons for failures. If an extended failure is detected (i.e. - not intentional), then the watchdog will unmount file systems and send - SIGTERM to all daemons. The duration of an extended failure is configurable - with watchdog_daemon_timeout. + This process monitors 3 classes of running processes for failures: + 1. Ceph daemons e.g OSDs + 2. Thrashers - These are test processes used to create errors during + testing e.g. RBDMirrorThrasher in qa/tasks/rbd_mirror_thrash.py + 3. Processes - Any WatchedProcess that is running as part of a test suite. + These processses are typically things like I/O exercisers + e.g. CephTestRados in qa/tasks/rados.py + + If an extended failure is detected in a daemon process (i.e. not intentional), then + the watchdog will bark. It will also bark is an assert is raised during the running + of any monitored Thrashers or WatchedProcesses + + When the watchdog barks it will + - unmount file systems and send SIGTERM to all daemons. + - stop all thrasher processes by calling their stop_and_join() method + - stop any watched processes by calling their stop() method + + The duration of an extended failure is configurable with watchdog_daemon_timeout. ceph: watchdog: @@ -26,7 +50,7 @@ class DaemonWatchdog(Greenlet): watchdog will bark. """ - def __init__(self, ctx, config, thrashers): + def __init__(self, ctx, config): super(DaemonWatchdog, self).__init__() self.config = ctx.config.get('watchdog', {}) self.ctx = ctx @@ -35,7 +59,8 @@ class DaemonWatchdog(Greenlet): self.cluster = config.get('cluster', 'ceph') self.name = 'watchdog' self.stopping = Event() - self.thrashers = thrashers + self.thrashers = ctx.ceph[config["cluster"]].thrashers + self.watched_processes = ctx.ceph[config["cluster"]].watched_processes def _run(self): try: @@ -53,7 +78,7 @@ class DaemonWatchdog(Greenlet): def stop(self): self.stopping.set() - def bark(self): + def bark(self, reason): self.log("BARK! unmounting mounts and killing all daemons") if hasattr(self.ctx, 'mounts'): for mount in self.ctx.mounts.values(): @@ -74,11 +99,21 @@ class DaemonWatchdog(Greenlet): except: self.logger.exception("ignoring exception:") + for thrasher in self.thrashers: + self.log("Killing thrasher {name}".format(name=thrasher.name)) + thrasher.stop_and_join() + + for proc in self.watched_processes: + self.log("Killing remote process {process_id}".format(process_id=proc.id)) + proc.set_exception(BarkError(reason)) + proc.stop() + def watch(self): self.log("watchdog starting") daemon_timeout = int(self.config.get('daemon_timeout', 300)) daemon_restart = self.config.get('daemon_restart', False) daemon_failure_time = {} + bark_reason: str = "" while not self.stopping.is_set(): bark = False now = time.time() @@ -104,6 +139,7 @@ class DaemonWatchdog(Greenlet): self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta)) if delta > daemon_timeout: bark = True + bark_reason = f"Daemon {name} has failed" if daemon_restart == 'normal' and daemon.proc.exitstatus == 0: self.log(f"attempting to restart daemon {name}") daemon.restart() @@ -117,11 +153,17 @@ class DaemonWatchdog(Greenlet): for thrasher in self.thrashers: if thrasher.exception is not None: self.log("{name} failed".format(name=thrasher.name)) - thrasher.stop_and_join() + bark_reason = f"Thrasher {name} threw exception {thrasher.exception}" + bark = True + + for proc in self.watched_processes: + if proc.exception is not None: + self.log("Remote process %s failed" % proc.id) + bark_reason = f"Remote process {proc.id} threw exception {proc.exception}" bark = True if bark: - self.bark() + self.bark(bark_reason) return sleep(5) diff --git a/qa/tasks/rados.py b/qa/tasks/rados.py index 96bcc770511..89e0b4dbcf7 100644 --- a/qa/tasks/rados.py +++ b/qa/tasks/rados.py @@ -1,16 +1,53 @@ """ Rados modle-based integration tests """ + import contextlib import logging -import gevent -from teuthology import misc as teuthology +from typing import Any, Dict +import gevent +from teuthology import misc as teuthology from teuthology.orchestra import run +from .watched_process import WatchedProcess + log = logging.getLogger(__name__) + +class CephTestRados(WatchedProcess): + """ + The WatchedProcess class for ceph_test_rados. This allows us to monitor + any ceph_test_rados processes for error, and to kill the remote processes when + the DaemonWatchdog barks. + + It also raises the assert from the watchdog so that the failure reason shown in + the test result is the reason the watchdog barked. + """ + + def __init__(self, ctx: Dict[Any, Any], config: Dict[Any, Any], cluster: str, sub_processes: Dict[str, Any]): + super(CephTestRados, self).__init__() + + self._ctx = ctx + self._config = config + self._cluster: str = cluster + self._sub_processes = sub_processes + self._name: str = f"ceph-test-rados-{self._cluster}" + + @property + def id(self) -> str: + return self._name + + def stop(self) -> None: + debug: str = f"Stopping {self._name}" + if self._exception: + debug += f" due to exception {self._exception}" + log.debug(debug) + for test_id, proc in self._sub_processes.items(): + log.info("Stopping instance %s", test_id) + proc.stdin.close() + @contextlib.contextmanager def task(ctx, config): """ @@ -33,10 +70,10 @@ def task(ctx, config): fast_read: enable ec_pool's fast_read min_size: set the min_size of created pool pool_snaps: use pool snapshots instead of selfmanaged snapshots - write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED. - This mean data don't access in the near future. - Let osd backend don't keep data in cache. - pct_update_delay: delay before primary propogates pct on write pause, + write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED. + This mean data don't access in the near future. + Let osd backend don't keep data in cache. + pct_update_delay: delay before primary propogates pct on write pause, defaults to 5s if balance_reads is set For example:: @@ -67,7 +104,7 @@ def task(ctx, config): m: 1 crush-failure-domain: osd pool_snaps: true - write_fadvise_dontneed: true + write_fadvise_dontneed: true runs: 10 - interactive: @@ -146,6 +183,8 @@ def task(ctx, config): 'adjust-ulimits', 'ceph-coverage', '{tdir}/archive/coverage'.format(tdir=testdir), + 'daemon-helper', + 'kill', 'ceph_test_rados'] if config.get('ec_pool', False): args.extend(['--no-omap']) @@ -232,7 +271,7 @@ def task(ctx, config): args.extend([ '--op', op, str(weight) ]) - + def thread(): """Thread spawned by gevent""" @@ -253,6 +292,8 @@ def task(ctx, config): profile_name = None crush_name = None + cluster = config.get("cluster", "ceph") + for i in range(int(config.get('runs', '1'))): log.info("starting run %s out of %s", str(i), config.get('runs', '1')) tests = {} @@ -272,7 +313,7 @@ def task(ctx, config): erasure_code_profile_name=profile_name, erasure_code_crush_rule_name=crush_name, erasure_code_use_overwrites= - config.get('erasure_code_use_overwrites', False) + config.get('erasure_code_use_overwrites', False), ) created_pools.append(pool) if config.get('fast_read', False): @@ -281,8 +322,8 @@ def task(ctx, config): if pct_update_delay: manager.raw_cluster_cmd( 'osd', 'pool', 'set', pool, - 'pct_update_delay', str(pct_update_delay)); - min_size = config.get('min_size', None); + 'pct_update_delay', str(pct_update_delay)) + min_size = config.get('min_size', None) if min_size is not None: manager.raw_cluster_cmd( 'osd', 'pool', 'set', pool, 'min_size', str(min_size)) @@ -296,17 +337,28 @@ def task(ctx, config): wait=False ) tests[id_] = proc + + watched_process: CephTestRados = CephTestRados(ctx, config, cluster, tests) + ctx.ceph[cluster].watched_processes.append(watched_process) + try: + run.wait(tests.values()) + except Exception as e: + watched_process.set_exception(e) + run.wait(tests.values()) wait_for_all_active_clean_pgs = config.get("wait_for_all_active_clean_pgs", False) # usually set when we do min_size testing. - if wait_for_all_active_clean_pgs: + if wait_for_all_active_clean_pgs: # Make sure we finish the test first before deleting the pool. # Mainly used for test_pool_min_size manager.wait_for_clean() manager.wait_for_all_osds_up(timeout=1800) + if watched_process.exception: + raise watched_process.exception + for pool in created_pools: - manager.wait_snap_trimming_complete(pool); + manager.wait_snap_trimming_complete(pool) manager.remove_pool(pool) running = gevent.spawn(thread) diff --git a/qa/tasks/watched_process.py b/qa/tasks/watched_process.py new file mode 100644 index 00000000000..d468cec18a4 --- /dev/null +++ b/qa/tasks/watched_process.py @@ -0,0 +1,45 @@ +""" +WatchedProcess process base class. + +This can be applied to an object that we want the DaemonWatchdog to monitor +for failure, and bark when it sees an error +""" + +from abc import ABCMeta, abstractmethod +from typing import Optional + +from gevent.greenlet import Greenlet + + +class WatchedProcess(Greenlet, metaclass=ABCMeta): + """ + WatchedProcess: + + The abstract base class for a process that the DaemonWatchdog can monitor + for errors. It is based on the ThrasherGreenlet class in qa/tasks/thrasher.py + """ + def __init__(self) -> None: + self._exception: Optional[BaseException] = None + + @property + def exception(self) -> Optional[BaseException]: + return self._exception + + @property + @abstractmethod + def id(self) -> str: + """ + Return a string identifier for this process + """ + + def set_exception(self, e: Exception) -> None: + """ + Set the exception for this process + """ + self._exception = e + + @abstractmethod + def stop(self) -> None: + """ + Stop the remote process running + """ -- 2.39.5