From: Shraddha Agrawal Date: Sun, 9 Aug 2020 02:22:08 +0000 (+0530) Subject: add teuthology-dispatcher X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=798557499dabb28c7e5cf53000ef60031be5be23;p=teuthology.git add teuthology-dispatcher This commit does the following: 1. a new cmd teuthology-dispatcher: It watches a queue and takes job from it, locks required nodes without reimaging them and runs the job as its suprocess by invoking teuthology-dispacther in supervisor mode. Supervisor mode reimages the target machines in the config, and invokes teuthology cmd to run the job. 2. refactors task/internal/lock_machines.py: doing so enables locking machines in dispatcher while following DRY. 3. refactors reimaging logic in lock/ops.py: doing so enables reimaging machines in dispatcher's supervisor mode while following DRY. 4. adds an argument, reimage, to lock_many in lock/ops.py: enables optional reimagining of machines depending on the value. Defaults to True. Used in dispatcher to lock machines without reimaging them. Signed-off-by: Shraddha Agrawal --- diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py new file mode 100644 index 000000000..395225e11 --- /dev/null +++ b/scripts/dispatcher.py @@ -0,0 +1,33 @@ +""" +usage: teuthology-dispatcher --help + teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --config-fd FD --archive-dir ARC_DIR + teuthology-dispatcher [-v] --archive-dir ARC_DIR --log-dir LOG_DIR --tube TUBE + +Start a dispatcher for the specified tube. Grab jobs from a beanstalk +queue and run the teuthology tests they describe as subprocesses. The +subprocess invoked is a teuthology-dispatcher command run in supervisor +mode. + +Supervisor mode: Supervise the job run described by its config. Reimage +target machines and invoke teuthology command. Unlock the target machines +at the end of the run. + +standard arguments: + -h, --help show this help message and exit + -v, --verbose be more verbose + -t, --tube TUBE which beanstalk tube to read jobs from + -l, --log-dir LOG_DIR path in which to store logs + --archive-dir ARC_DIR path to archive results in + --supervisor run dispactcher in job supervisor mode + --bin-path BIN_PATH teuthology bin path + --config-fd FD file descriptor of job's config file +""" + +import docopt + +import teuthology.dispatcher + + +def main(): + args = docopt.docopt(__doc__) + teuthology.dispatcher.main(args) diff --git a/setup.py b/setup.py index 119732e86..7c64ba2b6 100644 --- a/setup.py +++ b/setup.py @@ -131,6 +131,7 @@ setup( 'teuthology-describe = scripts.describe:main', 'teuthology-reimage = scripts.reimage:main', 'teuthology-wait = scripts.wait:main', + 'teuthology-dispatcher = scripts.dispatcher:main', ], }, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py new file mode 100644 index 000000000..0e5dfd0e6 --- /dev/null +++ b/teuthology/dispatcher/__init__.py @@ -0,0 +1,159 @@ +import logging +import os +import subprocess +import sys +import yaml +import tempfile + +from datetime import datetime + +from teuthology import setup_log_file, install_except_hook +from teuthology import beanstalk +from teuthology.config import config as teuth_config +from teuthology.repo_utils import fetch_qa_suite, fetch_teuthology +from teuthology.task.internal.lock_machines import lock_machines_helper +from teuthology.dispatcher import supervisor +from teuthology.worker import prep_job + +log = logging.getLogger(__name__) +start_time = datetime.utcnow() +restart_file_path = '/tmp/teuthology-restart-dispatcher' +stop_file_path = '/tmp/teuthology-stop-dispatcher' + + +def sentinel(path): + if not os.path.exists(path): + return False + file_mtime = datetime.utcfromtimestamp(os.path.getmtime(path)) + if file_mtime > start_time: + return True + else: + return False + + +def restart(): + log.info('Restarting...') + args = sys.argv[:] + args.insert(0, sys.executable) + os.execv(sys.executable, args) + + +def stop(): + log.info('Stopping...') + sys.exit(0) + + +def load_config(archive_dir=None): + teuth_config.load() + if archive_dir is not None: + if not os.path.isdir(archive_dir): + sys.exit("{prog}: archive directory must exist: {path}".format( + prog=os.path.basename(sys.argv[0]), + path=archive_dir, + )) + else: + teuth_config.archive_base = archive_dir + + +def main(args): + # run dispatcher in job supervisor mode if --supervisor passed + if args["--supervisor"]: + return supervisor.main(args) + + verbose = args["--verbose"] + tube = args["--tube"] + log_dir = args["--log-dir"] + archive_dir = args["--archive-dir"] + + # setup logging for disoatcher in {log_dir} + loglevel = logging.INFO + if verbose: + loglevel = logging.DEBUG + log.setLevel(loglevel) + log_file_path = os.path.join(log_dir, 'dispatcher.{tube}.{pid}'.format( + pid=os.getpid(), tube=tube)) + setup_log_file(log_file_path) + install_except_hook() + + load_config(archive_dir=archive_dir) + + connection = beanstalk.connect() + beanstalk.watch_tube(connection, tube) + result_proc = None + + if teuth_config.teuthology_path is None: + fetch_teuthology('master') + fetch_qa_suite('master') + + keep_running = True + while keep_running: + # Check to see if we have a teuthology-results process hanging around + # and if so, read its return code so that it can exit. + if result_proc is not None and result_proc.poll() is not None: + log.debug("teuthology-results exited with code: %s", + result_proc.returncode) + result_proc = None + + if sentinel(restart_file_path): + restart() + elif sentinel(stop_file_path): + stop() + + load_config() + + job = connection.reserve(timeout=60) + if job is None: + continue + + # bury the job so it won't be re-run if it fails + job.bury() + job_id = job.jid + log.info('Reserved job %d', job_id) + log.info('Config is: %s', job.body) + job_config = yaml.safe_load(job.body) + job_config['job_id'] = str(job_id) + + if job_config.get('stop_worker'): + keep_running = False + + job_config, teuth_bin_path = prep_job( + job_config, + log_file_path, + archive_dir, + ) + + # lock machines but do not reimage them + if 'roles' in job_config: + job_config = lock_machines(job_config) + + run_args = [ + os.path.join(teuth_bin_path, 'teuthology-dispatcher'), + '--supervisor', + '-v', + '--bin-path', teuth_bin_path, + '--archive-dir', archive_dir, + ] + + with tempfile.NamedTemporaryFile(prefix='teuthology-dispatcher.', + suffix='.tmp', mode='w+t') as tmp: + yaml.safe_dump(data=job_config, stream=tmp) + tmp.flush() + run_args.extend(["--config-fd", str(tmp.fileno())]) + job_proc = subprocess.Popen(run_args, pass_fds=[tmp.fileno()]) + + log.info('Job subprocess PID: %s', job_proc.pid) + + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") + + +def lock_machines(job_config): + fake_ctx = supervisor.create_fake_context(job_config, block=True) + lock_machines_helper(fake_ctx, [len(job_config['roles']), + job_config['machine_type']], reimage=False) + job_config = fake_ctx.config + return job_config diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py new file mode 100644 index 000000000..b16314cb6 --- /dev/null +++ b/teuthology/dispatcher/supervisor.py @@ -0,0 +1,202 @@ +import logging +import os +import subprocess +import tempfile +import time +import yaml + +from datetime import datetime + +from teuthology import report +from teuthology import safepath +from teuthology.config import config as teuth_config +from teuthology.exceptions import SkipJob +from teuthology import setup_log_file, install_except_hook +from teuthology.lock.ops import reimage_many +from teuthology.misc import get_user +from teuthology.config import FakeNamespace +from teuthology.worker import run_with_watchdog, symlink_worker_log + +log = logging.getLogger(__name__) +start_time = datetime.utcnow() +restart_file_path = '/tmp/teuthology-restart-workers' +stop_file_path = '/tmp/teuthology-stop-workers' + + +def main(args): + + verbose = args["--verbose"] + archive_dir = args["--archive-dir"] + teuth_bin_path = args["--bin-path"] + config_fd = int(args["--config-fd"]) + + with open(config_fd, 'r') as config_file: + config_file.seek(0) + job_config = yaml.safe_load(config_file) + + loglevel = logging.INFO + if verbose: + loglevel = logging.DEBUG + log.setLevel(loglevel) + + suite_dir = os.path.join(archive_dir, job_config['name']) + if (not os.path.exists(suite_dir)): + os.mkdir(suite_dir) + log_file_path = os.path.join(suite_dir, 'worker.{job_id}'.format( + job_id=job_config['job_id'])) + setup_log_file(log_file_path) + + install_except_hook() + + # reimage target machines before running the job + if 'targets' in job_config: + reimage_machines(job_config) + + try: + run_job( + job_config, + teuth_bin_path, + archive_dir, + verbose + ) + except SkipJob: + return + + +def run_job(job_config, teuth_bin_path, archive_dir, verbose): + safe_archive = safepath.munge(job_config['name']) + if job_config.get('first_in_suite') or job_config.get('last_in_suite'): + if teuth_config.results_server: + try: + report.try_delete_jobs(job_config['name'], job_config['job_id']) + except Exception as e: + log.warning("Unable to delete job %s, exception occurred: %s", + job_config['job_id'], e) + suite_archive_dir = os.path.join(archive_dir, safe_archive) + safepath.makedirs('/', suite_archive_dir) + args = [ + os.path.join(teuth_bin_path, 'teuthology-results'), + '--archive-dir', suite_archive_dir, + '--name', job_config['name'], + ] + if job_config.get('first_in_suite'): + log.info('Generating memo for %s', job_config['name']) + if job_config.get('seed'): + args.extend(['--seed', job_config['seed']]) + if job_config.get('subset'): + args.extend(['--subset', job_config['subset']]) + else: + log.info('Generating results for %s', job_config['name']) + timeout = job_config.get('results_timeout', + teuth_config.results_timeout) + args.extend(['--timeout', str(timeout)]) + if job_config.get('email'): + args.extend(['--email', job_config['email']]) + # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to + # make sure that it will continue to run if this worker process + # dies (e.g. because of a restart) + result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp) + log.info("teuthology-results PID: %s", result_proc.pid) + return + + log.info('Creating archive dir %s', job_config['archive_path']) + safepath.makedirs('/', job_config['archive_path']) + log.info('Running job %s', job_config['job_id']) + + suite_path = job_config['suite_path'] + arg = [ + os.path.join(teuth_bin_path, 'teuthology'), + ] + # The following is for compatibility with older schedulers, from before we + # started merging the contents of job_config['config'] into job_config + # itself. + if 'config' in job_config: + inner_config = job_config.pop('config') + if not isinstance(inner_config, dict): + log.warn("run_job: job_config['config'] isn't a dict, it's a %s", + str(type(inner_config))) + else: + job_config.update(inner_config) + + if verbose or job_config['verbose']: + arg.append('-v') + + arg.extend([ + '--owner', job_config['owner'], + '--archive', job_config['archive_path'], + '--name', job_config['name'], + ]) + if job_config['description'] is not None: + arg.extend(['--description', job_config['description']]) + arg.append('--') + + with tempfile.NamedTemporaryFile(prefix='teuthology-worker.', + suffix='.tmp', mode='w+t') as tmp: + yaml.safe_dump(data=job_config, stream=tmp) + tmp.flush() + arg.append(tmp.name) + env = os.environ.copy() + python_path = env.get('PYTHONPATH', '') + python_path = ':'.join([suite_path, python_path]).strip(':') + env['PYTHONPATH'] = python_path + log.debug("Running: %s" % ' '.join(arg)) + p = subprocess.Popen(args=arg, env=env) + log.info("Job archive: %s", job_config['archive_path']) + log.info("Job PID: %s", str(p.pid)) + + if teuth_config.results_server: + log.info("Running with watchdog") + try: + run_with_watchdog(p, job_config) + except Exception: + log.exception("run_with_watchdog had an unhandled exception") + raise + else: + log.info("Running without watchdog") + # This sleep() is to give the child time to start up and create the + # archive dir. + time.sleep(5) + symlink_worker_log(job_config['worker_log'], + job_config['archive_path']) + p.wait() + + if p.returncode != 0: + log.error('Child exited with code %d', p.returncode) + else: + log.info('Success!') + + +def reimage_machines(job_config): + # Reimage the targets specified in job config + # and update their keys in config after reimaging + ctx = create_fake_context(job_config) + # change the status during the reimaging process + report.try_push_job_info(ctx.config, dict(status='waiting')) + targets = job_config['targets'] + reimaged = reimage_many(ctx, targets, job_config['machine_type']) + ctx.config['targets'] = reimaged + # change the status to running after the reimaging process + report.try_push_job_info(ctx.config, dict(status='waiting')) + + +def create_fake_context(job_config, block=False): + if job_config['owner'] is None: + job_config['owner'] = get_user() + + if 'os_version' in job_config: + os_version = job_config['os_version'] + else: + os_version = None + + ctx_args = { + 'config': job_config, + 'block': block, + 'owner': job_config['owner'], + 'archive': job_config['archive_path'], + 'machine_type': job_config['machine_type'], + 'os_type': job_config['os_type'], + 'os_version': os_version, + } + + fake_ctx = FakeNamespace(ctx_args) + return fake_ctx diff --git a/teuthology/lock/ops.py b/teuthology/lock/ops.py index 247ccfe66..2d538758d 100644 --- a/teuthology/lock/ops.py +++ b/teuthology/lock/ops.py @@ -52,7 +52,7 @@ def lock_many_openstack(ctx, num, machine_type, user=None, description=None, def lock_many(ctx, num, machine_type, user=None, description=None, - os_type=None, os_version=None, arch=None): + os_type=None, os_version=None, arch=None, reimage=True): if user is None: user = misc.get_user() @@ -128,24 +128,8 @@ def lock_many(ctx, num, machine_type, user=None, description=None, ok_machs = do_update_keys(list(ok_machs.keys()))[1] update_nodes(ok_machs) return ok_machs - elif machine_type in reimage_types: - reimaged = dict() - console_log_conf = dict( - logfile_name='{shortname}_reimage.log', - remotes=[teuthology.orchestra.remote.Remote(machine) - for machine in machines], - ) - with console_log.task( - ctx, console_log_conf): - update_nodes(reimaged, True) - with teuthology.parallel.parallel() as p: - for machine in machines: - p.spawn(teuthology.provision.reimage, ctx, - machine, machine_type) - reimaged[machine] = machines[machine] - reimaged = do_update_keys(list(reimaged.keys()))[1] - update_nodes(reimaged) - return reimaged + elif reimage and machine_type in reimage_types: + return reimage_many(ctx, machines, machine_type) return machines elif response.status_code == 503: log.error('Insufficient nodes available to lock %d %s nodes.', @@ -297,3 +281,31 @@ def push_new_keys(keys_dict, reference): log.error('failed to update %s!', hostname) ret = 1 return ret + + +def reimage(ctx, machines, machine_type): + reimaged = dict() + with teuthology.parallel.parallel() as p: + for machine in machines: + log.info("Start node '%s' reimaging", machine) + update_nodes([machine], True) + p.spawn(teuthology.provision.reimage, ctx, + machine, machine_type) + reimaged[machine] = machines[machine] + log.info("Node '%s' reimaging is complete", machine) + return reimaged + + +def reimage_many(ctx, machines, machine_type): + # Setup log file, reimage machines and update their keys + reimaged = dict() + console_log_conf = dict( + logfile_name='{shortname}_reimage.log', + remotes=[teuthology.orchestra.remote.Remote(machine) + for machine in machines], + ) + with console_log.task(ctx, console_log_conf): + reimaged = reimage(ctx, machines, machine_type) + reimaged = do_update_keys(list(reimaged.keys()))[1] + update_nodes(reimaged) + return reimaged diff --git a/teuthology/task/internal/lock_machines.py b/teuthology/task/internal/lock_machines.py index bfab80f79..5d61f017c 100644 --- a/teuthology/task/internal/lock_machines.py +++ b/teuthology/task/internal/lock_machines.py @@ -23,6 +23,14 @@ def lock_machines(ctx, config): new machines. This is not called if the one has teuthology-locked machines and placed those keys in the Targets section of a yaml file. """ + lock_machines_helper(ctx, config) + try: + yield + finally: + unlock_machines(ctx) + + +def lock_machines_helper(ctx, config, reimage=True): # It's OK for os_type and os_version to be None here. If we're trying # to lock a bare metal machine, we'll take whatever is available. If # we want a vps, defaults will be provided by misc.get_distro and @@ -75,10 +83,11 @@ def lock_machines(ctx, config): try: newly_locked = teuthology.lock.ops.lock_many(ctx, requested, machine_type, ctx.owner, ctx.archive, os_type, - os_version, arch) + os_version, arch, reimage=reimage) except Exception: # Lock failures should map to the 'dead' status instead of 'fail' - set_status(ctx.summary, 'dead') + if 'summary' in ctx: + set_status(ctx.summary, 'dead') raise all_locked.update(newly_locked) log.info( @@ -144,16 +153,16 @@ def lock_machines(ctx, config): ) log.warn('Could not lock enough machines, waiting...') time.sleep(10) - try: - yield - finally: - # If both unlock_on_failure and nuke-on-error are set, don't unlock now - # because we're just going to nuke (and unlock) later. - unlock_on_failure = ( + + +def unlock_machines(ctx): + # If both unlock_on_failure and nuke-on-error are set, don't unlock now + # because we're just going to nuke (and unlock) later. + unlock_on_failure = ( ctx.config.get('unlock_on_failure', False) and not ctx.config.get('nuke-on-error', False) ) - if get_status(ctx.summary) == 'pass' or unlock_on_failure: - log.info('Unlocking machines...') - for machine in ctx.config['targets'].keys(): - teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive) + if get_status(ctx.summary) == 'pass' or unlock_on_failure: + log.info('Unlocking machines...') + for machine in ctx.config['targets'].keys(): + teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive)