]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
Fix dispatcher crash on gevent LoopExit exceptions
authordeepssin <deepssin@redhat.com>
Tue, 11 Nov 2025 13:16:20 +0000 (13:16 +0000)
committerDavid Galloway <david.galloway@ibm.com>
Mon, 12 Jan 2026 21:12:29 +0000 (16:12 -0500)
Handle gevent LoopExit exceptions gracefully to prevent dispatcher
crashes. Add exception handling in main loop and lock_machines() call,
with loop exit counter (max 10) to prevent infinite restarts. Isolate
child processes using start_new_session=True so job supervisors
continue running independently if dispatcher encounters exceptions.

Signed-off-by: deepssin <deepssin@redhat.com>
teuthology/dispatcher/__init__.py

index 59f8ae3279d8e27509e2322d6cc32c0ea8bf6815..ede7f364cf9df9f7803e1142d9abe175026b34d1 100644 (file)
@@ -8,6 +8,12 @@ import yaml
 
 from typing import Dict, List
 
+try:
+    from gevent.exceptions import LoopExit
+except ImportError:
+    # gevent might not be available in some environments
+    LoopExit = Exception
+
 from teuthology import (
     # non-modules
     setup_log_file,
@@ -99,105 +105,182 @@ def main(args):
     keep_running = True
     job_procs = set()
     worst_returncode = 0
-    while keep_running:
-        # Check to see if we have a teuthology-results process hanging around
-        # and if so, read its return code so that it can exit.
-        if result_proc is not None and result_proc.poll() is not None:
-            log.debug("teuthology-results exited with code: %s",
-                      result_proc.returncode)
-            result_proc = None
-
-        if sentinel(restart_file_path):
-            restart()
-        elif sentinel(stop_file_path):
-            stop()
-
-        load_config()
-        for proc in list(job_procs):
-            rc = proc.poll()
-            if rc is not None:
-                worst_returncode = max([worst_returncode, rc])
-                job_procs.remove(proc)
-        job = connection.reserve(timeout=60)
-        if job is None:
-            if args.exit_on_empty_queue and not job_procs:
-                log.info("Queue is empty and no supervisor processes running; exiting!")
-                break
-            continue
-
-        # bury the job so it won't be re-run if it fails
-        job.bury()
-        job_id = job.jid
-        log.info('Reserved job %d', job_id)
-        log.info('Config is: %s', job.body)
-        job_config = yaml.safe_load(job.body)
-        job_config['job_id'] = str(job_id)
-
-        if job_config.get('stop_worker'):
-            keep_running = False
+    loop_exit_count = 0
+    max_loop_exits = 10  # Prevent infinite restart loops
 
+    while keep_running:
         try:
-            job_config, teuth_bin_path = prep_job(
-                job_config,
-                log_file_path,
-                archive_dir,
+            # Check to see if we have a teuthology-results process hanging around
+            # and if so, read its return code so that it can exit.
+            if result_proc is not None and result_proc.poll() is not None:
+                log.debug("teuthology-results exited with code: %s",
+                          result_proc.returncode)
+                result_proc = None
+
+            if sentinel(restart_file_path):
+                restart()
+            elif sentinel(stop_file_path):
+                stop()
+
+            load_config()
+            for proc in list(job_procs):
+                rc = proc.poll()
+                if rc is not None:
+                    worst_returncode = max([worst_returncode, rc])
+                    job_procs.remove(proc)
+            job = connection.reserve(timeout=60)
+            if job is None:
+                if args.exit_on_empty_queue and not job_procs:
+                    log.info("Queue is empty and no supervisor processes running; exiting!")
+                    break
+                continue
+
+            # bury the job so it won't be re-run if it fails
+            job.bury()
+            job_id = job.jid
+            log.info('Reserved job %d', job_id)
+            log.info('Config is: %s', job.body)
+            job_config = yaml.safe_load(job.body)
+            job_config['job_id'] = str(job_id)
+
+            if job_config.get('stop_worker'):
+                keep_running = False
+
+            try:
+                job_config, teuth_bin_path = prep_job(
+                    job_config,
+                    log_file_path,
+                    archive_dir,
+                )
+            except SkipJob:
+                continue
+
+            # lock machines but do not reimage them
+            if 'roles' in job_config:
+                try:
+                    job_config = lock_machines(job_config)
+                except LoopExit as e:
+                    log.critical(
+                        "Caught gevent LoopExit exception during lock_machines for job %s. "
+                        "This is likely due to gevent/urllib3 blocking issues. "
+                        "Marking job as dead.",
+                        job_id
+                    )
+                    log.exception("LoopExit exception details:")
+                    report.try_push_job_info(
+                        job_config,
+                        dict(
+                            status='dead',
+                            failure_reason='gevent LoopExit during machine locking: {}'.format(str(e))
+                        )
+                    )
+                    # Skip this job and continue with the next one
+                    continue
+                except Exception as e:
+                    log.exception("Unexpected exception during lock_machines for job %s", job_id)
+                    report.try_push_job_info(
+                        job_config,
+                        dict(
+                            status='dead',
+                            failure_reason='Exception during machine locking: {}'.format(str(e))
+                        )
+                    )
+                    continue
+
+            run_args = [
+                os.path.join(teuth_bin_path, 'teuthology-supervisor'),
+                '-v',
+                '--bin-path', teuth_bin_path,
+                '--archive-dir', archive_dir,
+            ]
+
+            # Create run archive directory if not already created and
+            # job's archive directory
+            create_job_archive(job_config['name'],
+                               job_config['archive_path'],
+                               archive_dir)
+            job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
+
+            # Write initial job config in job archive dir
+            with open(job_config_path, 'w') as f:
+                yaml.safe_dump(job_config, f, default_flow_style=False)
+
+            run_args.extend(["--job-config", job_config_path])
+
+            try:
+                # Use start_new_session=True to ensure child processes are isolated
+                # from the dispatcher's process group. This prevents accidental
+                # termination if the dispatcher crashes or receives signals.
+                job_proc = subprocess.Popen(
+                    run_args,
+                    stdout=subprocess.DEVNULL,
+                    stderr=subprocess.DEVNULL,
+                    start_new_session=True,  # Isolate child process from parent
+                )
+                job_procs.add(job_proc)
+                log.info('Job supervisor PID: %s', job_proc.pid)
+            except Exception:
+                error_message = "Saw error while trying to spawn supervisor."
+                log.exception(error_message)
+                if 'targets' in job_config:
+                    node_names = job_config["targets"].keys()
+                    lock_ops.unlock_safe(
+                        node_names,
+                        job_config["owner"],
+                        job_config["name"],
+                        job_config["job_id"]
+                    )
+                report.try_push_job_info(job_config, dict(
+                    status='fail',
+                    failure_reason=error_message))
+
+            # This try/except block is to keep the worker from dying when
+            # beanstalkc throws a SocketError
+            try:
+                job.delete()
+            except Exception:
+                log.exception("Saw exception while trying to delete job")
+
+            # Successful iteration - reset loop exit counter if it was set
+            if loop_exit_count > 0:
+                log.info("Successfully completed iteration after LoopExit exception(s). Resetting counter.")
+                loop_exit_count = 0
+
+        except LoopExit:
+            loop_exit_count += 1
+            log.critical(
+                "CRITICAL: Caught gevent LoopExit exception in dispatcher main loop "
+                "(count: %d/%d). This is likely due to gevent/urllib3 blocking issues. "
+                "The dispatcher will attempt to continue, but child processes should "
+                "be isolated and unaffected.",
+                loop_exit_count,
+                max_loop_exits
             )
-        except SkipJob:
-            continue
+            log.exception("LoopExit exception details:")
 
-        # lock machines but do not reimage them
-        if 'roles' in job_config:
-            job_config = lock_machines(job_config)
-
-        run_args = [
-            os.path.join(teuth_bin_path, 'teuthology-supervisor'),
-            '-v',
-            '--bin-path', teuth_bin_path,
-            '--archive-dir', archive_dir,
-        ]
-
-        # Create run archive directory if not already created and
-        # job's archive directory
-        create_job_archive(job_config['name'],
-                           job_config['archive_path'],
-                           archive_dir)
-        job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
-
-        # Write initial job config in job archive dir
-        with open(job_config_path, 'w') as f:
-            yaml.safe_dump(job_config, f, default_flow_style=False)
+            if loop_exit_count >= max_loop_exits:
+                log.critical(
+                    "Maximum LoopExit exceptions (%d) reached. "
+                    "Dispatcher is exiting to prevent infinite restart loop.",
+                    max_loop_exits
+                )
+                # Ensure all tracked job processes are noted as still running
+                # They should continue independently due to start_new_session=True
+                log.info("Dispatched %d job supervisor processes that should continue running independently", len(job_procs))
+                break
 
-        run_args.extend(["--job-config", job_config_path])
+            # Continue to next iteration to attempt recovery
+            continue
 
-        try:
-            job_proc = subprocess.Popen(
-                run_args,
-                stdout=subprocess.DEVNULL,
-                stderr=subprocess.DEVNULL,
+        except Exception as e:
+            log.critical(
+                "CRITICAL: Uncaught exception in dispatcher main loop: %s",
+                type(e).__name__
             )
-            job_procs.add(job_proc)
-            log.info('Job supervisor PID: %s', job_proc.pid)
-        except Exception:
-            error_message = "Saw error while trying to spawn supervisor."
-            log.exception(error_message)
-            if 'targets' in job_config:
-                node_names = job_config["targets"].keys()
-                lock_ops.unlock_safe(
-                    node_names,
-                    job_config["owner"],
-                    job_config["name"],
-                    job_config["job_id"]
-                )
-            report.try_push_job_info(job_config, dict(
-                status='fail',
-                failure_reason=error_message))
-
-        # This try/except block is to keep the worker from dying when
-        # beanstalkc throws a SocketError
-        try:
-            job.delete()
-        except Exception:
-            log.exception("Saw exception while trying to delete job")
+            log.exception("Exception details:")
+            # Continue the loop to avoid crashing the dispatcher completely
+            # Child processes should be isolated via start_new_session=True
+            continue
 
     return worst_returncode