Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk.
Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
+++ /dev/null
-import docopt
-
-import teuthology.config
-import teuthology.queue.paddles_queue
-doc = """
-usage: teuthology-paddles-queue -h
- teuthology-paddles-queue -s -m MACHINE_TYPE
- teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
- teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
- teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
- teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
- teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
- teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
-
-List Jobs in queue.
-If -D is passed, then jobs with PATTERN in the job name are deleted from the
-queue.
-
-Arguments:
- -m, --machine_type MACHINE_TYPE
- Which machine type queue to work on.
-
-optional arguments:
- -h, --help Show this help message and exit
- -D, --delete PATTERN Delete Jobs with PATTERN in their name
- -d, --description Show job descriptions
- -r, --runs Only show run names
- -f, --full Print the entire job config. Use with caution.
- -s, --status Prints the status of the queue
- -t, --time SECONDS Pause queues for a number of seconds.
- If -m is passed, pause that queue,
- otherwise pause all queues.
- -p, --pause Pause queue
- -u, --unpause Unpause queue
- -P, --priority PRIORITY
- Change priority of queued jobs
- -U, --user USER User who owns the jobs
- -R, --run-name RUN_NAME
- Used to change priority of all jobs in the run.
-"""
-
-
-def main():
- args = docopt.docopt(doc)
- teuthology.paddles_queue.main(args)
import docopt
-import teuthology.config
import teuthology.queue.beanstalk
import teuthology.queue.paddles
+from teuthology.config import config
doc = """
usage: teuthology-queue -h
teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-queue [-r] -m MACHINE_TYPE
teuthology-queue -m MACHINE_TYPE -D PATTERN
- teuthology-queue -p SECONDS [-m MACHINE_TYPE]
+ teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER]
+ teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
-p, --pause SECONDS Pause queues for a number of seconds. A value of 0
will unpause. If -m is passed, pause that queue,
otherwise pause all queues.
+ -P, --priority PRIORITY
+ Change priority of queued jobs (only in Paddles queues)
+ -U, --user USER User who owns the jobs
+ -R, --run-name RUN_NAME
+ Used to change priority of all jobs in the run.
"""
def main():
args = docopt.docopt(doc)
- teuthology.queue.main(args)
+ if config.backend == 'beanstalk':
+ teuthology.queue.beanstalk.main(args)
+ else:
+ teuthology.queue.paddles.main(args)
def parse_args():
parser = argparse.ArgumentParser(description="""
-Grab jobs from a paddles queue and run the teuthology tests they
+Grab jobs from a beanstalk queue and run the teuthology tests they
describe. One job is run at a time.
""")
parser.add_argument(
required=True,
)
parser.add_argument(
- '-m', '--machine-type',
- help='which machine type the jobs will run on',
+ '-t', '--tube',
+ help='which beanstalk tube to read jobs from',
required=True,
)
if backend == 'beanstalk':
connection = beanstalk.connect()
beanstalk.watch_tube(connection, machine_type)
+ elif backend == 'paddles':
+ report.create_machine_type_queue(machine_type)
result_proc = None
else:
job = report.get_queued_job(machine_type)
if job is None:
+ if exit_on_empty_queue and not job_procs:
+ log.info("Queue is empty and no supervisor processes running; exiting!")
+ break
continue
job = clean_config(job)
report.try_push_job_info(job, dict(status='running'))
if teuth_config.results_server:
try:
report.try_delete_jobs(job_config['name'], job_config['job_id'])
- except Exception as e:
- log.warning("Unable to delete job %s, exception occurred: %s",
- job_config['job_id'], e)
+ except Exception:
+ log.exception("Unable to delete job %s", job_config['job_id'])
job_archive = os.path.join(archive_dir, safe_archive)
args = [
os.path.join(teuth_bin_path, 'teuthology-results'),
'--archive', job_config['archive_path'],
'--name', job_config['name'],
])
- if 'description' in job_config:
+ if job_config.get('description') is not None:
arg.extend(['--description', job_config['description']])
job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml')
arg.extend(['--', job_archive])
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology import report
-from teuthology.config import config
-
-log = logging.getLogger(__name__)
-
-def print_progress(index, total, message=None):
- msg = "{m} ".format(m=message) if message else ''
- sys.stderr.write("{msg}{i}/{total}\r".format(
- msg=msg, i=index, total=total))
- sys.stderr.flush()
-
-def end_progress():
- sys.stderr.write('\n')
- sys.stderr.flush()
-
-class JobProcessor(object):
- def __init__(self):
- self.jobs = OrderedDict()
-
- def add_job(self, job_id, job_config, job_obj=None):
- job_id = str(job_id)
-
- job_dict = dict(
- index=(len(self.jobs) + 1),
- job_config=job_config,
- )
- if job_obj:
- job_dict['job_obj'] = job_obj
- self.jobs[job_id] = job_dict
-
- self.process_job(job_id)
-
- def process_job(self, job_id):
- pass
-
- def complete(self):
- pass
-
-
-class JobPrinter(JobProcessor):
- def __init__(self, show_desc=False, full=False):
- super(JobPrinter, self).__init__()
- self.show_desc = show_desc
- self.full = full
-
- def process_job(self, job_id):
- job_config = self.jobs[job_id]['job_config']
- job_index = self.jobs[job_id]['index']
- job_priority = job_config['priority']
- job_name = job_config['name']
- job_desc = job_config['description']
- print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
- i=job_index,
- pri=job_priority,
- job_id=job_id,
- job_name=job_name,
- ))
- if self.full:
- pprint.pprint(job_config)
- elif job_desc and self.show_desc:
- for desc in job_desc.split():
- print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
- def __init__(self):
- super(RunPrinter, self).__init__()
- self.runs = list()
-
- def process_job(self, job_id):
- run = self.jobs[job_id]['job_config']['name']
- if run not in self.runs:
- self.runs.append(run)
- print(run)
-
-
-class JobDeleter(JobProcessor):
- def __init__(self, pattern):
- self.pattern = pattern
- super(JobDeleter, self).__init__()
-
- def add_job(self, job_id, job_config, job_obj=None):
- job_name = job_config['name']
- if self.pattern in job_name:
- super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
- def process_job(self, job_id):
- job_config = self.jobs[job_id]['job_config']
- job_name = job_config['name']
- print('Deleting {job_name}/{job_id}'.format(
- job_id=job_id,
- job_name=job_name,
- ))
- report.try_delete_jobs(job_name, job_id)
-
-
-def main(args):
- if config.backend == 'paddles':
- paddles.main(args)
- else:
- beanstalk.main(args)
\ No newline at end of file
import beanstalkc
import yaml
import logging
-import pprint
-import sys
-from collections import OrderedDict
from teuthology.config import config
-from teuthology import report
+from teuthology.queue import util
+
log = logging.getLogger(__name__)
# Try to figure out a sane timeout based on how many jobs are in the queue
timeout = job_count / 2000.0 * 60
for i in range(1, job_count + 1):
- print_progress(i, job_count, "Loading")
+ util.print_progress(i, job_count, "Loading")
job = connection.reserve(timeout=timeout)
if job is None or job.body is None:
continue
if pattern is not None and pattern not in job_name:
continue
processor.add_job(job_id, job_config, job)
- end_progress()
+ util.end_progress()
processor.complete()
pause_tube(connection, machine_type, pause_duration)
elif delete:
walk_jobs(connection, machine_type,
- JobDeleter(delete))
+ util.JobDeleter(delete))
elif runs:
walk_jobs(connection, machine_type,
- RunPrinter())
+ util.RunPrinter())
else:
walk_jobs(connection, machine_type,
- JobPrinter(show_desc=show_desc, full=full))
+ util.JobPrinter(show_desc=show_desc, full=full))
except KeyboardInterrupt:
log.info("Interrupted.")
finally:
import logging
-import pprint
-import sys
-from collections import OrderedDict
from teuthology import report
-from teuthology.dispatcher import pause_queue
-
+from teuthology.queue import util
log = logging.getLogger(__name__)
stats = report.get_queue_stats(machine_type)
if stats['paused'] is None:
log.info("%s queue is currently running with %s jobs queued",
- stats['name'],
- stats['count'])
+ stats['queue'],
+ stats['queued_jobs'])
else:
log.info("%s queue is paused with %s jobs queued",
- stats['name'],
- stats['count'])
+ stats['queue'],
+ stats['queued_jobs'])
-def update_priority(machine_type, priority, user, run_name=None):
+def update_priority(machine_type, priority, run_name=None):
if run_name is not None:
- jobs = report.get_user_jobs_queue(machine_type, user, run_name)
- else:
- jobs = report.get_user_jobs_queue(machine_type, user)
+ jobs = report.get_jobs_by_run(machine_type, run_name)
for job in jobs:
job['priority'] = priority
report.try_push_job_info(job)
def walk_jobs(machine_type, processor, user):
log.info("Checking paddles queue...")
- job_count = report.get_queue_stats(machine_type)['count']
+ job_count = report.get_queue_stats(machine_type)['queued_jobs']
jobs = report.get_user_jobs_queue(machine_type, user)
if job_count == 0:
- log.info('No jobs in queue')
+ log.info('No jobs in Paddles queue')
return
for i in range(1, job_count + 1):
- print_progress(i, job_count, "Loading")
+ util.print_progress(i, job_count, "Loading")
job = jobs[i-1]
if job is None:
continue
job_id = job['job_id']
processor.add_job(job_id, job)
- end_progress()
+ util.end_progress()
processor.complete()
def main(args):
machine_type = args['--machine_type']
- #user = args['--user']
- #run_name = args['--run_name']
- #priority = args['--priority']
+ user = args['--user']
+ run_name = args['--run-name']
status = args['--status']
delete = args['--delete']
runs = args['--runs']
show_desc = args['--description']
full = args['--full']
pause_duration = args['--pause']
- #unpause = args['--unpause']
- #pause_duration = args['--time']
+ priority = args['--priority']
try:
if status:
stats_queue(machine_type)
if pause_duration:
- pause_queue(machine_type, pause, user, pause_duration)
- #else:
- #pause_queue(machine_type, pause, user)
+ if not user:
+ log.info('Please enter user to pause Paddles queue')
+ return
+ report.pause_queue(machine_type, user, pause_duration)
elif priority:
update_priority(machine_type, priority, run_name)
elif delete:
walk_jobs(machine_type,
- JobDeleter(delete), user)
+ util.JobDeleter(delete), user)
elif runs:
walk_jobs(machine_type,
- RunPrinter(), user)
+ util.RunPrinter(), user)
else:
walk_jobs(machine_type,
- JobPrinter(show_desc=show_desc, full=full),
+ util.JobPrinter(show_desc=show_desc, full=full),
user)
except KeyboardInterrupt:
log.info("Interrupted.")
--- /dev/null
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def print_progress(index, total, message=None):
+ msg = "{m} ".format(m=message) if message else ''
+ sys.stderr.write("{msg}{i}/{total}\r".format(
+ msg=msg, i=index, total=total))
+ sys.stderr.flush()
+
+
+def end_progress():
+ sys.stderr.write('\n')
+ sys.stderr.flush()
+
+
+class JobProcessor(object):
+ def __init__(self):
+ self.jobs = OrderedDict()
+
+ def add_job(self, job_id, job_config, job_obj=None):
+ job_id = str(job_id)
+
+ job_dict = dict(
+ index=(len(self.jobs) + 1),
+ job_config=job_config,
+ )
+ if job_obj:
+ job_dict['job_obj'] = job_obj
+ self.jobs[job_id] = job_dict
+
+ self.process_job(job_id)
+
+ def process_job(self, job_id):
+ pass
+
+ def complete(self):
+ pass
+
+
+class JobPrinter(JobProcessor):
+ def __init__(self, show_desc=False, full=False):
+ super(JobPrinter, self).__init__()
+ self.show_desc = show_desc
+ self.full = full
+
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_index = self.jobs[job_id]['index']
+ job_priority = job_config['priority']
+ job_name = job_config['name']
+ job_desc = job_config['description']
+ print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+ i=job_index,
+ pri=job_priority,
+ job_id=job_id,
+ job_name=job_name,
+ ))
+ if self.full:
+ pprint.pprint(job_config)
+ elif job_desc and self.show_desc:
+ for desc in job_desc.split():
+ print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+ def __init__(self):
+ super(RunPrinter, self).__init__()
+ self.runs = list()
+
+ def process_job(self, job_id):
+ run = self.jobs[job_id]['job_config']['name']
+ if run not in self.runs:
+ self.runs.append(run)
+ print(run)
+
+
+class JobDeleter(JobProcessor):
+ def __init__(self, pattern):
+ self.pattern = pattern
+ super(JobDeleter, self).__init__()
+
+ def add_job(self, job_id, job_config, job_obj=None):
+ job_name = job_config['name']
+ if self.pattern in job_name:
+ super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_name = job_config['name']
+ print('Deleting {job_name}/{job_id}'.format(
+ job_id=job_id,
+ job_name=job_name,
+ ))
+ report.try_delete_jobs(job_name, job_id)
def get_top_job(self, queue):
- uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri,
+ uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri,
queue=queue)
inc = random.uniform(0, 1)
with safe_while(
response.raise_for_status()
- def update_queue(self, queue, paused, paused_by, pause_duration=None):
+ def update_queue(self, queue, paused_by, pause_duration=None):
uri = "{base}/queue/".format(
base=self.base_uri
)
+
if pause_duration is not None:
pause_duration = int(pause_duration)
- queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by,
+ queue_info = {'queue': queue, 'paused_by': paused_by,
'pause_duration': pause_duration}
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
response = self.session.post(uri, data=queue_json, headers=headers)
if response.status_code == 200:
- self.log.info("Successfully retrieved stats for queue {queue}".format(
- queue=queue,
- ))
return response.json()
else:
msg = response.text
return
return reporter.queued_jobs(queue, user, run_name)
+def get_jobs_by_run(queue, run_name):
+ reporter = ResultsReporter()
+ if not reporter.base_uri:
+ return
+ return reporter.queued_jobs(queue, None, run_name)
+
-def pause_queue(queue, paused, paused_by, pause_duration=None):
+def pause_queue(queue, paused_by, pause_duration=None):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- reporter.update_queue(queue, paused, paused_by, pause_duration)
+ reporter.update_queue(queue, paused_by, pause_duration)
def is_queue_paused(queue):
import os
import yaml
-import teuthology.beanstalk
+import teuthology.queue.beanstalk
from teuthology.misc import get_user, merge_configs
from teuthology import report
"""
num = int(num)
tube = job_config.pop('tube')
- beanstalk = teuthology.beanstalk.connect()
+ beanstalk = teuthology.queue.beanstalk.connect()
beanstalk.use(tube)
queue = report.create_machine_type_queue(job_config['machine_type'])
job_config['queue'] = queue
'--machine-type': 'test_queue',
'--supervisor': False,
'--verbose': False,
- '--queue-backend': 'paddles'
+ '--queue-backend': 'paddles',
+ '--exit-on-empty-queue': False
}
m = mock.MagicMock()