import beanstalkc
import yaml
import logging
+import sys
+from collections import OrderedDict
from .config import config
+from . import report
log = logging.getLogger(__name__)
connection.ignore('default')
-def walk_jobs(connection, machine_type, show_desc=False, delete=None):
+def walk_jobs(connection, tube_name, callback, pattern=None):
+ """
+ def callback(jobs_dict)
+ """
log.info("Checking Beanstalk Queue...")
- job_count = connection.stats_tube(machine_type)['current-jobs-ready']
+ job_count = connection.stats_tube(tube_name)['current-jobs-ready']
if job_count == 0:
log.info('No jobs in Beanstalk Queue')
return
- x = 1
- while x < job_count:
- x += 1
- job = connection.reserve(timeout=20)
- if job is not None and job.body is not None:
+
+ matching_jobs = OrderedDict()
+ for i in range(1, job_count + 1):
+ sys.stderr.write("{i}/{count}\r".format(i=i, count=job_count))
+ sys.stderr.flush()
+ job = connection.reserve(timeout=30)
+ if job is None or job.body is None:
+ continue
+ job_config = yaml.safe_load(job.body)
+ job_name = job_config['name']
+ job_id = job.stats()['id']
+ if pattern is not None and pattern not in job_name:
+ continue
+ matching_jobs[job_id] = job
+ sys.stderr.write('\n')
+ sys.stderr.flush()
+ callback(matching_jobs)
+
+
+def _print_matching_jobs(show_desc=False):
+ def print_matching_jobs(jobs_dict):
+ i = 0
+ job_count = len(jobs_dict)
+ for job_id, job in jobs_dict.iteritems():
+ i += 1
job_config = yaml.safe_load(job.body)
job_name = job_config['name']
+ job_desc = job_config['description']
job_id = job.stats()['id']
- job_description = job_config['description']
- if delete:
- if delete in job_name:
- m = 'Deleting {job_id}/{job_name}'.format(
- job_id=job_id,
- job_name=job_name,
- )
- print m
- job.delete()
- else:
- m = "Searching queue... Checked {x}/{count} Jobs\r".format(
- x=x, count=job_count)
- print m,
- else:
- m = 'Job: {x}/{count} {job_name}/{job_id}'.format(
- x=x,
- count=job_count,
- job_id=job_id,
- job_name=job_name,
- )
- print m
- if job_description and show_desc:
- for desc in job_description.split():
- print '\t {desc}'.format(desc=desc)
- log.info("Finished checking Beanstalk Queue.")
+ print 'Job: {i}/{count} {job_name}/{job_id}'.format(
+ i=i,
+ count=job_count,
+ job_id=job_id,
+ job_name=job_name,
+ )
+ if job_desc and show_desc:
+ for desc in job_desc.split():
+ print '\t {desc}'.format(desc=desc)
+ return print_matching_jobs
+
+
+def delete_matching_jobs(jobs_dict):
+ for job_id, job in jobs_dict.iteritems():
+ job_config = yaml.safe_load(job.body)
+ job_name = job_config['name']
+ job_id = job.stats()['id']
+ print 'Deleting {job_id}/{job_name}'.format(
+ job_id=job_id,
+ job_name=job_name,
+ )
+ job.delete()
+ report.try_delete_jobs(job_name, job_id)
+
+
+def print_matching_runs(jobs_dict):
+ runs = set()
+ for job_id, job in jobs_dict.iteritems():
+ job_config = yaml.safe_load(job.body)
+ runs.add(job_config['name'])
+ for run in runs:
+ print run
def main(args):
try:
connection = connect()
watch_tube(connection, machine_type)
- walk_jobs(connection, machine_type, show_desc=show_desc, delete=delete)
+ if delete:
+ walk_jobs(connection, machine_type,
+ delete_matching_jobs)
+ else:
+ walk_jobs(connection, machine_type,
+ _print_matching_jobs(show_desc))
except KeyboardInterrupt:
log.info("Interrupted.")
finally: