--- /dev/null
+"""
+usage: teuthology-dispatcher --help
+ teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --config-fd FD --archive-dir ARC_DIR
+ teuthology-dispatcher [-v] --archive-dir ARC_DIR --log-dir LOG_DIR --tube TUBE
+
+Start a dispatcher for the specified tube. Grab jobs from a beanstalk
+queue and run the teuthology tests they describe as subprocesses. The
+subprocess invoked is a teuthology-dispatcher command run in supervisor
+mode.
+
+Supervisor mode: Supervise the job run described by its config. Reimage
+target machines and invoke teuthology command. Unlock the target machines
+at the end of the run.
+
+standard arguments:
+ -h, --help show this help message and exit
+ -v, --verbose be more verbose
+ -t, --tube TUBE which beanstalk tube to read jobs from
+ -l, --log-dir LOG_DIR path in which to store logs
+ --archive-dir ARC_DIR path to archive results in
+ --supervisor run dispactcher in job supervisor mode
+ --bin-path BIN_PATH teuthology bin path
+ --config-fd FD file descriptor of job's config file
+"""
+
+import docopt
+
+import teuthology.dispatcher
+
+
+def main():
+ args = docopt.docopt(__doc__)
+ teuthology.dispatcher.main(args)
'teuthology-describe = scripts.describe:main',
'teuthology-reimage = scripts.reimage:main',
'teuthology-wait = scripts.wait:main',
+ 'teuthology-dispatcher = scripts.dispatcher:main',
],
},
--- /dev/null
+import logging
+import os
+import subprocess
+import sys
+import yaml
+import tempfile
+
+from datetime import datetime
+
+from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
+from teuthology.config import config as teuth_config
+from teuthology.repo_utils import fetch_qa_suite, fetch_teuthology
+from teuthology.task.internal.lock_machines import lock_machines_helper
+from teuthology.dispatcher import supervisor
+from teuthology.worker import prep_job
+
+log = logging.getLogger(__name__)
+start_time = datetime.utcnow()
+restart_file_path = '/tmp/teuthology-restart-dispatcher'
+stop_file_path = '/tmp/teuthology-stop-dispatcher'
+
+
+def sentinel(path):
+ if not os.path.exists(path):
+ return False
+ file_mtime = datetime.utcfromtimestamp(os.path.getmtime(path))
+ if file_mtime > start_time:
+ return True
+ else:
+ return False
+
+
+def restart():
+ log.info('Restarting...')
+ args = sys.argv[:]
+ args.insert(0, sys.executable)
+ os.execv(sys.executable, args)
+
+
+def stop():
+ log.info('Stopping...')
+ sys.exit(0)
+
+
+def load_config(archive_dir=None):
+ teuth_config.load()
+ if archive_dir is not None:
+ if not os.path.isdir(archive_dir):
+ sys.exit("{prog}: archive directory must exist: {path}".format(
+ prog=os.path.basename(sys.argv[0]),
+ path=archive_dir,
+ ))
+ else:
+ teuth_config.archive_base = archive_dir
+
+
+def main(args):
+ # run dispatcher in job supervisor mode if --supervisor passed
+ if args["--supervisor"]:
+ return supervisor.main(args)
+
+ verbose = args["--verbose"]
+ tube = args["--tube"]
+ log_dir = args["--log-dir"]
+ archive_dir = args["--archive-dir"]
+
+ # setup logging for disoatcher in {log_dir}
+ loglevel = logging.INFO
+ if verbose:
+ loglevel = logging.DEBUG
+ log.setLevel(loglevel)
+ log_file_path = os.path.join(log_dir, 'dispatcher.{tube}.{pid}'.format(
+ pid=os.getpid(), tube=tube))
+ setup_log_file(log_file_path)
+ install_except_hook()
+
+ load_config(archive_dir=archive_dir)
+
+ connection = beanstalk.connect()
+ beanstalk.watch_tube(connection, tube)
+ result_proc = None
+
+ if teuth_config.teuthology_path is None:
+ fetch_teuthology('master')
+ fetch_qa_suite('master')
+
+ keep_running = True
+ while keep_running:
+ # Check to see if we have a teuthology-results process hanging around
+ # and if so, read its return code so that it can exit.
+ if result_proc is not None and result_proc.poll() is not None:
+ log.debug("teuthology-results exited with code: %s",
+ result_proc.returncode)
+ result_proc = None
+
+ if sentinel(restart_file_path):
+ restart()
+ elif sentinel(stop_file_path):
+ stop()
+
+ load_config()
+
+ job = connection.reserve(timeout=60)
+ if job is None:
+ continue
+
+ # bury the job so it won't be re-run if it fails
+ job.bury()
+ job_id = job.jid
+ log.info('Reserved job %d', job_id)
+ log.info('Config is: %s', job.body)
+ job_config = yaml.safe_load(job.body)
+ job_config['job_id'] = str(job_id)
+
+ if job_config.get('stop_worker'):
+ keep_running = False
+
+ job_config, teuth_bin_path = prep_job(
+ job_config,
+ log_file_path,
+ archive_dir,
+ )
+
+ # lock machines but do not reimage them
+ if 'roles' in job_config:
+ job_config = lock_machines(job_config)
+
+ run_args = [
+ os.path.join(teuth_bin_path, 'teuthology-dispatcher'),
+ '--supervisor',
+ '-v',
+ '--bin-path', teuth_bin_path,
+ '--archive-dir', archive_dir,
+ ]
+
+ with tempfile.NamedTemporaryFile(prefix='teuthology-dispatcher.',
+ suffix='.tmp', mode='w+t') as tmp:
+ yaml.safe_dump(data=job_config, stream=tmp)
+ tmp.flush()
+ run_args.extend(["--config-fd", str(tmp.fileno())])
+ job_proc = subprocess.Popen(run_args, pass_fds=[tmp.fileno()])
+
+ log.info('Job subprocess PID: %s', job_proc.pid)
+
+ # This try/except block is to keep the worker from dying when
+ # beanstalkc throws a SocketError
+ try:
+ job.delete()
+ except Exception:
+ log.exception("Saw exception while trying to delete job")
+
+
+def lock_machines(job_config):
+ fake_ctx = supervisor.create_fake_context(job_config, block=True)
+ lock_machines_helper(fake_ctx, [len(job_config['roles']),
+ job_config['machine_type']], reimage=False)
+ job_config = fake_ctx.config
+ return job_config
--- /dev/null
+import logging
+import os
+import subprocess
+import tempfile
+import time
+import yaml
+
+from datetime import datetime
+
+from teuthology import report
+from teuthology import safepath
+from teuthology.config import config as teuth_config
+from teuthology.exceptions import SkipJob
+from teuthology import setup_log_file, install_except_hook
+from teuthology.lock.ops import reimage_many
+from teuthology.misc import get_user
+from teuthology.config import FakeNamespace
+from teuthology.worker import run_with_watchdog, symlink_worker_log
+
+log = logging.getLogger(__name__)
+start_time = datetime.utcnow()
+restart_file_path = '/tmp/teuthology-restart-workers'
+stop_file_path = '/tmp/teuthology-stop-workers'
+
+
+def main(args):
+
+ verbose = args["--verbose"]
+ archive_dir = args["--archive-dir"]
+ teuth_bin_path = args["--bin-path"]
+ config_fd = int(args["--config-fd"])
+
+ with open(config_fd, 'r') as config_file:
+ config_file.seek(0)
+ job_config = yaml.safe_load(config_file)
+
+ loglevel = logging.INFO
+ if verbose:
+ loglevel = logging.DEBUG
+ log.setLevel(loglevel)
+
+ suite_dir = os.path.join(archive_dir, job_config['name'])
+ if (not os.path.exists(suite_dir)):
+ os.mkdir(suite_dir)
+ log_file_path = os.path.join(suite_dir, 'worker.{job_id}'.format(
+ job_id=job_config['job_id']))
+ setup_log_file(log_file_path)
+
+ install_except_hook()
+
+ # reimage target machines before running the job
+ if 'targets' in job_config:
+ reimage_machines(job_config)
+
+ try:
+ run_job(
+ job_config,
+ teuth_bin_path,
+ archive_dir,
+ verbose
+ )
+ except SkipJob:
+ return
+
+
+def run_job(job_config, teuth_bin_path, archive_dir, verbose):
+ safe_archive = safepath.munge(job_config['name'])
+ if job_config.get('first_in_suite') or job_config.get('last_in_suite'):
+ if teuth_config.results_server:
+ try:
+ report.try_delete_jobs(job_config['name'], job_config['job_id'])
+ except Exception as e:
+ log.warning("Unable to delete job %s, exception occurred: %s",
+ job_config['job_id'], e)
+ suite_archive_dir = os.path.join(archive_dir, safe_archive)
+ safepath.makedirs('/', suite_archive_dir)
+ args = [
+ os.path.join(teuth_bin_path, 'teuthology-results'),
+ '--archive-dir', suite_archive_dir,
+ '--name', job_config['name'],
+ ]
+ if job_config.get('first_in_suite'):
+ log.info('Generating memo for %s', job_config['name'])
+ if job_config.get('seed'):
+ args.extend(['--seed', job_config['seed']])
+ if job_config.get('subset'):
+ args.extend(['--subset', job_config['subset']])
+ else:
+ log.info('Generating results for %s', job_config['name'])
+ timeout = job_config.get('results_timeout',
+ teuth_config.results_timeout)
+ args.extend(['--timeout', str(timeout)])
+ if job_config.get('email'):
+ args.extend(['--email', job_config['email']])
+ # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
+ # make sure that it will continue to run if this worker process
+ # dies (e.g. because of a restart)
+ result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
+ log.info("teuthology-results PID: %s", result_proc.pid)
+ return
+
+ log.info('Creating archive dir %s', job_config['archive_path'])
+ safepath.makedirs('/', job_config['archive_path'])
+ log.info('Running job %s', job_config['job_id'])
+
+ suite_path = job_config['suite_path']
+ arg = [
+ os.path.join(teuth_bin_path, 'teuthology'),
+ ]
+ # The following is for compatibility with older schedulers, from before we
+ # started merging the contents of job_config['config'] into job_config
+ # itself.
+ if 'config' in job_config:
+ inner_config = job_config.pop('config')
+ if not isinstance(inner_config, dict):
+ log.warn("run_job: job_config['config'] isn't a dict, it's a %s",
+ str(type(inner_config)))
+ else:
+ job_config.update(inner_config)
+
+ if verbose or job_config['verbose']:
+ arg.append('-v')
+
+ arg.extend([
+ '--owner', job_config['owner'],
+ '--archive', job_config['archive_path'],
+ '--name', job_config['name'],
+ ])
+ if job_config['description'] is not None:
+ arg.extend(['--description', job_config['description']])
+ arg.append('--')
+
+ with tempfile.NamedTemporaryFile(prefix='teuthology-worker.',
+ suffix='.tmp', mode='w+t') as tmp:
+ yaml.safe_dump(data=job_config, stream=tmp)
+ tmp.flush()
+ arg.append(tmp.name)
+ env = os.environ.copy()
+ python_path = env.get('PYTHONPATH', '')
+ python_path = ':'.join([suite_path, python_path]).strip(':')
+ env['PYTHONPATH'] = python_path
+ log.debug("Running: %s" % ' '.join(arg))
+ p = subprocess.Popen(args=arg, env=env)
+ log.info("Job archive: %s", job_config['archive_path'])
+ log.info("Job PID: %s", str(p.pid))
+
+ if teuth_config.results_server:
+ log.info("Running with watchdog")
+ try:
+ run_with_watchdog(p, job_config)
+ except Exception:
+ log.exception("run_with_watchdog had an unhandled exception")
+ raise
+ else:
+ log.info("Running without watchdog")
+ # This sleep() is to give the child time to start up and create the
+ # archive dir.
+ time.sleep(5)
+ symlink_worker_log(job_config['worker_log'],
+ job_config['archive_path'])
+ p.wait()
+
+ if p.returncode != 0:
+ log.error('Child exited with code %d', p.returncode)
+ else:
+ log.info('Success!')
+
+
+def reimage_machines(job_config):
+ # Reimage the targets specified in job config
+ # and update their keys in config after reimaging
+ ctx = create_fake_context(job_config)
+ # change the status during the reimaging process
+ report.try_push_job_info(ctx.config, dict(status='waiting'))
+ targets = job_config['targets']
+ reimaged = reimage_many(ctx, targets, job_config['machine_type'])
+ ctx.config['targets'] = reimaged
+ # change the status to running after the reimaging process
+ report.try_push_job_info(ctx.config, dict(status='waiting'))
+
+
+def create_fake_context(job_config, block=False):
+ if job_config['owner'] is None:
+ job_config['owner'] = get_user()
+
+ if 'os_version' in job_config:
+ os_version = job_config['os_version']
+ else:
+ os_version = None
+
+ ctx_args = {
+ 'config': job_config,
+ 'block': block,
+ 'owner': job_config['owner'],
+ 'archive': job_config['archive_path'],
+ 'machine_type': job_config['machine_type'],
+ 'os_type': job_config['os_type'],
+ 'os_version': os_version,
+ }
+
+ fake_ctx = FakeNamespace(ctx_args)
+ return fake_ctx
def lock_many(ctx, num, machine_type, user=None, description=None,
- os_type=None, os_version=None, arch=None):
+ os_type=None, os_version=None, arch=None, reimage=True):
if user is None:
user = misc.get_user()
ok_machs = do_update_keys(list(ok_machs.keys()))[1]
update_nodes(ok_machs)
return ok_machs
- elif machine_type in reimage_types:
- reimaged = dict()
- console_log_conf = dict(
- logfile_name='{shortname}_reimage.log',
- remotes=[teuthology.orchestra.remote.Remote(machine)
- for machine in machines],
- )
- with console_log.task(
- ctx, console_log_conf):
- update_nodes(reimaged, True)
- with teuthology.parallel.parallel() as p:
- for machine in machines:
- p.spawn(teuthology.provision.reimage, ctx,
- machine, machine_type)
- reimaged[machine] = machines[machine]
- reimaged = do_update_keys(list(reimaged.keys()))[1]
- update_nodes(reimaged)
- return reimaged
+ elif reimage and machine_type in reimage_types:
+ return reimage_many(ctx, machines, machine_type)
return machines
elif response.status_code == 503:
log.error('Insufficient nodes available to lock %d %s nodes.',
log.error('failed to update %s!', hostname)
ret = 1
return ret
+
+
+def reimage(ctx, machines, machine_type):
+ reimaged = dict()
+ with teuthology.parallel.parallel() as p:
+ for machine in machines:
+ log.info("Start node '%s' reimaging", machine)
+ update_nodes([machine], True)
+ p.spawn(teuthology.provision.reimage, ctx,
+ machine, machine_type)
+ reimaged[machine] = machines[machine]
+ log.info("Node '%s' reimaging is complete", machine)
+ return reimaged
+
+
+def reimage_many(ctx, machines, machine_type):
+ # Setup log file, reimage machines and update their keys
+ reimaged = dict()
+ console_log_conf = dict(
+ logfile_name='{shortname}_reimage.log',
+ remotes=[teuthology.orchestra.remote.Remote(machine)
+ for machine in machines],
+ )
+ with console_log.task(ctx, console_log_conf):
+ reimaged = reimage(ctx, machines, machine_type)
+ reimaged = do_update_keys(list(reimaged.keys()))[1]
+ update_nodes(reimaged)
+ return reimaged
new machines. This is not called if the one has teuthology-locked
machines and placed those keys in the Targets section of a yaml file.
"""
+ lock_machines_helper(ctx, config)
+ try:
+ yield
+ finally:
+ unlock_machines(ctx)
+
+
+def lock_machines_helper(ctx, config, reimage=True):
# It's OK for os_type and os_version to be None here. If we're trying
# to lock a bare metal machine, we'll take whatever is available. If
# we want a vps, defaults will be provided by misc.get_distro and
try:
newly_locked = teuthology.lock.ops.lock_many(ctx, requested, machine_type,
ctx.owner, ctx.archive, os_type,
- os_version, arch)
+ os_version, arch, reimage=reimage)
except Exception:
# Lock failures should map to the 'dead' status instead of 'fail'
- set_status(ctx.summary, 'dead')
+ if 'summary' in ctx:
+ set_status(ctx.summary, 'dead')
raise
all_locked.update(newly_locked)
log.info(
)
log.warn('Could not lock enough machines, waiting...')
time.sleep(10)
- try:
- yield
- finally:
- # If both unlock_on_failure and nuke-on-error are set, don't unlock now
- # because we're just going to nuke (and unlock) later.
- unlock_on_failure = (
+
+
+def unlock_machines(ctx):
+ # If both unlock_on_failure and nuke-on-error are set, don't unlock now
+ # because we're just going to nuke (and unlock) later.
+ unlock_on_failure = (
ctx.config.get('unlock_on_failure', False)
and not ctx.config.get('nuke-on-error', False)
)
- if get_status(ctx.summary) == 'pass' or unlock_on_failure:
- log.info('Unlocking machines...')
- for machine in ctx.config['targets'].keys():
- teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive)
+ if get_status(ctx.summary) == 'pass' or unlock_on_failure:
+ log.info('Unlocking machines...')
+ for machine in ctx.config['targets'].keys():
+ teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive)