From a2d0abd562a8c99d5cfb005750f45cfb010d079d Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 4 Oct 2021 19:24:41 +0530 Subject: [PATCH] Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles With the use of the --queue-backend argument the user can specify which backend(paddles/beanstalk) they would like to use for maintaining the teuthology Jobs queue. In order to avoid overlapping Job IDs, when a job is being scheduled in beanstalk it is also written to paddles which returns a unique ID. This is the ID teuthology will treat as the Job ID throughout the run of the job. To differentiate between the 2 queue backends, the teuthology-queue command has been split into teuthology-paddles-queue command and teuthology-beanstalk-queue command. Signed-off-by: Aishwarya Mathuria --- scripts/beanstalk_queue.py | 35 ++++ scripts/dispatcher.py | 5 +- scripts/{queue.py => paddles_queue.py} | 18 +-- teuthology/beanstalk.py | 214 +++++++++++++++++++++++++ teuthology/config.py | 1 + teuthology/dispatcher/__init__.py | 55 +++++-- teuthology/dispatcher/supervisor.py | 6 + teuthology/kill.py | 42 ++++- teuthology/orchestra/run.py | 1 - teuthology/report.py | 67 ++++---- teuthology/schedule.py | 55 +++++-- teuthology/test/test_dispatcher.py | 3 +- teuthology/test/test_worker.py | 129 +++++++++------ teuthology/worker.py | 30 ++-- 14 files changed, 528 insertions(+), 133 deletions(-) create mode 100644 scripts/beanstalk_queue.py rename scripts/{queue.py => paddles_queue.py} (69%) create mode 100644 teuthology/beanstalk.py diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py new file mode 100644 index 0000000000..88a8242847 --- /dev/null +++ b/scripts/beanstalk_queue.py @@ -0,0 +1,35 @@ +import docopt + +import teuthology.config +import teuthology.beanstalk + +doc = """ +usage: teuthology-beanstalk-queue -h + teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-beanstalk-queue [-r] -m MACHINE_TYPE + teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN + teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE] +List Jobs in queue. +If -D is passed, then jobs with PATTERN in the job name are deleted from the +queue. +Arguments: + -m, --machine_type MACHINE_TYPE [default: multi] + Which machine type queue to work on. +optional arguments: + -h, --help Show this help message and exit + -D, --delete PATTERN Delete Jobs with PATTERN in their name + -d, --description Show job descriptions + -r, --runs Only show run names + -f, --full Print the entire job config. Use with caution. + -s, --status Prints the status of the queue + -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 + will unpause. If -m is passed, pause that queue, + otherwise pause all queues. +""" + + +def main(): + + args = docopt.docopt(doc) + print(args) + teuthology.beanstalk.main(args) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 8a67587ed8..5e64b382d8 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,9 +1,9 @@ """ usage: teuthology-dispatcher --help teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR - teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE + teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND -Start a dispatcher for the specified machine type. Grab jobs from a paddles +Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is a teuthology-dispatcher command run in supervisor mode. @@ -22,6 +22,7 @@ standard arguments: --bin-path BIN_PATH teuthology bin path --job-config CONFIG file descriptor of job's config file --exit-on-empty-queue if the queue is empty, exit + --queue-backend BACKEND choose between paddles and beanstalk """ import docopt diff --git a/scripts/queue.py b/scripts/paddles_queue.py similarity index 69% rename from scripts/queue.py rename to scripts/paddles_queue.py index a07598a92f..3c69d772e6 100644 --- a/scripts/queue.py +++ b/scripts/paddles_queue.py @@ -4,14 +4,14 @@ import teuthology.config import teuthology.paddles_queue doc = """ -usage: teuthology-queue -h - teuthology-queue -s -m MACHINE_TYPE - teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-queue [-r] -m MACHINE_TYPE -U USER - teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-queue -u -m MACHINE_TYPE -U USER +usage: teuthology-paddles-queue -h + teuthology-paddles-queue -s -m MACHINE_TYPE + teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] + teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER + teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -u -m MACHINE_TYPE -U USER List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -36,7 +36,7 @@ optional arguments: -P, --priority PRIORITY Change priority of queued jobs -U, --user USER User who owns the jobs - -R, --run_name RUN_NAME + -R, --run-name RUN_NAME Used to change priority of all jobs in the run. """ diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py new file mode 100644 index 0000000000..a1165becca --- /dev/null +++ b/teuthology/beanstalk.py @@ -0,0 +1,214 @@ +import beanstalkc +import yaml +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology.config import config +from teuthology import report + +log = logging.getLogger(__name__) + + +def connect(): + host = config.queue_host + port = config.queue_port + if host is None or port is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.teuthology_yaml)) + return beanstalkc.Connection(host=host, port=port) + + +def watch_tube(connection, tube_name): + """ + Watch a given tube, potentially correcting to 'multi' if necessary. Returns + the tube_name that was actually used. + """ + if ',' in tube_name: + log.debug("Correcting tube name to 'multi'") + tube_name = 'multi' + connection.watch(tube_name) + connection.ignore('default') + return tube_name + + +def walk_jobs(connection, tube_name, processor, pattern=None): + """ + def callback(jobs_dict) + """ + log.info("Checking Beanstalk Queue...") + job_count = connection.stats_tube(tube_name)['current-jobs-ready'] + if job_count == 0: + log.info('No jobs in Beanstalk Queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = connection.reserve(timeout=timeout) + if job is None or job.body is None: + continue + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + if pattern is not None and pattern not in job_name: + continue + processor.add_job(job_id, job_config, job) + end_progress() + processor.complete() + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + job_obj = self.jobs[job_id].get('job_obj') + if job_obj: + job_obj.delete() + report.try_delete_jobs(job_name, job_id) + + +def pause_tube(connection, tube, duration): + duration = int(duration) + if not tube: + tubes = sorted(connection.tubes()) + else: + tubes = [tube] + + prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" + templ = prefix + ": {tubes}" + log.info(templ.format(dur=duration, tubes=tubes)) + for tube in tubes: + connection.pause_tube(tube, duration) + + +def stats_tube(connection, tube): + stats = connection.stats_tube(tube) + result = dict( + name=tube, + count=stats['current-jobs-ready'], + paused=(stats['pause'] != 0), + ) + return result + + +def main(args): + machine_type = args['--machine_type'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + try: + connection = connect() + if machine_type and not pause_duration: + # watch_tube needs to be run before we inspect individual jobs; + # it is not needed for pausing tubes + watch_tube(connection, machine_type) + if status: + print(stats_tube(connection, machine_type)) + elif pause_duration: + pause_tube(connection, machine_type, pause_duration) + elif delete: + walk_jobs(connection, machine_type, + JobDeleter(delete)) + elif runs: + walk_jobs(connection, machine_type, + RunPrinter()) + else: + walk_jobs(connection, machine_type, + JobPrinter(show_desc=show_desc, full=full)) + except KeyboardInterrupt: + log.info("Interrupted.") + finally: + connection.close() diff --git a/teuthology/config.py b/teuthology/config.py index 6da6cdd7f1..b42ff48d1a 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -143,6 +143,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, + 'backend': 'paddles', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 603ec1ce72..f80818cf14 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -8,6 +8,7 @@ from datetime import datetime from time import sleep from teuthology import setup_log_file, install_except_hook +from teuthology import beanstalk from teuthology import report from teuthology.config import config as teuth_config from teuthology.exceptions import SkipJob @@ -57,6 +58,8 @@ def load_config(archive_dir=None): def clean_config(config): result = {} for key in config: + if key == 'status': + continue if config[key] is not None: result[key] = config[key] return result @@ -70,7 +73,11 @@ def main(args): machine_type = args["--machine-type"] log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] +<<<<<<< HEAD exit_on_empty_queue = args["--exit-on-empty-queue"] +======= + backend = args['--queue-backend'] +>>>>>>> 79c4d9bf... Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles if archive_dir is None: archive_dir = teuth_config.archive_base @@ -88,6 +95,10 @@ def main(args): load_config(archive_dir=archive_dir) + if backend == 'beanstalk': + connection = beanstalk.connect() + beanstalk.watch_tube(connection, machine_type) + result_proc = None if teuth_config.teuthology_path is None: @@ -111,19 +122,26 @@ def main(args): load_config() job_procs = set(filter(lambda p: p.poll() is None, job_procs)) - job = report.get_queued_job(machine_type) - if job is None: - if exit_on_empty_queue and not job_procs: - log.info("Queue is empty and no supervisor processes running; exiting!") - break - continue - job = clean_config(job) - report.try_push_job_info(job, dict(status='running')) - job_id = job.get('job_id') - log.info('Reserved job %s', job_id) - log.info('Config is: %s', job) - job_config = job - + if backend == 'beanstalk': + job = connection.reserve(timeout=60) + if job is None: + continue + job.bury() + job_config = yaml.safe_load(job.body) + job_id = job_config.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job.body) + else: + job = report.get_queued_job(machine_type) + if job is None: + continue + job = clean_config(job) + report.try_push_job_info(job, dict(status='running')) + job_id = job.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job) + job_config = job + if job_config.get('stop_worker'): keep_running = False @@ -174,6 +192,13 @@ def main(args): status='fail', failure_reason=error_message)) + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + if backend == 'beanstalk': + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") returncodes = set([0]) for proc in job_procs: @@ -201,7 +226,7 @@ def create_job_archive(job_name, job_archive_path, archive_dir): def pause_queue(machine_type, paused, paused_by, pause_duration=None): - if paused == True: + if paused: report.pause_queue(machine_type, paused, paused_by, pause_duration) ''' If there is a pause duration specified @@ -211,5 +236,5 @@ def pause_queue(machine_type, paused, paused_by, pause_duration=None): sleep(int(pause_duration)) paused = False report.pause_queue(machine_type, paused, paused_by) - elif paused == False: + elif not paused: report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 459669861b..edb273a199 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -67,6 +67,12 @@ def main(args): def run_job(job_config, teuth_bin_path, archive_dir, verbose): safe_archive = safepath.munge(job_config['name']) if job_config.get('first_in_suite') or job_config.get('last_in_suite'): + if teuth_config.results_server: + try: + report.try_delete_jobs(job_config['name'], job_config['job_id']) + except Exception as e: + log.warning("Unable to delete job %s, exception occurred: %s", + job_config['job_id'], e) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), diff --git a/teuthology/kill.py b/teuthology/kill.py index 63a63ab4ae..7ebf560166 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -8,8 +8,9 @@ import tempfile import logging import getpass - +from teuthology import beanstalk from teuthology import report +from teuthology.config import config from teuthology import misc log = logging.getLogger(__name__) @@ -64,6 +65,7 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None, "you must also pass --machine-type") if not preserve_queue: + remove_beanstalk_jobs(run_name, machine_type) remove_paddles_jobs(run_name) kill_processes(run_name, run_info.get('pids')) if owner is not None: @@ -101,10 +103,13 @@ def find_run_info(serializer, run_name): job_info = {} job_num = 0 jobs = serializer.jobs_for_run(run_name) + job_total = len(jobs) for (job_id, job_dir) in jobs.items(): if not os.path.isdir(job_dir): continue job_num += 1 + if config.backend == 'beanstalk': + beanstalk.print_progress(job_num, job_total, 'Reading Job: ') job_info = serializer.job_info(run_name, job_id, simple=True) for key in job_info.keys(): if key in run_info_fields and key not in run_info: @@ -123,6 +128,41 @@ def remove_paddles_jobs(run_name): report.try_delete_jobs(run_name, job_ids) +def remove_beanstalk_jobs(run_name, tube_name): + qhost = config.queue_host + qport = config.queue_port + if qhost is None or qport is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.yaml_path)) + log.info("Checking Beanstalk Queue...") + beanstalk_conn = beanstalk.connect() + real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name) + + curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready'] + if curjobs != 0: + x = 1 + while x != curjobs: + x += 1 + job = beanstalk_conn.reserve(timeout=20) + if job is None: + continue + job_config = yaml.safe_load(job.body) + if run_name == job_config['name']: + job_id = job_config['job_id'] + msg = "Deleting job from queue. ID: " + \ + "{id} Name: {name} Desc: {desc}".format( + id=str(job_id), + name=job_config['name'], + desc=job_config['description'], + ) + log.info(msg) + job.delete() + else: + print("No jobs in Beanstalk Queue") + beanstalk_conn.close() + + def kill_processes(run_name, pids=None): if pids: to_kill = set(pids).intersection(psutil.pids()) diff --git a/teuthology/orchestra/run.py b/teuthology/orchestra/run.py index f31dfd0d7f..6235b0d36e 100644 --- a/teuthology/orchestra/run.py +++ b/teuthology/orchestra/run.py @@ -182,7 +182,6 @@ class RemoteProcess(object): command=self.command, exitstatus=self.returncode, node=self.hostname, label=self.label ) - def _get_exitstatus(self): """ :returns: the remote command's exit status (return code). Note that diff --git a/teuthology/report.py b/teuthology/report.py index 5d8cc9c3c8..50b7f8966b 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -385,13 +385,13 @@ class ResultsReporter(object): if os.path.exists(self.last_run_file): os.remove(self.last_run_file) - def get_top_job(self, machine_type): + def get_top_job(self, queue): - uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri, - machine_type=machine_type) + uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri, + queue=queue) inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed: + sleep=1, increment=inc, action=f'get job from {queue}') as proceed: while proceed(): response = self.session.get(uri) if response.status_code == 200: @@ -505,7 +505,7 @@ class ResultsReporter(object): response = self.session.delete(uri) response.raise_for_status() - def create_queue(self, machine_type): + def create_queue(self, queue): """ Create a queue on the results server @@ -514,19 +514,19 @@ class ResultsReporter(object): uri = "{base}/queue/".format( base=self.base_uri ) - queue_info = {'machine_type': machine_type} + queue_info = {'queue': queue} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed: + sleep=1, increment=inc, action=f'creating queue {queue}') as proceed: while proceed(): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully created queue for {machine_type}".format( - machine_type=machine_type, + self.log.info("Successfully created queue {queue}".format( + queue=queue, )) return else: @@ -546,26 +546,26 @@ class ResultsReporter(object): response.raise_for_status() - def update_queue(self, machine_type, paused, paused_by, pause_duration=None): + def update_queue(self, queue, paused, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) if pause_duration is not None: pause_duration = int(pause_duration) - queue_info = {'machine_type': machine_type, 'paused': paused, 'paused_by': paused_by, + queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by, 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed: + sleep=1, increment=inc, action=f'updating queue {queue}') as proceed: while proceed(): response = self.session.put(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully updated queue for {machine_type}".format( - machine_type=machine_type, + self.log.info("Successfully updated queue {queue}".format( + queue=queue, )) return else: @@ -580,23 +580,23 @@ class ResultsReporter(object): response.raise_for_status() - def queue_stats(self, machine_type): + def queue_stats(self, queue): uri = "{base}/queue/stats/".format( base=self.base_uri ) - queue_info = {'machine_type': machine_type} + queue_info = {'queue': queue} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed: + sleep=1, increment=inc, action=f'stats for queue {queue}') as proceed: while proceed(): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {machine_type}".format( - machine_type=machine_type, + self.log.info("Successfully retrieved stats for queue {queue}".format( + queue=queue, )) return response.json() else: @@ -609,18 +609,18 @@ class ResultsReporter(object): )) response.raise_for_status() - def queued_jobs(self, machine_type, user, run_name): + def queued_jobs(self, queue, user, run_name): uri = "{base}/queue/queued_jobs/".format( base=self.base_uri ) + request_info = {'queue': queue} if run_name is not None: filter_field = run_name - request_info = {'machine_type': machine_type, - 'run_name': run_name} + uri += "?run_name=" + str(run_name) else: filter_field = user - request_info = {'machine_type': machine_type, - 'user': user} + uri += "?user=" + str(user) + request_json = json.dumps(request_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) @@ -645,11 +645,14 @@ class ResultsReporter(object): response.raise_for_status() -def create_machine_type_queue(machine_type): +def create_machine_type_queue(queue): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.create_queue(machine_type) + if ',' in queue: + queue = 'multi' + reporter.create_queue(queue) + return queue def get_user_jobs_queue(machine_type, user, run_name=None): @@ -709,12 +712,16 @@ def get_queued_job(machine_type): reporter = ResultsReporter() if not reporter.base_uri: return - if is_queue_paused(machine_type) == True: - log.info("Teuthology queue for machine type %s is currently paused", - machine_type) + if ',' in machine_type: + queue = 'multi' + else: + queue = machine_type + if is_queue_paused(queue) == True: + log.info("Teuthology queue %s is currently paused", + queue) return None else: - return reporter.get_top_job(machine_type) + return reporter.get_top_job(queue) def try_push_job_info(job_config, extra_info=None): diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 4ccf780845..81dd4d548f 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,6 +1,7 @@ import os import yaml +import teuthology.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -22,11 +23,6 @@ def main(args): if args[opt]: raise ValueError(msg_fmt.format(opt=opt)) - if args['--first-in-suite'] or args['--last-in-suite']: - report_status = False - else: - report_status = True - name = args['--name'] if not name or name.isdigit(): raise ValueError("Please use a more descriptive value for --name") @@ -34,13 +30,15 @@ def main(args): backend = args['--queue-backend'] if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) - elif backend == 'paddles': - schedule_job(job_config, args['--num'], report_status) elif backend.startswith('@'): dump_job_to_file(backend.lstrip('@'), job_config, args['--num']) + elif backend == 'paddles': + paddles_schedule_job(job_config, args['--num']) + elif backend == 'beanstalk': + beanstalk_schedule_job(job_config, args['--num']) else: raise ValueError("Provided schedule backend '%s' is not supported. " - "Try 'paddles' or '@path-to-a-file" % backend) + "Try 'paddles', 'beanstalk' or '@path-to-a-file" % backend) def build_config(args): @@ -86,9 +84,9 @@ def build_config(args): return job_config -def schedule_job(job_config, num=1, report_status=True): +def paddles_schedule_job(job_config, backend, num=1): """ - Schedule a job. + Schedule a job with Paddles as the backend. :param job_config: The complete job dict :param num: The number of times to schedule the job @@ -97,16 +95,44 @@ def schedule_job(job_config, num=1, report_status=True): ''' Add 'machine_type' queue to DB here. ''' - report.create_machine_type_queue(job_config['machine_type']) + queue = report.create_machine_type_queue(job_config['machine_type']) + job_config['queue'] = queue while num > 0: - job_id = report.try_create_job(job_config, dict(status='queued')) - print('Job scheduled with name {name} and ID {job_id}'.format( + print('Job scheduled in Paddles with name {name} and ID {job_id}'.format( name=job_config['name'], job_id=job_id)) job_config['job_id'] = str(job_id) - + + num -= 1 + + +def beanstalk_schedule_job(job_config, backend, num=1): + """ + Schedule a job with Beanstalk as the backend. + + :param job_config: The complete job dict + :param num: The number of times to schedule the job + """ + num = int(num) + tube = job_config.pop('tube') + beanstalk = teuthology.beanstalk.connect() + beanstalk.use(tube) + queue = report.create_machine_type_queue(job_config['machine_type']) + job_config['queue'] = queue + while num > 0: + job_id = report.try_create_job(job_config, dict(status='queued')) + job_config['job_id'] = str(job_id) + job = yaml.safe_dump(job_config) + _ = beanstalk.put( + job, + ttr=60 * 60 * 24, + priority=job_config['priority'], + ) + print('Job scheduled in Beanstalk with name {name} and ID {job_id}'.format( + name=job_config['name'], job_id=job_id)) num -= 1 + def dump_job_to_file(path, job_config, num=1): """ Schedule a job. @@ -134,4 +160,3 @@ def dump_job_to_file(path, job_config, num=1): num -= 1 with open(count_file_path, 'w') as f: f.write(str(jid)) - diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py index 6b0dddfe2f..9a6d0ff564 100644 --- a/teuthology/test/test_dispatcher.py +++ b/teuthology/test/test_dispatcher.py @@ -65,7 +65,8 @@ class TestDispatcher(unittest.TestCase): '--description': 'the_description', '--machine-type': 'test_queue', '--supervisor': False, - '--verbose': False + '--verbose': False, + '--queue-backend': 'paddles' } m = mock.MagicMock() diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py index c5ed88799f..6c5cf756d2 100644 --- a/teuthology/test/test_worker.py +++ b/teuthology/test/test_worker.py @@ -1,3 +1,4 @@ +import beanstalkc import os from unittest.mock import patch, Mock, MagicMock @@ -43,7 +44,8 @@ class TestWorker(object): @patch("os.symlink") def test_symlink_success(self, m_symlink): worker.symlink_worker_log("path/to/worker.log", "path/to/archive") - m_symlink.assert_called_with("path/to/worker.log", "path/to/archive/worker.log") + m_symlink.assert_called_with( + "path/to/worker.log", "path/to/archive/worker.log") @patch("teuthology.worker.log") @patch("os.symlink") @@ -135,7 +137,8 @@ class TestWorker(object): m_popen.return_value = m_p m_t_config.results_server = False worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) + m_symlink_log.assert_called_with( + config["worker_log"], config["archive_path"]) @patch("teuthology.worker.report.try_push_job_info") @patch("teuthology.worker.symlink_worker_log") @@ -151,7 +154,8 @@ class TestWorker(object): process = Mock() process.poll.return_value = "not None" worker.run_with_watchdog(process, config) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) + m_symlink_log.assert_called_with( + config["worker_log"], config["archive_path"]) m_try_push.assert_called_with( dict(name=config["name"], job_id=config["job_id"]), dict(status='dead') @@ -175,7 +179,8 @@ class TestWorker(object): m_proc.poll.return_value = "not None" m_popen.return_value = m_proc worker.run_with_watchdog(process, config) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) + m_symlink_log.assert_called_with( + config["worker_log"], config["archive_path"]) @patch("os.path.isdir") @patch("teuthology.worker.fetch_teuthology") @@ -211,63 +216,97 @@ class TestWorker(object): assert m_fetch_qa_suite.called_once_with_args(branch='main') assert got_config['suite_path'] == '/suite/path' - def build_fake_jobs(self, job_bodies): + def build_fake_jobs(self, m_connection, m_job, job_bodies): """ + Given patched copies of: + beanstalkc.Connection + beanstalkc.Job And a list of basic job bodies, return a list of mocked Job objects """ + # Make sure instantiating m_job returns a new object each time + m_job.side_effect = lambda **kwargs: Mock(spec=beanstalkc.Job) jobs = [] job_id = 0 for job_body in job_bodies: job_id += 1 - job = {} - job['job_id'] = job_id - job['body'] = job_body + job = m_job(conn=m_connection, jid=job_id, body=job_body) + job.jid = job_id + job_body += '\njob_id: ' + str(job_id) + job.body = job_body jobs.append(job) return jobs - - @patch("teuthology.worker.setup_log_file") - @patch("os.path.isdir", return_value=True) - @patch("teuthology.worker.fetch_teuthology") - @patch("teuthology.worker.fetch_qa_suite") @patch("teuthology.worker.run_job") + @patch("teuthology.worker.prep_job") + @patch("beanstalkc.Job", autospec=True) + @patch("teuthology.worker.fetch_qa_suite") + @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.worker.beanstalk.watch_tube") + @patch("teuthology.worker.beanstalk.connect") + @patch("os.path.isdir", return_value=True) + @patch("teuthology.worker.setup_log_file") + def test_main_loop( + self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, + m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job, + ): + m_connection = Mock() + jobs = self.build_fake_jobs( + m_connection, + m_job, + [ + 'foo: bar', + 'stop_worker: true', + ], + ) + m_connection.reserve.side_effect = jobs + m_connect.return_value = m_connection + m_prep_job.return_value = (dict(), '/bin/path') + worker.main(self.ctx) + # There should be one reserve call per item in the jobs list + expected_reserve_calls = [ + dict(timeout=60) for i in range(len(jobs)) + ] + got_reserve_calls = [ + call[1] for call in m_connection.reserve.call_args_list + ] + assert got_reserve_calls == expected_reserve_calls + for job in jobs: + job.bury.assert_called_once_with() + job.delete.assert_called_once_with() + @patch("teuthology.worker.report.try_push_job_info") - @patch("teuthology.worker.report.get_queued_job") - @patch("teuthology.worker.clean_config") + @patch("teuthology.worker.run_job") + @patch("beanstalkc.Job", autospec=True) + @patch("teuthology.worker.fetch_qa_suite") + @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.worker.beanstalk.watch_tube") + @patch("teuthology.worker.beanstalk.connect") + @patch("os.path.isdir", return_value=True) + @patch("teuthology.worker.setup_log_file") def test_main_loop_13925( - self, m_setup_log_file, m_isdir, - m_fetch_teuthology, m_fetch_qa_suite, m_run_job, - m_try_push_job_info, m_get_queued_job, m_clean_config - ): + self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, + m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job, + m_try_push_job_info, + ): + m_connection = Mock() + jobs = self.build_fake_jobs( + m_connection, + m_job, + [ + 'name: name', + 'name: name\nstop_worker: true', + ], + ) + m_connection.reserve.side_effect = jobs + m_connect.return_value = m_connection m_fetch_qa_suite.side_effect = [ '/suite/path', MaxWhileTries(), MaxWhileTries(), ] - job = { - 'job_id': '1', - 'description': 'DESC', - 'email': 'EMAIL', - 'first_in_suite': False, - 'last_in_suite': True, - 'machine_type': 'test_queue', - 'name': 'NAME', - 'owner': 'OWNER', - 'priority': 99, - 'results_timeout': '6', - 'verbose': False, - 'stop_worker': True - } - m_get_queued_job.return_value = job - m_clean_config.return_value = job - - mock_prep_job_patcher = patch('teuthology.worker.prep_job') - mock_prep_job = mock_prep_job_patcher.start() - mock_prep_job.return_value = (dict(), '/teuth/bin/path') - worker.main(self.ctx) - mock_prep_job_patcher.stop() - assert len(m_run_job.call_args_list) == 1 - assert len(m_try_push_job_info.call_args_list) == 1 - assert m_try_push_job_info.called_once_with(job, dict(status='running')) - + assert len(m_run_job.call_args_list) == 0 + assert len(m_try_push_job_info.call_args_list) == len(jobs) + for i in range(len(jobs)): + push_call = m_try_push_job_info.call_args_list[i] + assert push_call[0][1]['status'] == 'dead' diff --git a/teuthology/worker.py b/teuthology/worker.py index 89a7304229..5cff69c880 100644 --- a/teuthology/worker.py +++ b/teuthology/worker.py @@ -9,6 +9,7 @@ import yaml from datetime import datetime from teuthology import setup_log_file, install_except_hook +from teuthology import beanstalk from teuthology import report from teuthology import safepath from teuthology.config import config as teuth_config @@ -57,14 +58,6 @@ def load_config(ctx=None): teuth_config.archive_base = ctx.archive_dir -def clean_config(config): - result = {} - for key in config: - if config[key] is not None: - result[key] = config[key] - return result - - def main(ctx): loglevel = logging.INFO if ctx.verbose: @@ -81,6 +74,8 @@ def main(ctx): set_config_attr(ctx) + connection = beanstalk.connect() + beanstalk.watch_tube(connection, ctx.tube) result_proc = None if teuth_config.teuthology_path is None: @@ -103,16 +98,16 @@ def main(ctx): load_config() - job = report.get_queued_job(ctx.machine_type) + job = connection.reserve(timeout=60) if job is None: continue - job = clean_config(job) - report.try_push_job_info(job, dict(status='running')) - job_id = job.get('job_id') + # bury the job so it won't be re-run if it fails + job.bury() + job_config = yaml.safe_load(job.body) + job_id = job_config.get('job_id') log.info('Reserved job %s', job_id) - log.info('Config is: %s', job) - job_config = job + log.info('Config is: %s', job.body) if job_config.get('stop_worker'): keep_running = False @@ -132,6 +127,13 @@ def main(ctx): except SkipJob: continue + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") + def prep_job(job_config, log_file_path, archive_dir): job_id = job_config['job_id'] -- 2.39.5