]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Merge remote-tracking branch 'amathuria/wip-amathuria-replace-beanstalkd-paddles...
authorZack Cerza <zack@redhat.com>
Tue, 23 Aug 2022 20:24:17 +0000 (14:24 -0600)
committerZack Cerza <zack@redhat.com>
Tue, 23 Aug 2022 20:24:17 +0000 (14:24 -0600)
1  2 
teuthology/queue/beanstalk.py
teuthology/report.py

index 0000000000000000000000000000000000000000,c668e4f6bcb68be215f5535fe5013cb4bca5a7cf..6f99d3405c44327a754238831668d63821190dab
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,116 +1,117 @@@
 -            print(stats_tube(connection, machine_type))
+ import beanstalkc
++import json
+ import yaml
+ import logging
+ from teuthology.config import config
+ from teuthology.queue import util
+ log = logging.getLogger(__name__)
+ def connect():
+     host = config.queue_host
+     port = config.queue_port
+     if host is None or port is None:
+         raise RuntimeError(
+             'Beanstalk queue information not found in {conf_path}'.format(
+                 conf_path=config.teuthology_yaml))
+     return beanstalkc.Connection(host=host, port=port)
+ def watch_tube(connection, tube_name):
+     """
+     Watch a given tube, potentially correcting to 'multi' if necessary. Returns
+     the tube_name that was actually used.
+     """
+     if ',' in tube_name:
+         log.debug("Correcting tube name to 'multi'")
+         tube_name = 'multi'
+     connection.watch(tube_name)
+     connection.ignore('default')
+     return tube_name
+ def walk_jobs(connection, tube_name, processor, pattern=None):
+     """
+     def callback(jobs_dict)
+     """
+     log.info("Checking Beanstalk Queue...")
+     job_count = connection.stats_tube(tube_name)['current-jobs-ready']
+     if job_count == 0:
+         log.info('No jobs in Beanstalk Queue')
+         return
+     # Try to figure out a sane timeout based on how many jobs are in the queue
+     timeout = job_count / 2000.0 * 60
+     for i in range(1, job_count + 1):
+         util.print_progress(i, job_count, "Loading")
+         job = connection.reserve(timeout=timeout)
+         if job is None or job.body is None:
+             continue
+         job_config = yaml.safe_load(job.body)
+         job_name = job_config['name']
+         job_id = job.stats()['id']
+         if pattern is not None and pattern not in job_name:
+             continue
+         processor.add_job(job_id, job_config, job)
+     util.end_progress()
+     processor.complete()
+ def pause_tube(connection, tube, duration):
+     duration = int(duration)
+     if not tube:
+         tubes = sorted(connection.tubes())
+     else:
+         tubes = [tube]
+     prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
+     templ = prefix + ": {tubes}"
+     log.info(templ.format(dur=duration, tubes=tubes))
+     for tube in tubes:
+         connection.pause_tube(tube, duration)
+ def stats_tube(connection, tube):
+     stats = connection.stats_tube(tube)
+     result = dict(
+         name=tube,
+         count=stats['current-jobs-ready'],
+         paused=(stats['pause'] != 0),
+     )
+     return result
+ def main(args):
+     machine_type = args['--machine_type']
+     status = args['--status']
+     delete = args['--delete']
+     runs = args['--runs']
+     show_desc = args['--description']
+     full = args['--full']
+     pause_duration = args['--pause']
+     try:
+         connection = connect()
+         if machine_type and not pause_duration:
+             # watch_tube needs to be run before we inspect individual jobs;
+             # it is not needed for pausing tubes
+             watch_tube(connection, machine_type)
+         if status:
++            print(json.dumps(stats_tube(connection, machine_type)))
+         elif pause_duration:
+             pause_tube(connection, machine_type, pause_duration)
+         elif delete:
+             walk_jobs(connection, machine_type,
+                       util.JobDeleter(delete))
+         elif runs:
+             walk_jobs(connection, machine_type,
+                       util.RunPrinter())
+         else:
+             walk_jobs(connection, machine_type,
+                       util.JobPrinter(show_desc=show_desc, full=full))
+     except KeyboardInterrupt:
+         log.info("Interrupted.")
+     finally:
+         connection.close()
Simple merge