user at least sees the error.
"""
- def process_result(raw_event):
+ def process_result(event_data):
result = []
- raw_event = json.loads(raw_event)
- if raw_event['data']['success']:
- for node_name, node_devs in raw_event["data"]["return"].items():
+ if event_data['success']:
+ for node_name, node_devs in event_data["return"].items():
devs = []
for d in node_devs:
dev = orchestrator.InventoryDevice()
assert service_type in ("mon", "mgr", "mds", "rgw", None), service_type + " unsupported"
- def process_result(raw_event):
+ def process_result(event_data):
result = []
- raw_event = json.loads(raw_event)
- if raw_event['data']['success']:
- for node_name, service_info in raw_event["data"]["return"].items():
+ if event_data['success']:
+ for node_name, service_info in event_data["return"].items():
for service_type, daemon_name in service_info.items():
desc = orchestrator.ServiceDescription()
desc.nodename = node_name
# gives an (arbitrary) 60 second retry if we can't attach to
# the salt-api event bus for some reason (e.g.: invalid username,
# or password, which will be logged as "Request failed with status
- # code 401")
+ # code 401"). Note that this 60 second retry will also happen if
+ # salt-api dies.
self._event.wait(60)
self._event.clear()
continue
def _read_sse(self):
event = {}
try:
+ # Just starting the event reader; if we've made it here, we know we're
+ # talking to salt-api (_do_request would have raised an exception if the
+ # response wasn't ok), so check if there's any completions inflight that
+ # need to be dealt with. This handles the case where some command was
+ # invoked, then salt-api died somehow, and we reconneced, but missed the
+ # completion at the time it actually happened.
+ for tag in list(self._all_completions):
+ self.log.info("Found event {} inflight".format(tag))
+ try:
+ resp = self._do_request_with_login("POST", data = {
+ "client": "runner",
+ "fun": "jobs.lookup_jid",
+ "jid": tag.split('/')[2]
+ })
+ # jobs.lookup_jid returns a dict keyed by hostname.
+ return_dict = resp.json()['return'][0]
+ if return_dict:
+ # If the job is complete, there'll be one item in the dict.
+ self.log.info("Event {} complete".format(tag))
+ # The key is the salt master hostname, but we don't care
+ # about that, so just grab the data.
+ data = next(iter(return_dict.items()))[1]
+ self._all_completions[tag]._process_result(data)
+ # TODO: decide whether it's bad to delete the completion
+ # here -- would we ever need to resurrect it?
+ del self._all_completions[tag]
+ else:
+ # if the job is not complete, there'll be nothing in the dict
+ self.log.info("Event {} still pending".format(tag))
+ except Exception as ex:
+ # Logging a warning if the request failed, so we can continue
+ # checking any other completions, then get onto reading events
+ self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
+
for line in self._event_response.iter_lines():
with self._completion_lock:
if line:
# _process_result() callback and remove it from our list.
if event['tag'] in self._all_completions:
self.log.info("Event {} complete".format(event['tag']))
- self._all_completions[event['tag']]._process_result(event['data'])
+ self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data'])
# TODO: decide whether it's bad to delete the completion
# here -- would we ever need to resurrect it?
del self._all_completions[event['tag']]