doc = """
usage: teuthology-queue -h
teuthology-queue -s -m MACHINE_TYPE
- teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER
- teuthology-queue [-r] -m MACHINE_TYPE -u USER
- teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER
- teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER
+ teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER
+ teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
+ teuthology-queue [-r] -m MACHINE_TYPE -U USER
+ teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER
+ teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
+ teuthology-queue -u -m MACHINE_TYPE -U USER
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
-r, --runs Only show run names
-f, --full Print the entire job config. Use with caution.
-s, --status Prints the status of the queue
- -p, --pause SECONDS Pause queues for a number of seconds. A value of 0
- will unpause. If -m is passed, pause that queue,
+ -t, --time SECONDS Pause queues for a number of seconds.
+ If -m is passed, pause that queue,
otherwise pause all queues.
+ -p, --pause Pause queue
+ -u, --unpause Unpause queue
-P, --priority PRIORITY
Change priority of queued jobs
- -u, --user USER User who owns the jobs
+ -U, --user USER User who owns the jobs
+ -R, --run_name RUN_NAME
+ Used to change priority of all jobs in the run.
"""
from collections import OrderedDict
from teuthology import report
+from teuthology.dispatcher import pause_queue
log = logging.getLogger(__name__)
return
def stats_queue(machine_type):
- stats = report.get_queue_stats(machine_type)
stats = report.get_queue_stats(machine_type)
if stats['paused'] is None:
log.info("%s queue is currently running with %s jobs queued",
stats['count'])
-def update_priority(machine_type, priority, user):
- jobs = report.get_user_jobs_queue(machine_type, user)
+def update_priority(machine_type, priority, user, run_name=None):
+ if run_name is not None:
+ jobs = report.get_user_jobs_queue(machine_type, user, run_name)
+ else:
+ jobs = report.get_user_jobs_queue(machine_type, user)
for job in jobs:
job['priority'] = priority
report.try_push_job_info(job)
-def pause_queue(machine_type, pause_duration, paused_by):
- report.pause_queue(machine_type, paused_by, pause_duration)
-
-
def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
def main(args):
machine_type = args['--machine_type']
user = args['--user']
+ run_name = args['--run_name']
priority = args['--priority']
status = args['--status']
delete = args['--delete']
runs = args['--runs']
show_desc = args['--description']
full = args['--full']
- pause_duration = args['--pause']
+ pause = args['--pause']
+ unpause = args['--unpause']
+ pause_duration = args['--time']
try:
if status:
stats_queue(machine_type)
- elif pause_duration:
- pause_queue(machine_type, pause_duration, user)
+ elif pause:
+ if pause_duration:
+ pause_queue(machine_type, pause, user, pause_duration)
+ else:
+ pause_queue(machine_type, pause, user)
+ elif unpause:
+ pause = False
+ pause_queue(machine_type, pause, user)
elif priority:
- update_priority(machine_type, priority, user)
+ update_priority(machine_type, priority, user, run_name)
elif delete:
walk_jobs(machine_type,
JobDeleter(delete), user)
import logging
import random
import socket
-import threading
from datetime import datetime
import teuthology
)
job_json = json.dumps(job_info)
headers = {'content-type': 'application/json'}
- response = self.session.post(run_uri, data=job_json, headers=headers)
- if response.status_code == 200:
- resp_json = response.json()
- job_id = resp_json['job_id']
- return job_id
- else:
- msg = response.text
- self.log.error(
- "POST to {uri} failed with status {status}: {msg}".format(
- uri=run_uri,
- status=response.status_code,
- msg=msg,
- ))
+ inc = random.uniform(0, 1)
+ with safe_while(
+ sleep=1, increment=inc, action=f'write job for {run_name}') as proceed:
+ while proceed():
+ response = self.session.post(run_uri, data=job_json, headers=headers)
+
+ if response.status_code == 200:
+ resp_json = response.json()
+ job_id = resp_json['job_id']
+ return job_id
+ else:
+ msg = response.text
+ self.log.error(
+ "POST to {uri} failed with status {status}: {msg}".format(
+ uri=run_uri,
+ status=response.status_code,
+ msg=msg,
+ ))
response.raise_for_status()
return None
uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri,
machine_type=machine_type)
- response = self.session.get(uri)
+ inc = random.uniform(0, 1)
+ with safe_while(
+ sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed:
+ while proceed():
+ response = self.session.get(uri)
+ if response.status_code == 200:
+ return response.json()
response.raise_for_status()
- return response.json()
+
def get_jobs(self, run_name, job_id=None, fields=None):
"""
queue_info = {'machine_type': machine_type}
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
- response = self.session.post(uri, data=queue_json, headers=headers)
- if response.status_code == 200:
- self.log.info("Successfully created queue for {machine_type}".format(
- machine_type=machine_type,
- ))
- else:
- resp_json = response.json()
- if resp_json:
- msg = resp_json.get('message', '')
- else:
- msg = response.text
- if msg and msg.endswith('already exists'):
- return
- self.log.error(
- "POST to {uri} failed with status {status}: {msg}".format(
- uri=uri,
- status=response.status_code,
- msg=msg,
- ))
+ inc = random.uniform(0, 1)
+ with safe_while(
+ sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed:
+ while proceed():
+ response = self.session.post(uri, data=queue_json, headers=headers)
+
+ if response.status_code == 200:
+ self.log.info("Successfully created queue for {machine_type}".format(
+ machine_type=machine_type,
+ ))
+ return
+ else:
+ resp_json = response.json()
+ if resp_json:
+ msg = resp_json.get('message', '')
+ else:
+ msg = response.text
+ if msg and msg.endswith('already exists'):
+ return
+ self.log.error(
+ "POST to {uri} failed with status {status}: {msg}".format(
+ uri=uri,
+ status=response.status_code,
+ msg=msg,
+ ))
response.raise_for_status()
- def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None):
+ def update_queue(self, machine_type, paused, paused_by, pause_duration=None):
uri = "{base}/queue/".format(
base=self.base_uri
)
'pause_duration': pause_duration}
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
- response = self.session.put(uri, data=queue_json, headers=headers)
- if response.status_code == 200:
- self.log.info("Successfully updated queue for {machine_type}".format(
- machine_type=machine_type,
- ))
- else:
- msg = response.text
- self.log.error(
- "PUT to {uri} failed with status {status}: {msg}".format(
- uri=uri,
- status=response.status_code,
- msg=msg,
- ))
+ inc = random.uniform(0, 1)
+ with safe_while(
+ sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed:
+ while proceed():
+ response = self.session.put(uri, data=queue_json, headers=headers)
+
+ if response.status_code == 200:
+ self.log.info("Successfully updated queue for {machine_type}".format(
+ machine_type=machine_type,
+ ))
+ return
+ else:
+ msg = response.text
+ self.log.error(
+ "PUT to {uri} failed with status {status}: {msg}".format(
+ uri=uri,
+ status=response.status_code,
+ msg=msg,
+ ))
response.raise_for_status()
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
- response = self.session.post(uri, data=queue_json, headers=headers)
+ inc = random.uniform(0, 1)
+ with safe_while(
+ sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed:
+ while proceed():
+ response = self.session.post(uri, data=queue_json, headers=headers)
- if response.status_code == 200:
- self.log.info("Successfully retrieved stats for queue {machine_type}".format(
- machine_type=machine_type,
- ))
- return response.json()
- else:
- msg = response.text
- self.log.error(
- "POST to {uri} failed with status {status}: {msg}".format(
- uri=uri,
- status=response.status_code,
- msg=msg,
- ))
+ if response.status_code == 200:
+ self.log.info("Successfully retrieved stats for queue {machine_type}".format(
+ machine_type=machine_type,
+ ))
+ return response.json()
+ else:
+ msg = response.text
+ self.log.error(
+ "POST to {uri} failed with status {status}: {msg}".format(
+ uri=uri,
+ status=response.status_code,
+ msg=msg,
+ ))
response.raise_for_status()
- def queued_jobs(self, machine_type, user):
+ def queued_jobs(self, machine_type, user, run_name):
uri = "{base}/queue/queued_jobs/".format(
base=self.base_uri
)
- request_info = {'machine_type': machine_type,
- 'user': user}
+ if run_name is not None:
+ filter_field = run_name
+ request_info = {'machine_type': machine_type,
+ 'run_name': run_name}
+ else:
+ filter_field = user
+ request_info = {'machine_type': machine_type,
+ 'user': user}
request_json = json.dumps(request_info)
headers = {'content-type': 'application/json'}
- response = self.session.post(uri, data=request_json, headers=headers)
+ inc = random.uniform(0, 1)
+ with safe_while(
+ sleep=1, increment=inc, action=f'get queued jobs {filter_field}') as proceed:
+ while proceed():
+ response = self.session.post(uri, data=request_json, headers=headers)
- if response.status_code == 200:
- self.log.info("Successfully retrieved jobs for user {user}".format(
- user=user,
- ))
- return response.json()
- else:
- msg = response.text
- self.log.error(
- "POST to {uri} failed with status {status}: {msg}".format(
- uri=uri,
- status=response.status_code,
- msg=msg,
- ))
+ if response.status_code == 200:
+ self.log.info("Successfully retrieved jobs for {filter_field}".format(
+ filter_field=filter_field,
+ ))
+ return response.json()
+ else:
+ msg = response.text
+ self.log.error(
+ "POST to {uri} failed with status {status}: {msg}".format(
+ uri=uri,
+ status=response.status_code,
+ msg=msg,
+ ))
response.raise_for_status()
reporter.create_queue(machine_type)
-def get_user_jobs_queue(machine_type, user):
+def get_user_jobs_queue(machine_type, user, run_name=None):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- return reporter.queued_jobs(machine_type, user)
+ return reporter.queued_jobs(machine_type, user, run_name)
-def pause_queue(machine_type, paused_by, pause_duration):
+def pause_queue(machine_type, paused, paused_by, pause_duration=None):
reporter = ResultsReporter()
if not reporter.base_uri:
return
- paused = True
reporter.update_queue(machine_type, paused, paused_by, pause_duration)
- paused = False
- timer = threading.Timer(int(pause_duration), reporter.update_queue, [machine_type, paused, paused_by])
- timer.daemon = True
- timer.start()
- timer.join()
def is_queue_paused(machine_type):