With the use of the --queue-backend argument the user can specify which backend(paddles/beanstalk) they would like to use for maintaining the teuthology Jobs queue.
In order to avoid overlapping Job IDs, when a job is being scheduled in beanstalk it is also written to paddles which returns a unique ID.
This is the ID teuthology will treat as the Job ID throughout the run of the job.
To differentiate between the 2 queue backends, the teuthology-queue command has been split into teuthology-paddles-queue command and teuthology-beanstalk-queue command.
Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
--- /dev/null
+import docopt
+
+import teuthology.config
+import teuthology.beanstalk
+
+doc = """
+usage: teuthology-beanstalk-queue -h
+ teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE
+ teuthology-beanstalk-queue [-r] -m MACHINE_TYPE
+ teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN
+ teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE]
+List Jobs in queue.
+If -D is passed, then jobs with PATTERN in the job name are deleted from the
+queue.
+Arguments:
+ -m, --machine_type MACHINE_TYPE [default: multi]
+ Which machine type queue to work on.
+optional arguments:
+ -h, --help Show this help message and exit
+ -D, --delete PATTERN Delete Jobs with PATTERN in their name
+ -d, --description Show job descriptions
+ -r, --runs Only show run names
+ -f, --full Print the entire job config. Use with caution.
+ -s, --status Prints the status of the queue
+ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0
+ will unpause. If -m is passed, pause that queue,
+ otherwise pause all queues.
+"""
+
+
+def main():
+
+ args = docopt.docopt(doc)
+ print(args)
+ teuthology.beanstalk.main(args)
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
- teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE
+ teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND
-Start a dispatcher for the specified machine type. Grab jobs from a paddles
+Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
--exit-on-empty-queue if the queue is empty, exit
+ --queue-backend BACKEND choose between paddles and beanstalk
"""
import docopt
--- /dev/null
+import docopt
+
+import teuthology.config
+import teuthology.paddles_queue
+
+doc = """
+usage: teuthology-paddles-queue -h
+ teuthology-paddles-queue -s -m MACHINE_TYPE
+ teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
+ teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
+ teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
+ teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
+ teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
+ teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
+
+List Jobs in queue.
+If -D is passed, then jobs with PATTERN in the job name are deleted from the
+queue.
+
+Arguments:
+ -m, --machine_type MACHINE_TYPE
+ Which machine type queue to work on.
+
+optional arguments:
+ -h, --help Show this help message and exit
+ -D, --delete PATTERN Delete Jobs with PATTERN in their name
+ -d, --description Show job descriptions
+ -r, --runs Only show run names
+ -f, --full Print the entire job config. Use with caution.
+ -s, --status Prints the status of the queue
+ -t, --time SECONDS Pause queues for a number of seconds.
+ If -m is passed, pause that queue,
+ otherwise pause all queues.
+ -p, --pause Pause queue
+ -u, --unpause Unpause queue
+ -P, --priority PRIORITY
+ Change priority of queued jobs
+ -U, --user USER User who owns the jobs
+ -R, --run-name RUN_NAME
+ Used to change priority of all jobs in the run.
+"""
+
+
+def main():
+ args = docopt.docopt(doc)
+ teuthology.paddles_queue.main(args)
+++ /dev/null
-import docopt
-
-import teuthology.config
-import teuthology.paddles_queue
-
-doc = """
-usage: teuthology-queue -h
- teuthology-queue -s -m MACHINE_TYPE
- teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER
- teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
- teuthology-queue [-r] -m MACHINE_TYPE -U USER
- teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER
- teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
- teuthology-queue -u -m MACHINE_TYPE -U USER
-
-List Jobs in queue.
-If -D is passed, then jobs with PATTERN in the job name are deleted from the
-queue.
-
-Arguments:
- -m, --machine_type MACHINE_TYPE
- Which machine type queue to work on.
-
-optional arguments:
- -h, --help Show this help message and exit
- -D, --delete PATTERN Delete Jobs with PATTERN in their name
- -d, --description Show job descriptions
- -r, --runs Only show run names
- -f, --full Print the entire job config. Use with caution.
- -s, --status Prints the status of the queue
- -t, --time SECONDS Pause queues for a number of seconds.
- If -m is passed, pause that queue,
- otherwise pause all queues.
- -p, --pause Pause queue
- -u, --unpause Unpause queue
- -P, --priority PRIORITY
- Change priority of queued jobs
- -U, --user USER User who owns the jobs
- -R, --run_name RUN_NAME
- Used to change priority of all jobs in the run.
-"""
-
-
-def main():
- args = docopt.docopt(doc)
- teuthology.paddles_queue.main(args)
--- /dev/null
+import beanstalkc
+import yaml
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology.config import config
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def connect():
+ host = config.queue_host
+ port = config.queue_port
+ if host is None or port is None:
+ raise RuntimeError(
+ 'Beanstalk queue information not found in {conf_path}'.format(
+ conf_path=config.teuthology_yaml))
+ return beanstalkc.Connection(host=host, port=port)
+
+
+def watch_tube(connection, tube_name):
+ """
+ Watch a given tube, potentially correcting to 'multi' if necessary. Returns
+ the tube_name that was actually used.
+ """
+ if ',' in tube_name:
+ log.debug("Correcting tube name to 'multi'")
+ tube_name = 'multi'
+ connection.watch(tube_name)
+ connection.ignore('default')
+ return tube_name
+
+
+def walk_jobs(connection, tube_name, processor, pattern=None):
+ """
+ def callback(jobs_dict)
+ """
+ log.info("Checking Beanstalk Queue...")
+ job_count = connection.stats_tube(tube_name)['current-jobs-ready']
+ if job_count == 0:
+ log.info('No jobs in Beanstalk Queue')
+ return
+
+ # Try to figure out a sane timeout based on how many jobs are in the queue
+ timeout = job_count / 2000.0 * 60
+ for i in range(1, job_count + 1):
+ print_progress(i, job_count, "Loading")
+ job = connection.reserve(timeout=timeout)
+ if job is None or job.body is None:
+ continue
+ job_config = yaml.safe_load(job.body)
+ job_name = job_config['name']
+ job_id = job.stats()['id']
+ if pattern is not None and pattern not in job_name:
+ continue
+ processor.add_job(job_id, job_config, job)
+ end_progress()
+ processor.complete()
+
+
+def print_progress(index, total, message=None):
+ msg = "{m} ".format(m=message) if message else ''
+ sys.stderr.write("{msg}{i}/{total}\r".format(
+ msg=msg, i=index, total=total))
+ sys.stderr.flush()
+
+
+def end_progress():
+ sys.stderr.write('\n')
+ sys.stderr.flush()
+
+
+class JobProcessor(object):
+ def __init__(self):
+ self.jobs = OrderedDict()
+
+ def add_job(self, job_id, job_config, job_obj=None):
+ job_id = str(job_id)
+
+ job_dict = dict(
+ index=(len(self.jobs) + 1),
+ job_config=job_config,
+ )
+ if job_obj:
+ job_dict['job_obj'] = job_obj
+ self.jobs[job_id] = job_dict
+
+ self.process_job(job_id)
+
+ def process_job(self, job_id):
+ pass
+
+ def complete(self):
+ pass
+
+
+class JobPrinter(JobProcessor):
+ def __init__(self, show_desc=False, full=False):
+ super(JobPrinter, self).__init__()
+ self.show_desc = show_desc
+ self.full = full
+
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_index = self.jobs[job_id]['index']
+ job_priority = job_config['priority']
+ job_name = job_config['name']
+ job_desc = job_config['description']
+ print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+ i=job_index,
+ pri=job_priority,
+ job_id=job_id,
+ job_name=job_name,
+ ))
+ if self.full:
+ pprint.pprint(job_config)
+ elif job_desc and self.show_desc:
+ for desc in job_desc.split():
+ print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+ def __init__(self):
+ super(RunPrinter, self).__init__()
+ self.runs = list()
+
+ def process_job(self, job_id):
+ run = self.jobs[job_id]['job_config']['name']
+ if run not in self.runs:
+ self.runs.append(run)
+ print(run)
+
+
+class JobDeleter(JobProcessor):
+ def __init__(self, pattern):
+ self.pattern = pattern
+ super(JobDeleter, self).__init__()
+
+ def add_job(self, job_id, job_config, job_obj=None):
+ job_name = job_config['name']
+ if self.pattern in job_name:
+ super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_name = job_config['name']
+ print('Deleting {job_name}/{job_id}'.format(
+ job_id=job_id,
+ job_name=job_name,
+ ))
+ job_obj = self.jobs[job_id].get('job_obj')
+ if job_obj:
+ job_obj.delete()
+ report.try_delete_jobs(job_name, job_id)
+
+
+def pause_tube(connection, tube, duration):
+ duration = int(duration)
+ if not tube:
+ tubes = sorted(connection.tubes())
+ else:
+ tubes = [tube]
+
+ prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
+ templ = prefix + ": {tubes}"
+ log.info(templ.format(dur=duration, tubes=tubes))
+ for tube in tubes:
+ connection.pause_tube(tube, duration)
+
+
+def stats_tube(connection, tube):
+ stats = connection.stats_tube(tube)
+ result = dict(
+ name=tube,
+ count=stats['current-jobs-ready'],
+ paused=(stats['pause'] != 0),
+ )
+ return result
+
+
+def main(args):
+ machine_type = args['--machine_type']
+ status = args['--status']
+ delete = args['--delete']
+ runs = args['--runs']
+ show_desc = args['--description']
+ full = args['--full']
+ pause_duration = args['--pause']
+ try:
+ connection = connect()
+ if machine_type and not pause_duration:
+ # watch_tube needs to be run before we inspect individual jobs;
+ # it is not needed for pausing tubes
+ watch_tube(connection, machine_type)
+ if status:
+ print(stats_tube(connection, machine_type))
+ elif pause_duration:
+ pause_tube(connection, machine_type, pause_duration)
+ elif delete:
+ walk_jobs(connection, machine_type,
+ JobDeleter(delete))
+ elif runs:
+ walk_jobs(connection, machine_type,
+ RunPrinter())
+ else:
+ walk_jobs(connection, machine_type,
+ JobPrinter(show_desc=show_desc, full=full))
+ except KeyboardInterrupt:
+ log.info("Interrupted.")
+ finally:
+ connection.close()
'archive_upload_key': None,
'archive_upload_url': None,
'automated_scheduling': False,
+ 'backend': 'paddles',
'reserve_machines': 5,
'ceph_git_base_url': 'https://github.com/ceph/',
'ceph_git_url': None,
from time import sleep
from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
from teuthology import report
from teuthology.config import config as teuth_config
from teuthology.exceptions import SkipJob
def clean_config(config):
result = {}
for key in config:
+ if key == 'status':
+ continue
if config[key] is not None:
result[key] = config[key]
return result
machine_type = args["--machine-type"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
+<<<<<<< HEAD
exit_on_empty_queue = args["--exit-on-empty-queue"]
+=======
+ backend = args['--queue-backend']
+>>>>>>> 79c4d9bf... Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles
if archive_dir is None:
archive_dir = teuth_config.archive_base
load_config(archive_dir=archive_dir)
+ if backend == 'beanstalk':
+ connection = beanstalk.connect()
+ beanstalk.watch_tube(connection, machine_type)
+
result_proc = None
if teuth_config.teuthology_path is None:
load_config()
job_procs = set(filter(lambda p: p.poll() is None, job_procs))
- job = report.get_queued_job(machine_type)
- if job is None:
- if exit_on_empty_queue and not job_procs:
- log.info("Queue is empty and no supervisor processes running; exiting!")
- break
- continue
- job = clean_config(job)
- report.try_push_job_info(job, dict(status='running'))
- job_id = job.get('job_id')
- log.info('Reserved job %s', job_id)
- log.info('Config is: %s', job)
- job_config = job
-
+ if backend == 'beanstalk':
+ job = connection.reserve(timeout=60)
+ if job is None:
+ continue
+ job.bury()
+ job_config = yaml.safe_load(job.body)
+ job_id = job_config.get('job_id')
+ log.info('Reserved job %s', job_id)
+ log.info('Config is: %s', job.body)
+ else:
+ job = report.get_queued_job(machine_type)
+ if job is None:
+ continue
+ job = clean_config(job)
+ report.try_push_job_info(job, dict(status='running'))
+ job_id = job.get('job_id')
+ log.info('Reserved job %s', job_id)
+ log.info('Config is: %s', job)
+ job_config = job
+
if job_config.get('stop_worker'):
keep_running = False
status='fail',
failure_reason=error_message))
+ # This try/except block is to keep the worker from dying when
+ # beanstalkc throws a SocketError
+ if backend == 'beanstalk':
+ try:
+ job.delete()
+ except Exception:
+ log.exception("Saw exception while trying to delete job")
returncodes = set([0])
for proc in job_procs:
def pause_queue(machine_type, paused, paused_by, pause_duration=None):
- if paused == True:
+ if paused:
report.pause_queue(machine_type, paused, paused_by, pause_duration)
'''
If there is a pause duration specified
sleep(int(pause_duration))
paused = False
report.pause_queue(machine_type, paused, paused_by)
- elif paused == False:
+ elif not paused:
report.pause_queue(machine_type, paused, paused_by)
def run_job(job_config, teuth_bin_path, archive_dir, verbose):
safe_archive = safepath.munge(job_config['name'])
if job_config.get('first_in_suite') or job_config.get('last_in_suite'):
+ if teuth_config.results_server:
+ try:
+ report.try_delete_jobs(job_config['name'], job_config['job_id'])
+ except Exception as e:
+ log.warning("Unable to delete job %s, exception occurred: %s",
+ job_config['job_id'], e)
job_archive = os.path.join(archive_dir, safe_archive)
args = [
os.path.join(teuth_bin_path, 'teuthology-results'),
import logging
import getpass
-
+from teuthology import beanstalk
from teuthology import report
+from teuthology.config import config
from teuthology import misc
log = logging.getLogger(__name__)
"you must also pass --machine-type")
if not preserve_queue:
+ remove_beanstalk_jobs(run_name, machine_type)
remove_paddles_jobs(run_name)
kill_processes(run_name, run_info.get('pids'))
if owner is not None:
job_info = {}
job_num = 0
jobs = serializer.jobs_for_run(run_name)
+ job_total = len(jobs)
for (job_id, job_dir) in jobs.items():
if not os.path.isdir(job_dir):
continue
job_num += 1
+ if config.backend == 'beanstalk':
+ beanstalk.print_progress(job_num, job_total, 'Reading Job: ')
job_info = serializer.job_info(run_name, job_id, simple=True)
for key in job_info.keys():
if key in run_info_fields and key not in run_info:
report.try_delete_jobs(run_name, job_ids)
+def remove_beanstalk_jobs(run_name, tube_name):
+ qhost = config.queue_host
+ qport = config.queue_port
+ if qhost is None or qport is None:
+ raise RuntimeError(
+ 'Beanstalk queue information not found in {conf_path}'.format(
+ conf_path=config.yaml_path))
+ log.info("Checking Beanstalk Queue...")
+ beanstalk_conn = beanstalk.connect()
+ real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name)
+
+ curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready']
+ if curjobs != 0:
+ x = 1
+ while x != curjobs:
+ x += 1
+ job = beanstalk_conn.reserve(timeout=20)
+ if job is None:
+ continue
+ job_config = yaml.safe_load(job.body)
+ if run_name == job_config['name']:
+ job_id = job_config['job_id']
+ msg = "Deleting job from queue. ID: " + \
+ "{id} Name: {name} Desc: {desc}".format(
+ id=str(job_id),
+ name=job_config['name'],
+ desc=job_config['description'],
+ )
+ log.info(msg)
+ job.delete()
+ else:
+ print("No jobs in Beanstalk Queue")
+ beanstalk_conn.close()
+
+
def kill_processes(run_name, pids=None):
if pids:
to_kill = set(pids).intersection(psutil.pids())
command=self.command, exitstatus=self.returncode,
node=self.hostname, label=self.label
)
-
def _get_exitstatus(self):
"""
:returns: the remote command's exit status (return code). Note that
if os.path.exists(self.last_run_file):
os.remove(self.last_run_file)
- def get_top_job(self, machine_type):
+ def get_top_job(self, queue):
- uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri,
- machine_type=machine_type)
+ uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri,
+ queue=queue)
inc = random.uniform(0, 1)
with safe_while(
- sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed:
+ sleep=1, increment=inc, action=f'get job from {queue}') as proceed:
while proceed():
response = self.session.get(uri)
if response.status_code == 200:
response = self.session.delete(uri)
response.raise_for_status()
- def create_queue(self, machine_type):
+ def create_queue(self, queue):
"""
Create a queue on the results server
uri = "{base}/queue/".format(
base=self.base_uri
)
- queue_info = {'machine_type': machine_type}
+ queue_info = {'queue': queue}
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
inc = random.uniform(0, 1)
with safe_while(
- sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed:
+ sleep=1, increment=inc, action=f'creating queue {queue}') as proceed:
while proceed():
response = self.session.post(uri, data=queue_json, headers=headers)
if response.status_code == 200:
- self.log.info("Successfully created queue for {machine_type}".format(
- machine_type=machine_type,
+ self.log.info("Successfully created queue {queue}".format(
+ queue=queue,
))
return
else:
response.raise_for_status()
- def update_queue(self, machine_type, paused, paused_by, pause_duration=None):
+ def update_queue(self, queue, paused, paused_by, pause_duration=None):
uri = "{base}/queue/".format(
base=self.base_uri
)
if pause_duration is not None:
pause_duration = int(pause_duration)
- queue_info = {'machine_type': machine_type, 'paused': paused, 'paused_by': paused_by,
+ queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by,
'pause_duration': pause_duration}
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
inc = random.uniform(0, 1)
with safe_while(
- sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed:
+ sleep=1, increment=inc, action=f'updating queue {queue}') as proceed:
while proceed():
response = self.session.put(uri, data=queue_json, headers=headers)
if response.status_code == 200:
- self.log.info("Successfully updated queue for {machine_type}".format(
- machine_type=machine_type,
+ self.log.info("Successfully updated queue {queue}".format(
+ queue=queue,
))
return
else:
response.raise_for_status()
- def queue_stats(self, machine_type):
+ def queue_stats(self, queue):
uri = "{base}/queue/stats/".format(
base=self.base_uri
)
- queue_info = {'machine_type': machine_type}
+ queue_info = {'queue': queue}
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
inc = random.uniform(0, 1)
with safe_while(
- sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed:
+ sleep=1, increment=inc, action=f'stats for queue {queue}') as proceed:
while proceed():
response = self.session.post(uri, data=queue_json, headers=headers)
if response.status_code == 200:
- self.log.info("Successfully retrieved stats for queue {machine_type}".format(
- machine_type=machine_type,
+ self.log.info("Successfully retrieved stats for queue {queue}".format(
+ queue=queue,
))
return response.json()
else:
))
response.raise_for_status()
- def queued_jobs(self, machine_type, user, run_name):
+ def queued_jobs(self, queue, user, run_name):
uri = "{base}/queue/queued_jobs/".format(
base=self.base_uri
)
+ request_info = {'queue': queue}
if run_name is not None:
filter_field = run_name
- request_info = {'machine_type': machine_type,
- 'run_name': run_name}
+ uri += "?run_name=" + str(run_name)
else:
filter_field = user
- request_info = {'machine_type': machine_type,
- 'user': user}
+ uri += "?user=" + str(user)
+
request_json = json.dumps(request_info)
headers = {'content-type': 'application/json'}
inc = random.uniform(0, 1)
response.raise_for_status()
-def create_machine_type_queue(machine_type):
+def create_machine_type_queue(queue):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- reporter.create_queue(machine_type)
+ if ',' in queue:
+ queue = 'multi'
+ reporter.create_queue(queue)
+ return queue
def get_user_jobs_queue(machine_type, user, run_name=None):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- if is_queue_paused(machine_type) == True:
- log.info("Teuthology queue for machine type %s is currently paused",
- machine_type)
+ if ',' in machine_type:
+ queue = 'multi'
+ else:
+ queue = machine_type
+ if is_queue_paused(queue) == True:
+ log.info("Teuthology queue %s is currently paused",
+ queue)
return None
else:
- return reporter.get_top_job(machine_type)
+ return reporter.get_top_job(queue)
def try_push_job_info(job_config, extra_info=None):
import os
import yaml
+import teuthology.beanstalk
from teuthology.misc import get_user, merge_configs
from teuthology import report
if args[opt]:
raise ValueError(msg_fmt.format(opt=opt))
- if args['--first-in-suite'] or args['--last-in-suite']:
- report_status = False
- else:
- report_status = True
-
name = args['--name']
if not name or name.isdigit():
raise ValueError("Please use a more descriptive value for --name")
backend = args['--queue-backend']
if args['--dry-run']:
print('---\n' + yaml.safe_dump(job_config))
- elif backend == 'paddles':
- schedule_job(job_config, args['--num'], report_status)
elif backend.startswith('@'):
dump_job_to_file(backend.lstrip('@'), job_config, args['--num'])
+ elif backend == 'paddles':
+ paddles_schedule_job(job_config, args['--num'])
+ elif backend == 'beanstalk':
+ beanstalk_schedule_job(job_config, args['--num'])
else:
raise ValueError("Provided schedule backend '%s' is not supported. "
- "Try 'paddles' or '@path-to-a-file" % backend)
+ "Try 'paddles', 'beanstalk' or '@path-to-a-file" % backend)
def build_config(args):
return job_config
-def schedule_job(job_config, num=1, report_status=True):
+def paddles_schedule_job(job_config, backend, num=1):
"""
- Schedule a job.
+ Schedule a job with Paddles as the backend.
:param job_config: The complete job dict
:param num: The number of times to schedule the job
'''
Add 'machine_type' queue to DB here.
'''
- report.create_machine_type_queue(job_config['machine_type'])
+ queue = report.create_machine_type_queue(job_config['machine_type'])
+ job_config['queue'] = queue
while num > 0:
-
job_id = report.try_create_job(job_config, dict(status='queued'))
- print('Job scheduled with name {name} and ID {job_id}'.format(
+ print('Job scheduled in Paddles with name {name} and ID {job_id}'.format(
name=job_config['name'], job_id=job_id))
job_config['job_id'] = str(job_id)
-
+
+ num -= 1
+
+
+def beanstalk_schedule_job(job_config, backend, num=1):
+ """
+ Schedule a job with Beanstalk as the backend.
+
+ :param job_config: The complete job dict
+ :param num: The number of times to schedule the job
+ """
+ num = int(num)
+ tube = job_config.pop('tube')
+ beanstalk = teuthology.beanstalk.connect()
+ beanstalk.use(tube)
+ queue = report.create_machine_type_queue(job_config['machine_type'])
+ job_config['queue'] = queue
+ while num > 0:
+ job_id = report.try_create_job(job_config, dict(status='queued'))
+ job_config['job_id'] = str(job_id)
+ job = yaml.safe_dump(job_config)
+ _ = beanstalk.put(
+ job,
+ ttr=60 * 60 * 24,
+ priority=job_config['priority'],
+ )
+ print('Job scheduled in Beanstalk with name {name} and ID {job_id}'.format(
+ name=job_config['name'], job_id=job_id))
num -= 1
+
def dump_job_to_file(path, job_config, num=1):
"""
Schedule a job.
num -= 1
with open(count_file_path, 'w') as f:
f.write(str(jid))
-
'--description': 'the_description',
'--machine-type': 'test_queue',
'--supervisor': False,
- '--verbose': False
+ '--verbose': False,
+ '--queue-backend': 'paddles'
}
m = mock.MagicMock()
+import beanstalkc
import os
from unittest.mock import patch, Mock, MagicMock
@patch("os.symlink")
def test_symlink_success(self, m_symlink):
worker.symlink_worker_log("path/to/worker.log", "path/to/archive")
- m_symlink.assert_called_with("path/to/worker.log", "path/to/archive/worker.log")
+ m_symlink.assert_called_with(
+ "path/to/worker.log", "path/to/archive/worker.log")
@patch("teuthology.worker.log")
@patch("os.symlink")
m_popen.return_value = m_p
m_t_config.results_server = False
worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
- m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+ m_symlink_log.assert_called_with(
+ config["worker_log"], config["archive_path"])
@patch("teuthology.worker.report.try_push_job_info")
@patch("teuthology.worker.symlink_worker_log")
process = Mock()
process.poll.return_value = "not None"
worker.run_with_watchdog(process, config)
- m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+ m_symlink_log.assert_called_with(
+ config["worker_log"], config["archive_path"])
m_try_push.assert_called_with(
dict(name=config["name"], job_id=config["job_id"]),
dict(status='dead')
m_proc.poll.return_value = "not None"
m_popen.return_value = m_proc
worker.run_with_watchdog(process, config)
- m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+ m_symlink_log.assert_called_with(
+ config["worker_log"], config["archive_path"])
@patch("os.path.isdir")
@patch("teuthology.worker.fetch_teuthology")
assert m_fetch_qa_suite.called_once_with_args(branch='main')
assert got_config['suite_path'] == '/suite/path'
- def build_fake_jobs(self, job_bodies):
+ def build_fake_jobs(self, m_connection, m_job, job_bodies):
"""
+ Given patched copies of:
+ beanstalkc.Connection
+ beanstalkc.Job
And a list of basic job bodies, return a list of mocked Job objects
"""
+ # Make sure instantiating m_job returns a new object each time
+ m_job.side_effect = lambda **kwargs: Mock(spec=beanstalkc.Job)
jobs = []
job_id = 0
for job_body in job_bodies:
job_id += 1
- job = {}
- job['job_id'] = job_id
- job['body'] = job_body
+ job = m_job(conn=m_connection, jid=job_id, body=job_body)
+ job.jid = job_id
+ job_body += '\njob_id: ' + str(job_id)
+ job.body = job_body
jobs.append(job)
return jobs
-
- @patch("teuthology.worker.setup_log_file")
- @patch("os.path.isdir", return_value=True)
- @patch("teuthology.worker.fetch_teuthology")
- @patch("teuthology.worker.fetch_qa_suite")
@patch("teuthology.worker.run_job")
+ @patch("teuthology.worker.prep_job")
+ @patch("beanstalkc.Job", autospec=True)
+ @patch("teuthology.worker.fetch_qa_suite")
+ @patch("teuthology.worker.fetch_teuthology")
+ @patch("teuthology.worker.beanstalk.watch_tube")
+ @patch("teuthology.worker.beanstalk.connect")
+ @patch("os.path.isdir", return_value=True)
+ @patch("teuthology.worker.setup_log_file")
+ def test_main_loop(
+ self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+ m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job,
+ ):
+ m_connection = Mock()
+ jobs = self.build_fake_jobs(
+ m_connection,
+ m_job,
+ [
+ 'foo: bar',
+ 'stop_worker: true',
+ ],
+ )
+ m_connection.reserve.side_effect = jobs
+ m_connect.return_value = m_connection
+ m_prep_job.return_value = (dict(), '/bin/path')
+ worker.main(self.ctx)
+ # There should be one reserve call per item in the jobs list
+ expected_reserve_calls = [
+ dict(timeout=60) for i in range(len(jobs))
+ ]
+ got_reserve_calls = [
+ call[1] for call in m_connection.reserve.call_args_list
+ ]
+ assert got_reserve_calls == expected_reserve_calls
+ for job in jobs:
+ job.bury.assert_called_once_with()
+ job.delete.assert_called_once_with()
+
@patch("teuthology.worker.report.try_push_job_info")
- @patch("teuthology.worker.report.get_queued_job")
- @patch("teuthology.worker.clean_config")
+ @patch("teuthology.worker.run_job")
+ @patch("beanstalkc.Job", autospec=True)
+ @patch("teuthology.worker.fetch_qa_suite")
+ @patch("teuthology.worker.fetch_teuthology")
+ @patch("teuthology.worker.beanstalk.watch_tube")
+ @patch("teuthology.worker.beanstalk.connect")
+ @patch("os.path.isdir", return_value=True)
+ @patch("teuthology.worker.setup_log_file")
def test_main_loop_13925(
- self, m_setup_log_file, m_isdir,
- m_fetch_teuthology, m_fetch_qa_suite, m_run_job,
- m_try_push_job_info, m_get_queued_job, m_clean_config
- ):
+ self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+ m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
+ m_try_push_job_info,
+ ):
+ m_connection = Mock()
+ jobs = self.build_fake_jobs(
+ m_connection,
+ m_job,
+ [
+ 'name: name',
+ 'name: name\nstop_worker: true',
+ ],
+ )
+ m_connection.reserve.side_effect = jobs
+ m_connect.return_value = m_connection
m_fetch_qa_suite.side_effect = [
'/suite/path',
MaxWhileTries(),
MaxWhileTries(),
]
- job = {
- 'job_id': '1',
- 'description': 'DESC',
- 'email': 'EMAIL',
- 'first_in_suite': False,
- 'last_in_suite': True,
- 'machine_type': 'test_queue',
- 'name': 'NAME',
- 'owner': 'OWNER',
- 'priority': 99,
- 'results_timeout': '6',
- 'verbose': False,
- 'stop_worker': True
- }
- m_get_queued_job.return_value = job
- m_clean_config.return_value = job
-
- mock_prep_job_patcher = patch('teuthology.worker.prep_job')
- mock_prep_job = mock_prep_job_patcher.start()
- mock_prep_job.return_value = (dict(), '/teuth/bin/path')
-
worker.main(self.ctx)
- mock_prep_job_patcher.stop()
- assert len(m_run_job.call_args_list) == 1
- assert len(m_try_push_job_info.call_args_list) == 1
- assert m_try_push_job_info.called_once_with(job, dict(status='running'))
-
+ assert len(m_run_job.call_args_list) == 0
+ assert len(m_try_push_job_info.call_args_list) == len(jobs)
+ for i in range(len(jobs)):
+ push_call = m_try_push_job_info.call_args_list[i]
+ assert push_call[0][1]['status'] == 'dead'
from datetime import datetime
from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
from teuthology import report
from teuthology import safepath
from teuthology.config import config as teuth_config
teuth_config.archive_base = ctx.archive_dir
-def clean_config(config):
- result = {}
- for key in config:
- if config[key] is not None:
- result[key] = config[key]
- return result
-
-
def main(ctx):
loglevel = logging.INFO
if ctx.verbose:
set_config_attr(ctx)
+ connection = beanstalk.connect()
+ beanstalk.watch_tube(connection, ctx.tube)
result_proc = None
if teuth_config.teuthology_path is None:
load_config()
- job = report.get_queued_job(ctx.machine_type)
+ job = connection.reserve(timeout=60)
if job is None:
continue
- job = clean_config(job)
- report.try_push_job_info(job, dict(status='running'))
- job_id = job.get('job_id')
+ # bury the job so it won't be re-run if it fails
+ job.bury()
+ job_config = yaml.safe_load(job.body)
+ job_id = job_config.get('job_id')
log.info('Reserved job %s', job_id)
- log.info('Config is: %s', job)
- job_config = job
+ log.info('Config is: %s', job.body)
if job_config.get('stop_worker'):
keep_running = False
except SkipJob:
continue
+ # This try/except block is to keep the worker from dying when
+ # beanstalkc throws a SocketError
+ try:
+ job.delete()
+ except Exception:
+ log.exception("Saw exception while trying to delete job")
+
def prep_job(job_config, log_file_path, archive_dir):
job_id = job_config['job_id']