From 136b32510d604517f76b4629dbc8a9525c5805f2 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Sun, 21 Mar 2021 18:28:52 -0400 Subject: [PATCH] report, lock.ops: retry write requests to paddles For more contended cases of updating job status and machine keys, where we've seen 500 errors from DB conflicts, use random intervals for the retries. This is the teuthology half of fixing: https://tracker.ceph.com/issues/49864 Signed-off-by: Josh Durgin --- teuthology/lock/ops.py | 73 ++++++++++++++++++++++++------------------ teuthology/report.py | 61 +++++++++++++++++++---------------- 2 files changed, 76 insertions(+), 58 deletions(-) diff --git a/teuthology/lock/ops.py b/teuthology/lock/ops.py index 2becfbff2a..27fb68935e 100644 --- a/teuthology/lock/ops.py +++ b/teuthology/lock/ops.py @@ -1,6 +1,7 @@ import logging import json import os +import random import time import yaml @@ -175,16 +176,19 @@ def unlock_many(names, user): locked_by=user, names=names, ) - response = requests.post( - uri, - data=json.dumps(data), - headers={'content-type': 'application/json'}, - ) - if response.ok: - log.debug("Unlocked: %s", ', '.join(names)) - else: - log.error("Failed to unlock: %s", ', '.join(names)) - return response.ok + with safe_while( + sleep=1, increment=0.5, action=f'unlock_many {names}') as proceed: + while proceed(): + response = requests.post( + uri, + data=json.dumps(data), + headers={'content-type': 'application/json'}, + ) + if response.ok: + log.debug("Unlocked: %s", ', '.join(names)) + return True + log.error("Failed to unlock: %s", ', '.join(names)) + return False def unlock_one(ctx, name, user, description=None): @@ -229,9 +233,15 @@ def update_lock(name, description=None, status=None, ssh_pub_key=None): if updated: uri = os.path.join(config.lock_server, 'nodes', name, '') - response = requests.put( - uri, - json.dumps(updated)) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'update lock {name}') as proceed: + while proceed(): + response = requests.put( + uri, + json.dumps(updated)) + if response.ok: + return True return response.ok return True @@ -248,24 +258,25 @@ def update_inventory(node_dict): return uri = os.path.join(config.lock_server, 'nodes', name, '') log.info("Updating %s on lock server", name) - response = requests.put( - uri, - json.dumps(node_dict), - headers={'content-type': 'application/json'}, - ) - if response.status_code == 404: - log.info("Creating new node %s on lock server", name) - uri = os.path.join(config.lock_server, 'nodes', '') - response = requests.post( - uri, - json.dumps(node_dict), - headers={'content-type': 'application/json'}, - ) - if not response.ok: - log.error("Node update/creation failed for %s: %s", - name, response.text) - return response.ok - + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'update inventory {name}') as proceed: + while proceed(): + response = requests.put( + uri, + json.dumps(node_dict), + headers={'content-type': 'application/json'}, + ) + if response.status_code == 404: + log.info("Creating new node %s on lock server", name) + uri = os.path.join(config.lock_server, 'nodes', '') + response = requests.post( + uri, + json.dumps(node_dict), + headers={'content-type': 'application/json'}, + ) + if response.ok: + return def do_update_keys(machines, all_=False, _raise=True): reference = query.list_locks(keyed_by_name=True) diff --git a/teuthology/report.py b/teuthology/report.py index 7b23e7d8e5..2d06356772 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -4,11 +4,13 @@ import json import re import requests import logging +import random import socket from datetime import datetime import teuthology from teuthology.config import config +from teuthology.contextutil import safe_while from teuthology.job_status import get_status, set_status report_exceptions = (requests.exceptions.RequestException, socket.error) @@ -289,37 +291,42 @@ class ResultsReporter(object): set_status(job_info, 'dead') job_json = json.dumps(job_info) headers = {'content-type': 'application/json'} - response = self.session.post(run_uri, data=job_json, headers=headers) - if response.status_code == 200: - return job_id + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'report job {job_id}') as proceed: + while proceed(): + response = self.session.post(run_uri, data=job_json, headers=headers) - # This call is wrapped in a try/except because of: - # http://tracker.ceph.com/issues/8166 - try: - resp_json = response.json() - except ValueError: - resp_json = dict() + if response.status_code == 200: + return - if resp_json: - msg = resp_json.get('message', '') - else: - msg = response.text - - if msg and msg.endswith('already exists'): - job_uri = os.path.join(run_uri, job_id, '') - response = self.session.put(job_uri, data=job_json, - headers=headers) - elif msg: - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=run_uri, - status=response.status_code, - msg=msg, - )) - response.raise_for_status() + # This call is wrapped in a try/except because of: + # http://tracker.ceph.com/issues/8166 + try: + resp_json = response.json() + except ValueError: + resp_json = dict() - return job_id + if resp_json: + msg = resp_json.get('message', '') + else: + msg = response.text + + if msg and msg.endswith('already exists'): + job_uri = os.path.join(run_uri, job_id, '') + response = self.session.put(job_uri, data=job_json, + headers=headers) + if response.status_code == 200: + return + elif msg: + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=run_uri, + status=response.status_code, + msg=msg, + )) + response.raise_for_status() @property def last_run(self): -- 2.39.5