From 85c24bda7f67768dcbd6a75a7ac92ea38b88e1e1 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Fri, 8 Jul 2011 11:37:20 -0700 Subject: [PATCH] Add teuthology-schedule and teuthology-worker. schedule puts jobs in a beanstalk queue, worker takes them out and runs them. --- requirements.txt | 1 + setup.py | 3 ++ teuthology/queue.py | 111 ++++++++++++++++++++++++++++++++++++++++++++ teuthology/run.py | 52 +++++++++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 teuthology/queue.py diff --git a/requirements.txt b/requirements.txt index bd811828a5f82..0b442b2cff359 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ PyYAML bunch >=1.0.0 argparse >=1.2.1 httplib2 +beanstalkc >=0.2.0 diff --git a/setup.py b/setup.py index 51fd63e50fc8f..3082de82c0d79 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ setup( 'bunch >=1.0.0', 'argparse >=1.2.1', 'httplib2', + 'beanstalkc >=0.2.0', ], entry_points={ @@ -27,7 +28,9 @@ setup( 'teuthology-nuke = teuthology.run:nuke', 'teuthology-suite = teuthology.suite:main', 'teuthology-ls = teuthology.suite:ls', + 'teuthology-worker = teuthology.queue:worker', 'teuthology-lock = teuthology.lock:main', + 'teuthology-schedule = teuthology.run:schedule', ], }, diff --git a/teuthology/queue.py b/teuthology/queue.py new file mode 100644 index 0000000000000..9793bac7e7662 --- /dev/null +++ b/teuthology/queue.py @@ -0,0 +1,111 @@ +import argparse +import logging +import os +import subprocess +import sys +import tempfile +import yaml + +import beanstalkc + +from teuthology import safepath + +log = logging.getLogger(__name__) + +def connect(ctx): + host = ctx.teuthology_config['queue_host'] + port = ctx.teuthology_config['queue_port'] + return beanstalkc.Connection(host=host, port=port) + +def worker(): + parser = argparse.ArgumentParser(description=""" +Grab jobs from a beanstalk queue and run the teuthology tests they +describe. One job is run at a time. +""") + parser.add_argument( + '-v', '--verbose', + action='store_true', default=None, + help='be more verbose', + ) + parser.add_argument( + '--archive-dir', + metavar='DIR', + help='path under which to archive results', + required=True, + ) + + ctx = parser.parse_args() + + loglevel = logging.INFO + if ctx.verbose: + loglevel = logging.DEBUG + + logging.basicConfig( + level=loglevel, + ) + + if not os.path.isdir(ctx.archive_dir): + sys.exit("{prog}: archive directory must exist: {path}".format( + prog=os.path.basename(sys.argv[0]), + path=ctx.archive_dir, + )) + + from teuthology.misc import read_config + read_config(ctx) + + beanstalk = connect(ctx) + beanstalk.watch('teuthology') + beanstalk.ignore('default') + + while True: + job = beanstalk.reserve(timeout=60) + if job is None: + log.debug('no jobs to run in 60 seconds') + continue + + # bury the job so it won't be re-run if it fails + job.bury() + run_job(job, ctx.archive_dir) + +def run_job(job, archive_dir): + log.info('Running job %d', job.jid) + log.debug('Config is: %s', job.body) + job_config = yaml.safe_load(job.body) + + safe_archive = safepath.munge(job_config['name']) + safepath.makedirs(archive_dir, safe_archive) + archive_path = os.path.join(archive_dir, safe_archive, str(job.jid)) + + arg = [ + os.path.join(os.path.dirname(sys.argv[0]), 'teuthology'), + ] + + if job_config['verbose']: + arg.append('-v') + + arg.extend([ + '--lock', + '--block', + '--owner', job_config['owner'], + '--archive', archive_path, + ]) + if job_config['description'] is not None: + arg.extend(['--description', job_config['description']]) + arg.append('--') + + tmp_fp, tmp_path = tempfile.mkstemp() + try: + os.write(tmp_fp, yaml.safe_dump(job_config['config'])) + arg.append(tmp_path) + subprocess.check_call( + args=arg, + close_fds=True, + ) + except subprocess.CalledProcessError as e: + log.exception(e) + else: + log.info('Success!') + job.delete() + finally: + os.close(tmp_fp) + os.unlink(tmp_path) diff --git a/teuthology/run.py b/teuthology/run.py index f4dcad22d3e44..21a17cf113380 100644 --- a/teuthology/run.py +++ b/teuthology/run.py @@ -196,3 +196,55 @@ def nuke(): ]) log.info('Done.') + +def schedule(): + parser = argparse.ArgumentParser(description='Schedule ceph integration tests') + parser.add_argument( + 'config', + metavar='CONFFILE', + nargs='+', + type=config_file, + action=MergeConfig, + default={}, + help='config file to read', + ) + parser.add_argument( + '--name', + required=True, + help='job name', + ) + parser.add_argument( + '--description', + help='job description', + ) + parser.add_argument( + '--owner', + help='job owner', + ) + parser.add_argument( + '-v', '--verbose', + action='store_true', + default=False, + help='be more verbose', + ) + + ctx = parser.parse_args() + + from teuthology.misc import read_config, get_user + if ctx.owner is None: + ctx.owner = get_user() + read_config(ctx) + + import teuthology.queue + beanstalk = teuthology.queue.connect(ctx) + + beanstalk.use('teuthology') + job = yaml.safe_dump(dict( + config=ctx.config, + name=ctx.name, + description=ctx.description, + owner=ctx.owner, + verbose=ctx.verbose, + )) + jid = beanstalk.put(job, ttr=60*60*24) + print 'Job scheduled with ID {jid}'.format(jid=jid) -- 2.39.5