]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
add teuthology-dispatcher
authorShraddha Agrawal <shraddha.agrawal000@gmail.com>
Sun, 9 Aug 2020 02:22:08 +0000 (07:52 +0530)
committerJosh Durgin <jdurgin@redhat.com>
Sat, 16 Jan 2021 21:36:26 +0000 (16:36 -0500)
This commit does the following:

1. a new cmd teuthology-dispatcher: It watches a queue and takes
job from it, locks required nodes without reimaging them and runs
the job as its suprocess by invoking teuthology-dispacther in
supervisor mode. Supervisor mode reimages the target machines in
the config, and invokes teuthology cmd to run the job.

2. refactors  task/internal/lock_machines.py: doing so enables
locking machines in dispatcher while following DRY.

3. refactors reimaging logic in lock/ops.py: doing so enables
reimaging machines in dispatcher's supervisor mode while following
DRY.

4. adds an argument, reimage, to lock_many in lock/ops.py: enables
optional reimagining of machines depending on the value. Defaults
to True. Used in dispatcher to lock machines without reimaging them.

Signed-off-by: Shraddha Agrawal <shraddha.agrawal000@gmail.com>
scripts/dispatcher.py [new file with mode: 0644]
setup.py
teuthology/dispatcher/__init__.py [new file with mode: 0644]
teuthology/dispatcher/supervisor.py [new file with mode: 0644]
teuthology/lock/ops.py
teuthology/task/internal/lock_machines.py

diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py
new file mode 100644 (file)
index 0000000..395225e
--- /dev/null
@@ -0,0 +1,33 @@
+"""
+usage: teuthology-dispatcher --help
+       teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --config-fd FD --archive-dir ARC_DIR
+       teuthology-dispatcher [-v] --archive-dir ARC_DIR --log-dir LOG_DIR --tube TUBE
+
+Start a dispatcher for the specified tube. Grab jobs from a beanstalk
+queue and run the teuthology tests they describe as subprocesses. The
+subprocess invoked is a teuthology-dispatcher command run in supervisor
+mode.
+
+Supervisor mode: Supervise the job run described by its config. Reimage
+target machines and invoke teuthology command. Unlock the target machines
+at the end of the run.
+
+standard arguments:
+  -h, --help                     show this help message and exit
+  -v, --verbose                  be more verbose
+  -t, --tube TUBE                which beanstalk tube to read jobs from
+  -l, --log-dir LOG_DIR          path in which to store logs
+  --archive-dir ARC_DIR          path to archive results in
+  --supervisor                   run dispactcher in job supervisor mode
+  --bin-path BIN_PATH            teuthology bin path
+  --config-fd FD                 file descriptor of job's config file
+"""
+
+import docopt
+
+import teuthology.dispatcher
+
+
+def main():
+    args = docopt.docopt(__doc__)
+    teuthology.dispatcher.main(args)
index 119732e869d9e08d940907ff0eb448a4759fd680..7c64ba2b62208c4d86823b8d59fd6b204a9f62fd 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -131,6 +131,7 @@ setup(
             'teuthology-describe = scripts.describe:main',
             'teuthology-reimage = scripts.reimage:main',
             'teuthology-wait = scripts.wait:main',
+            'teuthology-dispatcher = scripts.dispatcher:main',
             ],
         },
 
diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py
new file mode 100644 (file)
index 0000000..0e5dfd0
--- /dev/null
@@ -0,0 +1,159 @@
+import logging
+import os
+import subprocess
+import sys
+import yaml
+import tempfile
+
+from datetime import datetime
+
+from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
+from teuthology.config import config as teuth_config
+from teuthology.repo_utils import fetch_qa_suite, fetch_teuthology
+from teuthology.task.internal.lock_machines import lock_machines_helper
+from teuthology.dispatcher import supervisor
+from teuthology.worker import prep_job
+
+log = logging.getLogger(__name__)
+start_time = datetime.utcnow()
+restart_file_path = '/tmp/teuthology-restart-dispatcher'
+stop_file_path = '/tmp/teuthology-stop-dispatcher'
+
+
+def sentinel(path):
+    if not os.path.exists(path):
+        return False
+    file_mtime = datetime.utcfromtimestamp(os.path.getmtime(path))
+    if file_mtime > start_time:
+        return True
+    else:
+        return False
+
+
+def restart():
+    log.info('Restarting...')
+    args = sys.argv[:]
+    args.insert(0, sys.executable)
+    os.execv(sys.executable, args)
+
+
+def stop():
+    log.info('Stopping...')
+    sys.exit(0)
+
+
+def load_config(archive_dir=None):
+    teuth_config.load()
+    if archive_dir is not None:
+        if not os.path.isdir(archive_dir):
+            sys.exit("{prog}: archive directory must exist: {path}".format(
+                prog=os.path.basename(sys.argv[0]),
+                path=archive_dir,
+            ))
+        else:
+            teuth_config.archive_base = archive_dir
+
+
+def main(args):
+    # run dispatcher in job supervisor mode if --supervisor passed
+    if args["--supervisor"]:
+        return supervisor.main(args)
+
+    verbose = args["--verbose"]
+    tube = args["--tube"]
+    log_dir = args["--log-dir"]
+    archive_dir = args["--archive-dir"]
+
+    # setup logging for disoatcher in {log_dir}
+    loglevel = logging.INFO
+    if verbose:
+        loglevel = logging.DEBUG
+    log.setLevel(loglevel)
+    log_file_path = os.path.join(log_dir, 'dispatcher.{tube}.{pid}'.format(
+        pid=os.getpid(), tube=tube))
+    setup_log_file(log_file_path)
+    install_except_hook()
+
+    load_config(archive_dir=archive_dir)
+
+    connection = beanstalk.connect()
+    beanstalk.watch_tube(connection, tube)
+    result_proc = None
+
+    if teuth_config.teuthology_path is None:
+        fetch_teuthology('master')
+    fetch_qa_suite('master')
+
+    keep_running = True
+    while keep_running:
+        # Check to see if we have a teuthology-results process hanging around
+        # and if so, read its return code so that it can exit.
+        if result_proc is not None and result_proc.poll() is not None:
+            log.debug("teuthology-results exited with code: %s",
+                      result_proc.returncode)
+            result_proc = None
+
+        if sentinel(restart_file_path):
+            restart()
+        elif sentinel(stop_file_path):
+            stop()
+
+        load_config()
+
+        job = connection.reserve(timeout=60)
+        if job is None:
+            continue
+
+        # bury the job so it won't be re-run if it fails
+        job.bury()
+        job_id = job.jid
+        log.info('Reserved job %d', job_id)
+        log.info('Config is: %s', job.body)
+        job_config = yaml.safe_load(job.body)
+        job_config['job_id'] = str(job_id)
+
+        if job_config.get('stop_worker'):
+            keep_running = False
+
+        job_config, teuth_bin_path = prep_job(
+            job_config,
+            log_file_path,
+            archive_dir,
+        )
+
+        # lock machines but do not reimage them
+        if 'roles' in job_config:
+            job_config = lock_machines(job_config)
+
+        run_args = [
+            os.path.join(teuth_bin_path, 'teuthology-dispatcher'),
+            '--supervisor',
+            '-v',
+            '--bin-path', teuth_bin_path,
+            '--archive-dir', archive_dir,
+        ]
+
+        with tempfile.NamedTemporaryFile(prefix='teuthology-dispatcher.',
+                                         suffix='.tmp', mode='w+t') as tmp:
+            yaml.safe_dump(data=job_config, stream=tmp)
+            tmp.flush()
+            run_args.extend(["--config-fd", str(tmp.fileno())])
+            job_proc = subprocess.Popen(run_args, pass_fds=[tmp.fileno()])
+
+        log.info('Job subprocess PID: %s', job_proc.pid)
+
+        # This try/except block is to keep the worker from dying when
+        # beanstalkc throws a SocketError
+        try:
+            job.delete()
+        except Exception:
+            log.exception("Saw exception while trying to delete job")
+
+
+def lock_machines(job_config):
+    fake_ctx = supervisor.create_fake_context(job_config, block=True)
+    lock_machines_helper(fake_ctx, [len(job_config['roles']),
+                         job_config['machine_type']], reimage=False)
+    job_config = fake_ctx.config
+    return job_config
diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py
new file mode 100644 (file)
index 0000000..b16314c
--- /dev/null
@@ -0,0 +1,202 @@
+import logging
+import os
+import subprocess
+import tempfile
+import time
+import yaml
+
+from datetime import datetime
+
+from teuthology import report
+from teuthology import safepath
+from teuthology.config import config as teuth_config
+from teuthology.exceptions import SkipJob
+from teuthology import setup_log_file, install_except_hook
+from teuthology.lock.ops import reimage_many
+from teuthology.misc import get_user
+from teuthology.config import FakeNamespace
+from teuthology.worker import run_with_watchdog, symlink_worker_log
+
+log = logging.getLogger(__name__)
+start_time = datetime.utcnow()
+restart_file_path = '/tmp/teuthology-restart-workers'
+stop_file_path = '/tmp/teuthology-stop-workers'
+
+
+def main(args):
+
+    verbose = args["--verbose"]
+    archive_dir = args["--archive-dir"]
+    teuth_bin_path = args["--bin-path"]
+    config_fd = int(args["--config-fd"])
+
+    with open(config_fd, 'r') as config_file:
+        config_file.seek(0)
+        job_config = yaml.safe_load(config_file)
+
+    loglevel = logging.INFO
+    if verbose:
+        loglevel = logging.DEBUG
+    log.setLevel(loglevel)
+
+    suite_dir = os.path.join(archive_dir, job_config['name'])
+    if (not os.path.exists(suite_dir)):
+        os.mkdir(suite_dir)
+    log_file_path = os.path.join(suite_dir, 'worker.{job_id}'.format(
+                                 job_id=job_config['job_id']))
+    setup_log_file(log_file_path)
+
+    install_except_hook()
+
+    # reimage target machines before running the job
+    if 'targets' in job_config:
+        reimage_machines(job_config)
+
+    try:
+        run_job(
+            job_config,
+            teuth_bin_path,
+            archive_dir,
+            verbose
+        )
+    except SkipJob:
+        return
+
+
+def run_job(job_config, teuth_bin_path, archive_dir, verbose):
+    safe_archive = safepath.munge(job_config['name'])
+    if job_config.get('first_in_suite') or job_config.get('last_in_suite'):
+        if teuth_config.results_server:
+            try:
+                report.try_delete_jobs(job_config['name'], job_config['job_id'])
+            except Exception as e:
+                log.warning("Unable to delete job %s, exception occurred: %s",
+                            job_config['job_id'], e)
+        suite_archive_dir = os.path.join(archive_dir, safe_archive)
+        safepath.makedirs('/', suite_archive_dir)
+        args = [
+            os.path.join(teuth_bin_path, 'teuthology-results'),
+            '--archive-dir', suite_archive_dir,
+            '--name', job_config['name'],
+        ]
+        if job_config.get('first_in_suite'):
+            log.info('Generating memo for %s', job_config['name'])
+            if job_config.get('seed'):
+                args.extend(['--seed', job_config['seed']])
+            if job_config.get('subset'):
+                args.extend(['--subset', job_config['subset']])
+        else:
+            log.info('Generating results for %s', job_config['name'])
+            timeout = job_config.get('results_timeout',
+                                     teuth_config.results_timeout)
+            args.extend(['--timeout', str(timeout)])
+            if job_config.get('email'):
+                args.extend(['--email', job_config['email']])
+        # Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
+        # make sure that it will continue to run if this worker process
+        # dies (e.g. because of a restart)
+        result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
+        log.info("teuthology-results PID: %s", result_proc.pid)
+        return
+
+    log.info('Creating archive dir %s', job_config['archive_path'])
+    safepath.makedirs('/', job_config['archive_path'])
+    log.info('Running job %s', job_config['job_id'])
+
+    suite_path = job_config['suite_path']
+    arg = [
+        os.path.join(teuth_bin_path, 'teuthology'),
+    ]
+    # The following is for compatibility with older schedulers, from before we
+    # started merging the contents of job_config['config'] into job_config
+    # itself.
+    if 'config' in job_config:
+        inner_config = job_config.pop('config')
+        if not isinstance(inner_config, dict):
+            log.warn("run_job: job_config['config'] isn't a dict, it's a %s",
+                     str(type(inner_config)))
+        else:
+            job_config.update(inner_config)
+
+    if verbose or job_config['verbose']:
+        arg.append('-v')
+
+    arg.extend([
+        '--owner', job_config['owner'],
+        '--archive', job_config['archive_path'],
+        '--name', job_config['name'],
+    ])
+    if job_config['description'] is not None:
+        arg.extend(['--description', job_config['description']])
+    arg.append('--')
+
+    with tempfile.NamedTemporaryFile(prefix='teuthology-worker.',
+                                     suffix='.tmp', mode='w+t') as tmp:
+        yaml.safe_dump(data=job_config, stream=tmp)
+        tmp.flush()
+        arg.append(tmp.name)
+        env = os.environ.copy()
+        python_path = env.get('PYTHONPATH', '')
+        python_path = ':'.join([suite_path, python_path]).strip(':')
+        env['PYTHONPATH'] = python_path
+        log.debug("Running: %s" % ' '.join(arg))
+        p = subprocess.Popen(args=arg, env=env)
+        log.info("Job archive: %s", job_config['archive_path'])
+        log.info("Job PID: %s", str(p.pid))
+
+        if teuth_config.results_server:
+            log.info("Running with watchdog")
+            try:
+                run_with_watchdog(p, job_config)
+            except Exception:
+                log.exception("run_with_watchdog had an unhandled exception")
+                raise
+        else:
+            log.info("Running without watchdog")
+            # This sleep() is to give the child time to start up and create the
+            # archive dir.
+            time.sleep(5)
+            symlink_worker_log(job_config['worker_log'],
+                               job_config['archive_path'])
+            p.wait()
+
+        if p.returncode != 0:
+            log.error('Child exited with code %d', p.returncode)
+        else:
+            log.info('Success!')
+
+
+def reimage_machines(job_config):
+    # Reimage the targets specified in job config
+    # and update their keys in config after reimaging
+    ctx = create_fake_context(job_config)
+    # change the status during the reimaging process
+    report.try_push_job_info(ctx.config, dict(status='waiting'))
+    targets = job_config['targets']
+    reimaged = reimage_many(ctx, targets, job_config['machine_type'])
+    ctx.config['targets'] = reimaged
+    # change the status to running after the reimaging process
+    report.try_push_job_info(ctx.config, dict(status='waiting'))
+
+
+def create_fake_context(job_config, block=False):
+    if job_config['owner'] is None:
+        job_config['owner'] = get_user()
+
+    if 'os_version' in job_config:
+        os_version = job_config['os_version']
+    else:
+        os_version = None
+
+    ctx_args = {
+        'config': job_config,
+        'block': block,
+        'owner': job_config['owner'],
+        'archive': job_config['archive_path'],
+        'machine_type': job_config['machine_type'],
+        'os_type': job_config['os_type'],
+        'os_version': os_version,
+    }
+
+    fake_ctx = FakeNamespace(ctx_args)
+    return fake_ctx
index 247ccfe667e206deb990d9400bfcbfc947c3a9f0..2d538758dfaced955178fe0789d6cbf342c36ef0 100644 (file)
@@ -52,7 +52,7 @@ def lock_many_openstack(ctx, num, machine_type, user=None, description=None,
 
 
 def lock_many(ctx, num, machine_type, user=None, description=None,
-              os_type=None, os_version=None, arch=None):
+              os_type=None, os_version=None, arch=None, reimage=True):
     if user is None:
         user = misc.get_user()
 
@@ -128,24 +128,8 @@ def lock_many(ctx, num, machine_type, user=None, description=None,
                     ok_machs = do_update_keys(list(ok_machs.keys()))[1]
                 update_nodes(ok_machs)
                 return ok_machs
-            elif machine_type in reimage_types:
-                reimaged = dict()
-                console_log_conf = dict(
-                    logfile_name='{shortname}_reimage.log',
-                    remotes=[teuthology.orchestra.remote.Remote(machine)
-                             for machine in machines],
-                )
-                with console_log.task(
-                        ctx, console_log_conf):
-                    update_nodes(reimaged, True)
-                    with teuthology.parallel.parallel() as p:
-                        for machine in machines:
-                            p.spawn(teuthology.provision.reimage, ctx,
-                                    machine, machine_type)
-                            reimaged[machine] = machines[machine]
-                reimaged = do_update_keys(list(reimaged.keys()))[1]
-                update_nodes(reimaged)
-                return reimaged
+            elif reimage and machine_type in reimage_types:
+                return reimage_many(ctx, machines, machine_type)
             return machines
         elif response.status_code == 503:
             log.error('Insufficient nodes available to lock %d %s nodes.',
@@ -297,3 +281,31 @@ def push_new_keys(keys_dict, reference):
                 log.error('failed to update %s!', hostname)
                 ret = 1
     return ret
+
+
+def reimage(ctx, machines, machine_type):
+    reimaged = dict()
+    with teuthology.parallel.parallel() as p:
+        for machine in machines:
+            log.info("Start node '%s' reimaging", machine)
+            update_nodes([machine], True)
+            p.spawn(teuthology.provision.reimage, ctx,
+                    machine, machine_type)
+            reimaged[machine] = machines[machine]
+            log.info("Node '%s' reimaging is complete", machine)
+    return reimaged
+
+
+def reimage_many(ctx, machines, machine_type):
+    # Setup log file, reimage machines and update their keys
+    reimaged = dict()
+    console_log_conf = dict(
+        logfile_name='{shortname}_reimage.log',
+        remotes=[teuthology.orchestra.remote.Remote(machine)
+                 for machine in machines],
+    )
+    with console_log.task(ctx, console_log_conf):
+        reimaged = reimage(ctx, machines, machine_type)
+    reimaged = do_update_keys(list(reimaged.keys()))[1]
+    update_nodes(reimaged)
+    return reimaged
index bfab80f79ef7fa6a5310505f76dcbfe233f78d57..5d61f017c45bbf9c790eacf9f9308609f2423578 100644 (file)
@@ -23,6 +23,14 @@ def lock_machines(ctx, config):
     new machines.  This is not called if the one has teuthology-locked
     machines and placed those keys in the Targets section of a yaml file.
     """
+    lock_machines_helper(ctx, config)
+    try:
+        yield
+    finally:
+        unlock_machines(ctx)
+
+
+def lock_machines_helper(ctx, config, reimage=True):
     # It's OK for os_type and os_version to be None here.  If we're trying
     # to lock a bare metal machine, we'll take whatever is available.  If
     # we want a vps, defaults will be provided by misc.get_distro and
@@ -75,10 +83,11 @@ def lock_machines(ctx, config):
         try:
             newly_locked = teuthology.lock.ops.lock_many(ctx, requested, machine_type,
                                                          ctx.owner, ctx.archive, os_type,
-                                                         os_version, arch)
+                                                         os_version, arch, reimage=reimage)
         except Exception:
             # Lock failures should map to the 'dead' status instead of 'fail'
-            set_status(ctx.summary, 'dead')
+            if 'summary' in ctx:
+                set_status(ctx.summary, 'dead')
             raise
         all_locked.update(newly_locked)
         log.info(
@@ -144,16 +153,16 @@ def lock_machines(ctx, config):
         )
         log.warn('Could not lock enough machines, waiting...')
         time.sleep(10)
-    try:
-        yield
-    finally:
-        # If both unlock_on_failure and nuke-on-error are set, don't unlock now
-        # because we're just going to nuke (and unlock) later.
-        unlock_on_failure = (
+
+
+def unlock_machines(ctx):
+    # If both unlock_on_failure and nuke-on-error are set, don't unlock now
+    # because we're just going to nuke (and unlock) later.
+    unlock_on_failure = (
             ctx.config.get('unlock_on_failure', False)
             and not ctx.config.get('nuke-on-error', False)
         )
-        if get_status(ctx.summary) == 'pass' or unlock_on_failure:
-            log.info('Unlocking machines...')
-            for machine in ctx.config['targets'].keys():
-                teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive)
+    if get_status(ctx.summary) == 'pass' or unlock_on_failure:
+        log.info('Unlocking machines...')
+        for machine in ctx.config['targets'].keys():
+            teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive)