from typing import Dict, List
+try:
+ from gevent.exceptions import LoopExit
+except ImportError:
+ # gevent might not be available in some environments
+ LoopExit = Exception
+
from teuthology import (
# non-modules
setup_log_file,
keep_running = True
job_procs = set()
worst_returncode = 0
- while keep_running:
- # Check to see if we have a teuthology-results process hanging around
- # and if so, read its return code so that it can exit.
- if result_proc is not None and result_proc.poll() is not None:
- log.debug("teuthology-results exited with code: %s",
- result_proc.returncode)
- result_proc = None
-
- if sentinel(restart_file_path):
- restart()
- elif sentinel(stop_file_path):
- stop()
-
- load_config()
- for proc in list(job_procs):
- rc = proc.poll()
- if rc is not None:
- worst_returncode = max([worst_returncode, rc])
- job_procs.remove(proc)
- job = connection.reserve(timeout=60)
- if job is None:
- if args.exit_on_empty_queue and not job_procs:
- log.info("Queue is empty and no supervisor processes running; exiting!")
- break
- continue
-
- # bury the job so it won't be re-run if it fails
- job.bury()
- job_id = job.jid
- log.info('Reserved job %d', job_id)
- log.info('Config is: %s', job.body)
- job_config = yaml.safe_load(job.body)
- job_config['job_id'] = str(job_id)
-
- if job_config.get('stop_worker'):
- keep_running = False
+ loop_exit_count = 0
+ max_loop_exits = 10 # Prevent infinite restart loops
+ while keep_running:
try:
- job_config, teuth_bin_path = prep_job(
- job_config,
- log_file_path,
- archive_dir,
+ # Check to see if we have a teuthology-results process hanging around
+ # and if so, read its return code so that it can exit.
+ if result_proc is not None and result_proc.poll() is not None:
+ log.debug("teuthology-results exited with code: %s",
+ result_proc.returncode)
+ result_proc = None
+
+ if sentinel(restart_file_path):
+ restart()
+ elif sentinel(stop_file_path):
+ stop()
+
+ load_config()
+ for proc in list(job_procs):
+ rc = proc.poll()
+ if rc is not None:
+ worst_returncode = max([worst_returncode, rc])
+ job_procs.remove(proc)
+ job = connection.reserve(timeout=60)
+ if job is None:
+ if args.exit_on_empty_queue and not job_procs:
+ log.info("Queue is empty and no supervisor processes running; exiting!")
+ break
+ continue
+
+ # bury the job so it won't be re-run if it fails
+ job.bury()
+ job_id = job.jid
+ log.info('Reserved job %d', job_id)
+ log.info('Config is: %s', job.body)
+ job_config = yaml.safe_load(job.body)
+ job_config['job_id'] = str(job_id)
+
+ if job_config.get('stop_worker'):
+ keep_running = False
+
+ try:
+ job_config, teuth_bin_path = prep_job(
+ job_config,
+ log_file_path,
+ archive_dir,
+ )
+ except SkipJob:
+ continue
+
+ # lock machines but do not reimage them
+ if 'roles' in job_config:
+ try:
+ job_config = lock_machines(job_config)
+ except LoopExit as e:
+ log.critical(
+ "Caught gevent LoopExit exception during lock_machines for job %s. "
+ "This is likely due to gevent/urllib3 blocking issues. "
+ "Marking job as dead.",
+ job_id
+ )
+ log.exception("LoopExit exception details:")
+ report.try_push_job_info(
+ job_config,
+ dict(
+ status='dead',
+ failure_reason='gevent LoopExit during machine locking: {}'.format(str(e))
+ )
+ )
+ # Skip this job and continue with the next one
+ continue
+ except Exception as e:
+ log.exception("Unexpected exception during lock_machines for job %s", job_id)
+ report.try_push_job_info(
+ job_config,
+ dict(
+ status='dead',
+ failure_reason='Exception during machine locking: {}'.format(str(e))
+ )
+ )
+ continue
+
+ run_args = [
+ os.path.join(teuth_bin_path, 'teuthology-supervisor'),
+ '-v',
+ '--bin-path', teuth_bin_path,
+ '--archive-dir', archive_dir,
+ ]
+
+ # Create run archive directory if not already created and
+ # job's archive directory
+ create_job_archive(job_config['name'],
+ job_config['archive_path'],
+ archive_dir)
+ job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
+
+ # Write initial job config in job archive dir
+ with open(job_config_path, 'w') as f:
+ yaml.safe_dump(job_config, f, default_flow_style=False)
+
+ run_args.extend(["--job-config", job_config_path])
+
+ try:
+ # Use start_new_session=True to ensure child processes are isolated
+ # from the dispatcher's process group. This prevents accidental
+ # termination if the dispatcher crashes or receives signals.
+ job_proc = subprocess.Popen(
+ run_args,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ start_new_session=True, # Isolate child process from parent
+ )
+ job_procs.add(job_proc)
+ log.info('Job supervisor PID: %s', job_proc.pid)
+ except Exception:
+ error_message = "Saw error while trying to spawn supervisor."
+ log.exception(error_message)
+ if 'targets' in job_config:
+ node_names = job_config["targets"].keys()
+ lock_ops.unlock_safe(
+ node_names,
+ job_config["owner"],
+ job_config["name"],
+ job_config["job_id"]
+ )
+ report.try_push_job_info(job_config, dict(
+ status='fail',
+ failure_reason=error_message))
+
+ # This try/except block is to keep the worker from dying when
+ # beanstalkc throws a SocketError
+ try:
+ job.delete()
+ except Exception:
+ log.exception("Saw exception while trying to delete job")
+
+ # Successful iteration - reset loop exit counter if it was set
+ if loop_exit_count > 0:
+ log.info("Successfully completed iteration after LoopExit exception(s). Resetting counter.")
+ loop_exit_count = 0
+
+ except LoopExit:
+ loop_exit_count += 1
+ log.critical(
+ "CRITICAL: Caught gevent LoopExit exception in dispatcher main loop "
+ "(count: %d/%d). This is likely due to gevent/urllib3 blocking issues. "
+ "The dispatcher will attempt to continue, but child processes should "
+ "be isolated and unaffected.",
+ loop_exit_count,
+ max_loop_exits
)
- except SkipJob:
- continue
+ log.exception("LoopExit exception details:")
- # lock machines but do not reimage them
- if 'roles' in job_config:
- job_config = lock_machines(job_config)
-
- run_args = [
- os.path.join(teuth_bin_path, 'teuthology-supervisor'),
- '-v',
- '--bin-path', teuth_bin_path,
- '--archive-dir', archive_dir,
- ]
-
- # Create run archive directory if not already created and
- # job's archive directory
- create_job_archive(job_config['name'],
- job_config['archive_path'],
- archive_dir)
- job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml')
-
- # Write initial job config in job archive dir
- with open(job_config_path, 'w') as f:
- yaml.safe_dump(job_config, f, default_flow_style=False)
+ if loop_exit_count >= max_loop_exits:
+ log.critical(
+ "Maximum LoopExit exceptions (%d) reached. "
+ "Dispatcher is exiting to prevent infinite restart loop.",
+ max_loop_exits
+ )
+ # Ensure all tracked job processes are noted as still running
+ # They should continue independently due to start_new_session=True
+ log.info("Dispatched %d job supervisor processes that should continue running independently", len(job_procs))
+ break
- run_args.extend(["--job-config", job_config_path])
+ # Continue to next iteration to attempt recovery
+ continue
- try:
- job_proc = subprocess.Popen(
- run_args,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
+ except Exception as e:
+ log.critical(
+ "CRITICAL: Uncaught exception in dispatcher main loop: %s",
+ type(e).__name__
)
- job_procs.add(job_proc)
- log.info('Job supervisor PID: %s', job_proc.pid)
- except Exception:
- error_message = "Saw error while trying to spawn supervisor."
- log.exception(error_message)
- if 'targets' in job_config:
- node_names = job_config["targets"].keys()
- lock_ops.unlock_safe(
- node_names,
- job_config["owner"],
- job_config["name"],
- job_config["job_id"]
- )
- report.try_push_job_info(job_config, dict(
- status='fail',
- failure_reason=error_message))
-
- # This try/except block is to keep the worker from dying when
- # beanstalkc throws a SocketError
- try:
- job.delete()
- except Exception:
- log.exception("Saw exception while trying to delete job")
+ log.exception("Exception details:")
+ # Continue the loop to avoid crashing the dispatcher completely
+ # Child processes should be isolated via start_new_session=True
+ continue
return worst_returncode