]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/deepsea: check for inflight completions when starting event reader 25391/head
authorTim Serong <tserong@suse.com>
Tue, 4 Dec 2018 11:57:05 +0000 (22:57 +1100)
committerTim Serong <tserong@suse.com>
Wed, 5 Dec 2018 05:37:44 +0000 (16:37 +1100)
This handles the case where an operation is inflight, but salt-api dies
(or the connection to salt-api has failed for some reason).  The next time
we're able to connect, we first check if there's any outstanding completions,
and look up matching salt jobs to see if they finished in the meantime.

Signed-off-by: Tim Serong <tserong@suse.com>
src/pybind/mgr/deepsea/module.py

index 169c6db2d0f72a82fb0fa4c2c7970caaa263d65e..ededbd8885b7dfc001affef7bb01e3afffbce8b4 100644 (file)
@@ -129,11 +129,10 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
         user at least sees the error.
         """
 
-        def process_result(raw_event):
+        def process_result(event_data):
             result = []
-            raw_event = json.loads(raw_event)
-            if raw_event['data']['success']:
-                for node_name, node_devs in raw_event["data"]["return"].items():
+            if event_data['success']:
+                for node_name, node_devs in event_data["return"].items():
                     devs = []
                     for d in node_devs:
                         dev = orchestrator.InventoryDevice()
@@ -180,11 +179,10 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         assert service_type in ("mon", "mgr", "mds", "rgw", None), service_type + " unsupported"
 
-        def process_result(raw_event):
+        def process_result(event_data):
             result = []
-            raw_event = json.loads(raw_event)
-            if raw_event['data']['success']:
-                for node_name, service_info in raw_event["data"]["return"].items():
+            if event_data['success']:
+                for node_name, service_info in event_data["return"].items():
                     for service_type, daemon_name in service_info.items():
                         desc = orchestrator.ServiceDescription()
                         desc.nodename = node_name
@@ -292,7 +290,8 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
                     # gives an (arbitrary) 60 second retry if we can't attach to
                     # the salt-api event bus for some reason (e.g.: invalid username,
                     # or password, which will be logged as "Request failed with status
-                    # code 401")
+                    # code 401").  Note that this 60 second retry will also happen if
+                    # salt-api dies.
                     self._event.wait(60)
                     self._event.clear()
                     continue
@@ -324,6 +323,40 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
     def _read_sse(self):
         event = {}
         try:
+            # Just starting the event reader; if we've made it here, we know we're
+            # talking to salt-api (_do_request would have raised an exception if the
+            # response wasn't ok), so check if there's any completions inflight that
+            # need to be dealt with.  This handles the case where some command was
+            # invoked, then salt-api died somehow, and we reconneced, but missed the
+            # completion at the time it actually happened.
+            for tag in list(self._all_completions):
+                self.log.info("Found event {} inflight".format(tag))
+                try:
+                    resp = self._do_request_with_login("POST", data = {
+                        "client": "runner",
+                        "fun": "jobs.lookup_jid",
+                        "jid": tag.split('/')[2]
+                    })
+                    # jobs.lookup_jid returns a dict keyed by hostname.
+                    return_dict = resp.json()['return'][0]
+                    if return_dict:
+                        # If the job is complete, there'll be one item in the dict.
+                        self.log.info("Event {} complete".format(tag))
+                        # The key is the salt master hostname, but we don't care
+                        # about that, so just grab the data.
+                        data = next(iter(return_dict.items()))[1]
+                        self._all_completions[tag]._process_result(data)
+                        # TODO: decide whether it's bad to delete the completion
+                        # here -- would we ever need to resurrect it?
+                        del self._all_completions[tag]
+                    else:
+                        # if the job is not complete, there'll be nothing in the dict
+                        self.log.info("Event {} still pending".format(tag))
+                except Exception as ex:
+                    # Logging a warning if the request failed, so we can continue
+                    # checking any other completions, then get onto reading events
+                    self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
+
             for line in self._event_response.iter_lines():
                 with self._completion_lock:
                     if line:
@@ -352,7 +385,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
                         # _process_result() callback and remove it from our list.
                         if event['tag'] in self._all_completions:
                             self.log.info("Event {} complete".format(event['tag']))
-                            self._all_completions[event['tag']]._process_result(event['data'])
+                            self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data'])
                             # TODO: decide whether it's bad to delete the completion
                             # here -- would we ever need to resurrect it?
                             del self._all_completions[event['tag']]