From: Tim Serong Date: Tue, 4 Dec 2018 11:57:05 +0000 (+1100) Subject: mgr/deepsea: check for inflight completions when starting event reader X-Git-Tag: v14.1.0~671^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b9bf62932579d5747e08cdbcbb760dc5862849a7;p=ceph.git mgr/deepsea: check for inflight completions when starting event reader This handles the case where an operation is inflight, but salt-api dies (or the connection to salt-api has failed for some reason). The next time we're able to connect, we first check if there's any outstanding completions, and look up matching salt jobs to see if they finished in the meantime. Signed-off-by: Tim Serong --- diff --git a/src/pybind/mgr/deepsea/module.py b/src/pybind/mgr/deepsea/module.py index 169c6db2d0f..ededbd8885b 100644 --- a/src/pybind/mgr/deepsea/module.py +++ b/src/pybind/mgr/deepsea/module.py @@ -129,11 +129,10 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): user at least sees the error. """ - def process_result(raw_event): + def process_result(event_data): result = [] - raw_event = json.loads(raw_event) - if raw_event['data']['success']: - for node_name, node_devs in raw_event["data"]["return"].items(): + if event_data['success']: + for node_name, node_devs in event_data["return"].items(): devs = [] for d in node_devs: dev = orchestrator.InventoryDevice() @@ -180,11 +179,10 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): assert service_type in ("mon", "mgr", "mds", "rgw", None), service_type + " unsupported" - def process_result(raw_event): + def process_result(event_data): result = [] - raw_event = json.loads(raw_event) - if raw_event['data']['success']: - for node_name, service_info in raw_event["data"]["return"].items(): + if event_data['success']: + for node_name, service_info in event_data["return"].items(): for service_type, daemon_name in service_info.items(): desc = orchestrator.ServiceDescription() desc.nodename = node_name @@ -292,7 +290,8 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): # gives an (arbitrary) 60 second retry if we can't attach to # the salt-api event bus for some reason (e.g.: invalid username, # or password, which will be logged as "Request failed with status - # code 401") + # code 401"). Note that this 60 second retry will also happen if + # salt-api dies. self._event.wait(60) self._event.clear() continue @@ -324,6 +323,40 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): def _read_sse(self): event = {} try: + # Just starting the event reader; if we've made it here, we know we're + # talking to salt-api (_do_request would have raised an exception if the + # response wasn't ok), so check if there's any completions inflight that + # need to be dealt with. This handles the case where some command was + # invoked, then salt-api died somehow, and we reconneced, but missed the + # completion at the time it actually happened. + for tag in list(self._all_completions): + self.log.info("Found event {} inflight".format(tag)) + try: + resp = self._do_request_with_login("POST", data = { + "client": "runner", + "fun": "jobs.lookup_jid", + "jid": tag.split('/')[2] + }) + # jobs.lookup_jid returns a dict keyed by hostname. + return_dict = resp.json()['return'][0] + if return_dict: + # If the job is complete, there'll be one item in the dict. + self.log.info("Event {} complete".format(tag)) + # The key is the salt master hostname, but we don't care + # about that, so just grab the data. + data = next(iter(return_dict.items()))[1] + self._all_completions[tag]._process_result(data) + # TODO: decide whether it's bad to delete the completion + # here -- would we ever need to resurrect it? + del self._all_completions[tag] + else: + # if the job is not complete, there'll be nothing in the dict + self.log.info("Event {} still pending".format(tag)) + except Exception as ex: + # Logging a warning if the request failed, so we can continue + # checking any other completions, then get onto reading events + self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex))) + for line in self._event_response.iter_lines(): with self._completion_lock: if line: @@ -352,7 +385,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): # _process_result() callback and remove it from our list. if event['tag'] in self._all_completions: self.log.info("Event {} complete".format(event['tag'])) - self._all_completions[event['tag']]._process_result(event['data']) + self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data']) # TODO: decide whether it's bad to delete the completion # here -- would we ever need to resurrect it? del self._all_completions[event['tag']]