import os
import yaml
-import teuthology.beanstalk
-from teuthology.misc import get_user, merge_configs
from teuthology import report
+from teuthology.jobqueue.base import QueueDirection
+from teuthology.misc import get_user, merge_configs
+import teuthology.jobqueue.choice
def main(args):
if not name or name.isdigit():
raise ValueError("Please use a more descriptive value for --name")
job_config = build_config(args)
- backend = args['--queue-backend']
if args['--dry-run']:
print('---\n' + yaml.safe_dump(job_config))
- elif backend == 'beanstalk':
- schedule_job(job_config, args['--num'], report_status)
- elif backend.startswith('@'):
- dump_job_to_file(backend.lstrip('@'), job_config, args['--num'])
- else:
- raise ValueError("Provided schedule backend '%s' is not supported. "
- "Try 'beanstalk' or '@path-to-a-file" % backend)
+ return
+ queue = select_queue(args['--queue-backend'], job_config)
+ schedule_job(queue, job_config, args['--num'], report_status)
def build_config(args):
return job_config
-def schedule_job(job_config, num=1, report_status=True):
+def schedule_job(queue, job_config, num=1, report_status=True):
"""
Schedule a job.
+ :param queue: Job queue object
:param job_config: The complete job dict
:param num: The number of times to schedule the job
+ :param report_status: Enable or disable reporting job
"""
- num = int(num)
- job = yaml.safe_dump(job_config)
- tube = job_config.pop('tube')
- beanstalk = teuthology.beanstalk.connect()
- beanstalk.use(tube)
- while num > 0:
- jid = beanstalk.put(
- job,
- ttr=60 * 60 * 24,
- priority=job_config['priority'],
- )
- print('Job scheduled with name {name} and ID {jid}'.format(
- name=job_config['name'], jid=jid))
- job_config['job_id'] = str(jid)
+ for _ in range(int(num)):
+ queue.put(job_config)
if report_status:
report.try_push_job_info(job_config, dict(status='queued'))
- num -= 1
-def dump_job_to_file(path, job_config, num=1):
+def select_queue(backend, job_config):
"""
- Schedule a job.
+ Select the kind of job queue to use.
+ :param backend: String name of queue
:param job_config: The complete job dict
- :param num: The number of times to schedule the job
- :param path: The file path where the job config to append
"""
- num = int(num)
- count_file_path = path + '.count'
-
- jid = 0
- if os.path.exists(count_file_path):
- with open(count_file_path, 'r') as f:
- jid=int(f.read() or '0')
-
- with open(path, 'a') as f:
- while num > 0:
- jid += 1
- job_config['job_id'] = str(jid)
- job = yaml.safe_dump(job_config)
- print('Job scheduled with name {name} and ID {jid}'.format(
- name=job_config['name'], jid=jid))
- f.write('---\n' + job)
- num -= 1
- with open(count_file_path, 'w') as f:
- f.write(str(jid))
-
+ tube = job_config.get('tube', '')
+ return teuthology.jobqueue.choice.from_backend(
+ backend, tube, QueueDirection.IN
+ )