]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Refactor teuthology.beanstalk
authorZack Cerza <zack@cerza.org>
Mon, 28 Apr 2014 19:35:35 +0000 (14:35 -0500)
committerZack Cerza <zack@cerza.org>
Wed, 30 Apr 2014 19:30:43 +0000 (14:30 -0500)
This architecture will make it easier to add new functionality.

Signed-off-by: Zack Cerza <zack.cerza@inktank.com>
teuthology/beanstalk.py

index fcb3c6806dcffd8416df89abcb057ec864e8cf5f..afa4e30539df6b30c215ce1ffea040d006d8586b 100644 (file)
@@ -1,8 +1,11 @@
 import beanstalkc
 import yaml
 import logging
+import sys
+from collections import OrderedDict
 
 from .config import config
+from . import report
 
 log = logging.getLogger(__name__)
 
@@ -22,45 +25,76 @@ def watch_tube(connection, tube_name):
     connection.ignore('default')
 
 
-def walk_jobs(connection, machine_type, show_desc=False, delete=None):
+def walk_jobs(connection, tube_name, callback, pattern=None):
+    """
+    def callback(jobs_dict)
+    """
     log.info("Checking Beanstalk Queue...")
-    job_count = connection.stats_tube(machine_type)['current-jobs-ready']
+    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
     if job_count == 0:
         log.info('No jobs in Beanstalk Queue')
         return
-    x = 1
-    while x < job_count:
-        x += 1
-        job = connection.reserve(timeout=20)
-        if job is not None and job.body is not None:
+
+    matching_jobs = OrderedDict()
+    for i in range(1, job_count + 1):
+        sys.stderr.write("{i}/{count}\r".format(i=i, count=job_count))
+        sys.stderr.flush()
+        job = connection.reserve(timeout=30)
+        if job is None or job.body is None:
+            continue
+        job_config = yaml.safe_load(job.body)
+        job_name = job_config['name']
+        job_id = job.stats()['id']
+        if pattern is not None and pattern not in job_name:
+            continue
+        matching_jobs[job_id] = job
+    sys.stderr.write('\n')
+    sys.stderr.flush()
+    callback(matching_jobs)
+
+
+def _print_matching_jobs(show_desc=False):
+    def print_matching_jobs(jobs_dict):
+        i = 0
+        job_count = len(jobs_dict)
+        for job_id, job in jobs_dict.iteritems():
+            i += 1
             job_config = yaml.safe_load(job.body)
             job_name = job_config['name']
+            job_desc = job_config['description']
             job_id = job.stats()['id']
-            job_description = job_config['description']
-            if delete:
-                if delete in job_name:
-                    m = 'Deleting {job_id}/{job_name}'.format(
-                        job_id=job_id,
-                        job_name=job_name,
-                        )
-                    print m
-                    job.delete()
-                else:
-                    m = "Searching queue... Checked {x}/{count} Jobs\r".format(
-                        x=x, count=job_count)
-                    print m,
-            else:
-                m = 'Job: {x}/{count} {job_name}/{job_id}'.format(
-                    x=x,
-                    count=job_count,
-                    job_id=job_id,
-                    job_name=job_name,
-                    )
-                print m
-                if job_description and show_desc:
-                    for desc in job_description.split():
-                        print '\t {desc}'.format(desc=desc)
-    log.info("Finished checking Beanstalk Queue.")
+            print 'Job: {i}/{count} {job_name}/{job_id}'.format(
+                i=i,
+                count=job_count,
+                job_id=job_id,
+                job_name=job_name,
+                )
+            if job_desc and show_desc:
+                for desc in job_desc.split():
+                    print '\t {desc}'.format(desc=desc)
+    return print_matching_jobs
+
+
+def delete_matching_jobs(jobs_dict):
+    for job_id, job in jobs_dict.iteritems():
+        job_config = yaml.safe_load(job.body)
+        job_name = job_config['name']
+        job_id = job.stats()['id']
+        print 'Deleting {job_id}/{job_name}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            )
+        job.delete()
+        report.try_delete_jobs(job_name, job_id)
+
+
+def print_matching_runs(jobs_dict):
+    runs = set()
+    for job_id, job in jobs_dict.iteritems():
+        job_config = yaml.safe_load(job.body)
+        runs.add(job_config['name'])
+    for run in runs:
+        print run
 
 
 def main(args):
@@ -70,7 +104,12 @@ def main(args):
     try:
         connection = connect()
         watch_tube(connection, machine_type)
-        walk_jobs(connection, machine_type, show_desc=show_desc, delete=delete)
+        if delete:
+            walk_jobs(connection, machine_type,
+                      delete_matching_jobs)
+        else:
+            walk_jobs(connection, machine_type,
+                      _print_matching_jobs(show_desc))
     except KeyboardInterrupt:
         log.info("Interrupted.")
     finally: