]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
scripts/queue: common teuthology-queue command for paddles and beanstalk queue
authorAishwarya Mathuria <amathuri@redhat.com>
Mon, 18 Apr 2022 07:49:40 +0000 (13:19 +0530)
committerZack Cerza <zack@redhat.com>
Tue, 31 May 2022 19:45:58 +0000 (13:45 -0600)
Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
13 files changed:
scripts/beanstalk_queue.py
scripts/paddles_queue.py
scripts/queue.py [new file with mode: 0644]
teuthology/beanstalk.py [deleted file]
teuthology/config.py
teuthology/dispatcher/__init__.py
teuthology/kill.py
teuthology/paddles_queue.py [deleted file]
teuthology/queue/__init__.py [new file with mode: 0644]
teuthology/queue/beanstalk.py [new file with mode: 0644]
teuthology/queue/paddles.py [new file with mode: 0644]
teuthology/report.py
teuthology/worker.py

index 88a82428473c53e13c3504351d82f3a51d3e01a1..a8a0661ecf9e97c52009c6197999551b77f12c95 100644 (file)
@@ -1,7 +1,7 @@
 import docopt
 
 import teuthology.config
-import teuthology.beanstalk
+import teuthology.queue.beanstalk
 
 doc = """
 usage: teuthology-beanstalk-queue -h
index 3c69d772e696aa78e81034fae30964ef44e41ca6..8487fd938e3d3e83e1f5533a758eda470612557d 100644 (file)
@@ -1,8 +1,7 @@
 import docopt
 
 import teuthology.config
-import teuthology.paddles_queue
-
+import teuthology.queue.paddles_queue
 doc = """
 usage: teuthology-paddles-queue -h
        teuthology-paddles-queue -s -m MACHINE_TYPE
diff --git a/scripts/queue.py b/scripts/queue.py
new file mode 100644 (file)
index 0000000..2c466a7
--- /dev/null
@@ -0,0 +1,37 @@
+import docopt
+
+import teuthology.config
+import teuthology.queue.beanstalk
+import teuthology.queue.paddles
+
+doc = """
+usage: teuthology-queue -h
+       teuthology-queue [-s|-d|-f] -m MACHINE_TYPE 
+       teuthology-queue [-r] -m MACHINE_TYPE
+       teuthology-queue -m MACHINE_TYPE -D PATTERN
+       teuthology-queue -p SECONDS [-m MACHINE_TYPE]
+
+List Jobs in queue.
+If -D is passed, then jobs with PATTERN in the job name are deleted from the
+queue.
+
+Arguments:
+  -m, --machine_type MACHINE_TYPE [default: multi]
+                        Which machine type queue to work on.
+
+optional arguments:
+  -h, --help            Show this help message and exit
+  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
+  -d, --description     Show job descriptions
+  -r, --runs            Only show run names
+  -f, --full            Print the entire job config. Use with caution.
+  -s, --status          Prints the status of the queue
+  -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
+                        will unpause. If -m is passed, pause that queue,
+                        otherwise pause all queues.
+"""
+
+
+def main():
+    args = docopt.docopt(doc)
+    teuthology.queue.main(args)
diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py
deleted file mode 100644 (file)
index a1165be..0000000
+++ /dev/null
@@ -1,214 +0,0 @@
-import beanstalkc
-import yaml
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology.config import config
-from teuthology import report
-
-log = logging.getLogger(__name__)
-
-
-def connect():
-    host = config.queue_host
-    port = config.queue_port
-    if host is None or port is None:
-        raise RuntimeError(
-            'Beanstalk queue information not found in {conf_path}'.format(
-                conf_path=config.teuthology_yaml))
-    return beanstalkc.Connection(host=host, port=port)
-
-
-def watch_tube(connection, tube_name):
-    """
-    Watch a given tube, potentially correcting to 'multi' if necessary. Returns
-    the tube_name that was actually used.
-    """
-    if ',' in tube_name:
-        log.debug("Correcting tube name to 'multi'")
-        tube_name = 'multi'
-    connection.watch(tube_name)
-    connection.ignore('default')
-    return tube_name
-
-
-def walk_jobs(connection, tube_name, processor, pattern=None):
-    """
-    def callback(jobs_dict)
-    """
-    log.info("Checking Beanstalk Queue...")
-    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
-    if job_count == 0:
-        log.info('No jobs in Beanstalk Queue')
-        return
-
-    # Try to figure out a sane timeout based on how many jobs are in the queue
-    timeout = job_count / 2000.0 * 60
-    for i in range(1, job_count + 1):
-        print_progress(i, job_count, "Loading")
-        job = connection.reserve(timeout=timeout)
-        if job is None or job.body is None:
-            continue
-        job_config = yaml.safe_load(job.body)
-        job_name = job_config['name']
-        job_id = job.stats()['id']
-        if pattern is not None and pattern not in job_name:
-            continue
-        processor.add_job(job_id, job_config, job)
-    end_progress()
-    processor.complete()
-
-
-def print_progress(index, total, message=None):
-    msg = "{m} ".format(m=message) if message else ''
-    sys.stderr.write("{msg}{i}/{total}\r".format(
-        msg=msg, i=index, total=total))
-    sys.stderr.flush()
-
-
-def end_progress():
-    sys.stderr.write('\n')
-    sys.stderr.flush()
-
-
-class JobProcessor(object):
-    def __init__(self):
-        self.jobs = OrderedDict()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_id = str(job_id)
-
-        job_dict = dict(
-            index=(len(self.jobs) + 1),
-            job_config=job_config,
-        )
-        if job_obj:
-            job_dict['job_obj'] = job_obj
-        self.jobs[job_id] = job_dict
-
-        self.process_job(job_id)
-
-    def process_job(self, job_id):
-        pass
-
-    def complete(self):
-        pass
-
-
-class JobPrinter(JobProcessor):
-    def __init__(self, show_desc=False, full=False):
-        super(JobPrinter, self).__init__()
-        self.show_desc = show_desc
-        self.full = full
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_index = self.jobs[job_id]['index']
-        job_priority = job_config['priority']
-        job_name = job_config['name']
-        job_desc = job_config['description']
-        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
-            i=job_index,
-            pri=job_priority,
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        if self.full:
-            pprint.pprint(job_config)
-        elif job_desc and self.show_desc:
-            for desc in job_desc.split():
-                print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
-    def __init__(self):
-        super(RunPrinter, self).__init__()
-        self.runs = list()
-
-    def process_job(self, job_id):
-        run = self.jobs[job_id]['job_config']['name']
-        if run not in self.runs:
-            self.runs.append(run)
-            print(run)
-
-
-class JobDeleter(JobProcessor):
-    def __init__(self, pattern):
-        self.pattern = pattern
-        super(JobDeleter, self).__init__()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_name = job_config['name']
-        if self.pattern in job_name:
-            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_name = job_config['name']
-        print('Deleting {job_name}/{job_id}'.format(
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        job_obj = self.jobs[job_id].get('job_obj')
-        if job_obj:
-            job_obj.delete()
-        report.try_delete_jobs(job_name, job_id)
-
-
-def pause_tube(connection, tube, duration):
-    duration = int(duration)
-    if not tube:
-        tubes = sorted(connection.tubes())
-    else:
-        tubes = [tube]
-
-    prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
-    templ = prefix + ": {tubes}"
-    log.info(templ.format(dur=duration, tubes=tubes))
-    for tube in tubes:
-        connection.pause_tube(tube, duration)
-
-
-def stats_tube(connection, tube):
-    stats = connection.stats_tube(tube)
-    result = dict(
-        name=tube,
-        count=stats['current-jobs-ready'],
-        paused=(stats['pause'] != 0),
-    )
-    return result
-
-
-def main(args):
-    machine_type = args['--machine_type']
-    status = args['--status']
-    delete = args['--delete']
-    runs = args['--runs']
-    show_desc = args['--description']
-    full = args['--full']
-    pause_duration = args['--pause']
-    try:
-        connection = connect()
-        if machine_type and not pause_duration:
-            # watch_tube needs to be run before we inspect individual jobs;
-            # it is not needed for pausing tubes
-            watch_tube(connection, machine_type)
-        if status:
-            print(stats_tube(connection, machine_type))
-        elif pause_duration:
-            pause_tube(connection, machine_type, pause_duration)
-        elif delete:
-            walk_jobs(connection, machine_type,
-                      JobDeleter(delete))
-        elif runs:
-            walk_jobs(connection, machine_type,
-                      RunPrinter())
-        else:
-            walk_jobs(connection, machine_type,
-                      JobPrinter(show_desc=show_desc, full=full))
-    except KeyboardInterrupt:
-        log.info("Interrupted.")
-    finally:
-        connection.close()
index b42ff48d1a91a29442739da95740752f82d6fab3..e39ba4fb6bcfc56e73e98aadfa08565ef1d2f67c 100644 (file)
@@ -143,7 +143,7 @@ class TeuthologyConfig(YamlConfig):
         'archive_upload_key': None,
         'archive_upload_url': None,
         'automated_scheduling': False,
-        'backend': 'paddles',
+        'backend': 'beanstalk',
         'reserve_machines': 5,
         'ceph_git_base_url': 'https://github.com/ceph/',
         'ceph_git_url': None,
index f80818cf149635fd59a7e16edc07cbc7def13fc3..b06ef606181027ee757b0e9b82890e9459493357 100644 (file)
@@ -8,7 +8,7 @@ from datetime import datetime
 from time import sleep
 
 from teuthology import setup_log_file, install_except_hook
-from teuthology import beanstalk
+from teuthology.queue import beanstalk
 from teuthology import report
 from teuthology.config import config as teuth_config
 from teuthology.exceptions import SkipJob
@@ -73,11 +73,8 @@ def main(args):
     machine_type = args["--machine-type"]
     log_dir = args["--log-dir"]
     archive_dir = args["--archive-dir"]
-<<<<<<< HEAD
     exit_on_empty_queue = args["--exit-on-empty-queue"]
-=======
     backend = args['--queue-backend']
->>>>>>> 79c4d9bf... Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles
 
     if archive_dir is None:
         archive_dir = teuth_config.archive_base
index 7ebf560166569036bb62bc9bfd0a1771820bed28..776463d1416d5bc45ca8879f35a039abdfa17db5 100755 (executable)
@@ -8,7 +8,7 @@ import tempfile
 import logging
 import getpass
 
-from teuthology import beanstalk
+from teuthology.queue import beanstalk
 from teuthology import report
 from teuthology.config import config
 from teuthology import misc
diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py
deleted file mode 100644 (file)
index 99cfe77..0000000
+++ /dev/null
@@ -1,218 +0,0 @@
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology import report
-from teuthology.dispatcher import pause_queue
-
-
-log = logging.getLogger(__name__)
-
-
-def connect():
-    host = config.queue_host
-    port = config.queue_port
-    if host is None or port is None:
-        raise RuntimeError(
-            'Beanstalk queue information not found in {conf_path}'.format(
-                conf_path=config.teuthology_yaml))
-    return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load)
-
-
-def watch_tube(connection, tube_name):
-    """
-    Watch a given tube, potentially correcting to 'multi' if necessary. Returns
-    the tube_name that was actually used.
-    """
-    if ',' in tube_name:
-        log.debug("Correcting tube name to 'multi'")
-        tube_name = 'multi'
-    connection.watch(tube_name)
-    connection.ignore('default')
-    return tube_name
-
-
-def walk_jobs(connection, tube_name, processor, pattern=None):
-    """
-    def callback(jobs_dict)
-    """
-    log.info("Checking Beanstalk Queue...")
-    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
-    if job_count == 0:
-        log.info('No jobs in Beanstalk Queue')
-        return
-
-def stats_queue(machine_type):
-    stats = report.get_queue_stats(machine_type)
-    if stats['paused'] is None:
-        log.info("%s queue is currently running with %s jobs queued",
-                 stats['name'],
-                 stats['count'])
-    else:
-        log.info("%s queue is paused with %s jobs queued",
-                 stats['name'],
-                 stats['count'])
-
-
-def update_priority(machine_type, priority, user, run_name=None):
-    if run_name is not None:
-        jobs = report.get_user_jobs_queue(machine_type, user, run_name)
-    else:
-        jobs = report.get_user_jobs_queue(machine_type, user)
-    for job in jobs:
-        job['priority'] = priority
-        report.try_push_job_info(job)
-
-
-def print_progress(index, total, message=None):
-    msg = "{m} ".format(m=message) if message else ''
-    sys.stderr.write("{msg}{i}/{total}\r".format(
-        msg=msg, i=index, total=total))
-    sys.stderr.flush()
-
-
-def end_progress():
-    sys.stderr.write('\n')
-    sys.stderr.flush()
-
-
-def walk_jobs(machine_type, processor, user):
-    log.info("Checking paddles queue...")
-    job_count = report.get_queue_stats(machine_type)['count']
-
-    jobs = report.get_user_jobs_queue(machine_type, user)
-    if job_count == 0:
-        log.info('No jobs in queue')
-        return
-
-    for i in range(1, job_count + 1):
-        print_progress(i, job_count, "Loading")
-        job = jobs[i-1]
-        if job is None:
-            continue
-        job_id = job['job_id']
-        processor.add_job(job_id, job)
-    end_progress()
-    processor.complete()
-
-
-class JobProcessor(object):
-    def __init__(self):
-        self.jobs = OrderedDict()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_id = str(job_id)
-
-        job_dict = dict(
-            index=(len(self.jobs) + 1),
-            job_config=job_config,
-        )
-        if job_obj:
-            job_dict['job_obj'] = job_obj
-        self.jobs[job_id] = job_dict
-
-        self.process_job(job_id)
-
-    def process_job(self, job_id):
-        pass
-
-    def complete(self):
-        pass
-
-
-class JobPrinter(JobProcessor):
-    def __init__(self, show_desc=False, full=False):
-        super(JobPrinter, self).__init__()
-        self.show_desc = show_desc
-        self.full = full
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_index = self.jobs[job_id]['index']
-        job_priority = job_config['priority']
-        job_name = job_config['name']
-        job_desc = job_config['description']
-        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
-            i=job_index,
-            pri=job_priority,
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        if self.full:
-            pprint.pprint(job_config)
-        elif job_desc and self.show_desc:
-            for desc in job_desc.split():
-                print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
-    def __init__(self):
-        super(RunPrinter, self).__init__()
-        self.runs = list()
-
-    def process_job(self, job_id):
-        run = self.jobs[job_id]['job_config']['name']
-        if run not in self.runs:
-            self.runs.append(run)
-            print(run)
-
-
-class JobDeleter(JobProcessor):
-    def __init__(self, pattern):
-        self.pattern = pattern
-        super(JobDeleter, self).__init__()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_name = job_config['name']
-        if self.pattern in job_name:
-            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_name = job_config['name']
-        print('Deleting {job_name}/{job_id}'.format(
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        report.try_delete_jobs(job_name, job_id)
-
-
-def main(args):
-    machine_type = args['--machine_type']
-    user = args['--user']
-    run_name = args['--run_name']
-    priority = args['--priority']
-    status = args['--status']
-    delete = args['--delete']
-    runs = args['--runs']
-    show_desc = args['--description']
-    full = args['--full']
-    pause = args['--pause']
-    unpause = args['--unpause']
-    pause_duration = args['--time']
-    try:
-        if status:
-            stats_queue(machine_type)
-        elif pause:
-            if pause_duration:
-                pause_queue(machine_type, pause, user, pause_duration)
-            else:
-                pause_queue(machine_type, pause, user)
-        elif unpause:
-            pause = False
-            pause_queue(machine_type, pause, user)
-        elif priority:
-            update_priority(machine_type, priority, user, run_name)
-        elif delete:
-            walk_jobs(machine_type,
-                      JobDeleter(delete), user)
-        elif runs:
-            walk_jobs(machine_type,
-                      RunPrinter(), user)
-        else:
-            walk_jobs(machine_type,
-                      JobPrinter(show_desc=show_desc, full=full),
-                      user)
-    except KeyboardInterrupt:
-        log.info("Interrupted.")
diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py
new file mode 100644 (file)
index 0000000..2a0b6ff
--- /dev/null
@@ -0,0 +1,106 @@
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology import report
+from teuthology.config import config
+
+log = logging.getLogger(__name__)
+
+def print_progress(index, total, message=None):
+    msg = "{m} ".format(m=message) if message else ''
+    sys.stderr.write("{msg}{i}/{total}\r".format(
+        msg=msg, i=index, total=total))
+    sys.stderr.flush()
+
+def end_progress():
+    sys.stderr.write('\n')
+    sys.stderr.flush()
+
+class JobProcessor(object):
+    def __init__(self):
+        self.jobs = OrderedDict()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_id = str(job_id)
+
+        job_dict = dict(
+            index=(len(self.jobs) + 1),
+            job_config=job_config,
+        )
+        if job_obj:
+            job_dict['job_obj'] = job_obj
+        self.jobs[job_id] = job_dict
+
+        self.process_job(job_id)
+
+    def process_job(self, job_id):
+        pass
+
+    def complete(self):
+        pass
+
+
+class JobPrinter(JobProcessor):
+    def __init__(self, show_desc=False, full=False):
+        super(JobPrinter, self).__init__()
+        self.show_desc = show_desc
+        self.full = full
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_index = self.jobs[job_id]['index']
+        job_priority = job_config['priority']
+        job_name = job_config['name']
+        job_desc = job_config['description']
+        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+            i=job_index,
+            pri=job_priority,
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        if self.full:
+            pprint.pprint(job_config)
+        elif job_desc and self.show_desc:
+            for desc in job_desc.split():
+                print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+    def __init__(self):
+        super(RunPrinter, self).__init__()
+        self.runs = list()
+
+    def process_job(self, job_id):
+        run = self.jobs[job_id]['job_config']['name']
+        if run not in self.runs:
+            self.runs.append(run)
+            print(run)
+
+
+class JobDeleter(JobProcessor):
+    def __init__(self, pattern):
+        self.pattern = pattern
+        super(JobDeleter, self).__init__()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_name = job_config['name']
+        if self.pattern in job_name:
+            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_name = job_config['name']
+        print('Deleting {job_name}/{job_id}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        report.try_delete_jobs(job_name, job_id)
+
+
+def main(args):
+    if config.backend == 'paddles':
+        paddles.main(args)
+    else:
+        beanstalk.main(args)
\ No newline at end of file
diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py
new file mode 100644 (file)
index 0000000..90b1cbd
--- /dev/null
@@ -0,0 +1,118 @@
+import beanstalkc
+import yaml
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology.config import config
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def connect():
+    host = config.queue_host
+    port = config.queue_port
+    if host is None or port is None:
+        raise RuntimeError(
+            'Beanstalk queue information not found in {conf_path}'.format(
+                conf_path=config.teuthology_yaml))
+    return beanstalkc.Connection(host=host, port=port)
+
+
+def watch_tube(connection, tube_name):
+    """
+    Watch a given tube, potentially correcting to 'multi' if necessary. Returns
+    the tube_name that was actually used.
+    """
+    if ',' in tube_name:
+        log.debug("Correcting tube name to 'multi'")
+        tube_name = 'multi'
+    connection.watch(tube_name)
+    connection.ignore('default')
+    return tube_name
+
+
+def walk_jobs(connection, tube_name, processor, pattern=None):
+    """
+    def callback(jobs_dict)
+    """
+    log.info("Checking Beanstalk Queue...")
+    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
+    if job_count == 0:
+        log.info('No jobs in Beanstalk Queue')
+        return
+
+    # Try to figure out a sane timeout based on how many jobs are in the queue
+    timeout = job_count / 2000.0 * 60
+    for i in range(1, job_count + 1):
+        print_progress(i, job_count, "Loading")
+        job = connection.reserve(timeout=timeout)
+        if job is None or job.body is None:
+            continue
+        job_config = yaml.safe_load(job.body)
+        job_name = job_config['name']
+        job_id = job.stats()['id']
+        if pattern is not None and pattern not in job_name:
+            continue
+        processor.add_job(job_id, job_config, job)
+    end_progress()
+    processor.complete()
+
+
+def pause_tube(connection, tube, duration):
+    duration = int(duration)
+    if not tube:
+        tubes = sorted(connection.tubes())
+    else:
+        tubes = [tube]
+
+    prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
+    templ = prefix + ": {tubes}"
+    log.info(templ.format(dur=duration, tubes=tubes))
+    for tube in tubes:
+        connection.pause_tube(tube, duration)
+
+
+def stats_tube(connection, tube):
+    stats = connection.stats_tube(tube)
+    result = dict(
+        name=tube,
+        count=stats['current-jobs-ready'],
+        paused=(stats['pause'] != 0),
+    )
+    return result
+
+
+def main(args):
+    machine_type = args['--machine_type']
+    status = args['--status']
+    delete = args['--delete']
+    runs = args['--runs']
+    show_desc = args['--description']
+    full = args['--full']
+    pause_duration = args['--pause']
+    try:
+        connection = connect()
+        if machine_type and not pause_duration:
+            # watch_tube needs to be run before we inspect individual jobs;
+            # it is not needed for pausing tubes
+            watch_tube(connection, machine_type)
+        if status:
+            print(stats_tube(connection, machine_type))
+        elif pause_duration:
+            pause_tube(connection, machine_type, pause_duration)
+        elif delete:
+            walk_jobs(connection, machine_type,
+                      JobDeleter(delete))
+        elif runs:
+            walk_jobs(connection, machine_type,
+                      RunPrinter())
+        else:
+            walk_jobs(connection, machine_type,
+                      JobPrinter(show_desc=show_desc, full=full))
+    except KeyboardInterrupt:
+        log.info("Interrupted.")
+    finally:
+        connection.close()
diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py
new file mode 100644 (file)
index 0000000..f2ea8b8
--- /dev/null
@@ -0,0 +1,88 @@
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology import report
+from teuthology.dispatcher import pause_queue
+
+
+log = logging.getLogger(__name__)
+
+
+def stats_queue(machine_type):
+    stats = report.get_queue_stats(machine_type)
+    if stats['paused'] is None:
+        log.info("%s queue is currently running with %s jobs queued",
+                 stats['name'],
+                 stats['count'])
+    else:
+        log.info("%s queue is paused with %s jobs queued",
+                 stats['name'],
+                 stats['count'])
+
+
+def update_priority(machine_type, priority, user, run_name=None):
+    if run_name is not None:
+        jobs = report.get_user_jobs_queue(machine_type, user, run_name)
+    else:
+        jobs = report.get_user_jobs_queue(machine_type, user)
+    for job in jobs:
+        job['priority'] = priority
+        report.try_push_job_info(job)
+
+
+def walk_jobs(machine_type, processor, user):
+    log.info("Checking paddles queue...")
+    job_count = report.get_queue_stats(machine_type)['count']
+
+    jobs = report.get_user_jobs_queue(machine_type, user)
+    if job_count == 0:
+        log.info('No jobs in queue')
+        return
+
+    for i in range(1, job_count + 1):
+        print_progress(i, job_count, "Loading")
+        job = jobs[i-1]
+        if job is None:
+            continue
+        job_id = job['job_id']
+        processor.add_job(job_id, job)
+    end_progress()
+    processor.complete()
+
+
+def main(args):
+    machine_type = args['--machine_type']
+    #user = args['--user']
+    #run_name = args['--run_name']
+    #priority = args['--priority']
+    status = args['--status']
+    delete = args['--delete']
+    runs = args['--runs']
+    show_desc = args['--description']
+    full = args['--full']
+    pause_duration = args['--pause']
+    #unpause = args['--unpause']
+    #pause_duration = args['--time']
+    try:
+        if status:
+            stats_queue(machine_type)
+        if pause_duration:
+            pause_queue(machine_type, pause, user, pause_duration)
+        #else:
+            #pause_queue(machine_type, pause, user)
+        elif priority:
+            update_priority(machine_type, priority, run_name)
+        elif delete:
+            walk_jobs(machine_type,
+                      JobDeleter(delete), user)
+        elif runs:
+            walk_jobs(machine_type,
+                      RunPrinter(), user)
+        else:
+            walk_jobs(machine_type,
+                      JobPrinter(show_desc=show_desc, full=full),
+                      user)
+    except KeyboardInterrupt:
+        log.info("Interrupted.")
index 50b7f8966b1bf91dae553a0f222403be0b7b6655..6796415014cb9ae1ab128988de1210798dc0224e 100644 (file)
@@ -509,7 +509,7 @@ class ResultsReporter(object):
         """
         Create a queue on the results server
 
-        :param machine_type: The machine type specified for the job
+        :param queue: The queue specified for the job
         """
         uri = "{base}/queue/".format(
             base=self.base_uri
@@ -614,10 +614,11 @@ class ResultsReporter(object):
             base=self.base_uri
         )
         request_info = {'queue': queue}
+        filter_field = queue
         if run_name is not None:
             filter_field = run_name
             uri += "?run_name=" + str(run_name)
-        else:
+        elif user is not None:
             filter_field = user
             uri += "?user=" + str(user)
             
@@ -654,36 +655,43 @@ def create_machine_type_queue(queue):
     reporter.create_queue(queue)
     return queue
 
+def get_all_jobs_in_queue(queue, user=None, run_name=None):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    if ',' in queue:
+        queue = 'multi'
+    return reporter.queued_jobs(queue)
 
-def get_user_jobs_queue(machine_type, user, run_name=None):
+def get_user_jobs_queue(queue, user, run_name=None):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    return reporter.queued_jobs(machine_type, user, run_name)
+    return reporter.queued_jobs(queue, user, run_name)
 
 
-def pause_queue(machine_type, paused, paused_by, pause_duration=None):
+def pause_queue(queue, paused, paused_by, pause_duration=None):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    reporter.update_queue(machine_type, paused, paused_by, pause_duration)
+    reporter.update_queue(queue, paused, paused_by, pause_duration)
 
 
-def is_queue_paused(machine_type):
+def is_queue_paused(queue):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    stats = reporter.queue_stats(machine_type)
+    stats = reporter.queue_stats(queue)
     if stats['paused'] != 0 and stats['paused'] is not None:
         return True
     return False
 
 
-def get_queue_stats(machine_type):
+def get_queue_stats(queue):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return  
-    stats = reporter.queue_stats(machine_type)
+    stats = reporter.queue_stats(queue)
     return stats
 
 
index 5cff69c880640723b086fe8da503b5b57d6ac32a..36e42eeac4b80e221e569d06b33d76e6a49b8bf0 100644 (file)
@@ -9,7 +9,7 @@ import yaml
 from datetime import datetime
 
 from teuthology import setup_log_file, install_except_hook
-from teuthology import beanstalk
+from teuthology.queue import beanstalk
 from teuthology import report
 from teuthology import safepath
 from teuthology.config import config as teuth_config