connection.ignore('default')
-def walk_jobs(connection, tube_name, callback, pattern=None):
+def walk_jobs(connection, tube_name, processor, pattern=None):
"""
def callback(jobs_dict)
"""
# Try to figure out a sane timeout based on how many jobs are in the queue
timeout = job_count / 2000.0 * 60
- matching_jobs = OrderedDict()
for i in range(1, job_count + 1):
print_progress(i, job_count, "Loading")
job = connection.reserve(timeout=timeout)
job_id = job.stats()['id']
if pattern is not None and pattern not in job_name:
continue
- matching_jobs[job_id] = [job, job_config]
+ processor.add_job(job_id, job_config, job)
end_progress()
- callback(matching_jobs)
+ processor.complete()
def print_progress(index, total, message=None):
sys.stderr.flush()
-def _print_matching_jobs(show_desc=False):
- def print_matching_jobs(jobs_dict):
- i = 0
- job_count = len(jobs_dict)
- for job_id, (job, job_config) in jobs_dict.iteritems():
- i += 1
- job_name = job_config['name']
- job_desc = job_config['description']
- job_id = job.stats()['id']
- print 'Job: {i}/{count} {job_name}/{job_id}'.format(
- i=i,
- count=job_count,
- job_id=job_id,
- job_name=job_name,
- )
- if job_desc and show_desc:
- for desc in job_desc.split():
- print '\t {desc}'.format(desc=desc)
- return print_matching_jobs
-
-
-def delete_matching_jobs(jobs_dict):
- for job_id, (job, job_config) in jobs_dict.iteritems():
+class JobProcessor(object):
+ def __init__(self):
+ self.jobs = OrderedDict()
+
+ def add_job(self, job_id, job_config, job_obj=None):
+ job_id = str(job_id)
+
+ job_dict = dict(
+ index=(len(self.jobs) + 1),
+ job_config=job_config,
+ )
+ if job_obj:
+ job_dict['job_obj'] = job_obj
+ self.jobs[job_id] = job_dict
+
+ self.process_job(job_id)
+
+ def process_job(self, job_id):
+ pass
+
+ def complete(self):
+ pass
+
+
+class JobPrinter(JobProcessor):
+ def __init__(self, show_desc=False):
+ super(JobPrinter, self).__init__()
+ self.show_desc = show_desc
+
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_index = self.jobs[job_id]['index']
job_name = job_config['name']
- job_id = job.stats()['id']
- print 'Deleting {job_id}/{job_name}'.format(
+ job_desc = job_config['description']
+ print 'Job: {i:>4} {job_name}/{job_id}'.format(
+ i=job_index,
job_id=job_id,
job_name=job_name,
)
- job.delete()
- report.try_delete_jobs(job_name, job_id)
+ if job_desc and self.show_desc:
+ for desc in job_desc.split():
+ print '\t {desc}'.format(desc=desc)
+
+
+class RunPrinter(JobProcessor):
+ def __init__(self):
+ super(RunPrinter, self).__init__()
+ self.runs = list()
+ def process_job(self, job_id):
+ run = self.jobs[job_id]['job_config']['name']
+ if run not in self.runs:
+ self.runs.append(run)
+ print run
-def print_matching_runs(jobs_dict):
- runs = set()
- for job_id, (job, job_config) in jobs_dict.iteritems():
- runs.add(job_config['name'])
- for run in runs:
- print run
+
+class JobDeleter(JobProcessor):
+ def process_job(self, job_id):
+ job_config = self.jobs[job_id]['job_config']
+ job_name = job_config['name']
+ print 'Deleting {job_id}/{job_name}'.format(
+ job_id=job_id,
+ job_name=job_name,
+ )
+ job_obj = self.jobs[job_id].get('job_obj')
+ if job_obj:
+ job_obj.delete()
+ report.try_delete_jobs(job_name, job_id)
def main(args):
watch_tube(connection, machine_type)
if delete:
walk_jobs(connection, machine_type,
- delete_matching_jobs)
+ JobDeleter())
elif runs:
walk_jobs(connection, machine_type,
- print_matching_runs)
+ RunPrinter())
else:
walk_jobs(connection, machine_type,
- _print_matching_jobs(show_desc))
+ JobPrinter(show_desc=show_desc))
except KeyboardInterrupt:
log.info("Interrupted.")
finally: