From: Aishwarya Mathuria Date: Mon, 18 Apr 2022 07:49:40 +0000 (+0530) Subject: scripts/queue: common teuthology-queue command for paddles and beanstalk queue X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a63debc7ad6ebfcbef6af1044d1cfb5c97bee7a4;p=teuthology.git scripts/queue: common teuthology-queue command for paddles and beanstalk queue Signed-off-by: Aishwarya Mathuria --- diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py index 88a8242847..a8a0661ecf 100644 --- a/scripts/beanstalk_queue.py +++ b/scripts/beanstalk_queue.py @@ -1,7 +1,7 @@ import docopt import teuthology.config -import teuthology.beanstalk +import teuthology.queue.beanstalk doc = """ usage: teuthology-beanstalk-queue -h diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py index 3c69d772e6..8487fd938e 100644 --- a/scripts/paddles_queue.py +++ b/scripts/paddles_queue.py @@ -1,8 +1,7 @@ import docopt import teuthology.config -import teuthology.paddles_queue - +import teuthology.queue.paddles_queue doc = """ usage: teuthology-paddles-queue -h teuthology-paddles-queue -s -m MACHINE_TYPE diff --git a/scripts/queue.py b/scripts/queue.py new file mode 100644 index 0000000000..2c466a7be9 --- /dev/null +++ b/scripts/queue.py @@ -0,0 +1,37 @@ +import docopt + +import teuthology.config +import teuthology.queue.beanstalk +import teuthology.queue.paddles + +doc = """ +usage: teuthology-queue -h + teuthology-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-queue [-r] -m MACHINE_TYPE + teuthology-queue -m MACHINE_TYPE -D PATTERN + teuthology-queue -p SECONDS [-m MACHINE_TYPE] + +List Jobs in queue. +If -D is passed, then jobs with PATTERN in the job name are deleted from the +queue. + +Arguments: + -m, --machine_type MACHINE_TYPE [default: multi] + Which machine type queue to work on. + +optional arguments: + -h, --help Show this help message and exit + -D, --delete PATTERN Delete Jobs with PATTERN in their name + -d, --description Show job descriptions + -r, --runs Only show run names + -f, --full Print the entire job config. Use with caution. + -s, --status Prints the status of the queue + -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 + will unpause. If -m is passed, pause that queue, + otherwise pause all queues. +""" + + +def main(): + args = docopt.docopt(doc) + teuthology.queue.main(args) diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py deleted file mode 100644 index a1165becca..0000000000 --- a/teuthology/beanstalk.py +++ /dev/null @@ -1,214 +0,0 @@ -import beanstalkc -import yaml -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology.config import config -from teuthology import report - -log = logging.getLogger(__name__) - - -def connect(): - host = config.queue_host - port = config.queue_port - if host is None or port is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.teuthology_yaml)) - return beanstalkc.Connection(host=host, port=port) - - -def watch_tube(connection, tube_name): - """ - Watch a given tube, potentially correcting to 'multi' if necessary. Returns - the tube_name that was actually used. - """ - if ',' in tube_name: - log.debug("Correcting tube name to 'multi'") - tube_name = 'multi' - connection.watch(tube_name) - connection.ignore('default') - return tube_name - - -def walk_jobs(connection, tube_name, processor, pattern=None): - """ - def callback(jobs_dict) - """ - log.info("Checking Beanstalk Queue...") - job_count = connection.stats_tube(tube_name)['current-jobs-ready'] - if job_count == 0: - log.info('No jobs in Beanstalk Queue') - return - - # Try to figure out a sane timeout based on how many jobs are in the queue - timeout = job_count / 2000.0 * 60 - for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") - job = connection.reserve(timeout=timeout) - if job is None or job.body is None: - continue - job_config = yaml.safe_load(job.body) - job_name = job_config['name'] - job_id = job.stats()['id'] - if pattern is not None and pattern not in job_name: - continue - processor.add_job(job_id, job_config, job) - end_progress() - processor.complete() - - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - job_obj = self.jobs[job_id].get('job_obj') - if job_obj: - job_obj.delete() - report.try_delete_jobs(job_name, job_id) - - -def pause_tube(connection, tube, duration): - duration = int(duration) - if not tube: - tubes = sorted(connection.tubes()) - else: - tubes = [tube] - - prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" - templ = prefix + ": {tubes}" - log.info(templ.format(dur=duration, tubes=tubes)) - for tube in tubes: - connection.pause_tube(tube, duration) - - -def stats_tube(connection, tube): - stats = connection.stats_tube(tube) - result = dict( - name=tube, - count=stats['current-jobs-ready'], - paused=(stats['pause'] != 0), - ) - return result - - -def main(args): - machine_type = args['--machine_type'] - status = args['--status'] - delete = args['--delete'] - runs = args['--runs'] - show_desc = args['--description'] - full = args['--full'] - pause_duration = args['--pause'] - try: - connection = connect() - if machine_type and not pause_duration: - # watch_tube needs to be run before we inspect individual jobs; - # it is not needed for pausing tubes - watch_tube(connection, machine_type) - if status: - print(stats_tube(connection, machine_type)) - elif pause_duration: - pause_tube(connection, machine_type, pause_duration) - elif delete: - walk_jobs(connection, machine_type, - JobDeleter(delete)) - elif runs: - walk_jobs(connection, machine_type, - RunPrinter()) - else: - walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) - except KeyboardInterrupt: - log.info("Interrupted.") - finally: - connection.close() diff --git a/teuthology/config.py b/teuthology/config.py index b42ff48d1a..e39ba4fb6b 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -143,7 +143,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, - 'backend': 'paddles', + 'backend': 'beanstalk', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index f80818cf14..b06ef60618 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -8,7 +8,7 @@ from datetime import datetime from time import sleep from teuthology import setup_log_file, install_except_hook -from teuthology import beanstalk +from teuthology.queue import beanstalk from teuthology import report from teuthology.config import config as teuth_config from teuthology.exceptions import SkipJob @@ -73,11 +73,8 @@ def main(args): machine_type = args["--machine-type"] log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] -<<<<<<< HEAD exit_on_empty_queue = args["--exit-on-empty-queue"] -======= backend = args['--queue-backend'] ->>>>>>> 79c4d9bf... Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles if archive_dir is None: archive_dir = teuth_config.archive_base diff --git a/teuthology/kill.py b/teuthology/kill.py index 7ebf560166..776463d141 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -8,7 +8,7 @@ import tempfile import logging import getpass -from teuthology import beanstalk +from teuthology.queue import beanstalk from teuthology import report from teuthology.config import config from teuthology import misc diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py deleted file mode 100644 index 99cfe77e6e..0000000000 --- a/teuthology/paddles_queue.py +++ /dev/null @@ -1,218 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.dispatcher import pause_queue - - -log = logging.getLogger(__name__) - - -def connect(): - host = config.queue_host - port = config.queue_port - if host is None or port is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.teuthology_yaml)) - return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load) - - -def watch_tube(connection, tube_name): - """ - Watch a given tube, potentially correcting to 'multi' if necessary. Returns - the tube_name that was actually used. - """ - if ',' in tube_name: - log.debug("Correcting tube name to 'multi'") - tube_name = 'multi' - connection.watch(tube_name) - connection.ignore('default') - return tube_name - - -def walk_jobs(connection, tube_name, processor, pattern=None): - """ - def callback(jobs_dict) - """ - log.info("Checking Beanstalk Queue...") - job_count = connection.stats_tube(tube_name)['current-jobs-ready'] - if job_count == 0: - log.info('No jobs in Beanstalk Queue') - return - -def stats_queue(machine_type): - stats = report.get_queue_stats(machine_type) - if stats['paused'] is None: - log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) - else: - log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) - - -def update_priority(machine_type, priority, user, run_name=None): - if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) - for job in jobs: - job['priority'] = priority - report.try_push_job_info(job) - - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - - -def walk_jobs(machine_type, processor, user): - log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] - - jobs = report.get_user_jobs_queue(machine_type, user) - if job_count == 0: - log.info('No jobs in queue') - return - - for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") - job = jobs[i-1] - if job is None: - continue - job_id = job['job_id'] - processor.add_job(job_id, job) - end_progress() - processor.complete() - - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - machine_type = args['--machine_type'] - user = args['--user'] - run_name = args['--run_name'] - priority = args['--priority'] - status = args['--status'] - delete = args['--delete'] - runs = args['--runs'] - show_desc = args['--description'] - full = args['--full'] - pause = args['--pause'] - unpause = args['--unpause'] - pause_duration = args['--time'] - try: - if status: - stats_queue(machine_type) - elif pause: - if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - else: - pause_queue(machine_type, pause, user) - elif unpause: - pause = False - pause_queue(machine_type, pause, user) - elif priority: - update_priority(machine_type, priority, user, run_name) - elif delete: - walk_jobs(machine_type, - JobDeleter(delete), user) - elif runs: - walk_jobs(machine_type, - RunPrinter(), user) - else: - walk_jobs(machine_type, - JobPrinter(show_desc=show_desc, full=full), - user) - except KeyboardInterrupt: - log.info("Interrupted.") diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py new file mode 100644 index 0000000000..2a0b6ff363 --- /dev/null +++ b/teuthology/queue/__init__.py @@ -0,0 +1,106 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report +from teuthology.config import config + +log = logging.getLogger(__name__) + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) + + +def main(args): + if config.backend == 'paddles': + paddles.main(args) + else: + beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py new file mode 100644 index 0000000000..90b1cbd6d3 --- /dev/null +++ b/teuthology/queue/beanstalk.py @@ -0,0 +1,118 @@ +import beanstalkc +import yaml +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology.config import config +from teuthology import report + +log = logging.getLogger(__name__) + + +def connect(): + host = config.queue_host + port = config.queue_port + if host is None or port is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.teuthology_yaml)) + return beanstalkc.Connection(host=host, port=port) + + +def watch_tube(connection, tube_name): + """ + Watch a given tube, potentially correcting to 'multi' if necessary. Returns + the tube_name that was actually used. + """ + if ',' in tube_name: + log.debug("Correcting tube name to 'multi'") + tube_name = 'multi' + connection.watch(tube_name) + connection.ignore('default') + return tube_name + + +def walk_jobs(connection, tube_name, processor, pattern=None): + """ + def callback(jobs_dict) + """ + log.info("Checking Beanstalk Queue...") + job_count = connection.stats_tube(tube_name)['current-jobs-ready'] + if job_count == 0: + log.info('No jobs in Beanstalk Queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = connection.reserve(timeout=timeout) + if job is None or job.body is None: + continue + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + if pattern is not None and pattern not in job_name: + continue + processor.add_job(job_id, job_config, job) + end_progress() + processor.complete() + + +def pause_tube(connection, tube, duration): + duration = int(duration) + if not tube: + tubes = sorted(connection.tubes()) + else: + tubes = [tube] + + prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" + templ = prefix + ": {tubes}" + log.info(templ.format(dur=duration, tubes=tubes)) + for tube in tubes: + connection.pause_tube(tube, duration) + + +def stats_tube(connection, tube): + stats = connection.stats_tube(tube) + result = dict( + name=tube, + count=stats['current-jobs-ready'], + paused=(stats['pause'] != 0), + ) + return result + + +def main(args): + machine_type = args['--machine_type'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + try: + connection = connect() + if machine_type and not pause_duration: + # watch_tube needs to be run before we inspect individual jobs; + # it is not needed for pausing tubes + watch_tube(connection, machine_type) + if status: + print(stats_tube(connection, machine_type)) + elif pause_duration: + pause_tube(connection, machine_type, pause_duration) + elif delete: + walk_jobs(connection, machine_type, + JobDeleter(delete)) + elif runs: + walk_jobs(connection, machine_type, + RunPrinter()) + else: + walk_jobs(connection, machine_type, + JobPrinter(show_desc=show_desc, full=full)) + except KeyboardInterrupt: + log.info("Interrupted.") + finally: + connection.close() diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py new file mode 100644 index 0000000000..f2ea8b84c8 --- /dev/null +++ b/teuthology/queue/paddles.py @@ -0,0 +1,88 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report +from teuthology.dispatcher import pause_queue + + +log = logging.getLogger(__name__) + + +def stats_queue(machine_type): + stats = report.get_queue_stats(machine_type) + if stats['paused'] is None: + log.info("%s queue is currently running with %s jobs queued", + stats['name'], + stats['count']) + else: + log.info("%s queue is paused with %s jobs queued", + stats['name'], + stats['count']) + + +def update_priority(machine_type, priority, user, run_name=None): + if run_name is not None: + jobs = report.get_user_jobs_queue(machine_type, user, run_name) + else: + jobs = report.get_user_jobs_queue(machine_type, user) + for job in jobs: + job['priority'] = priority + report.try_push_job_info(job) + + +def walk_jobs(machine_type, processor, user): + log.info("Checking paddles queue...") + job_count = report.get_queue_stats(machine_type)['count'] + + jobs = report.get_user_jobs_queue(machine_type, user) + if job_count == 0: + log.info('No jobs in queue') + return + + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = jobs[i-1] + if job is None: + continue + job_id = job['job_id'] + processor.add_job(job_id, job) + end_progress() + processor.complete() + + +def main(args): + machine_type = args['--machine_type'] + #user = args['--user'] + #run_name = args['--run_name'] + #priority = args['--priority'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + #unpause = args['--unpause'] + #pause_duration = args['--time'] + try: + if status: + stats_queue(machine_type) + if pause_duration: + pause_queue(machine_type, pause, user, pause_duration) + #else: + #pause_queue(machine_type, pause, user) + elif priority: + update_priority(machine_type, priority, run_name) + elif delete: + walk_jobs(machine_type, + JobDeleter(delete), user) + elif runs: + walk_jobs(machine_type, + RunPrinter(), user) + else: + walk_jobs(machine_type, + JobPrinter(show_desc=show_desc, full=full), + user) + except KeyboardInterrupt: + log.info("Interrupted.") diff --git a/teuthology/report.py b/teuthology/report.py index 50b7f8966b..6796415014 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -509,7 +509,7 @@ class ResultsReporter(object): """ Create a queue on the results server - :param machine_type: The machine type specified for the job + :param queue: The queue specified for the job """ uri = "{base}/queue/".format( base=self.base_uri @@ -614,10 +614,11 @@ class ResultsReporter(object): base=self.base_uri ) request_info = {'queue': queue} + filter_field = queue if run_name is not None: filter_field = run_name uri += "?run_name=" + str(run_name) - else: + elif user is not None: filter_field = user uri += "?user=" + str(user) @@ -654,36 +655,43 @@ def create_machine_type_queue(queue): reporter.create_queue(queue) return queue +def get_all_jobs_in_queue(queue, user=None, run_name=None): + reporter = ResultsReporter() + if not reporter.base_uri: + return + if ',' in queue: + queue = 'multi' + return reporter.queued_jobs(queue) -def get_user_jobs_queue(machine_type, user, run_name=None): +def get_user_jobs_queue(queue, user, run_name=None): reporter = ResultsReporter() if not reporter.base_uri: return - return reporter.queued_jobs(machine_type, user, run_name) + return reporter.queued_jobs(queue, user, run_name) -def pause_queue(machine_type, paused, paused_by, pause_duration=None): +def pause_queue(queue, paused, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(machine_type, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused, paused_by, pause_duration) -def is_queue_paused(machine_type): +def is_queue_paused(queue): reporter = ResultsReporter() if not reporter.base_uri: return - stats = reporter.queue_stats(machine_type) + stats = reporter.queue_stats(queue) if stats['paused'] != 0 and stats['paused'] is not None: return True return False -def get_queue_stats(machine_type): +def get_queue_stats(queue): reporter = ResultsReporter() if not reporter.base_uri: return - stats = reporter.queue_stats(machine_type) + stats = reporter.queue_stats(queue) return stats diff --git a/teuthology/worker.py b/teuthology/worker.py index 5cff69c880..36e42eeac4 100644 --- a/teuthology/worker.py +++ b/teuthology/worker.py @@ -9,7 +9,7 @@ import yaml from datetime import datetime from teuthology import setup_log_file, install_except_hook -from teuthology import beanstalk +from teuthology.queue import beanstalk from teuthology import report from teuthology import safepath from teuthology.config import config as teuth_config