From 28247fde1b09e1a7ec83620e1b8b56133b456353 Mon Sep 17 00:00:00 2001 From: deepssin Date: Tue, 11 Nov 2025 13:16:20 +0000 Subject: [PATCH] Fix dispatcher crash on gevent LoopExit exceptions Handle gevent LoopExit exceptions gracefully to prevent dispatcher crashes. Add exception handling in main loop and lock_machines() call, with loop exit counter (max 10) to prevent infinite restarts. Isolate child processes using start_new_session=True so job supervisors continue running independently if dispatcher encounters exceptions. Signed-off-by: deepssin --- teuthology/dispatcher/__init__.py | 267 ++++++++++++++++++++---------- 1 file changed, 175 insertions(+), 92 deletions(-) diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 59f8ae327..ede7f364c 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -8,6 +8,12 @@ import yaml from typing import Dict, List +try: + from gevent.exceptions import LoopExit +except ImportError: + # gevent might not be available in some environments + LoopExit = Exception + from teuthology import ( # non-modules setup_log_file, @@ -99,105 +105,182 @@ def main(args): keep_running = True job_procs = set() worst_returncode = 0 - while keep_running: - # Check to see if we have a teuthology-results process hanging around - # and if so, read its return code so that it can exit. - if result_proc is not None and result_proc.poll() is not None: - log.debug("teuthology-results exited with code: %s", - result_proc.returncode) - result_proc = None - - if sentinel(restart_file_path): - restart() - elif sentinel(stop_file_path): - stop() - - load_config() - for proc in list(job_procs): - rc = proc.poll() - if rc is not None: - worst_returncode = max([worst_returncode, rc]) - job_procs.remove(proc) - job = connection.reserve(timeout=60) - if job is None: - if args.exit_on_empty_queue and not job_procs: - log.info("Queue is empty and no supervisor processes running; exiting!") - break - continue - - # bury the job so it won't be re-run if it fails - job.bury() - job_id = job.jid - log.info('Reserved job %d', job_id) - log.info('Config is: %s', job.body) - job_config = yaml.safe_load(job.body) - job_config['job_id'] = str(job_id) - - if job_config.get('stop_worker'): - keep_running = False + loop_exit_count = 0 + max_loop_exits = 10 # Prevent infinite restart loops + while keep_running: try: - job_config, teuth_bin_path = prep_job( - job_config, - log_file_path, - archive_dir, + # Check to see if we have a teuthology-results process hanging around + # and if so, read its return code so that it can exit. + if result_proc is not None and result_proc.poll() is not None: + log.debug("teuthology-results exited with code: %s", + result_proc.returncode) + result_proc = None + + if sentinel(restart_file_path): + restart() + elif sentinel(stop_file_path): + stop() + + load_config() + for proc in list(job_procs): + rc = proc.poll() + if rc is not None: + worst_returncode = max([worst_returncode, rc]) + job_procs.remove(proc) + job = connection.reserve(timeout=60) + if job is None: + if args.exit_on_empty_queue and not job_procs: + log.info("Queue is empty and no supervisor processes running; exiting!") + break + continue + + # bury the job so it won't be re-run if it fails + job.bury() + job_id = job.jid + log.info('Reserved job %d', job_id) + log.info('Config is: %s', job.body) + job_config = yaml.safe_load(job.body) + job_config['job_id'] = str(job_id) + + if job_config.get('stop_worker'): + keep_running = False + + try: + job_config, teuth_bin_path = prep_job( + job_config, + log_file_path, + archive_dir, + ) + except SkipJob: + continue + + # lock machines but do not reimage them + if 'roles' in job_config: + try: + job_config = lock_machines(job_config) + except LoopExit as e: + log.critical( + "Caught gevent LoopExit exception during lock_machines for job %s. " + "This is likely due to gevent/urllib3 blocking issues. " + "Marking job as dead.", + job_id + ) + log.exception("LoopExit exception details:") + report.try_push_job_info( + job_config, + dict( + status='dead', + failure_reason='gevent LoopExit during machine locking: {}'.format(str(e)) + ) + ) + # Skip this job and continue with the next one + continue + except Exception as e: + log.exception("Unexpected exception during lock_machines for job %s", job_id) + report.try_push_job_info( + job_config, + dict( + status='dead', + failure_reason='Exception during machine locking: {}'.format(str(e)) + ) + ) + continue + + run_args = [ + os.path.join(teuth_bin_path, 'teuthology-supervisor'), + '-v', + '--bin-path', teuth_bin_path, + '--archive-dir', archive_dir, + ] + + # Create run archive directory if not already created and + # job's archive directory + create_job_archive(job_config['name'], + job_config['archive_path'], + archive_dir) + job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml') + + # Write initial job config in job archive dir + with open(job_config_path, 'w') as f: + yaml.safe_dump(job_config, f, default_flow_style=False) + + run_args.extend(["--job-config", job_config_path]) + + try: + # Use start_new_session=True to ensure child processes are isolated + # from the dispatcher's process group. This prevents accidental + # termination if the dispatcher crashes or receives signals. + job_proc = subprocess.Popen( + run_args, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, # Isolate child process from parent + ) + job_procs.add(job_proc) + log.info('Job supervisor PID: %s', job_proc.pid) + except Exception: + error_message = "Saw error while trying to spawn supervisor." + log.exception(error_message) + if 'targets' in job_config: + node_names = job_config["targets"].keys() + lock_ops.unlock_safe( + node_names, + job_config["owner"], + job_config["name"], + job_config["job_id"] + ) + report.try_push_job_info(job_config, dict( + status='fail', + failure_reason=error_message)) + + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") + + # Successful iteration - reset loop exit counter if it was set + if loop_exit_count > 0: + log.info("Successfully completed iteration after LoopExit exception(s). Resetting counter.") + loop_exit_count = 0 + + except LoopExit: + loop_exit_count += 1 + log.critical( + "CRITICAL: Caught gevent LoopExit exception in dispatcher main loop " + "(count: %d/%d). This is likely due to gevent/urllib3 blocking issues. " + "The dispatcher will attempt to continue, but child processes should " + "be isolated and unaffected.", + loop_exit_count, + max_loop_exits ) - except SkipJob: - continue + log.exception("LoopExit exception details:") - # lock machines but do not reimage them - if 'roles' in job_config: - job_config = lock_machines(job_config) - - run_args = [ - os.path.join(teuth_bin_path, 'teuthology-supervisor'), - '-v', - '--bin-path', teuth_bin_path, - '--archive-dir', archive_dir, - ] - - # Create run archive directory if not already created and - # job's archive directory - create_job_archive(job_config['name'], - job_config['archive_path'], - archive_dir) - job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml') - - # Write initial job config in job archive dir - with open(job_config_path, 'w') as f: - yaml.safe_dump(job_config, f, default_flow_style=False) + if loop_exit_count >= max_loop_exits: + log.critical( + "Maximum LoopExit exceptions (%d) reached. " + "Dispatcher is exiting to prevent infinite restart loop.", + max_loop_exits + ) + # Ensure all tracked job processes are noted as still running + # They should continue independently due to start_new_session=True + log.info("Dispatched %d job supervisor processes that should continue running independently", len(job_procs)) + break - run_args.extend(["--job-config", job_config_path]) + # Continue to next iteration to attempt recovery + continue - try: - job_proc = subprocess.Popen( - run_args, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, + except Exception as e: + log.critical( + "CRITICAL: Uncaught exception in dispatcher main loop: %s", + type(e).__name__ ) - job_procs.add(job_proc) - log.info('Job supervisor PID: %s', job_proc.pid) - except Exception: - error_message = "Saw error while trying to spawn supervisor." - log.exception(error_message) - if 'targets' in job_config: - node_names = job_config["targets"].keys() - lock_ops.unlock_safe( - node_names, - job_config["owner"], - job_config["name"], - job_config["job_id"] - ) - report.try_push_job_info(job_config, dict( - status='fail', - failure_reason=error_message)) - - # This try/except block is to keep the worker from dying when - # beanstalkc throws a SocketError - try: - job.delete() - except Exception: - log.exception("Saw exception while trying to delete job") + log.exception("Exception details:") + # Continue the loop to avoid crashing the dispatcher completely + # Child processes should be isolated via start_new_session=True + continue return worst_returncode -- 2.47.3