]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
teuthology/queue: Single command for queue operations
authorAishwarya Mathuria <amathuri@redhat.com>
Wed, 4 May 2022 14:07:40 +0000 (19:37 +0530)
committerAishwarya Mathuria <amathuri@redhat.com>
Wed, 15 Jun 2022 14:01:35 +0000 (19:31 +0530)
Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
17 files changed:
scripts/beanstalk_queue.py [deleted file]
scripts/dispatcher.py
scripts/kill.py
scripts/paddles_queue.py [deleted file]
scripts/queue.py
scripts/schedule.py
scripts/worker.py
teuthology/dispatcher/__init__.py
teuthology/dispatcher/supervisor.py
teuthology/orchestra/run.py
teuthology/queue/__init__.py
teuthology/queue/beanstalk.py
teuthology/queue/paddles.py
teuthology/queue/util.py [new file with mode: 0644]
teuthology/report.py
teuthology/schedule.py
teuthology/test/test_dispatcher.py

diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py
deleted file mode 100644 (file)
index a8a0661..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-import docopt
-
-import teuthology.config
-import teuthology.queue.beanstalk
-
-doc = """
-usage: teuthology-beanstalk-queue -h
-       teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE
-       teuthology-beanstalk-queue [-r] -m MACHINE_TYPE
-       teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN
-       teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE]
-List Jobs in queue.
-If -D is passed, then jobs with PATTERN in the job name are deleted from the
-queue.
-Arguments:
-  -m, --machine_type MACHINE_TYPE [default: multi]
-                        Which machine type queue to work on.
-optional arguments:
-  -h, --help            Show this help message and exit
-  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
-  -d, --description     Show job descriptions
-  -r, --runs            Only show run names
-  -f, --full            Print the entire job config. Use with caution.
-  -s, --status          Prints the status of the queue
-  -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
-                        will unpause. If -m is passed, pause that queue,
-                        otherwise pause all queues.
-"""
-
-
-def main():
-
-    args = docopt.docopt(doc)
-    print(args)
-    teuthology.beanstalk.main(args)
index 5e64b382d84d2f032dca0343dcd897225e48d09b..ac74b29fa29228735f43c993110000556bcfbabd 100644 (file)
@@ -1,9 +1,9 @@
 """
 usage: teuthology-dispatcher --help
        teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
-       teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND
+       teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] [--queue-backend BACKEND] --log-dir LOG_DIR --tube TUBE
 
-Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk
+Start a dispatcher for the specified tube. Grab jobs from a paddles/beanstalk
 queue and run the teuthology tests they describe as subprocesses. The
 subprocess invoked is a teuthology-dispatcher command run in supervisor
 mode.
@@ -17,12 +17,13 @@ standard arguments:
   -v, --verbose                  be more verbose
   -l, --log-dir LOG_DIR          path in which to store logs
   -a DIR, --archive-dir DIR      path to archive results in
-  --machine-type MACHINE_TYPE    the machine type for the job
+  -t, --tube TUBE                which queue to read jobs from
   --supervisor                   run dispatcher in job supervisor mode
   --bin-path BIN_PATH            teuthology bin path
   --job-config CONFIG            file descriptor of job's config file
   --exit-on-empty-queue          if the queue is empty, exit
-  --queue-backend BACKEND        choose between paddles and beanstalk
+  --queue-backend BACKEND        which backend will be used for the queue
+                                 [default: beanstalk]
 """
 
 import docopt
index e2a1a4ef09e9aac45920b0fe8c4164f27fe047bf..a93bcd86294aa2bbe381d5763f6001a84fb01e56 100644 (file)
@@ -12,7 +12,7 @@ usage: teuthology-kill -h
        teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN
 
 Kill running teuthology jobs:
-1. Removes any queued jobs from the paddles queue
+1. Removes any queued jobs from the queue
 2. Kills any running jobs
 3. Nukes any machines involved
 
diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py
deleted file mode 100644 (file)
index 8487fd9..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-import docopt
-
-import teuthology.config
-import teuthology.queue.paddles_queue
-doc = """
-usage: teuthology-paddles-queue -h
-       teuthology-paddles-queue -s -m MACHINE_TYPE
-       teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
-       teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
-       teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
-       teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
-       teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
-       teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
-
-List Jobs in queue.
-If -D is passed, then jobs with PATTERN in the job name are deleted from the
-queue.
-
-Arguments:
-  -m, --machine_type MACHINE_TYPE
-                        Which machine type queue to work on.
-
-optional arguments:
-  -h, --help            Show this help message and exit
-  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
-  -d, --description     Show job descriptions
-  -r, --runs            Only show run names
-  -f, --full            Print the entire job config. Use with caution.
-  -s, --status          Prints the status of the queue
-  -t, --time SECONDS    Pause queues for a number of seconds.
-                        If -m is passed, pause that queue,
-                        otherwise pause all queues.
-  -p, --pause           Pause queue
-  -u, --unpause         Unpause queue
-  -P, --priority PRIORITY
-                        Change priority of queued jobs
-  -U, --user USER       User who owns the jobs
-  -R, --run-name RUN_NAME
-                        Used to change priority of all jobs in the run.
-"""
-
-
-def main():
-    args = docopt.docopt(doc)
-    teuthology.paddles_queue.main(args)
index 2c466a7be902b167eecd982757d3c7da0e780720..0c972634ed352530cc9023f6db096210a3edda61 100644 (file)
@@ -1,15 +1,16 @@
 import docopt
 
-import teuthology.config
 import teuthology.queue.beanstalk
 import teuthology.queue.paddles
+from teuthology.config import config
 
 doc = """
 usage: teuthology-queue -h
-       teuthology-queue [-s|-d|-f] -m MACHINE_TYPE 
+       teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
        teuthology-queue [-r] -m MACHINE_TYPE
        teuthology-queue -m MACHINE_TYPE -D PATTERN
-       teuthology-queue -p SECONDS [-m MACHINE_TYPE]
+       teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER]
+       teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
 
 List Jobs in queue.
 If -D is passed, then jobs with PATTERN in the job name are deleted from the
@@ -29,9 +30,17 @@ optional arguments:
   -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
                         will unpause. If -m is passed, pause that queue,
                         otherwise pause all queues.
+  -P, --priority PRIORITY
+                        Change priority of queued jobs (only in Paddles queues)
+  -U, --user USER       User who owns the jobs
+  -R, --run-name RUN_NAME
+                        Used to change priority of all jobs in the run.
 """
 
 
 def main():
     args = docopt.docopt(doc)
-    teuthology.queue.main(args)
+    if config.backend == 'beanstalk':
+      teuthology.queue.beanstalk.main(args)
+    else:
+      teuthology.queue.paddles.main(args)
index ee443125ee1a821936a86b8efd27ebc7111e9892..59a2cee298426c891b15feb928fe15d10d5c2308 100644 (file)
@@ -20,7 +20,7 @@ optional arguments:
                                        Queue backend name, use prefix '@'
                                        to append job config to the given
                                        file path as yaml.
-                                       [default: paddles]
+                                       [default: beanstalk]
   -n <name>, --name <name>             Name of suite run the job is part of
   -d <desc>, --description <desc>      Job description
   -o <owner>, --owner <owner>          Job owner
index 8d3228d8d031b2bceaf84bf4c65a997cda327406..a3e12c20d7bb9366ae92347e23e1b833f70b1f34 100644 (file)
@@ -9,7 +9,7 @@ def main():
 
 def parse_args():
     parser = argparse.ArgumentParser(description="""
-Grab jobs from a paddles queue and run the teuthology tests they
+Grab jobs from a beanstalk queue and run the teuthology tests they
 describe. One job is run at a time.
 """)
     parser.add_argument(
@@ -29,8 +29,8 @@ describe. One job is run at a time.
         required=True,
     )
     parser.add_argument(
-        '-m', '--machine-type',
-        help='which machine type the jobs will run on',
+        '-t', '--tube',
+        help='which beanstalk tube to read jobs from',
         required=True,
     )
 
index b06ef606181027ee757b0e9b82890e9459493357..1ec3c7dc91768f05e6aa290e8fb06c64c2797b17 100644 (file)
@@ -5,7 +5,6 @@ import sys
 import yaml
 
 from datetime import datetime
-from time import sleep
 
 from teuthology import setup_log_file, install_except_hook
 from teuthology.queue import beanstalk
@@ -70,23 +69,24 @@ def main(args):
         return supervisor.main(args)
 
     verbose = args["--verbose"]
-    machine_type = args["--machine-type"]
+    tube = args["--tube"]
     log_dir = args["--log-dir"]
     archive_dir = args["--archive-dir"]
     exit_on_empty_queue = args["--exit-on-empty-queue"]
     backend = args['--queue-backend']
 
+    if backend is None:
+        backend = 'beanstalk'
+
     if archive_dir is None:
         archive_dir = teuth_config.archive_base
 
-    if machine_type is None and teuth_config.machine_type is None:
-        return
     # setup logging for disoatcher in {log_dir}
     loglevel = logging.INFO
     if verbose:
         loglevel = logging.DEBUG
     log.setLevel(loglevel)
-    log_file_path = os.path.join(log_dir, f"dispatcher.{machine_type}.{os.getpid()}")
+    log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
     setup_log_file(log_file_path)
     install_except_hook()
 
@@ -94,7 +94,7 @@ def main(args):
 
     if backend == 'beanstalk':
         connection = beanstalk.connect()
-        beanstalk.watch_tube(connection, machine_type)
+        beanstalk.watch_tube(connection, tube)
 
     result_proc = None
 
@@ -129,8 +129,11 @@ def main(args):
             log.info('Reserved job %s', job_id)
             log.info('Config is: %s', job.body)
         else:
-            job = report.get_queued_job(machine_type)
+            job = report.get_queued_job(tube)
             if job is None:
+                if exit_on_empty_queue and not job_procs:
+                    log.info("Queue is empty and no supervisor processes running; exiting!")
+                    break
                 continue
             job = clean_config(job)
             report.try_push_job_info(job, dict(status='running'))
@@ -220,18 +223,3 @@ def create_job_archive(job_name, job_archive_path, archive_dir):
     if not os.path.exists(run_archive):
         safepath.makedirs('/', run_archive)
     safepath.makedirs('/', job_archive_path)
-
-
-def pause_queue(machine_type, paused, paused_by, pause_duration=None):
-    if paused:
-        report.pause_queue(machine_type, paused, paused_by, pause_duration)
-        '''
-        If there is a pause duration specified
-        un-pause the queue after the time elapses
-        '''
-        if pause_duration is not None:
-            sleep(int(pause_duration))
-            paused = False
-            report.pause_queue(machine_type, paused, paused_by)
-    elif not paused:
-        report.pause_queue(machine_type, paused, paused_by)
index edb273a1996737c29d87b6906f471ef3d986a879..19883267dc62a47a9f861a80ef2c05d6240d6f97 100644 (file)
@@ -70,9 +70,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
         if teuth_config.results_server:
             try:
                 report.try_delete_jobs(job_config['name'], job_config['job_id'])
-            except Exception as e:
-                log.warning("Unable to delete job %s, exception occurred: %s",
-                            job_config['job_id'], e)
+            except Exception:
+                log.exception("Unable to delete job %s", job_config['job_id'])
         job_archive = os.path.join(archive_dir, safe_archive)
         args = [
             os.path.join(teuth_bin_path, 'teuthology-results'),
@@ -130,7 +129,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
         '--archive', job_config['archive_path'],
         '--name', job_config['name'],
     ])
-    if 'description' in job_config:
+    if job_config.get('description') is not None:
         arg.extend(['--description', job_config['description']])
     job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml')
     arg.extend(['--', job_archive])
index 6235b0d36e4913d0bfb30ff8c5a1a0be62de28fe..f31dfd0d7fc1db89feab1f67abdc95c809a9731d 100644 (file)
@@ -182,6 +182,7 @@ class RemoteProcess(object):
                     command=self.command, exitstatus=self.returncode,
                     node=self.hostname, label=self.label
                 )
+
     def _get_exitstatus(self):
         """
         :returns: the remote command's exit status (return code). Note that
index 2a0b6ff36375f466e15bfff63757f3d4c6fa7992..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 (file)
@@ -1,106 +0,0 @@
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology import report
-from teuthology.config import config
-
-log = logging.getLogger(__name__)
-
-def print_progress(index, total, message=None):
-    msg = "{m} ".format(m=message) if message else ''
-    sys.stderr.write("{msg}{i}/{total}\r".format(
-        msg=msg, i=index, total=total))
-    sys.stderr.flush()
-
-def end_progress():
-    sys.stderr.write('\n')
-    sys.stderr.flush()
-
-class JobProcessor(object):
-    def __init__(self):
-        self.jobs = OrderedDict()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_id = str(job_id)
-
-        job_dict = dict(
-            index=(len(self.jobs) + 1),
-            job_config=job_config,
-        )
-        if job_obj:
-            job_dict['job_obj'] = job_obj
-        self.jobs[job_id] = job_dict
-
-        self.process_job(job_id)
-
-    def process_job(self, job_id):
-        pass
-
-    def complete(self):
-        pass
-
-
-class JobPrinter(JobProcessor):
-    def __init__(self, show_desc=False, full=False):
-        super(JobPrinter, self).__init__()
-        self.show_desc = show_desc
-        self.full = full
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_index = self.jobs[job_id]['index']
-        job_priority = job_config['priority']
-        job_name = job_config['name']
-        job_desc = job_config['description']
-        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
-            i=job_index,
-            pri=job_priority,
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        if self.full:
-            pprint.pprint(job_config)
-        elif job_desc and self.show_desc:
-            for desc in job_desc.split():
-                print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
-    def __init__(self):
-        super(RunPrinter, self).__init__()
-        self.runs = list()
-
-    def process_job(self, job_id):
-        run = self.jobs[job_id]['job_config']['name']
-        if run not in self.runs:
-            self.runs.append(run)
-            print(run)
-
-
-class JobDeleter(JobProcessor):
-    def __init__(self, pattern):
-        self.pattern = pattern
-        super(JobDeleter, self).__init__()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_name = job_config['name']
-        if self.pattern in job_name:
-            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_name = job_config['name']
-        print('Deleting {job_name}/{job_id}'.format(
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        report.try_delete_jobs(job_name, job_id)
-
-
-def main(args):
-    if config.backend == 'paddles':
-        paddles.main(args)
-    else:
-        beanstalk.main(args)
\ No newline at end of file
index 90b1cbd6d3cff505015902b40221e824e895b875..c668e4f6bcb68be215f5535fe5013cb4bca5a7cf 100644 (file)
@@ -1,12 +1,10 @@
 import beanstalkc
 import yaml
 import logging
-import pprint
-import sys
-from collections import OrderedDict
 
 from teuthology.config import config
-from teuthology import report
+from teuthology.queue import util
+
 
 log = logging.getLogger(__name__)
 
@@ -47,7 +45,7 @@ def walk_jobs(connection, tube_name, processor, pattern=None):
     # Try to figure out a sane timeout based on how many jobs are in the queue
     timeout = job_count / 2000.0 * 60
     for i in range(1, job_count + 1):
-        print_progress(i, job_count, "Loading")
+        util.print_progress(i, job_count, "Loading")
         job = connection.reserve(timeout=timeout)
         if job is None or job.body is None:
             continue
@@ -57,7 +55,7 @@ def walk_jobs(connection, tube_name, processor, pattern=None):
         if pattern is not None and pattern not in job_name:
             continue
         processor.add_job(job_id, job_config, job)
-    end_progress()
+    util.end_progress()
     processor.complete()
 
 
@@ -105,13 +103,13 @@ def main(args):
             pause_tube(connection, machine_type, pause_duration)
         elif delete:
             walk_jobs(connection, machine_type,
-                      JobDeleter(delete))
+                      util.JobDeleter(delete))
         elif runs:
             walk_jobs(connection, machine_type,
-                      RunPrinter())
+                      util.RunPrinter())
         else:
             walk_jobs(connection, machine_type,
-                      JobPrinter(show_desc=show_desc, full=full))
+                      util.JobPrinter(show_desc=show_desc, full=full))
     except KeyboardInterrupt:
         log.info("Interrupted.")
     finally:
index f2ea8b84c81560c5e07685c0b3ee2caf10113339..489d638e2fcf37d23f99eac35b6eabe3a1466338 100644 (file)
@@ -1,11 +1,7 @@
 import logging
-import pprint
-import sys
-from collections import OrderedDict
 
 from teuthology import report
-from teuthology.dispatcher import pause_queue
-
+from teuthology.queue import util
 
 log = logging.getLogger(__name__)
 
@@ -14,19 +10,17 @@ def stats_queue(machine_type):
     stats = report.get_queue_stats(machine_type)
     if stats['paused'] is None:
         log.info("%s queue is currently running with %s jobs queued",
-                 stats['name'],
-                 stats['count'])
+                 stats['queue'],
+                 stats['queued_jobs'])
     else:
         log.info("%s queue is paused with %s jobs queued",
-                 stats['name'],
-                 stats['count'])
+                 stats['queue'],
+                 stats['queued_jobs'])
 
 
-def update_priority(machine_type, priority, user, run_name=None):
+def update_priority(machine_type, priority, run_name=None):
     if run_name is not None:
-        jobs = report.get_user_jobs_queue(machine_type, user, run_name)
-    else:
-        jobs = report.get_user_jobs_queue(machine_type, user)
+        jobs = report.get_jobs_by_run(machine_type, run_name)
     for job in jobs:
         job['priority'] = priority
         report.try_push_job_info(job)
@@ -34,55 +28,54 @@ def update_priority(machine_type, priority, user, run_name=None):
 
 def walk_jobs(machine_type, processor, user):
     log.info("Checking paddles queue...")
-    job_count = report.get_queue_stats(machine_type)['count']
+    job_count = report.get_queue_stats(machine_type)['queued_jobs']
 
     jobs = report.get_user_jobs_queue(machine_type, user)
     if job_count == 0:
-        log.info('No jobs in queue')
+        log.info('No jobs in Paddles queue')
         return
 
     for i in range(1, job_count + 1):
-        print_progress(i, job_count, "Loading")
+        util.print_progress(i, job_count, "Loading")
         job = jobs[i-1]
         if job is None:
             continue
         job_id = job['job_id']
         processor.add_job(job_id, job)
-    end_progress()
+    util.end_progress()
     processor.complete()
 
 
 def main(args):
     machine_type = args['--machine_type']
-    #user = args['--user']
-    #run_name = args['--run_name']
-    #priority = args['--priority']
+    user = args['--user']
+    run_name = args['--run-name']
     status = args['--status']
     delete = args['--delete']
     runs = args['--runs']
     show_desc = args['--description']
     full = args['--full']
     pause_duration = args['--pause']
-    #unpause = args['--unpause']
-    #pause_duration = args['--time']
+    priority = args['--priority']
     try:
         if status:
             stats_queue(machine_type)
         if pause_duration:
-            pause_queue(machine_type, pause, user, pause_duration)
-        #else:
-            #pause_queue(machine_type, pause, user)
+            if not user:
+                log.info('Please enter user to pause Paddles queue')
+                return
+            report.pause_queue(machine_type, user, pause_duration)
         elif priority:
             update_priority(machine_type, priority, run_name)
         elif delete:
             walk_jobs(machine_type,
-                      JobDeleter(delete), user)
+                      util.JobDeleter(delete), user)
         elif runs:
             walk_jobs(machine_type,
-                      RunPrinter(), user)
+                      util.RunPrinter(), user)
         else:
             walk_jobs(machine_type,
-                      JobPrinter(show_desc=show_desc, full=full),
+                      util.JobPrinter(show_desc=show_desc, full=full),
                       user)
     except KeyboardInterrupt:
         log.info("Interrupted.")
diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py
new file mode 100644 (file)
index 0000000..2a7642e
--- /dev/null
@@ -0,0 +1,101 @@
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def print_progress(index, total, message=None):
+    msg = "{m} ".format(m=message) if message else ''
+    sys.stderr.write("{msg}{i}/{total}\r".format(
+        msg=msg, i=index, total=total))
+    sys.stderr.flush()
+
+
+def end_progress():
+    sys.stderr.write('\n')
+    sys.stderr.flush()
+
+
+class JobProcessor(object):
+    def __init__(self):
+        self.jobs = OrderedDict()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_id = str(job_id)
+
+        job_dict = dict(
+            index=(len(self.jobs) + 1),
+            job_config=job_config,
+        )
+        if job_obj:
+            job_dict['job_obj'] = job_obj
+        self.jobs[job_id] = job_dict
+
+        self.process_job(job_id)
+
+    def process_job(self, job_id):
+        pass
+
+    def complete(self):
+        pass
+
+
+class JobPrinter(JobProcessor):
+    def __init__(self, show_desc=False, full=False):
+        super(JobPrinter, self).__init__()
+        self.show_desc = show_desc
+        self.full = full
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_index = self.jobs[job_id]['index']
+        job_priority = job_config['priority']
+        job_name = job_config['name']
+        job_desc = job_config['description']
+        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+            i=job_index,
+            pri=job_priority,
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        if self.full:
+            pprint.pprint(job_config)
+        elif job_desc and self.show_desc:
+            for desc in job_desc.split():
+                print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+    def __init__(self):
+        super(RunPrinter, self).__init__()
+        self.runs = list()
+
+    def process_job(self, job_id):
+        run = self.jobs[job_id]['job_config']['name']
+        if run not in self.runs:
+            self.runs.append(run)
+            print(run)
+
+
+class JobDeleter(JobProcessor):
+    def __init__(self, pattern):
+        self.pattern = pattern
+        super(JobDeleter, self).__init__()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_name = job_config['name']
+        if self.pattern in job_name:
+            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_name = job_config['name']
+        print('Deleting {job_name}/{job_id}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        report.try_delete_jobs(job_name, job_id)
index 6796415014cb9ae1ab128988de1210798dc0224e..6e2a1993a385c9ae5133908482c5002ec0c96d99 100644 (file)
@@ -282,7 +282,6 @@ class ResultsReporter(object):
                 sleep=1, increment=inc, action=f'write job for {run_name}') as proceed:
             while proceed():
                 response = self.session.post(run_uri, data=job_json, headers=headers)
-
                 if response.status_code == 200:
                     resp_json = response.json()
                     job_id = resp_json['job_id']
@@ -387,7 +386,7 @@ class ResultsReporter(object):
     
     def get_top_job(self, queue):
 
-        uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri,
+        uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri,
                                                                           queue=queue)
         inc = random.uniform(0, 1)
         with safe_while(
@@ -523,7 +522,6 @@ class ResultsReporter(object):
                 sleep=1, increment=inc, action=f'creating queue {queue}') as proceed:
             while proceed():
                 response = self.session.post(uri, data=queue_json, headers=headers)
-
                 if response.status_code == 200:
                     self.log.info("Successfully created queue {queue}".format(
                         queue=queue,
@@ -546,13 +544,14 @@ class ResultsReporter(object):
 
         response.raise_for_status()
 
-    def update_queue(self, queue, paused, paused_by, pause_duration=None):
+    def update_queue(self, queue, paused_by, pause_duration=None):
         uri = "{base}/queue/".format(
             base=self.base_uri
         )
+
         if pause_duration is not None:
             pause_duration = int(pause_duration)
-        queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by,
+        queue_info = {'queue': queue, 'paused_by': paused_by,
                       'pause_duration': pause_duration}
         queue_json = json.dumps(queue_info)
         headers = {'content-type': 'application/json'}
@@ -595,9 +594,6 @@ class ResultsReporter(object):
                 response = self.session.post(uri, data=queue_json, headers=headers)
 
                 if response.status_code == 200:
-                    self.log.info("Successfully retrieved stats for queue {queue}".format(
-                        queue=queue,
-                    ))
                     return response.json()
                 else:
                     msg = response.text
@@ -669,12 +665,18 @@ def get_user_jobs_queue(queue, user, run_name=None):
         return
     return reporter.queued_jobs(queue, user, run_name)
 
+def get_jobs_by_run(queue, run_name):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    return reporter.queued_jobs(queue, None, run_name)
+
 
-def pause_queue(queue, paused, paused_by, pause_duration=None):
+def pause_queue(queue, paused_by, pause_duration=None):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    reporter.update_queue(queue, paused, paused_by, pause_duration)
+    reporter.update_queue(queue, paused_by, pause_duration)
 
 
 def is_queue_paused(queue):
@@ -711,7 +713,7 @@ def push_job_info(run_name, job_id, job_info, base_uri=None):
     reporter.report_job(run_name, job_id, job_info)
 
 
-def get_queued_job(machine_type):
+def get_queued_job(queue):
     """
     Retrieve a job that is queued depending on priority
 
@@ -720,10 +722,6 @@ def get_queued_job(machine_type):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    if ',' in machine_type:
-        queue = 'multi'
-    else:
-        queue = machine_type
     if is_queue_paused(queue) == True:
         log.info("Teuthology queue %s is currently paused",
                   queue)
index 81dd4d548f1a85daad782fce19339d3563ad3409..fea64e9b4df31a7f07f44b9914a7b1dd8e79286a 100644 (file)
@@ -1,7 +1,7 @@
 import os
 import yaml
 
-import teuthology.beanstalk
+import teuthology.queue.beanstalk
 from teuthology.misc import get_user, merge_configs
 from teuthology import report
 
@@ -115,9 +115,9 @@ def beanstalk_schedule_job(job_config, backend, num=1):
     """
     num = int(num)
     tube = job_config.pop('tube')
-    beanstalk = teuthology.beanstalk.connect()
+    beanstalk = teuthology.queue.beanstalk.connect()
     beanstalk.use(tube)
-    queue = report.create_machine_type_queue(job_config['machine_type'])
+    queue = report.create_machine_type_queue(tube)
     job_config['queue'] = queue
     while num > 0:
         job_id = report.try_create_job(job_config, dict(status='queued'))
index 9a6d0ff564cd31addc26686159adc6fe92f0e83f..ce4e55d851940eabc5db5e2fd9a7ec6d9643da1f 100644 (file)
@@ -1,8 +1,6 @@
-from teuthology import dispatcher
 from unittest.mock import patch, Mock
 from teuthology import report
 
-import unittest.mock as mock
 import unittest
 
 
@@ -35,74 +33,3 @@ class TestDispatcher(unittest.TestCase):
 
         self.assertEqual(response.status_code, 200)
         self.assertEqual(response.json(), job_config)
-    
-
-    @patch("teuthology.worker.fetch_teuthology")
-    @patch("teuthology.dispatcher.fetch_qa_suite")
-    @patch("teuthology.worker.fetch_qa_suite")
-    @patch("teuthology.dispatcher.report.get_queued_job")
-    @patch("teuthology.dispatcher.report.try_push_job_info")
-    @patch("teuthology.dispatcher.setup_log_file")
-    @patch("os.path.isdir")
-    @patch("os.getpid")
-    @patch("teuthology.dispatcher.teuth_config")
-    @patch("subprocess.Popen")
-    @patch("os.path.join")
-    @patch("teuthology.dispatcher.create_job_archive")
-    @patch("yaml.safe_dump")
-    def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, 
-                             m_worker_fetch_qa_suite, m_get_queued_job, 
-                             m_try_push_job_info, 
-                             m_setup_log, 
-                             m_isdir, m_getpid, 
-                             m_t_config, m_popen, m_join, m_create_archive, m_yaml_dump):
-            
-        args = {
-            '--owner': 'the_owner',
-            '--archive-dir': '/archive/dir',
-            '--log-dir': '/worker/log',
-            '--name': 'the_name',
-            '--description': 'the_description',
-            '--machine-type': 'test_queue',
-            '--supervisor': False,
-            '--verbose': False,
-            '--queue-backend': 'paddles'
-        }
-
-        m = mock.MagicMock()
-        job_id = {'job_id': '1'}
-        m.__getitem__.side_effect = job_id.__getitem__
-        m.__iter__.side_effect = job_id.__iter__
-        job = {
-            'job_id': '1',
-            'description': 'DESC',
-            'email': 'EMAIL',
-            'first_in_suite': False,
-            'last_in_suite': True,
-            'machine_type': 'test_queue',
-            'name': 'NAME',
-            'owner': 'OWNER',
-            'priority': 99,
-            'results_timeout': '6',
-            'verbose': False,
-            'stop_worker': True,
-            'archive_path': '/archive/dir/NAME/1'
-        }
-
-        m_fetch_teuthology.return_value = '/teuth/path'
-        m_fetch_qa_suite.return_value = '/suite/path'
-        m_isdir.return_value = True
-        mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job')
-        mock_get = mock_get_patcher.start()
-        mock_get.return_value = job
-
-        mock_prep_job_patcher = patch('teuthology.dispatcher.prep_job')
-        mock_prep_job = mock_prep_job_patcher.start()
-        mock_prep_job.return_value = (job, '/teuth/bin/path')
-        m_yaml_dump.return_value = ''
-
-        m_try_push_job_info.called_once_with(job, dict(status='running'))
-        dispatcher.main(args)
-        mock_get_patcher.stop()
-
-