]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Add retry for paddles calls and modify pause queue command
authorAishwarya Mathuria <amathuri@redhat.com>
Wed, 9 Jun 2021 13:36:37 +0000 (19:06 +0530)
committerAishwarya Mathuria <amathuri@redhat.com>
Fri, 3 Jun 2022 14:35:24 +0000 (20:05 +0530)
1. Add retry loop for the paddles calls.
2. Add run name as a parameter for updating priority of jobs in paddles.
3. Modify the pause queue command to run on server side with an optional pause duration parameter.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
scripts/queue.py
teuthology/dispatcher/__init__.py
teuthology/paddles_queue.py
teuthology/report.py

index 285a0adac9599df848ad71ec575c16486ee9cba1..a07598a92ff81fbcb82eefd974c339c13a94b493 100644 (file)
@@ -6,10 +6,12 @@ import teuthology.paddles_queue
 doc = """
 usage: teuthology-queue -h
        teuthology-queue -s -m MACHINE_TYPE
-       teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER
-       teuthology-queue [-r] -m MACHINE_TYPE -u USER
-       teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER
-       teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER
+       teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER
+       teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
+       teuthology-queue [-r] -m MACHINE_TYPE -U USER
+       teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER
+       teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
+       teuthology-queue -u -m MACHINE_TYPE -U USER
 
 List Jobs in queue.
 If -D is passed, then jobs with PATTERN in the job name are deleted from the
@@ -26,12 +28,16 @@ optional arguments:
   -r, --runs            Only show run names
   -f, --full            Print the entire job config. Use with caution.
   -s, --status          Prints the status of the queue
-  -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
-                        will unpause. If -m is passed, pause that queue,
+  -t, --time SECONDS    Pause queues for a number of seconds.
+                        If -m is passed, pause that queue,
                         otherwise pause all queues.
+  -p, --pause           Pause queue
+  -u, --unpause         Unpause queue
   -P, --priority PRIORITY
                         Change priority of queued jobs
-  -u, --user USER       User who owns the jobs
+  -U, --user USER       User who owns the jobs
+  -R, --run_name RUN_NAME
+                        Used to change priority of all jobs in the run.
 """
 
 
index e079f6bed40239e0894aadcc5a2063a0028a31f9..603ec1ce725a349b196cd30aaa578d889d88aa7a 100644 (file)
@@ -5,6 +5,7 @@ import sys
 import yaml
 
 from datetime import datetime
+from time import sleep
 
 from teuthology import setup_log_file, install_except_hook
 from teuthology import report
@@ -197,3 +198,18 @@ def create_job_archive(job_name, job_archive_path, archive_dir):
     if not os.path.exists(run_archive):
         safepath.makedirs('/', run_archive)
     safepath.makedirs('/', job_archive_path)
+
+
+def pause_queue(machine_type, paused, paused_by, pause_duration=None):
+    if paused == True:
+        report.pause_queue(machine_type, paused, paused_by, pause_duration)
+        '''
+        If there is a pause duration specified
+        un-pause the queue after the time elapses
+        '''
+        if pause_duration is not None:
+            sleep(int(pause_duration))
+            paused = False
+            report.pause_queue(machine_type, paused, paused_by)
+    elif paused == False:
+        report.pause_queue(machine_type, paused, paused_by)
index 87c96d8cb6ba00b68f0a72f1931f28ed0d40ea22..99cfe77e6e005c857a951369ac831de9754c05dc 100644 (file)
@@ -4,6 +4,7 @@ import sys
 from collections import OrderedDict
 
 from teuthology import report
+from teuthology.dispatcher import pause_queue
 
 
 log = logging.getLogger(__name__)
@@ -43,7 +44,6 @@ def walk_jobs(connection, tube_name, processor, pattern=None):
         return
 
 def stats_queue(machine_type):
-    stats = report.get_queue_stats(machine_type)
     stats = report.get_queue_stats(machine_type)
     if stats['paused'] is None:
         log.info("%s queue is currently running with %s jobs queued",
@@ -55,17 +55,16 @@ def stats_queue(machine_type):
                  stats['count'])
 
 
-def update_priority(machine_type, priority, user):
-    jobs = report.get_user_jobs_queue(machine_type, user)
+def update_priority(machine_type, priority, user, run_name=None):
+    if run_name is not None:
+        jobs = report.get_user_jobs_queue(machine_type, user, run_name)
+    else:
+        jobs = report.get_user_jobs_queue(machine_type, user)
     for job in jobs:
         job['priority'] = priority
         report.try_push_job_info(job)
 
 
-def pause_queue(machine_type, pause_duration, paused_by):
-    report.pause_queue(machine_type, paused_by, pause_duration)
-
-
 def print_progress(index, total, message=None):
     msg = "{m} ".format(m=message) if message else ''
     sys.stderr.write("{msg}{i}/{total}\r".format(
@@ -182,20 +181,29 @@ class JobDeleter(JobProcessor):
 def main(args):
     machine_type = args['--machine_type']
     user = args['--user']
+    run_name = args['--run_name']
     priority = args['--priority']
     status = args['--status']
     delete = args['--delete']
     runs = args['--runs']
     show_desc = args['--description']
     full = args['--full']
-    pause_duration = args['--pause']
+    pause = args['--pause']
+    unpause = args['--unpause']
+    pause_duration = args['--time']
     try:
         if status:
             stats_queue(machine_type)
-        elif pause_duration:
-            pause_queue(machine_type, pause_duration, user)
+        elif pause:
+            if pause_duration:
+                pause_queue(machine_type, pause, user, pause_duration)
+            else:
+                pause_queue(machine_type, pause, user)
+        elif unpause:
+            pause = False
+            pause_queue(machine_type, pause, user)
         elif priority:
-            update_priority(machine_type, priority, user)
+            update_priority(machine_type, priority, user, run_name)
         elif delete:
             walk_jobs(machine_type,
                       JobDeleter(delete), user)
index a28fcffb7ce2ff079f0b79174a556459e399048a..5d8cc9c3c87af35b698493a133415974fb73fff6 100644 (file)
@@ -6,7 +6,6 @@ import requests
 import logging
 import random
 import socket
-import threading
 from datetime import datetime
 
 import teuthology
@@ -277,20 +276,25 @@ class ResultsReporter(object):
         )
         job_json = json.dumps(job_info)
         headers = {'content-type': 'application/json'}
-        response = self.session.post(run_uri, data=job_json, headers=headers)
 
-        if response.status_code == 200:
-            resp_json = response.json()
-            job_id = resp_json['job_id']
-            return job_id
-        else:
-            msg = response.text
-            self.log.error(
-                "POST to {uri} failed with status {status}: {msg}".format(
-                    uri=run_uri,
-                    status=response.status_code,
-                    msg=msg,
-                ))
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'write job for {run_name}') as proceed:
+            while proceed():
+                response = self.session.post(run_uri, data=job_json, headers=headers)
+
+                if response.status_code == 200:
+                    resp_json = response.json()
+                    job_id = resp_json['job_id']
+                    return job_id
+                else:
+                    msg = response.text
+                    self.log.error(
+                        "POST to {uri} failed with status {status}: {msg}".format(
+                            uri=run_uri,
+                            status=response.status_code,
+                            msg=msg,
+                        ))
         
         response.raise_for_status()
         return None
@@ -385,9 +389,15 @@ class ResultsReporter(object):
 
         uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri,
                                                                           machine_type=machine_type)
-        response = self.session.get(uri)
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed:
+            while proceed():
+                response = self.session.get(uri)
+                if response.status_code == 200:
+                    return response.json()
         response.raise_for_status()
-        return response.json()
+
 
     def get_jobs(self, run_name, job_id=None, fields=None):
         """
@@ -507,30 +517,36 @@ class ResultsReporter(object):
         queue_info = {'machine_type': machine_type}
         queue_json = json.dumps(queue_info)
         headers = {'content-type': 'application/json'}
-        response = self.session.post(uri, data=queue_json, headers=headers)
 
-        if response.status_code == 200:
-            self.log.info("Successfully created queue for {machine_type}".format(
-                machine_type=machine_type,
-            ))
-        else:
-            resp_json = response.json()
-            if resp_json:
-                msg = resp_json.get('message', '')
-            else:
-                msg = response.text
-            if msg and msg.endswith('already exists'):
-                return
-            self.log.error(
-                "POST to {uri} failed with status {status}: {msg}".format(
-                    uri=uri,
-                    status=response.status_code,
-                    msg=msg,
-                ))
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed:
+            while proceed():
+                response = self.session.post(uri, data=queue_json, headers=headers)
+
+                if response.status_code == 200:
+                    self.log.info("Successfully created queue for {machine_type}".format(
+                        machine_type=machine_type,
+                    ))
+                    return
+                else:
+                    resp_json = response.json()
+                    if resp_json:
+                        msg = resp_json.get('message', '')
+                    else:
+                        msg = response.text
+                    if msg and msg.endswith('already exists'):
+                        return
+                    self.log.error(
+                        "POST to {uri} failed with status {status}: {msg}".format(
+                            uri=uri,
+                            status=response.status_code,
+                            msg=msg,
+                        ))
 
         response.raise_for_status()
 
-    def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None):
+    def update_queue(self, machine_type, paused, paused_by, pause_duration=None):
         uri = "{base}/queue/".format(
             base=self.base_uri
         )
@@ -540,20 +556,26 @@ class ResultsReporter(object):
                       'pause_duration': pause_duration}
         queue_json = json.dumps(queue_info)
         headers = {'content-type': 'application/json'}
-        response = self.session.put(uri, data=queue_json, headers=headers)
 
-        if response.status_code == 200:
-            self.log.info("Successfully updated queue for {machine_type}".format(
-                machine_type=machine_type,
-            ))
-        else:
-            msg = response.text
-            self.log.error(
-                "PUT to {uri} failed with status {status}: {msg}".format(
-                    uri=uri,
-                    status=response.status_code,
-                    msg=msg,
-                ))
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed:
+            while proceed():
+                response = self.session.put(uri, data=queue_json, headers=headers)
+
+                if response.status_code == 200:
+                    self.log.info("Successfully updated queue for {machine_type}".format(
+                        machine_type=machine_type,
+                    ))
+                    return
+                else:
+                    msg = response.text
+                    self.log.error(
+                        "PUT to {uri} failed with status {status}: {msg}".format(
+                            uri=uri,
+                            status=response.status_code,
+                            msg=msg,
+                        ))
 
         response.raise_for_status()
     
@@ -566,46 +588,60 @@ class ResultsReporter(object):
         queue_json = json.dumps(queue_info)
 
         headers = {'content-type': 'application/json'}
-        response = self.session.post(uri, data=queue_json, headers=headers)
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed:
+            while proceed():
+                response = self.session.post(uri, data=queue_json, headers=headers)
 
-        if response.status_code == 200:
-            self.log.info("Successfully retrieved stats for queue {machine_type}".format(
-                machine_type=machine_type,
-            ))
-            return response.json()
-        else:
-            msg = response.text
-            self.log.error(
-                "POST to {uri} failed with status {status}: {msg}".format(
-                    uri=uri,
-                    status=response.status_code,
-                    msg=msg,
-                ))
+                if response.status_code == 200:
+                    self.log.info("Successfully retrieved stats for queue {machine_type}".format(
+                        machine_type=machine_type,
+                    ))
+                    return response.json()
+                else:
+                    msg = response.text
+                    self.log.error(
+                        "POST to {uri} failed with status {status}: {msg}".format(
+                            uri=uri,
+                            status=response.status_code,
+                            msg=msg,
+                        ))
         response.raise_for_status()
     
-    def queued_jobs(self, machine_type, user):
+    def queued_jobs(self, machine_type, user, run_name):
         uri = "{base}/queue/queued_jobs/".format(
             base=self.base_uri
         )
-        request_info = {'machine_type': machine_type,
-                        'user': user}
+        if run_name is not None:
+            filter_field = run_name
+            request_info = {'machine_type': machine_type,
+                            'run_name': run_name}
+        else:
+            filter_field = user
+            request_info = {'machine_type': machine_type,
+                            'user': user}
         request_json = json.dumps(request_info)
         headers = {'content-type': 'application/json'}
-        response = self.session.post(uri, data=request_json, headers=headers)
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'get queued jobs {filter_field}') as proceed:
+            while proceed():
+                response = self.session.post(uri, data=request_json, headers=headers)
 
-        if response.status_code == 200:
-            self.log.info("Successfully retrieved jobs for user {user}".format(
-                user=user,
-            ))
-            return response.json()
-        else:
-            msg = response.text
-            self.log.error(
-                "POST to {uri} failed with status {status}: {msg}".format(
-                    uri=uri,
-                    status=response.status_code,
-                    msg=msg,
-                ))
+                if response.status_code == 200:
+                    self.log.info("Successfully retrieved jobs for {filter_field}".format(
+                        filter_field=filter_field,
+                    ))
+                    return response.json()
+                else:
+                    msg = response.text
+                    self.log.error(
+                        "POST to {uri} failed with status {status}: {msg}".format(
+                            uri=uri,
+                            status=response.status_code,
+                            msg=msg,
+                        ))
         response.raise_for_status()
 
 
@@ -616,24 +652,18 @@ def create_machine_type_queue(machine_type):
     reporter.create_queue(machine_type)
 
 
-def get_user_jobs_queue(machine_type, user):
+def get_user_jobs_queue(machine_type, user, run_name=None):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    return reporter.queued_jobs(machine_type, user)
+    return reporter.queued_jobs(machine_type, user, run_name)
 
 
-def pause_queue(machine_type, paused_by, pause_duration):
+def pause_queue(machine_type, paused, paused_by, pause_duration=None):
     reporter = ResultsReporter()
     if not reporter.base_uri:
         return
-    paused = True
     reporter.update_queue(machine_type, paused, paused_by, pause_duration)
-    paused = False
-    timer = threading.Timer(int(pause_duration), reporter.update_queue, [machine_type, paused, paused_by])
-    timer.daemon = True
-    timer.start()
-    timer.join()
 
 
 def is_queue_paused(machine_type):