]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
Process queued jobs synchronously
authorZack Cerza <zack@cerza.org>
Wed, 21 May 2014 20:58:27 +0000 (15:58 -0500)
committerZack Cerza <zack@cerza.org>
Wed, 21 May 2014 21:18:00 +0000 (16:18 -0500)
Signed-off-by: Zack Cerza <zack.cerza@inktank.com>
teuthology/beanstalk.py

index edd62284cb58ed743f84d87cba0f44116cb0824c..7eee0c1adfc7419258fb326ddb8b261112b01a03 100644 (file)
@@ -25,7 +25,7 @@ def watch_tube(connection, tube_name):
     connection.ignore('default')
 
 
-def walk_jobs(connection, tube_name, callback, pattern=None):
+def walk_jobs(connection, tube_name, processor, pattern=None):
     """
     def callback(jobs_dict)
     """
@@ -37,7 +37,6 @@ def walk_jobs(connection, tube_name, callback, pattern=None):
 
     # Try to figure out a sane timeout based on how many jobs are in the queue
     timeout = job_count / 2000.0 * 60
-    matching_jobs = OrderedDict()
     for i in range(1, job_count + 1):
         print_progress(i, job_count, "Loading")
         job = connection.reserve(timeout=timeout)
@@ -48,9 +47,9 @@ def walk_jobs(connection, tube_name, callback, pattern=None):
         job_id = job.stats()['id']
         if pattern is not None and pattern not in job_name:
             continue
-        matching_jobs[job_id] = [job, job_config]
+        processor.add_job(job_id, job_config, job)
     end_progress()
-    callback(matching_jobs)
+    processor.complete()
 
 
 def print_progress(index, total, message=None):
@@ -65,45 +64,74 @@ def end_progress():
     sys.stderr.flush()
 
 
-def _print_matching_jobs(show_desc=False):
-    def print_matching_jobs(jobs_dict):
-        i = 0
-        job_count = len(jobs_dict)
-        for job_id, (job, job_config) in jobs_dict.iteritems():
-            i += 1
-            job_name = job_config['name']
-            job_desc = job_config['description']
-            job_id = job.stats()['id']
-            print 'Job: {i}/{count} {job_name}/{job_id}'.format(
-                i=i,
-                count=job_count,
-                job_id=job_id,
-                job_name=job_name,
-                )
-            if job_desc and show_desc:
-                for desc in job_desc.split():
-                    print '\t {desc}'.format(desc=desc)
-    return print_matching_jobs
-
-
-def delete_matching_jobs(jobs_dict):
-    for job_id, (job, job_config) in jobs_dict.iteritems():
+class JobProcessor(object):
+    def __init__(self):
+        self.jobs = OrderedDict()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_id = str(job_id)
+
+        job_dict = dict(
+            index=(len(self.jobs) + 1),
+            job_config=job_config,
+        )
+        if job_obj:
+            job_dict['job_obj'] = job_obj
+        self.jobs[job_id] = job_dict
+
+        self.process_job(job_id)
+
+    def process_job(self, job_id):
+        pass
+
+    def complete(self):
+        pass
+
+
+class JobPrinter(JobProcessor):
+    def __init__(self, show_desc=False):
+        super(JobPrinter, self).__init__()
+        self.show_desc = show_desc
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_index = self.jobs[job_id]['index']
         job_name = job_config['name']
-        job_id = job.stats()['id']
-        print 'Deleting {job_id}/{job_name}'.format(
+        job_desc = job_config['description']
+        print 'Job: {i:>4} {job_name}/{job_id}'.format(
+            i=job_index,
             job_id=job_id,
             job_name=job_name,
             )
-        job.delete()
-        report.try_delete_jobs(job_name, job_id)
+        if job_desc and self.show_desc:
+            for desc in job_desc.split():
+                print '\t {desc}'.format(desc=desc)
+
+
+class RunPrinter(JobProcessor):
+    def __init__(self):
+        super(RunPrinter, self).__init__()
+        self.runs = list()
 
+    def process_job(self, job_id):
+        run = self.jobs[job_id]['job_config']['name']
+        if run not in self.runs:
+            self.runs.append(run)
+            print run
 
-def print_matching_runs(jobs_dict):
-    runs = set()
-    for job_id, (job, job_config) in jobs_dict.iteritems():
-        runs.add(job_config['name'])
-    for run in runs:
-        print run
+
+class JobDeleter(JobProcessor):
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_name = job_config['name']
+        print 'Deleting {job_id}/{job_name}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            )
+        job_obj = self.jobs[job_id].get('job_obj')
+        if job_obj:
+            job_obj.delete()
+        report.try_delete_jobs(job_name, job_id)
 
 
 def main(args):
@@ -116,13 +144,13 @@ def main(args):
         watch_tube(connection, machine_type)
         if delete:
             walk_jobs(connection, machine_type,
-                      delete_matching_jobs)
+                      JobDeleter())
         elif runs:
             walk_jobs(connection, machine_type,
-                      print_matching_runs)
+                      RunPrinter())
         else:
             walk_jobs(connection, machine_type,
-                      _print_matching_jobs(show_desc))
+                      JobPrinter(show_desc=show_desc))
     except KeyboardInterrupt:
         log.info("Interrupted.")
     finally: