From 513d0ebfc1032a33f1462e54da8dabb45cfe294b Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 9 Jun 2021 19:06:37 +0530 Subject: [PATCH] Add retry for paddles calls and modify pause queue command 1. Add retry loop for the paddles calls. 2. Add run name as a parameter for updating priority of jobs in paddles. 3. Modify the pause queue command to run on server side with an optional pause duration parameter. Signed-off-by: Aishwarya Mathuria --- scripts/queue.py | 20 ++- teuthology/dispatcher/__init__.py | 16 +++ teuthology/paddles_queue.py | 30 +++-- teuthology/report.py | 208 +++++++++++++++++------------- 4 files changed, 167 insertions(+), 107 deletions(-) diff --git a/scripts/queue.py b/scripts/queue.py index 285a0adac9..a07598a92f 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -6,10 +6,12 @@ import teuthology.paddles_queue doc = """ usage: teuthology-queue -h teuthology-queue -s -m MACHINE_TYPE - teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER - teuthology-queue [-r] -m MACHINE_TYPE -u USER - teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER - teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER + teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] + teuthology-queue [-r] -m MACHINE_TYPE -U USER + teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER + teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER + teuthology-queue -u -m MACHINE_TYPE -U USER List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -26,12 +28,16 @@ optional arguments: -r, --runs Only show run names -f, --full Print the entire job config. Use with caution. -s, --status Prints the status of the queue - -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 - will unpause. If -m is passed, pause that queue, + -t, --time SECONDS Pause queues for a number of seconds. + If -m is passed, pause that queue, otherwise pause all queues. + -p, --pause Pause queue + -u, --unpause Unpause queue -P, --priority PRIORITY Change priority of queued jobs - -u, --user USER User who owns the jobs + -U, --user USER User who owns the jobs + -R, --run_name RUN_NAME + Used to change priority of all jobs in the run. """ diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index e079f6bed4..603ec1ce72 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -5,6 +5,7 @@ import sys import yaml from datetime import datetime +from time import sleep from teuthology import setup_log_file, install_except_hook from teuthology import report @@ -197,3 +198,18 @@ def create_job_archive(job_name, job_archive_path, archive_dir): if not os.path.exists(run_archive): safepath.makedirs('/', run_archive) safepath.makedirs('/', job_archive_path) + + +def pause_queue(machine_type, paused, paused_by, pause_duration=None): + if paused == True: + report.pause_queue(machine_type, paused, paused_by, pause_duration) + ''' + If there is a pause duration specified + un-pause the queue after the time elapses + ''' + if pause_duration is not None: + sleep(int(pause_duration)) + paused = False + report.pause_queue(machine_type, paused, paused_by) + elif paused == False: + report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py index 87c96d8cb6..99cfe77e6e 100644 --- a/teuthology/paddles_queue.py +++ b/teuthology/paddles_queue.py @@ -4,6 +4,7 @@ import sys from collections import OrderedDict from teuthology import report +from teuthology.dispatcher import pause_queue log = logging.getLogger(__name__) @@ -43,7 +44,6 @@ def walk_jobs(connection, tube_name, processor, pattern=None): return def stats_queue(machine_type): - stats = report.get_queue_stats(machine_type) stats = report.get_queue_stats(machine_type) if stats['paused'] is None: log.info("%s queue is currently running with %s jobs queued", @@ -55,17 +55,16 @@ def stats_queue(machine_type): stats['count']) -def update_priority(machine_type, priority, user): - jobs = report.get_user_jobs_queue(machine_type, user) +def update_priority(machine_type, priority, user, run_name=None): + if run_name is not None: + jobs = report.get_user_jobs_queue(machine_type, user, run_name) + else: + jobs = report.get_user_jobs_queue(machine_type, user) for job in jobs: job['priority'] = priority report.try_push_job_info(job) -def pause_queue(machine_type, pause_duration, paused_by): - report.pause_queue(machine_type, paused_by, pause_duration) - - def print_progress(index, total, message=None): msg = "{m} ".format(m=message) if message else '' sys.stderr.write("{msg}{i}/{total}\r".format( @@ -182,20 +181,29 @@ class JobDeleter(JobProcessor): def main(args): machine_type = args['--machine_type'] user = args['--user'] + run_name = args['--run_name'] priority = args['--priority'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] show_desc = args['--description'] full = args['--full'] - pause_duration = args['--pause'] + pause = args['--pause'] + unpause = args['--unpause'] + pause_duration = args['--time'] try: if status: stats_queue(machine_type) - elif pause_duration: - pause_queue(machine_type, pause_duration, user) + elif pause: + if pause_duration: + pause_queue(machine_type, pause, user, pause_duration) + else: + pause_queue(machine_type, pause, user) + elif unpause: + pause = False + pause_queue(machine_type, pause, user) elif priority: - update_priority(machine_type, priority, user) + update_priority(machine_type, priority, user, run_name) elif delete: walk_jobs(machine_type, JobDeleter(delete), user) diff --git a/teuthology/report.py b/teuthology/report.py index a28fcffb7c..5d8cc9c3c8 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -6,7 +6,6 @@ import requests import logging import random import socket -import threading from datetime import datetime import teuthology @@ -277,20 +276,25 @@ class ResultsReporter(object): ) job_json = json.dumps(job_info) headers = {'content-type': 'application/json'} - response = self.session.post(run_uri, data=job_json, headers=headers) - if response.status_code == 200: - resp_json = response.json() - job_id = resp_json['job_id'] - return job_id - else: - msg = response.text - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=run_uri, - status=response.status_code, - msg=msg, - )) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'write job for {run_name}') as proceed: + while proceed(): + response = self.session.post(run_uri, data=job_json, headers=headers) + + if response.status_code == 200: + resp_json = response.json() + job_id = resp_json['job_id'] + return job_id + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=run_uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() return None @@ -385,9 +389,15 @@ class ResultsReporter(object): uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri, machine_type=machine_type) - response = self.session.get(uri) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed: + while proceed(): + response = self.session.get(uri) + if response.status_code == 200: + return response.json() response.raise_for_status() - return response.json() + def get_jobs(self, run_name, job_id=None, fields=None): """ @@ -507,30 +517,36 @@ class ResultsReporter(object): queue_info = {'machine_type': machine_type} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} - response = self.session.post(uri, data=queue_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully created queue for {machine_type}".format( - machine_type=machine_type, - )) - else: - resp_json = response.json() - if resp_json: - msg = resp_json.get('message', '') - else: - msg = response.text - if msg and msg.endswith('already exists'): - return - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed: + while proceed(): + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully created queue for {machine_type}".format( + machine_type=machine_type, + )) + return + else: + resp_json = response.json() + if resp_json: + msg = resp_json.get('message', '') + else: + msg = response.text + if msg and msg.endswith('already exists'): + return + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() - def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None): + def update_queue(self, machine_type, paused, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) @@ -540,20 +556,26 @@ class ResultsReporter(object): 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} - response = self.session.put(uri, data=queue_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully updated queue for {machine_type}".format( - machine_type=machine_type, - )) - else: - msg = response.text - self.log.error( - "PUT to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed: + while proceed(): + response = self.session.put(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully updated queue for {machine_type}".format( + machine_type=machine_type, + )) + return + else: + msg = response.text + self.log.error( + "PUT to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() @@ -566,46 +588,60 @@ class ResultsReporter(object): queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} - response = self.session.post(uri, data=queue_json, headers=headers) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed: + while proceed(): + response = self.session.post(uri, data=queue_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {machine_type}".format( - machine_type=machine_type, - )) - return response.json() - else: - msg = response.text - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + if response.status_code == 200: + self.log.info("Successfully retrieved stats for queue {machine_type}".format( + machine_type=machine_type, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() - def queued_jobs(self, machine_type, user): + def queued_jobs(self, machine_type, user, run_name): uri = "{base}/queue/queued_jobs/".format( base=self.base_uri ) - request_info = {'machine_type': machine_type, - 'user': user} + if run_name is not None: + filter_field = run_name + request_info = {'machine_type': machine_type, + 'run_name': run_name} + else: + filter_field = user + request_info = {'machine_type': machine_type, + 'user': user} request_json = json.dumps(request_info) headers = {'content-type': 'application/json'} - response = self.session.post(uri, data=request_json, headers=headers) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get queued jobs {filter_field}') as proceed: + while proceed(): + response = self.session.post(uri, data=request_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully retrieved jobs for user {user}".format( - user=user, - )) - return response.json() - else: - msg = response.text - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + if response.status_code == 200: + self.log.info("Successfully retrieved jobs for {filter_field}".format( + filter_field=filter_field, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() @@ -616,24 +652,18 @@ def create_machine_type_queue(machine_type): reporter.create_queue(machine_type) -def get_user_jobs_queue(machine_type, user): +def get_user_jobs_queue(machine_type, user, run_name=None): reporter = ResultsReporter() if not reporter.base_uri: return - return reporter.queued_jobs(machine_type, user) + return reporter.queued_jobs(machine_type, user, run_name) -def pause_queue(machine_type, paused_by, pause_duration): +def pause_queue(machine_type, paused, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - paused = True reporter.update_queue(machine_type, paused, paused_by, pause_duration) - paused = False - timer = threading.Timer(int(pause_duration), reporter.update_queue, [machine_type, paused, paused_by]) - timer.daemon = True - timer.start() - timer.join() def is_queue_paused(machine_type): -- 2.39.5