From: Zack Cerza Date: Mon, 28 Apr 2014 19:35:35 +0000 (-0500) Subject: Refactor teuthology.beanstalk X-Git-Tag: 1.1.0~1509 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a9d7aa351463288407752bd86a6169be40510623;p=teuthology.git Refactor teuthology.beanstalk This architecture will make it easier to add new functionality. Signed-off-by: Zack Cerza --- diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py index fcb3c6806d..afa4e30539 100644 --- a/teuthology/beanstalk.py +++ b/teuthology/beanstalk.py @@ -1,8 +1,11 @@ import beanstalkc import yaml import logging +import sys +from collections import OrderedDict from .config import config +from . import report log = logging.getLogger(__name__) @@ -22,45 +25,76 @@ def watch_tube(connection, tube_name): connection.ignore('default') -def walk_jobs(connection, machine_type, show_desc=False, delete=None): +def walk_jobs(connection, tube_name, callback, pattern=None): + """ + def callback(jobs_dict) + """ log.info("Checking Beanstalk Queue...") - job_count = connection.stats_tube(machine_type)['current-jobs-ready'] + job_count = connection.stats_tube(tube_name)['current-jobs-ready'] if job_count == 0: log.info('No jobs in Beanstalk Queue') return - x = 1 - while x < job_count: - x += 1 - job = connection.reserve(timeout=20) - if job is not None and job.body is not None: + + matching_jobs = OrderedDict() + for i in range(1, job_count + 1): + sys.stderr.write("{i}/{count}\r".format(i=i, count=job_count)) + sys.stderr.flush() + job = connection.reserve(timeout=30) + if job is None or job.body is None: + continue + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + if pattern is not None and pattern not in job_name: + continue + matching_jobs[job_id] = job + sys.stderr.write('\n') + sys.stderr.flush() + callback(matching_jobs) + + +def _print_matching_jobs(show_desc=False): + def print_matching_jobs(jobs_dict): + i = 0 + job_count = len(jobs_dict) + for job_id, job in jobs_dict.iteritems(): + i += 1 job_config = yaml.safe_load(job.body) job_name = job_config['name'] + job_desc = job_config['description'] job_id = job.stats()['id'] - job_description = job_config['description'] - if delete: - if delete in job_name: - m = 'Deleting {job_id}/{job_name}'.format( - job_id=job_id, - job_name=job_name, - ) - print m - job.delete() - else: - m = "Searching queue... Checked {x}/{count} Jobs\r".format( - x=x, count=job_count) - print m, - else: - m = 'Job: {x}/{count} {job_name}/{job_id}'.format( - x=x, - count=job_count, - job_id=job_id, - job_name=job_name, - ) - print m - if job_description and show_desc: - for desc in job_description.split(): - print '\t {desc}'.format(desc=desc) - log.info("Finished checking Beanstalk Queue.") + print 'Job: {i}/{count} {job_name}/{job_id}'.format( + i=i, + count=job_count, + job_id=job_id, + job_name=job_name, + ) + if job_desc and show_desc: + for desc in job_desc.split(): + print '\t {desc}'.format(desc=desc) + return print_matching_jobs + + +def delete_matching_jobs(jobs_dict): + for job_id, job in jobs_dict.iteritems(): + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + print 'Deleting {job_id}/{job_name}'.format( + job_id=job_id, + job_name=job_name, + ) + job.delete() + report.try_delete_jobs(job_name, job_id) + + +def print_matching_runs(jobs_dict): + runs = set() + for job_id, job in jobs_dict.iteritems(): + job_config = yaml.safe_load(job.body) + runs.add(job_config['name']) + for run in runs: + print run def main(args): @@ -70,7 +104,12 @@ def main(args): try: connection = connect() watch_tube(connection, machine_type) - walk_jobs(connection, machine_type, show_desc=show_desc, delete=delete) + if delete: + walk_jobs(connection, machine_type, + delete_matching_jobs) + else: + walk_jobs(connection, machine_type, + _print_matching_jobs(show_desc)) except KeyboardInterrupt: log.info("Interrupted.") finally: