import docopt
import teuthology.config
-import teuthology.beanstalk
+import teuthology.queue.beanstalk
doc = """
usage: teuthology-beanstalk-queue -h
import docopt
import teuthology.config
-import teuthology.paddles_queue
-
+import teuthology.queue.paddles_queue
doc = """
usage: teuthology-paddles-queue -h
teuthology-paddles-queue -s -m MACHINE_TYPE
--- /dev/null
+import docopt
+
+import teuthology.config
+import teuthology.queue.beanstalk
+import teuthology.queue.paddles
+
+doc = """
+usage: teuthology-queue -h
+ teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
+ teuthology-queue [-r] -m MACHINE_TYPE
+ teuthology-queue -m MACHINE_TYPE -D PATTERN
+ teuthology-queue -p SECONDS [-m MACHINE_TYPE]
+
+List Jobs in queue.
+If -D is passed, then jobs with PATTERN in the job name are deleted from the
+queue.
+
+Arguments:
+ -m, --machine_type MACHINE_TYPE [default: multi]
+ Which machine type queue to work on.
+
+optional arguments:
+ -h, --help Show this help message and exit
+ -D, --delete PATTERN Delete Jobs with PATTERN in their name
+ -d, --description Show job descriptions
+ -r, --runs Only show run names
+ -f, --full Print the entire job config. Use with caution.
+ -s, --status Prints the status of the queue
+ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0
+ will unpause. If -m is passed, pause that queue,
+ otherwise pause all queues.
+"""
+
+
+def main():
+ args = docopt.docopt(doc)
+ teuthology.queue.main(args)
+++ /dev/null
-import beanstalkc
-import yaml
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology.config import config
-from teuthology import report
-
-log = logging.getLogger(__name__)
-
-
-def connect():
- host = config.queue_host
- port = config.queue_port
- if host is None or port is None:
- raise RuntimeError(
- 'Beanstalk queue information not found in {conf_path}'.format(
- conf_path=config.teuthology_yaml))
- return beanstalkc.Connection(host=host, port=port)
-
-
-def watch_tube(connection, tube_name):
- """
- Watch a given tube, potentially correcting to 'multi' if necessary. Returns
- the tube_name that was actually used.
- """
- if ',' in tube_name:
- log.debug("Correcting tube name to 'multi'")
- tube_name = 'multi'
- connection.watch(tube_name)
- connection.ignore('default')
- return tube_name
-
-
-def walk_jobs(connection, tube_name, processor, pattern=None):
- """
- def callback(jobs_dict)
- """
- log.info("Checking Beanstalk Queue...")
- job_count = connection.stats_tube(tube_name)['current-jobs-ready']
- if job_count == 0:
- log.info('No jobs in Beanstalk Queue')
- return
-
- # Try to figure out a sane timeout based on how many jobs are in the queue
- timeout = job_count / 2000.0 * 60
- for i in range(1, job_count + 1):
- print_progress(i, job_count, "Loading")
- job = connection.reserve(timeout=timeout)
- if job is None or job.body is None:
- continue
- job_config = yaml.safe_load(job.body)
- job_name = job_config['name']
- job_id = job.stats()['id']
- if pattern is not None and pattern not in job_name:
- continue
- processor.add_job(job_id, job_config, job)
- end_progress()
- processor.complete()
-
-
-def print_progress(index, total, message=None):
- msg = "{m} ".format(m=message) if message else ''
- sys.stderr.write("{msg}{i}/{total}\r".format(
- msg=msg, i=index, total=total))
- sys.stderr.flush()
-
-
-def end_progress():
- sys.stderr.write('\n')
- sys.stderr.flush()
-
-
-class JobProcessor(object):
- def __init__(self):
- self.jobs = OrderedDict()
-
- def add_job(self, job_id, job_config, job_obj=None):
- job_id = str(job_id)
-
- job_dict = dict(
- index=(len(self.jobs) + 1),
- job_config=job_config,
- )
- if job_obj:
- job_dict['job_obj'] = job_obj
- self.jobs[job_id] = job_dict
-
- self.process_job(job_id)
-
- def process_job(self, job_id):
- pass
-
- def complete(self):
- pass
-
-
-class JobPrinter(JobProcessor):
- def __init__(self, show_desc=False, full=False):
- super(JobPrinter, self).__init__()
- self.show_desc = show_desc
- self.full = full
-
- def process_job(self, job_id):
- job_config = self.jobs[job_id]['job_config']
- job_index = self.jobs[job_id]['index']
- job_priority = job_config['priority']
- job_name = job_config['name']
- job_desc = job_config['description']
- print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
- i=job_index,
- pri=job_priority,
- job_id=job_id,
- job_name=job_name,
- ))
- if self.full:
- pprint.pprint(job_config)
- elif job_desc and self.show_desc:
- for desc in job_desc.split():
- print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
- def __init__(self):
- super(RunPrinter, self).__init__()
- self.runs = list()
-
- def process_job(self, job_id):
- run = self.jobs[job_id]['job_config']['name']
- if run not in self.runs:
- self.runs.append(run)
- print(run)
-
-
-class JobDeleter(JobProcessor):
- def __init__(self, pattern):
- self.pattern = pattern
- super(JobDeleter, self).__init__()
-
- def add_job(self, job_id, job_config, job_obj=None):
- job_name = job_config['name']
- if self.pattern in job_name:
- super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
- def process_job(self, job_id):
- job_config = self.jobs[job_id]['job_config']
- job_name = job_config['name']
- print('Deleting {job_name}/{job_id}'.format(
- job_id=job_id,
- job_name=job_name,
- ))
- job_obj = self.jobs[job_id].get('job_obj')
- if job_obj:
- job_obj.delete()
- report.try_delete_jobs(job_name, job_id)
-
-
-def pause_tube(connection, tube, duration):
- duration = int(duration)
- if not tube:
- tubes = sorted(connection.tubes())
- else:
- tubes = [tube]
-
- prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
- templ = prefix + ": {tubes}"
- log.info(templ.format(dur=duration, tubes=tubes))
- for tube in tubes:
- connection.pause_tube(tube, duration)
-
-
-def stats_tube(connection, tube):
- stats = connection.stats_tube(tube)
- result = dict(
- name=tube,
- count=stats['current-jobs-ready'],
- paused=(stats['pause'] != 0),
- )
- return result
-
-
-def main(args):
- machine_type = args['--machine_type']
- status = args['--status']
- delete = args['--delete']
- runs = args['--runs']
- show_desc = args['--description']
- full = args['--full']
- pause_duration = args['--pause']
- try:
- connection = connect()
- if machine_type and not pause_duration:
- # watch_tube needs to be run before we inspect individual jobs;
- # it is not needed for pausing tubes
- watch_tube(connection, machine_type)
- if status:
- print(stats_tube(connection, machine_type))
- elif pause_duration:
- pause_tube(connection, machine_type, pause_duration)
- elif delete:
- walk_jobs(connection, machine_type,
- JobDeleter(delete))
- elif runs:
- walk_jobs(connection, machine_type,
- RunPrinter())
- else:
- walk_jobs(connection, machine_type,
- JobPrinter(show_desc=show_desc, full=full))
- except KeyboardInterrupt:
- log.info("Interrupted.")
- finally:
- connection.close()
'archive_upload_key': None,
'archive_upload_url': None,
'automated_scheduling': False,
- 'backend': 'paddles',
+ 'backend': 'beanstalk',
'reserve_machines': 5,
'ceph_git_base_url': 'https://github.com/ceph/',
'ceph_git_url': None,
from time import sleep
from teuthology import setup_log_file, install_except_hook
-from teuthology import beanstalk
+from teuthology.queue import beanstalk
from teuthology import report
from teuthology.config import config as teuth_config
from teuthology.exceptions import SkipJob
machine_type = args["--machine-type"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
-<<<<<<< HEAD
exit_on_empty_queue = args["--exit-on-empty-queue"]
-=======
backend = args['--queue-backend']
->>>>>>> 79c4d9bf... Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles
if archive_dir is None:
archive_dir = teuth_config.archive_base
import logging
import getpass
-from teuthology import beanstalk
+from teuthology.queue import beanstalk
from teuthology import report
from teuthology.config import config
from teuthology import misc
+++ /dev/null
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology import report
-from teuthology.dispatcher import pause_queue
-
-
-log = logging.getLogger(__name__)
-
-
-def connect():
- host = config.queue_host
- port = config.queue_port
- if host is None or port is None:
- raise RuntimeError(
- 'Beanstalk queue information not found in {conf_path}'.format(
- conf_path=config.teuthology_yaml))
- return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load)
-
-
-def watch_tube(connection, tube_name):
- """
- Watch a given tube, potentially correcting to 'multi' if necessary. Returns
- the tube_name that was actually used.
- """
- if ',' in tube_name:
- log.debug("Correcting tube name to 'multi'")
- tube_name = 'multi'
- connection.watch(tube_name)
- connection.ignore('default')
- return tube_name
-
-
-def walk_jobs(connection, tube_name, processor, pattern=None):
- """
- def callback(jobs_dict)
- """
- log.info("Checking Beanstalk Queue...")
- job_count = connection.stats_tube(tube_name)['current-jobs-ready']
- if job_count == 0:
- log.info('No jobs in Beanstalk Queue')
- return
-
-def stats_queue(machine_type):
- stats = report.get_queue_stats(machine_type)
- if stats['paused'] is None:
- log.info("%s queue is currently running with %s jobs queued",
- stats['name'],
- stats['count'])
- else:
- log.info("%s queue is paused with %s jobs queued",
- stats['name'],
- stats['count'])
-
-
-def update_priority(machine_type, priority, user, run_name=None):
- if run_name is not None:
- jobs = report.get_user_jobs_queue(machine_type, user, run_name)
- else:
- jobs = report.get_user_jobs_queue(machine_type, user)
- for job in jobs:
- job['priority'] = priority
- report.try_push_job_info(job)
-
-
-def print_progress(index, total, message=None):
- msg = "{m} ".format(m=message) if message else ''
- sys.stderr.write("{msg}{i}/{total}\r".format(
- msg=msg, i=index, total=total))
- sys.stderr.flush()
-
-
-def end_progress():
- sys.stderr.write('\n')
- sys.stderr.flush()
-
-
-def walk_jobs(machine_type, processor, user):
- log.info("Checking paddles queue...")
- job_count = report.get_queue_stats(machine_type)['count']
-
- jobs = report.get_user_jobs_queue(machine_type, user)
- if job_count == 0:
- log.info('No jobs in queue')
- return
-
- for i in range(1, job_count + 1):
- print_progress(i, job_count, "Loading")
- job = jobs[i-1]
- if job is None:
- continue
- job_id = job['job_id']
- processor.add_job(job_id, job)
- end_progress()
- processor.complete()
-
-
-class JobProcessor(object):
- def __init__(self):
- self.jobs = OrderedDict()
-
- def add_job(self, job_id, job_config, job_obj=None):
- job_id = str(job_id)
-
- job_dict = dict(
- index=(len(self.jobs) + 1),
- job_config=job_config,
- )
- if job_obj:
- job_dict['job_obj'] = job_obj
- self.jobs[job_id] = job_dict
-
- self.process_job(job_id)
-
- def process_job(self, job_id):
- pass
-
- def complete(self):
- pass
-
-
-class JobPrinter(JobProcessor):
- def __init__(self, show_desc=False, full=False):
- super(JobPrinter, self).__init__()
- self.show_desc = show_desc
- self.full = full
-
- def process_job(self, job_id):
- job_config = self.jobs[job_id]['job_config']
- job_index = self.jobs[job_id]['index']
- job_priority = job_config['priority']
- job_name = job_config['name']
- job_desc = job_config['description']
- print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
- i=job_index,
- pri=job_priority,
- job_id=job_id,
- job_name=job_name,
- ))
- if self.full:
- pprint.pprint(job_config)
- elif job_desc and self.show_desc:
- for desc in job_desc.split():
- print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
- def __init__(self):
- super(RunPrinter, self).__init__()
- self.runs = list()
-
- def process_job(self, job_id):
- run = self.jobs[job_id]['job_config']['name']
- if run not in self.runs:
- self.runs.append(run)
- print(run)
-
-
-class JobDeleter(JobProcessor):
- def __init__(self, pattern):
- self.pattern = pattern
- super(JobDeleter, self).__init__()
-
- def add_job(self, job_id, job_config, job_obj=None):
- job_name = job_config['name']
- if self.pattern in job_name:
- super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
- def process_job(self, job_id):
- job_config = self.jobs[job_id]['job_config']
- job_name = job_config['name']
- print('Deleting {job_name}/{job_id}'.format(
- job_id=job_id,
- job_name=job_name,
- ))
- report.try_delete_jobs(job_name, job_id)
-
-
-def main(args):
- machine_type = args['--machine_type']
- user = args['--user']
- run_name = args['--run_name']
- priority = args['--priority']
- status = args['--status']
- delete = args['--delete']
- runs = args['--runs']
- show_desc = args['--description']
- full = args['--full']
- pause = args['--pause']
- unpause = args['--unpause']
- pause_duration = args['--time']
- try:
- if status:
- stats_queue(machine_type)
- elif pause:
- if pause_duration:
- pause_queue(machine_type, pause, user, pause_duration)
- else:
- pause_queue(machine_type, pause, user)
- elif unpause:
- pause = False
- pause_queue(machine_type, pause, user)
- elif priority:
- update_priority(machine_type, priority, user, run_name)
- elif delete:
- walk_jobs(machine_type,
- JobDeleter(delete), user)
- elif runs:
- walk_jobs(machine_type,
- RunPrinter(), user)
- else:
- walk_jobs(machine_type,
- JobPrinter(show_desc=show_desc, full=full),
- user)
- except KeyboardInterrupt:
- log.info("Interrupted.")
--- /dev/null
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology import report
+from teuthology.config import config
+
+log = logging.getLogger(__name__)
+
+def print_progress(index, total, message=None):
+ msg = "{m} ".format(m=message) if message else ''
+ sys.stderr.write("{msg}{i}/{total}\r".format(
+ msg=msg, i=index, total=total))
+ sys.stderr.flush()
+
+def end_progress():
+ sys.stderr.write('\n')
+ sys.stderr.flush()
+
+class JobProcessor(object):
+ def __init__(self):
+ self.jobs = OrderedDict()
+
+ def add_job(self, job_id, job_config, job_obj=None):
+ job_id = str(job_id)
+
+ job_dict = dict(
+ index=(len(self.jobs) + 1),
+ job_config=job_config,
+ )
+ if job_obj:
+ job_dict['job_obj'] = job_obj
+ self.jobs[job_id] = job_dict
+
+ self.process_job(job_id)
+
+ def process_job(self, job_id):
+ pass
+
+ def complete(self):
+ pass
+
+
+class JobPrinter(JobProcessor):
+ def __init__(self, show_desc=False, full=False):
+ super(JobPrinter, self).__init__()
+ self.show_desc = show_desc
+ self.full = full
+
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_index = self.jobs[job_id]['index']
+ job_priority = job_config['priority']
+ job_name = job_config['name']
+ job_desc = job_config['description']
+ print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+ i=job_index,
+ pri=job_priority,
+ job_id=job_id,
+ job_name=job_name,
+ ))
+ if self.full:
+ pprint.pprint(job_config)
+ elif job_desc and self.show_desc:
+ for desc in job_desc.split():
+ print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+ def __init__(self):
+ super(RunPrinter, self).__init__()
+ self.runs = list()
+
+ def process_job(self, job_id):
+ run = self.jobs[job_id]['job_config']['name']
+ if run not in self.runs:
+ self.runs.append(run)
+ print(run)
+
+
+class JobDeleter(JobProcessor):
+ def __init__(self, pattern):
+ self.pattern = pattern
+ super(JobDeleter, self).__init__()
+
+ def add_job(self, job_id, job_config, job_obj=None):
+ job_name = job_config['name']
+ if self.pattern in job_name:
+ super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_name = job_config['name']
+ print('Deleting {job_name}/{job_id}'.format(
+ job_id=job_id,
+ job_name=job_name,
+ ))
+ report.try_delete_jobs(job_name, job_id)
+
+
+def main(args):
+ if config.backend == 'paddles':
+ paddles.main(args)
+ else:
+ beanstalk.main(args)
\ No newline at end of file
--- /dev/null
+import beanstalkc
+import yaml
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology.config import config
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def connect():
+ host = config.queue_host
+ port = config.queue_port
+ if host is None or port is None:
+ raise RuntimeError(
+ 'Beanstalk queue information not found in {conf_path}'.format(
+ conf_path=config.teuthology_yaml))
+ return beanstalkc.Connection(host=host, port=port)
+
+
+def watch_tube(connection, tube_name):
+ """
+ Watch a given tube, potentially correcting to 'multi' if necessary. Returns
+ the tube_name that was actually used.
+ """
+ if ',' in tube_name:
+ log.debug("Correcting tube name to 'multi'")
+ tube_name = 'multi'
+ connection.watch(tube_name)
+ connection.ignore('default')
+ return tube_name
+
+
+def walk_jobs(connection, tube_name, processor, pattern=None):
+ """
+ def callback(jobs_dict)
+ """
+ log.info("Checking Beanstalk Queue...")
+ job_count = connection.stats_tube(tube_name)['current-jobs-ready']
+ if job_count == 0:
+ log.info('No jobs in Beanstalk Queue')
+ return
+
+ # Try to figure out a sane timeout based on how many jobs are in the queue
+ timeout = job_count / 2000.0 * 60
+ for i in range(1, job_count + 1):
+ print_progress(i, job_count, "Loading")
+ job = connection.reserve(timeout=timeout)
+ if job is None or job.body is None:
+ continue
+ job_config = yaml.safe_load(job.body)
+ job_name = job_config['name']
+ job_id = job.stats()['id']
+ if pattern is not None and pattern not in job_name:
+ continue
+ processor.add_job(job_id, job_config, job)
+ end_progress()
+ processor.complete()
+
+
+def pause_tube(connection, tube, duration):
+ duration = int(duration)
+ if not tube:
+ tubes = sorted(connection.tubes())
+ else:
+ tubes = [tube]
+
+ prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
+ templ = prefix + ": {tubes}"
+ log.info(templ.format(dur=duration, tubes=tubes))
+ for tube in tubes:
+ connection.pause_tube(tube, duration)
+
+
+def stats_tube(connection, tube):
+ stats = connection.stats_tube(tube)
+ result = dict(
+ name=tube,
+ count=stats['current-jobs-ready'],
+ paused=(stats['pause'] != 0),
+ )
+ return result
+
+
+def main(args):
+ machine_type = args['--machine_type']
+ status = args['--status']
+ delete = args['--delete']
+ runs = args['--runs']
+ show_desc = args['--description']
+ full = args['--full']
+ pause_duration = args['--pause']
+ try:
+ connection = connect()
+ if machine_type and not pause_duration:
+ # watch_tube needs to be run before we inspect individual jobs;
+ # it is not needed for pausing tubes
+ watch_tube(connection, machine_type)
+ if status:
+ print(stats_tube(connection, machine_type))
+ elif pause_duration:
+ pause_tube(connection, machine_type, pause_duration)
+ elif delete:
+ walk_jobs(connection, machine_type,
+ JobDeleter(delete))
+ elif runs:
+ walk_jobs(connection, machine_type,
+ RunPrinter())
+ else:
+ walk_jobs(connection, machine_type,
+ JobPrinter(show_desc=show_desc, full=full))
+ except KeyboardInterrupt:
+ log.info("Interrupted.")
+ finally:
+ connection.close()
--- /dev/null
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology import report
+from teuthology.dispatcher import pause_queue
+
+
+log = logging.getLogger(__name__)
+
+
+def stats_queue(machine_type):
+ stats = report.get_queue_stats(machine_type)
+ if stats['paused'] is None:
+ log.info("%s queue is currently running with %s jobs queued",
+ stats['name'],
+ stats['count'])
+ else:
+ log.info("%s queue is paused with %s jobs queued",
+ stats['name'],
+ stats['count'])
+
+
+def update_priority(machine_type, priority, user, run_name=None):
+ if run_name is not None:
+ jobs = report.get_user_jobs_queue(machine_type, user, run_name)
+ else:
+ jobs = report.get_user_jobs_queue(machine_type, user)
+ for job in jobs:
+ job['priority'] = priority
+ report.try_push_job_info(job)
+
+
+def walk_jobs(machine_type, processor, user):
+ log.info("Checking paddles queue...")
+ job_count = report.get_queue_stats(machine_type)['count']
+
+ jobs = report.get_user_jobs_queue(machine_type, user)
+ if job_count == 0:
+ log.info('No jobs in queue')
+ return
+
+ for i in range(1, job_count + 1):
+ print_progress(i, job_count, "Loading")
+ job = jobs[i-1]
+ if job is None:
+ continue
+ job_id = job['job_id']
+ processor.add_job(job_id, job)
+ end_progress()
+ processor.complete()
+
+
+def main(args):
+ machine_type = args['--machine_type']
+ #user = args['--user']
+ #run_name = args['--run_name']
+ #priority = args['--priority']
+ status = args['--status']
+ delete = args['--delete']
+ runs = args['--runs']
+ show_desc = args['--description']
+ full = args['--full']
+ pause_duration = args['--pause']
+ #unpause = args['--unpause']
+ #pause_duration = args['--time']
+ try:
+ if status:
+ stats_queue(machine_type)
+ if pause_duration:
+ pause_queue(machine_type, pause, user, pause_duration)
+ #else:
+ #pause_queue(machine_type, pause, user)
+ elif priority:
+ update_priority(machine_type, priority, run_name)
+ elif delete:
+ walk_jobs(machine_type,
+ JobDeleter(delete), user)
+ elif runs:
+ walk_jobs(machine_type,
+ RunPrinter(), user)
+ else:
+ walk_jobs(machine_type,
+ JobPrinter(show_desc=show_desc, full=full),
+ user)
+ except KeyboardInterrupt:
+ log.info("Interrupted.")
"""
Create a queue on the results server
- :param machine_type: The machine type specified for the job
+ :param queue: The queue specified for the job
"""
uri = "{base}/queue/".format(
base=self.base_uri
base=self.base_uri
)
request_info = {'queue': queue}
+ filter_field = queue
if run_name is not None:
filter_field = run_name
uri += "?run_name=" + str(run_name)
- else:
+ elif user is not None:
filter_field = user
uri += "?user=" + str(user)
reporter.create_queue(queue)
return queue
+def get_all_jobs_in_queue(queue, user=None, run_name=None):
+ reporter = ResultsReporter()
+ if not reporter.base_uri:
+ return
+ if ',' in queue:
+ queue = 'multi'
+ return reporter.queued_jobs(queue)
-def get_user_jobs_queue(machine_type, user, run_name=None):
+def get_user_jobs_queue(queue, user, run_name=None):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- return reporter.queued_jobs(machine_type, user, run_name)
+ return reporter.queued_jobs(queue, user, run_name)
-def pause_queue(machine_type, paused, paused_by, pause_duration=None):
+def pause_queue(queue, paused, paused_by, pause_duration=None):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- reporter.update_queue(machine_type, paused, paused_by, pause_duration)
+ reporter.update_queue(queue, paused, paused_by, pause_duration)
-def is_queue_paused(machine_type):
+def is_queue_paused(queue):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- stats = reporter.queue_stats(machine_type)
+ stats = reporter.queue_stats(queue)
if stats['paused'] != 0 and stats['paused'] is not None:
return True
return False
-def get_queue_stats(machine_type):
+def get_queue_stats(queue):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- stats = reporter.queue_stats(machine_type)
+ stats = reporter.queue_stats(queue)
return stats
from datetime import datetime
from teuthology import setup_log_file, install_except_hook
-from teuthology import beanstalk
+from teuthology.queue import beanstalk
from teuthology import report
from teuthology import safepath
from teuthology.config import config as teuth_config