From 583d85c4b3e4ba7ecc4081d55881029c23506c22 Mon Sep 17 00:00:00 2001 From: Shraddha Agrawal Date: Tue, 11 Aug 2020 19:57:06 +0530 Subject: [PATCH] send config file path instead of file descriptor This commit saves job config in its archive dir and sends its path instead of file descriptor of a temporary file in the dispatcher and the supervisor. Signed-off-by: Shraddha Agrawal --- scripts/dispatcher.py | 4 +- teuthology/dispatcher/__init__.py | 31 +++++++--- teuthology/dispatcher/supervisor.py | 94 ++++++++++++----------------- 3 files changed, 64 insertions(+), 65 deletions(-) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 395225e111..bdba50ef1d 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,6 +1,6 @@ """ usage: teuthology-dispatcher --help - teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --config-fd FD --archive-dir ARC_DIR + teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --config-file COFNFIG --archive-dir ARC_DIR teuthology-dispatcher [-v] --archive-dir ARC_DIR --log-dir LOG_DIR --tube TUBE Start a dispatcher for the specified tube. Grab jobs from a beanstalk @@ -20,7 +20,7 @@ standard arguments: --archive-dir ARC_DIR path to archive results in --supervisor run dispactcher in job supervisor mode --bin-path BIN_PATH teuthology bin path - --config-fd FD file descriptor of job's config file + --config-file CONFIG file descriptor of job's config file """ import docopt diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 0e5dfd0e6b..3a9443b6e5 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -3,7 +3,6 @@ import os import subprocess import sys import yaml -import tempfile from datetime import datetime @@ -14,6 +13,7 @@ from teuthology.repo_utils import fetch_qa_suite, fetch_teuthology from teuthology.task.internal.lock_machines import lock_machines_helper from teuthology.dispatcher import supervisor from teuthology.worker import prep_job +from teuthology import safepath log = logging.getLogger(__name__) start_time = datetime.utcnow() @@ -134,14 +134,20 @@ def main(args): '--archive-dir', archive_dir, ] - with tempfile.NamedTemporaryFile(prefix='teuthology-dispatcher.', - suffix='.tmp', mode='w+t') as tmp: - yaml.safe_dump(data=job_config, stream=tmp) - tmp.flush() - run_args.extend(["--config-fd", str(tmp.fileno())]) - job_proc = subprocess.Popen(run_args, pass_fds=[tmp.fileno()]) + # Create run archive directory if not already created and + # job's archive directory + create_job_archive(job_config['name'], + job_config['archive_path'], + archive_dir) + job_config_path = os.path.join(job_config['archive_path'], 'orig.config.yaml') - log.info('Job subprocess PID: %s', job_proc.pid) + # Write initial job config in job archive dir + with open(job_config_path, 'w') as f: + yaml.safe_dump(job_config, f, default_flow_style=False) + + run_args.extend(["--config-file", job_config_path]) + job_proc = subprocess.Popen(run_args) + log.info('Job supervisor PID: %s', job_proc.pid) # This try/except block is to keep the worker from dying when # beanstalkc throws a SocketError @@ -157,3 +163,12 @@ def lock_machines(job_config): job_config['machine_type']], reimage=False) job_config = fake_ctx.config return job_config + + +def create_job_archive(job_name, job_archive_path, archive_dir): + log.info('Creating job\'s archive dir %s', job_archive_path) + safe_archive = safepath.munge(job_name) + run_archive = os.path.join(archive_dir, safe_archive) + if not os.path.exists(run_archive): + safepath.makedirs('/', run_archive) + safepath.makedirs('/', job_archive_path) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 52a424c248..c680daa056 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -1,12 +1,9 @@ import logging import os import subprocess -import tempfile import time import yaml -from datetime import datetime - from teuthology import report from teuthology import safepath from teuthology.config import config as teuth_config @@ -21,9 +18,6 @@ import teuthology from teuthology.nuke import nuke log = logging.getLogger(__name__) -start_time = datetime.utcnow() -restart_file_path = '/tmp/teuthology-restart-workers' -stop_file_path = '/tmp/teuthology-stop-workers' def main(args): @@ -31,10 +25,9 @@ def main(args): verbose = args["--verbose"] archive_dir = args["--archive-dir"] teuth_bin_path = args["--bin-path"] - config_fd = int(args["--config-fd"]) + config_file_path = args["--config-file"] - with open(config_fd, 'r') as config_file: - config_file.seek(0) + with open(config_file_path, 'r') as config_file: job_config = yaml.safe_load(config_file) loglevel = logging.INFO @@ -42,18 +35,16 @@ def main(args): loglevel = logging.DEBUG log.setLevel(loglevel) - suite_dir = os.path.join(archive_dir, job_config['name']) - if (not os.path.exists(suite_dir)): - os.mkdir(suite_dir) - log_file_path = os.path.join(suite_dir, 'supervisor.{job_id}'.format( + log_file_path = os.path.join(job_config['archive_path'], 'supervisor.{job_id}'.format( job_id=job_config['job_id'])) setup_log_file(log_file_path) - install_except_hook() # reimage target machines before running the job if 'targets' in job_config: reimage_machines(job_config) + with open(config_file_path, 'w') as f: + yaml.safe_dump(job_config, f, default_flow_style=False) try: run_job( @@ -76,7 +67,6 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): log.warning("Unable to delete job %s, exception occurred: %s", job_config['job_id'], e) suite_archive_dir = os.path.join(archive_dir, safe_archive) - safepath.makedirs('/', suite_archive_dir) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), '--archive-dir', suite_archive_dir, @@ -100,13 +90,14 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): # dies (e.g. because of a restart) result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp) log.info("teuthology-results PID: %s", result_proc.pid) + # Remove unnecessary logs for first and last jobs in run + for f in os.listdir(job_config['archive_path']): + os.remove(os.path.join(job_config['archive_path'], f)) + os.rmdir(job_config['archive_path']) return - log.info('Creating archive dir %s', job_config['archive_path']) - safepath.makedirs('/', job_config['archive_path']) log.info('Running job %s', job_config['job_id']) - suite_path = job_config['suite_path'] arg = [ os.path.join(teuth_bin_path, 'teuthology'), ] @@ -131,42 +122,34 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): ]) if job_config['description'] is not None: arg.extend(['--description', job_config['description']]) - arg.append('--') - - with tempfile.NamedTemporaryFile(prefix='teuthology-worker.', - suffix='.tmp', mode='w+t') as tmp: - yaml.safe_dump(data=job_config, stream=tmp) - tmp.flush() - arg.append(tmp.name) - env = os.environ.copy() - python_path = env.get('PYTHONPATH', '') - python_path = ':'.join([suite_path, python_path]).strip(':') - env['PYTHONPATH'] = python_path - log.debug("Running: %s" % ' '.join(arg)) - p = subprocess.Popen(args=arg, env=env) - log.info("Job archive: %s", job_config['archive_path']) - log.info("Job PID: %s", str(p.pid)) - - if teuth_config.results_server: - log.info("Running with watchdog") - try: - run_with_watchdog(p, job_config) - except Exception: - log.exception("run_with_watchdog had an unhandled exception") - raise - else: - log.info("Running without watchdog") - # This sleep() is to give the child time to start up and create the - # archive dir. - time.sleep(5) - symlink_worker_log(job_config['worker_log'], - job_config['archive_path']) - p.wait() - - if p.returncode != 0: - log.error('Child exited with code %d', p.returncode) - else: - log.info('Success!') + job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') + arg.extend(['--', job_archive]) + + log.debug("Running: %s" % ' '.join(arg)) + p = subprocess.Popen(args=arg) + log.info("Job archive: %s", job_config['archive_path']) + log.info("Job PID: %s", str(p.pid)) + + if teuth_config.results_server: + log.info("Running with watchdog") + try: + run_with_watchdog(p, job_config) + except Exception: + log.exception("run_with_watchdog had an unhandled exception") + raise + else: + log.info("Running without watchdog") + # This sleep() is to give the child time to start up and create the + # archive dir. + time.sleep(5) + symlink_worker_log(job_config['worker_log'], + job_config['archive_path']) + p.wait() + + if p.returncode != 0: + log.error('Child exited with code %d', p.returncode) + else: + log.info('Success!') if 'targets' in job_config: unlock_targets(job_config) @@ -192,7 +175,8 @@ def unlock_targets(job_config): log.info('Unlocking machines...') fake_ctx = create_fake_context(job_config) for machine in job_info['targets'].keys(): - teuthology.lock.ops.unlock_one(fake_ctx, machine, job_info['owner'], job_info['archive_path']) + teuthology.lock.ops.unlock_one(fake_ctx, machine, job_info['owner'], + job_info['archive_path']) if job_status != 'pass' and job_config.get('nuke-on-error', False): fake_ctx = create_fake_context(job_config) nuke(fake_ctx, True) -- 2.39.5