]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
report, lock.ops: retry write requests to paddles 1633/head
authorJosh Durgin <jdurgin@redhat.com>
Sun, 21 Mar 2021 22:28:52 +0000 (18:28 -0400)
committerJosh Durgin <jdurgin@redhat.com>
Sun, 21 Mar 2021 22:28:57 +0000 (18:28 -0400)
For more contended cases of updating job status and machine keys,
where we've seen 500 errors from DB conflicts, use random intervals
for the retries.

This is the teuthology half of fixing:
https://tracker.ceph.com/issues/49864

Signed-off-by: Josh Durgin <jdurgin@redhat.com>
teuthology/lock/ops.py
teuthology/report.py

index 2becfbff2a3e8b6b7992b11d5328aaf633bf3006..27fb68935e1600797ae0dba1958faded190cd87e 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 import json
 import os
+import random
 import time
 import yaml
 
@@ -175,16 +176,19 @@ def unlock_many(names, user):
         locked_by=user,
         names=names,
     )
-    response = requests.post(
-        uri,
-        data=json.dumps(data),
-        headers={'content-type': 'application/json'},
-    )
-    if response.ok:
-        log.debug("Unlocked: %s", ', '.join(names))
-    else:
-        log.error("Failed to unlock: %s", ', '.join(names))
-    return response.ok
+    with safe_while(
+            sleep=1, increment=0.5, action=f'unlock_many {names}') as proceed:
+        while proceed():
+            response = requests.post(
+                uri,
+                data=json.dumps(data),
+                headers={'content-type': 'application/json'},
+            )
+            if response.ok:
+                log.debug("Unlocked: %s", ', '.join(names))
+                return True
+    log.error("Failed to unlock: %s", ', '.join(names))
+    return False
 
 
 def unlock_one(ctx, name, user, description=None):
@@ -229,9 +233,15 @@ def update_lock(name, description=None, status=None, ssh_pub_key=None):
 
     if updated:
         uri = os.path.join(config.lock_server, 'nodes', name, '')
-        response = requests.put(
-            uri,
-            json.dumps(updated))
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'update lock {name}') as proceed:
+            while proceed():
+                response = requests.put(
+                    uri,
+                    json.dumps(updated))
+                if response.ok:
+                    return True
         return response.ok
     return True
 
@@ -248,24 +258,25 @@ def update_inventory(node_dict):
         return
     uri = os.path.join(config.lock_server, 'nodes', name, '')
     log.info("Updating %s on lock server", name)
-    response = requests.put(
-        uri,
-        json.dumps(node_dict),
-        headers={'content-type': 'application/json'},
-        )
-    if response.status_code == 404:
-        log.info("Creating new node %s on lock server", name)
-        uri = os.path.join(config.lock_server, 'nodes', '')
-        response = requests.post(
-            uri,
-            json.dumps(node_dict),
-            headers={'content-type': 'application/json'},
-        )
-    if not response.ok:
-        log.error("Node update/creation failed for %s: %s",
-                  name, response.text)
-    return response.ok
-
+    inc = random.uniform(0, 1)
+    with safe_while(
+            sleep=1, increment=inc, action=f'update inventory {name}') as proceed:
+        while proceed():
+            response = requests.put(
+                uri,
+                json.dumps(node_dict),
+                headers={'content-type': 'application/json'},
+            )
+            if response.status_code == 404:
+                log.info("Creating new node %s on lock server", name)
+                uri = os.path.join(config.lock_server, 'nodes', '')
+                response = requests.post(
+                    uri,
+                    json.dumps(node_dict),
+                    headers={'content-type': 'application/json'},
+                )
+            if response.ok:
+                return
 
 def do_update_keys(machines, all_=False, _raise=True):
     reference = query.list_locks(keyed_by_name=True)
index 7b23e7d8e5afc6e548539e274af3e60841d2630d..2d063567728fecb984f65ae4adadb1f888f42ab5 100644 (file)
@@ -4,11 +4,13 @@ import json
 import re
 import requests
 import logging
+import random
 import socket
 from datetime import datetime
 
 import teuthology
 from teuthology.config import config
+from teuthology.contextutil import safe_while
 from teuthology.job_status import get_status, set_status
 
 report_exceptions = (requests.exceptions.RequestException, socket.error)
@@ -289,37 +291,42 @@ class ResultsReporter(object):
             set_status(job_info, 'dead')
         job_json = json.dumps(job_info)
         headers = {'content-type': 'application/json'}
-        response = self.session.post(run_uri, data=job_json, headers=headers)
 
-        if response.status_code == 200:
-            return job_id
+        inc = random.uniform(0, 1)
+        with safe_while(
+                sleep=1, increment=inc, action=f'report job {job_id}') as proceed:
+            while proceed():
+                response = self.session.post(run_uri, data=job_json, headers=headers)
 
-        # This call is wrapped in a try/except because of:
-        #  http://tracker.ceph.com/issues/8166
-        try:
-            resp_json = response.json()
-        except ValueError:
-            resp_json = dict()
+                if response.status_code == 200:
+                    return
 
-        if resp_json:
-            msg = resp_json.get('message', '')
-        else:
-            msg = response.text
-
-        if msg and msg.endswith('already exists'):
-            job_uri = os.path.join(run_uri, job_id, '')
-            response = self.session.put(job_uri, data=job_json,
-                                        headers=headers)
-        elif msg:
-            self.log.error(
-                "POST to {uri} failed with status {status}: {msg}".format(
-                    uri=run_uri,
-                    status=response.status_code,
-                    msg=msg,
-                ))
-        response.raise_for_status()
+                # This call is wrapped in a try/except because of:
+                #  http://tracker.ceph.com/issues/8166
+                try:
+                    resp_json = response.json()
+                except ValueError:
+                    resp_json = dict()
 
-        return job_id
+                if resp_json:
+                    msg = resp_json.get('message', '')
+                else:
+                    msg = response.text
+
+                if msg and msg.endswith('already exists'):
+                    job_uri = os.path.join(run_uri, job_id, '')
+                    response = self.session.put(job_uri, data=job_json,
+                                                headers=headers)
+                    if response.status_code == 200:
+                        return
+                elif msg:
+                    self.log.error(
+                        "POST to {uri} failed with status {status}: {msg}".format(
+                            uri=run_uri,
+                            status=response.status_code,
+                            msg=msg,
+                        ))
+        response.raise_for_status()
 
     @property
     def last_run(self):