From: Zack Cerza Date: Wed, 21 May 2014 20:58:27 +0000 (-0500) Subject: Process queued jobs synchronously X-Git-Tag: v0.94.10~27^2^2~364^2~146 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3b382b701147bfe195f3024e5211886093954749;p=ceph.git Process queued jobs synchronously Signed-off-by: Zack Cerza --- diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py index edd62284cb5..7eee0c1adfc 100644 --- a/teuthology/beanstalk.py +++ b/teuthology/beanstalk.py @@ -25,7 +25,7 @@ def watch_tube(connection, tube_name): connection.ignore('default') -def walk_jobs(connection, tube_name, callback, pattern=None): +def walk_jobs(connection, tube_name, processor, pattern=None): """ def callback(jobs_dict) """ @@ -37,7 +37,6 @@ def walk_jobs(connection, tube_name, callback, pattern=None): # Try to figure out a sane timeout based on how many jobs are in the queue timeout = job_count / 2000.0 * 60 - matching_jobs = OrderedDict() for i in range(1, job_count + 1): print_progress(i, job_count, "Loading") job = connection.reserve(timeout=timeout) @@ -48,9 +47,9 @@ def walk_jobs(connection, tube_name, callback, pattern=None): job_id = job.stats()['id'] if pattern is not None and pattern not in job_name: continue - matching_jobs[job_id] = [job, job_config] + processor.add_job(job_id, job_config, job) end_progress() - callback(matching_jobs) + processor.complete() def print_progress(index, total, message=None): @@ -65,45 +64,74 @@ def end_progress(): sys.stderr.flush() -def _print_matching_jobs(show_desc=False): - def print_matching_jobs(jobs_dict): - i = 0 - job_count = len(jobs_dict) - for job_id, (job, job_config) in jobs_dict.iteritems(): - i += 1 - job_name = job_config['name'] - job_desc = job_config['description'] - job_id = job.stats()['id'] - print 'Job: {i}/{count} {job_name}/{job_id}'.format( - i=i, - count=job_count, - job_id=job_id, - job_name=job_name, - ) - if job_desc and show_desc: - for desc in job_desc.split(): - print '\t {desc}'.format(desc=desc) - return print_matching_jobs - - -def delete_matching_jobs(jobs_dict): - for job_id, (job, job_config) in jobs_dict.iteritems(): +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] job_name = job_config['name'] - job_id = job.stats()['id'] - print 'Deleting {job_id}/{job_name}'.format( + job_desc = job_config['description'] + print 'Job: {i:>4} {job_name}/{job_id}'.format( + i=job_index, job_id=job_id, job_name=job_name, ) - job.delete() - report.try_delete_jobs(job_name, job_id) + if job_desc and self.show_desc: + for desc in job_desc.split(): + print '\t {desc}'.format(desc=desc) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print run -def print_matching_runs(jobs_dict): - runs = set() - for job_id, (job, job_config) in jobs_dict.iteritems(): - runs.add(job_config['name']) - for run in runs: - print run + +class JobDeleter(JobProcessor): + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print 'Deleting {job_id}/{job_name}'.format( + job_id=job_id, + job_name=job_name, + ) + job_obj = self.jobs[job_id].get('job_obj') + if job_obj: + job_obj.delete() + report.try_delete_jobs(job_name, job_id) def main(args): @@ -116,13 +144,13 @@ def main(args): watch_tube(connection, machine_type) if delete: walk_jobs(connection, machine_type, - delete_matching_jobs) + JobDeleter()) elif runs: walk_jobs(connection, machine_type, - print_matching_runs) + RunPrinter()) else: walk_jobs(connection, machine_type, - _print_matching_jobs(show_desc)) + JobPrinter(show_desc=show_desc)) except KeyboardInterrupt: log.info("Interrupted.") finally: