]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
teuthology/queue: Single command for queue operations wip-amathuria-replace-beanstalkd-paddles
authorAishwarya Mathuria <amathuri@redhat.com>
Wed, 4 May 2022 14:07:40 +0000 (19:37 +0530)
committerZack Cerza <zack@redhat.com>
Tue, 31 May 2022 19:46:19 +0000 (13:46 -0600)
Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
12 files changed:
scripts/paddles_queue.py [deleted file]
scripts/queue.py
scripts/worker.py
teuthology/dispatcher/__init__.py
teuthology/dispatcher/supervisor.py
teuthology/queue/__init__.py
teuthology/queue/beanstalk.py
teuthology/queue/paddles.py
teuthology/queue/util.py [new file with mode: 0644]
teuthology/report.py
teuthology/schedule.py
teuthology/test/test_dispatcher.py

diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py
deleted file mode 100644 (file)
index 8487fd9..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-import docopt
-
-import teuthology.config
-import teuthology.queue.paddles_queue
-doc = """
-usage: teuthology-paddles-queue -h
-       teuthology-paddles-queue -s -m MACHINE_TYPE
-       teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
-       teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
-       teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
-       teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
-       teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
-       teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
-
-List Jobs in queue.
-If -D is passed, then jobs with PATTERN in the job name are deleted from the
-queue.
-
-Arguments:
-  -m, --machine_type MACHINE_TYPE
-                        Which machine type queue to work on.
-
-optional arguments:
-  -h, --help            Show this help message and exit
-  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
-  -d, --description     Show job descriptions
-  -r, --runs            Only show run names
-  -f, --full            Print the entire job config. Use with caution.
-  -s, --status          Prints the status of the queue
-  -t, --time SECONDS    Pause queues for a number of seconds.
-                        If -m is passed, pause that queue,
-                        otherwise pause all queues.
-  -p, --pause           Pause queue
-  -u, --unpause         Unpause queue
-  -P, --priority PRIORITY
-                        Change priority of queued jobs
-  -U, --user USER       User who owns the jobs
-  -R, --run-name RUN_NAME
-                        Used to change priority of all jobs in the run.
-"""
-
-
-def main():
-    args = docopt.docopt(doc)
-    teuthology.paddles_queue.main(args)
index 2c466a7be902b167eecd982757d3c7da0e780720..1d9112c22ecc29c363fcb27e56898715f339725b 100644 (file)
@@ -1,15 +1,16 @@
 import docopt
 
-import teuthology.config
 import teuthology.queue.beanstalk
 import teuthology.queue.paddles
+from teuthology.config import config
 
 doc = """
 usage: teuthology-queue -h
        teuthology-queue [-s|-d|-f] -m MACHINE_TYPE 
        teuthology-queue [-r] -m MACHINE_TYPE
        teuthology-queue -m MACHINE_TYPE -D PATTERN
-       teuthology-queue -p SECONDS [-m MACHINE_TYPE]
+       teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER]
+       teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
 
 List Jobs in queue.
 If -D is passed, then jobs with PATTERN in the job name are deleted from the
@@ -29,9 +30,17 @@ optional arguments:
   -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
                         will unpause. If -m is passed, pause that queue,
                         otherwise pause all queues.
+  -P, --priority PRIORITY
+                        Change priority of queued jobs (only in Paddles queues)
+  -U, --user USER       User who owns the jobs
+  -R, --run-name RUN_NAME
+                        Used to change priority of all jobs in the run.
 """
 
 
 def main():
     args = docopt.docopt(doc)
-    teuthology.queue.main(args)
+    if config.backend == 'beanstalk':
+      teuthology.queue.beanstalk.main(args)
+    else:
+      teuthology.queue.paddles.main(args)
index 8d3228d8d031b2bceaf84bf4c65a997cda327406..a3e12c20d7bb9366ae92347e23e1b833f70b1f34 100644 (file)
@@ -9,7 +9,7 @@ def main():
 
 def parse_args():
     parser = argparse.ArgumentParser(description="""
-Grab jobs from a paddles queue and run the teuthology tests they
+Grab jobs from a beanstalk queue and run the teuthology tests they
 describe. One job is run at a time.
 """)
     parser.add_argument(
@@ -29,8 +29,8 @@ describe. One job is run at a time.
         required=True,
     )
     parser.add_argument(
-        '-m', '--machine-type',
-        help='which machine type the jobs will run on',
+        '-t', '--tube',
+        help='which beanstalk tube to read jobs from',
         required=True,
     )
 
index b06ef606181027ee757b0e9b82890e9459493357..2e4a084e73d333f3977cae9bbfcb7ed0d6a93ede 100644 (file)
@@ -95,6 +95,8 @@ def main(args):
     if backend == 'beanstalk':
         connection = beanstalk.connect()
         beanstalk.watch_tube(connection, machine_type)
+    elif backend == 'paddles':
+        report.create_machine_type_queue(machine_type)
 
     result_proc = None
 
@@ -131,6 +133,9 @@ def main(args):
         else:
             job = report.get_queued_job(machine_type)
             if job is None:
+                if exit_on_empty_queue and not job_procs:
+                    log.info("Queue is empty and no supervisor processes running; exiting!")
+                    break
                 continue
             job = clean_config(job)
             report.try_push_job_info(job, dict(status='running'))
index edb273a1996737c29d87b6906f471ef3d986a879..19883267dc62a47a9f861a80ef2c05d6240d6f97 100644 (file)
@@ -70,9 +70,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
         if teuth_config.results_server:
             try:
                 report.try_delete_jobs(job_config['name'], job_config['job_id'])
-            except Exception as e:
-                log.warning("Unable to delete job %s, exception occurred: %s",
-                            job_config['job_id'], e)
+            except Exception:
+                log.exception("Unable to delete job %s", job_config['job_id'])
         job_archive = os.path.join(archive_dir, safe_archive)
         args = [
             os.path.join(teuth_bin_path, 'teuthology-results'),
@@ -130,7 +129,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
         '--archive', job_config['archive_path'],
         '--name', job_config['name'],
     ])
-    if 'description' in job_config:
+    if job_config.get('description') is not None:
         arg.extend(['--description', job_config['description']])
     job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml')
     arg.extend(['--', job_archive])
index 2a0b6ff36375f466e15bfff63757f3d4c6fa7992..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 (file)
@@ -1,106 +0,0 @@
-import logging
-import pprint
-import sys
-from collections import OrderedDict
-
-from teuthology import report
-from teuthology.config import config
-
-log = logging.getLogger(__name__)
-
-def print_progress(index, total, message=None):
-    msg = "{m} ".format(m=message) if message else ''
-    sys.stderr.write("{msg}{i}/{total}\r".format(
-        msg=msg, i=index, total=total))
-    sys.stderr.flush()
-
-def end_progress():
-    sys.stderr.write('\n')
-    sys.stderr.flush()
-
-class JobProcessor(object):
-    def __init__(self):
-        self.jobs = OrderedDict()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_id = str(job_id)
-
-        job_dict = dict(
-            index=(len(self.jobs) + 1),
-            job_config=job_config,
-        )
-        if job_obj:
-            job_dict['job_obj'] = job_obj
-        self.jobs[job_id] = job_dict
-
-        self.process_job(job_id)
-
-    def process_job(self, job_id):
-        pass
-
-    def complete(self):
-        pass
-
-
-class JobPrinter(JobProcessor):
-    def __init__(self, show_desc=False, full=False):
-        super(JobPrinter, self).__init__()
-        self.show_desc = show_desc
-        self.full = full
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_index = self.jobs[job_id]['index']
-        job_priority = job_config['priority']
-        job_name = job_config['name']
-        job_desc = job_config['description']
-        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
-            i=job_index,
-            pri=job_priority,
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        if self.full:
-            pprint.pprint(job_config)
-        elif job_desc and self.show_desc:
-            for desc in job_desc.split():
-                print('\t {}'.format(desc))
-
-
-class RunPrinter(JobProcessor):
-    def __init__(self):
-        super(RunPrinter, self).__init__()
-        self.runs = list()
-
-    def process_job(self, job_id):
-        run = self.jobs[job_id]['job_config']['name']
-        if run not in self.runs:
-            self.runs.append(run)
-            print(run)
-
-
-class JobDeleter(JobProcessor):
-    def __init__(self, pattern):
-        self.pattern = pattern
-        super(JobDeleter, self).__init__()
-
-    def add_job(self, job_id, job_config, job_obj=None):
-        job_name = job_config['name']
-        if self.pattern in job_name:
-            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
-
-    def process_job(self, job_id):
-        job_config = self.jobs[job_id]['job_config']
-        job_name = job_config['name']
-        print('Deleting {job_name}/{job_id}'.format(
-            job_id=job_id,
-            job_name=job_name,
-            ))
-        report.try_delete_jobs(job_name, job_id)
-
-
-def main(args):
-    if config.backend == 'paddles':
-        paddles.main(args)
-    else:
-        beanstalk.main(args)
\ No newline at end of file
index 90b1cbd6d3cff505015902b40221e824e895b875..c668e4f6bcb68be215f5535fe5013cb4bca5a7cf 100644 (file)
@@ -1,12 +1,10 @@
 import beanstalkc
 import yaml
 import logging
-import pprint
-import sys
-from collections import OrderedDict
 
 from teuthology.config import config
-from teuthology import report
+from teuthology.queue import util
+
 
 log = logging.getLogger(__name__)
 
@@ -47,7 +45,7 @@ def walk_jobs(connection, tube_name, processor, pattern=None):
     # Try to figure out a sane timeout based on how many jobs are in the queue
     timeout = job_count / 2000.0 * 60
     for i in range(1, job_count + 1):
-        print_progress(i, job_count, "Loading")
+        util.print_progress(i, job_count, "Loading")
         job = connection.reserve(timeout=timeout)
         if job is None or job.body is None:
             continue
@@ -57,7 +55,7 @@ def walk_jobs(connection, tube_name, processor, pattern=None):
         if pattern is not None and pattern not in job_name:
             continue
         processor.add_job(job_id, job_config, job)
-    end_progress()
+    util.end_progress()
     processor.complete()
 
 
@@ -105,13 +103,13 @@ def main(args):
             pause_tube(connection, machine_type, pause_duration)
         elif delete:
             walk_jobs(connection, machine_type,
-                      JobDeleter(delete))
+                      util.JobDeleter(delete))
         elif runs:
             walk_jobs(connection, machine_type,
-                      RunPrinter())
+                      util.RunPrinter())
         else:
             walk_jobs(connection, machine_type,
-                      JobPrinter(show_desc=show_desc, full=full))
+                      util.JobPrinter(show_desc=show_desc, full=full))
     except KeyboardInterrupt:
         log.info("Interrupted.")
     finally:
index f2ea8b84c81560c5e07685c0b3ee2caf10113339..489d638e2fcf37d23f99eac35b6eabe3a1466338 100644 (file)
@@ -1,11 +1,7 @@
 import logging
-import pprint
-import sys
-from collections import OrderedDict
 
 from teuthology import report
-from teuthology.dispatcher import pause_queue
-
+from teuthology.queue import util
 
 log = logging.getLogger(__name__)
 
@@ -14,19 +10,17 @@ def stats_queue(machine_type):
     stats = report.get_queue_stats(machine_type)
     if stats['paused'] is None:
         log.info("%s queue is currently running with %s jobs queued",
-                 stats['name'],
-                 stats['count'])
+                 stats['queue'],
+                 stats['queued_jobs'])
     else:
         log.info("%s queue is paused with %s jobs queued",
-                 stats['name'],
-                 stats['count'])
+                 stats['queue'],
+                 stats['queued_jobs'])
 
 
-def update_priority(machine_type, priority, user, run_name=None):
+def update_priority(machine_type, priority, run_name=None):
     if run_name is not None:
-        jobs = report.get_user_jobs_queue(machine_type, user, run_name)
-    else:
-        jobs = report.get_user_jobs_queue(machine_type, user)
+        jobs = report.get_jobs_by_run(machine_type, run_name)
     for job in jobs:
         job['priority'] = priority
         report.try_push_job_info(job)
@@ -34,55 +28,54 @@ def update_priority(machine_type, priority, user, run_name=None):
 
 def walk_jobs(machine_type, processor, user):
     log.info("Checking paddles queue...")
-    job_count = report.get_queue_stats(machine_type)['count']
+    job_count = report.get_queue_stats(machine_type)['queued_jobs']
 
     jobs = report.get_user_jobs_queue(machine_type, user)
     if job_count == 0:
-        log.info('No jobs in queue')
+        log.info('No jobs in Paddles queue')
         return
 
     for i in range(1, job_count + 1):
-        print_progress(i, job_count, "Loading")
+        util.print_progress(i, job_count, "Loading")
         job = jobs[i-1]
         if job is None:
             continue
         job_id = job['job_id']
         processor.add_job(job_id, job)
-    end_progress()
+    util.end_progress()
     processor.complete()
 
 
 def main(args):
     machine_type = args['--machine_type']
-    #user = args['--user']
-    #run_name = args['--run_name']
-    #priority = args['--priority']
+    user = args['--user']
+    run_name = args['--run-name']
     status = args['--status']
     delete = args['--delete']
     runs = args['--runs']
     show_desc = args['--description']
     full = args['--full']
     pause_duration = args['--pause']
-    #unpause = args['--unpause']
-    #pause_duration = args['--time']
+    priority = args['--priority']
     try:
         if status:
             stats_queue(machine_type)
         if pause_duration:
-            pause_queue(machine_type, pause, user, pause_duration)
-        #else:
-            #pause_queue(machine_type, pause, user)
+            if not user:
+                log.info('Please enter user to pause Paddles queue')
+                return
+            report.pause_queue(machine_type, user, pause_duration)
         elif priority:
             update_priority(machine_type, priority, run_name)
         elif delete:
             walk_jobs(machine_type,
-                      JobDeleter(delete), user)
+                      util.JobDeleter(delete), user)
         elif runs:
             walk_jobs(machine_type,
-                      RunPrinter(), user)
+                      util.RunPrinter(), user)
         else:
             walk_jobs(machine_type,
-                      JobPrinter(show_desc=show_desc, full=full),
+                      util.JobPrinter(show_desc=show_desc, full=full),
                       user)
     except KeyboardInterrupt:
         log.info("Interrupted.")
diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py
new file mode 100644 (file)
index 0000000..2a7642e
--- /dev/null
@@ -0,0 +1,101 @@
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def print_progress(index, total, message=None):
+    msg = "{m} ".format(m=message) if message else ''
+    sys.stderr.write("{msg}{i}/{total}\r".format(
+        msg=msg, i=index, total=total))
+    sys.stderr.flush()
+
+
+def end_progress():
+    sys.stderr.write('\n')
+    sys.stderr.flush()
+
+
+class JobProcessor(object):
+    def __init__(self):
+        self.jobs = OrderedDict()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_id = str(job_id)
+
+        job_dict = dict(
+            index=(len(self.jobs) + 1),
+            job_config=job_config,
+        )
+        if job_obj:
+            job_dict['job_obj'] = job_obj
+        self.jobs[job_id] = job_dict
+
+        self.process_job(job_id)
+
+    def process_job(self, job_id):
+        pass
+
+    def complete(self):
+        pass
+
+
+class JobPrinter(JobProcessor):
+    def __init__(self, show_desc=False, full=False):
+        super(JobPrinter, self).__init__()
+        self.show_desc = show_desc
+        self.full = full
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_index = self.jobs[job_id]['index']
+        job_priority = job_config['priority']
+        job_name = job_config['name']
+        job_desc = job_config['description']
+        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+            i=job_index,
+            pri=job_priority,
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        if self.full:
+            pprint.pprint(job_config)
+        elif job_desc and self.show_desc:
+            for desc in job_desc.split():
+                print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+    def __init__(self):
+        super(RunPrinter, self).__init__()
+        self.runs = list()
+
+    def process_job(self, job_id):
+        run = self.jobs[job_id]['job_config']['name']
+        if run not in self.runs:
+            self.runs.append(run)
+            print(run)
+
+
+class JobDeleter(JobProcessor):
+    def __init__(self, pattern):
+        self.pattern = pattern
+        super(JobDeleter, self).__init__()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_name = job_config['name']
+        if self.pattern in job_name:
+            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_name = job_config['name']
+        print('Deleting {job_name}/{job_id}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        report.try_delete_jobs(job_name, job_id)
index 6796415014cb9ae1ab128988de1210798dc0224e..bfd79f988f20e7a2838a29d479c8d7fe5af87d5f 100644 (file)
@@ -387,7 +387,7 @@ class ResultsReporter(object):
     
     def get_top_job(self, queue):
 
-        uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri,
+        uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri,
                                                                           queue=queue)
         inc = random.uniform(0, 1)
         with safe_while(
@@ -546,13 +546,14 @@ class ResultsReporter(object):
 
         response.raise_for_status()
 
-    def update_queue(self, queue, paused, paused_by, pause_duration=None):
+    def update_queue(self, queue, paused_by, pause_duration=None):
         uri = "{base}/queue/".format(
             base=self.base_uri
         )
+
         if pause_duration is not None:
             pause_duration = int(pause_duration)
-        queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by,
+        queue_info = {'queue': queue, 'paused_by': paused_by,
                       'pause_duration': pause_duration}
         queue_json = json.dumps(queue_info)
         headers = {'content-type': 'application/json'}
@@ -595,9 +596,6 @@ class ResultsReporter(object):
                 response = self.session.post(uri, data=queue_json, headers=headers)
 
                 if response.status_code == 200:
-                    self.log.info("Successfully retrieved stats for queue {queue}".format(
-                        queue=queue,
-                    ))
                     return response.json()
                 else:
                     msg = response.text
@@ -669,12 +667,18 @@ def get_user_jobs_queue(queue, user, run_name=None):
         return
     return reporter.queued_jobs(queue, user, run_name)
 
+def get_jobs_by_run(queue, run_name):
+    reporter = ResultsReporter()
+    if not reporter.base_uri:
+        return
+    return reporter.queued_jobs(queue, None, run_name)
+
 
-def pause_queue(queue, paused, paused_by, pause_duration=None):
+def pause_queue(queue, paused_by, pause_duration=None):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    reporter.update_queue(queue, paused, paused_by, pause_duration)
+    reporter.update_queue(queue, paused_by, pause_duration)
 
 
 def is_queue_paused(queue):
index 81dd4d548f1a85daad782fce19339d3563ad3409..3a370e86db166b96f48a4d801ec7ef52650dc3e3 100644 (file)
@@ -1,7 +1,7 @@
 import os
 import yaml
 
-import teuthology.beanstalk
+import teuthology.queue.beanstalk
 from teuthology.misc import get_user, merge_configs
 from teuthology import report
 
@@ -115,7 +115,7 @@ def beanstalk_schedule_job(job_config, backend, num=1):
     """
     num = int(num)
     tube = job_config.pop('tube')
-    beanstalk = teuthology.beanstalk.connect()
+    beanstalk = teuthology.queue.beanstalk.connect()
     beanstalk.use(tube)
     queue = report.create_machine_type_queue(job_config['machine_type'])
     job_config['queue'] = queue
index 9a6d0ff564cd31addc26686159adc6fe92f0e83f..16c61618a0032fd85695d80dbc6b890adceedd2c 100644 (file)
@@ -66,7 +66,8 @@ class TestDispatcher(unittest.TestCase):
             '--machine-type': 'test_queue',
             '--supervisor': False,
             '--verbose': False,
-            '--queue-backend': 'paddles'
+            '--queue-backend': 'paddles',
+            '--exit-on-empty-queue': False
         }
 
         m = mock.MagicMock()