]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa: Adding a new class for the daemonwatchdog to monitor
authorChris Harris <harriscr@uk.ibm.com>
Mon, 21 Jul 2025 12:25:34 +0000 (13:25 +0100)
committerJon <jonathan.bailey1@ibm.com>
Fri, 3 Oct 2025 13:31:28 +0000 (14:31 +0100)
Adds a new WatchedProcess class for the daemon watchdog to monitor during testing, and stop in the event of a bark.
The DaemonWatchdog is also updated to monitor the WatchedProcesses.

Signed-off-by: Chris Harris <harriscr@uk.ibm.com>
qa/tasks/ceph.py
qa/tasks/daemonwatchdog.py
qa/tasks/rados.py
qa/tasks/watched_process.py [new file with mode: 0644]

index 819154c98db87307881c0f1e3781925e12ce184e..ea860a135f9911c6f40188cdf4a7afee0889dc21 100644 (file)
@@ -528,7 +528,8 @@ def cephfs_setup(ctx, config):
 @contextlib.contextmanager
 def watchdog_setup(ctx, config):
     ctx.ceph[config['cluster']].thrashers = []
-    ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config, ctx.ceph[config['cluster']].thrashers)
+    ctx.ceph[config["cluster"]].watched_processes = []
+    ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config)
     ctx.ceph[config['cluster']].watchdog.start()
     yield
 
index 234a26e10ea24df5cb7476c810d09866626174af..7003a6b798598ea4950e979a160b002babb4437c 100644 (file)
@@ -8,14 +8,38 @@ from gevent.event import Event
 
 log = logging.getLogger(__name__)
 
+class BarkError(Exception):
+    """
+    An exception for the watchdog bark. We can use this as an exception
+    higer up in the stack to give a reason why the test failed
+    """
+
+    def __init__(self, bark_reason):
+        self.message = f"The WatchDog barked due to {bark_reason}"
+        super().__init__(self.message)
+
 class DaemonWatchdog(Greenlet):
     """
     DaemonWatchdog::
 
-    Watch Ceph daemons for failures. If an extended failure is detected (i.e.
-    not intentional), then the watchdog will unmount file systems and send
-    SIGTERM to all daemons. The duration of an extended failure is configurable
-    with watchdog_daemon_timeout.
+    This process monitors 3 classes of running processes for failures:
+    1. Ceph daemons e.g OSDs
+    2. Thrashers - These are test processes used to create errors during
+                   testing e.g. RBDMirrorThrasher in qa/tasks/rbd_mirror_thrash.py
+    3. Processes - Any WatchedProcess that is running as part of a test suite.
+                   These processses are typically things like I/O exercisers
+                   e.g. CephTestRados in qa/tasks/rados.py
+
+    If an extended failure is detected in a daemon process (i.e. not intentional), then
+    the watchdog will bark. It will also bark is an assert is raised during the running
+    of any monitored Thrashers or WatchedProcesses
+    
+    When the watchdog barks it will 
+     - unmount file systems and send SIGTERM to all daemons.
+     - stop all thrasher processes by calling their stop_and_join() method
+     - stop any watched processes by calling their stop() method
+
+    The duration of an extended failure is configurable with watchdog_daemon_timeout.
 
     ceph:
       watchdog:
@@ -26,7 +50,7 @@ class DaemonWatchdog(Greenlet):
                                               watchdog will bark.
     """
 
-    def __init__(self, ctx, config, thrashers):
+    def __init__(self, ctx, config):
         super(DaemonWatchdog, self).__init__()
         self.config = ctx.config.get('watchdog', {})
         self.ctx = ctx
@@ -35,7 +59,8 @@ class DaemonWatchdog(Greenlet):
         self.cluster = config.get('cluster', 'ceph')
         self.name = 'watchdog'
         self.stopping = Event()
-        self.thrashers = thrashers
+        self.thrashers = ctx.ceph[config["cluster"]].thrashers
+        self.watched_processes = ctx.ceph[config["cluster"]].watched_processes
 
     def _run(self):
         try:
@@ -53,7 +78,7 @@ class DaemonWatchdog(Greenlet):
     def stop(self):
         self.stopping.set()
 
-    def bark(self):
+    def bark(self, reason):
         self.log("BARK! unmounting mounts and killing all daemons")
         if hasattr(self.ctx, 'mounts'):
             for mount in self.ctx.mounts.values():
@@ -74,11 +99,21 @@ class DaemonWatchdog(Greenlet):
             except:
                 self.logger.exception("ignoring exception:")
 
+        for thrasher in self.thrashers:
+            self.log("Killing thrasher {name}".format(name=thrasher.name))
+            thrasher.stop_and_join()
+
+        for proc in self.watched_processes:
+            self.log("Killing remote process {process_id}".format(process_id=proc.id))
+            proc.set_exception(BarkError(reason))
+            proc.stop()
+
     def watch(self):
         self.log("watchdog starting")
         daemon_timeout = int(self.config.get('daemon_timeout', 300))
         daemon_restart = self.config.get('daemon_restart', False)
         daemon_failure_time = {}
+        bark_reason: str = ""
         while not self.stopping.is_set():
             bark = False
             now = time.time()
@@ -104,6 +139,7 @@ class DaemonWatchdog(Greenlet):
                 self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
                 if delta > daemon_timeout:
                     bark = True
+                    bark_reason = f"Daemon {name} has failed"
                 if daemon_restart == 'normal' and daemon.proc.exitstatus == 0:
                     self.log(f"attempting to restart daemon {name}")
                     daemon.restart()
@@ -117,11 +153,17 @@ class DaemonWatchdog(Greenlet):
             for thrasher in self.thrashers:
                 if thrasher.exception is not None:
                     self.log("{name} failed".format(name=thrasher.name))
-                    thrasher.stop_and_join()
+                    bark_reason = f"Thrasher {name} threw exception {thrasher.exception}"
+                    bark = True
+
+            for proc in self.watched_processes:
+                if proc.exception is not None:
+                    self.log("Remote process %s failed" % proc.id)
+                    bark_reason = f"Remote process {proc.id} threw exception {proc.exception}"
                     bark = True
 
             if bark:
-                self.bark()
+                self.bark(bark_reason)
                 return
 
             sleep(5)
index 96bcc770511a1d51a15ee08089bfc64d2a8533b4..89e0b4dbcf78a974331e1491e8dda87c61908f6f 100644 (file)
@@ -1,16 +1,53 @@
 """
 Rados modle-based integration tests
 """
+
 import contextlib
 import logging
-import gevent
-from teuthology import misc as teuthology
+from typing import Any, Dict
 
+import gevent
 
+from teuthology import misc as teuthology
 from teuthology.orchestra import run
 
+from .watched_process import WatchedProcess
+
 log = logging.getLogger(__name__)
 
+
+class CephTestRados(WatchedProcess):
+    """
+    The WatchedProcess class for ceph_test_rados. This allows us to monitor
+    any ceph_test_rados processes for error, and to kill the remote processes when
+    the DaemonWatchdog barks.
+
+    It also raises the assert from the watchdog so that the failure reason shown in
+    the test result is the reason the watchdog barked.
+    """
+
+    def __init__(self, ctx: Dict[Any, Any], config: Dict[Any, Any], cluster: str, sub_processes: Dict[str, Any]):
+        super(CephTestRados, self).__init__()
+
+        self._ctx = ctx
+        self._config = config
+        self._cluster: str = cluster
+        self._sub_processes = sub_processes
+        self._name: str = f"ceph-test-rados-{self._cluster}"
+
+    @property
+    def id(self) -> str:
+        return self._name
+
+    def stop(self) -> None:
+        debug: str = f"Stopping {self._name}"
+        if self._exception:
+            debug += f" due to exception {self._exception}"
+        log.debug(debug)
+        for test_id, proc in self._sub_processes.items():
+            log.info("Stopping instance %s", test_id)
+            proc.stdin.close()
+
 @contextlib.contextmanager
 def task(ctx, config):
     """
@@ -33,10 +70,10 @@ def task(ctx, config):
           fast_read: enable ec_pool's fast_read
           min_size: set the min_size of created pool
           pool_snaps: use pool snapshots instead of selfmanaged snapshots
-         write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED.
-                                 This mean data don't access in the near future.
-                                 Let osd backend don't keep data in cache.
-         pct_update_delay: delay before primary propogates pct on write pause,
+          write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED.
+                                  This mean data don't access in the near future.
+                                  Let osd backend don't keep data in cache.
+          pct_update_delay: delay before primary propogates pct on write pause,
                             defaults to 5s if balance_reads is set
 
     For example::
@@ -67,7 +104,7 @@ def task(ctx, config):
               m: 1
               crush-failure-domain: osd
             pool_snaps: true
-           write_fadvise_dontneed: true
+            write_fadvise_dontneed: true
             runs: 10
         - interactive:
 
@@ -146,6 +183,8 @@ def task(ctx, config):
         'adjust-ulimits',
         'ceph-coverage',
         '{tdir}/archive/coverage'.format(tdir=testdir),
+        'daemon-helper',
+        'kill', 
         'ceph_test_rados']
     if config.get('ec_pool', False):
         args.extend(['--no-omap'])
@@ -232,7 +271,7 @@ def task(ctx, config):
         args.extend([
             '--op', op, str(weight)
         ])
-                
+        
 
     def thread():
         """Thread spawned by gevent"""
@@ -253,6 +292,8 @@ def task(ctx, config):
             profile_name = None
             crush_name = None
 
+        cluster = config.get("cluster", "ceph")
+
         for i in range(int(config.get('runs', '1'))):
             log.info("starting run %s out of %s", str(i), config.get('runs', '1'))
             tests = {}
@@ -272,7 +313,7 @@ def task(ctx, config):
                         erasure_code_profile_name=profile_name,
                         erasure_code_crush_rule_name=crush_name,
                         erasure_code_use_overwrites=
-                          config.get('erasure_code_use_overwrites', False)
+                            config.get('erasure_code_use_overwrites', False),
                     )
                     created_pools.append(pool)
                     if config.get('fast_read', False):
@@ -281,8 +322,8 @@ def task(ctx, config):
                     if pct_update_delay:
                         manager.raw_cluster_cmd(
                             'osd', 'pool', 'set', pool,
-                            'pct_update_delay', str(pct_update_delay));
-                    min_size = config.get('min_size', None);
+                            'pct_update_delay', str(pct_update_delay))
+                    min_size = config.get('min_size', None)
                     if min_size is not None:
                         manager.raw_cluster_cmd(
                             'osd', 'pool', 'set', pool, 'min_size', str(min_size))
@@ -296,17 +337,28 @@ def task(ctx, config):
                     wait=False
                     )
                 tests[id_] = proc
+
+            watched_process: CephTestRados = CephTestRados(ctx, config, cluster, tests)
+            ctx.ceph[cluster].watched_processes.append(watched_process)
+            try:
+                run.wait(tests.values())
+            except Exception as e:
+                watched_process.set_exception(e)
+
             run.wait(tests.values())
             wait_for_all_active_clean_pgs = config.get("wait_for_all_active_clean_pgs", False)
             # usually set when we do min_size testing.
-            if  wait_for_all_active_clean_pgs:
+            if wait_for_all_active_clean_pgs:
                 # Make sure we finish the test first before deleting the pool.
                 # Mainly used for test_pool_min_size
                 manager.wait_for_clean()
                 manager.wait_for_all_osds_up(timeout=1800)
 
+            if watched_process.exception:
+                raise watched_process.exception
+
             for pool in created_pools:
-                manager.wait_snap_trimming_complete(pool);
+                manager.wait_snap_trimming_complete(pool)
                 manager.remove_pool(pool)
 
     running = gevent.spawn(thread)
diff --git a/qa/tasks/watched_process.py b/qa/tasks/watched_process.py
new file mode 100644 (file)
index 0000000..d468cec
--- /dev/null
@@ -0,0 +1,45 @@
+"""
+WatchedProcess process base class.
+
+This can be applied to an object that we want the DaemonWatchdog to monitor
+for failure, and bark when it sees an error
+"""
+
+from abc import ABCMeta, abstractmethod
+from typing import Optional
+
+from gevent.greenlet import Greenlet
+
+
+class WatchedProcess(Greenlet, metaclass=ABCMeta):
+    """
+    WatchedProcess:
+
+    The abstract base class for a process that the DaemonWatchdog can monitor
+    for errors. It is based on the ThrasherGreenlet class in qa/tasks/thrasher.py
+    """
+    def __init__(self) -> None:
+        self._exception: Optional[BaseException] = None
+
+    @property
+    def exception(self) -> Optional[BaseException]:
+        return self._exception
+
+    @property
+    @abstractmethod
+    def id(self) -> str:
+        """
+        Return a string identifier for this process
+        """
+
+    def set_exception(self, e: Exception) -> None:
+        """
+        Set the exception for this process
+        """
+        self._exception = e
+
+    @abstractmethod
+    def stop(self) -> None:
+        """
+        Stop the remote process running
+        """