From: Zack Cerza Date: Tue, 23 Aug 2022 20:24:17 +0000 (-0600) Subject: Merge remote-tracking branch 'amathuria/wip-amathuria-replace-beanstalkd-paddles... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0e641eb51da68682742fda0bf27088fdef5e4dfd;p=teuthology.git Merge remote-tracking branch 'amathuria/wip-amathuria-replace-beanstalkd-paddles' into devstack-plus-queue --- 0e641eb51da68682742fda0bf27088fdef5e4dfd diff --cc teuthology/queue/beanstalk.py index 000000000,c668e4f6b..6f99d3405 mode 000000,100644..100644 --- a/teuthology/queue/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@@ -1,0 -1,116 +1,117 @@@ + import beanstalkc ++import json + import yaml + import logging + + from teuthology.config import config + from teuthology.queue import util + + + log = logging.getLogger(__name__) + + + def connect(): + host = config.queue_host + port = config.queue_port + if host is None or port is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.teuthology_yaml)) + return beanstalkc.Connection(host=host, port=port) + + + def watch_tube(connection, tube_name): + """ + Watch a given tube, potentially correcting to 'multi' if necessary. Returns + the tube_name that was actually used. + """ + if ',' in tube_name: + log.debug("Correcting tube name to 'multi'") + tube_name = 'multi' + connection.watch(tube_name) + connection.ignore('default') + return tube_name + + + def walk_jobs(connection, tube_name, processor, pattern=None): + """ + def callback(jobs_dict) + """ + log.info("Checking Beanstalk Queue...") + job_count = connection.stats_tube(tube_name)['current-jobs-ready'] + if job_count == 0: + log.info('No jobs in Beanstalk Queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + util.print_progress(i, job_count, "Loading") + job = connection.reserve(timeout=timeout) + if job is None or job.body is None: + continue + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + if pattern is not None and pattern not in job_name: + continue + processor.add_job(job_id, job_config, job) + util.end_progress() + processor.complete() + + + def pause_tube(connection, tube, duration): + duration = int(duration) + if not tube: + tubes = sorted(connection.tubes()) + else: + tubes = [tube] + + prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" + templ = prefix + ": {tubes}" + log.info(templ.format(dur=duration, tubes=tubes)) + for tube in tubes: + connection.pause_tube(tube, duration) + + + def stats_tube(connection, tube): + stats = connection.stats_tube(tube) + result = dict( + name=tube, + count=stats['current-jobs-ready'], + paused=(stats['pause'] != 0), + ) + return result + + + def main(args): + machine_type = args['--machine_type'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + try: + connection = connect() + if machine_type and not pause_duration: + # watch_tube needs to be run before we inspect individual jobs; + # it is not needed for pausing tubes + watch_tube(connection, machine_type) + if status: - print(stats_tube(connection, machine_type)) ++ print(json.dumps(stats_tube(connection, machine_type))) + elif pause_duration: + pause_tube(connection, machine_type, pause_duration) + elif delete: + walk_jobs(connection, machine_type, + util.JobDeleter(delete)) + elif runs: + walk_jobs(connection, machine_type, + util.RunPrinter()) + else: + walk_jobs(connection, machine_type, + util.JobPrinter(show_desc=show_desc, full=full)) + except KeyboardInterrupt: + log.info("Interrupted.") + finally: + connection.close()