import json
import errno
+from typing import Dict
+
import requests
from threading import Event, Thread, Lock
self.status_code = status_code
-class DeepSeaReadCompletion(orchestrator.ReadCompletion):
- def __init__(self, process_result_callback):
- super(DeepSeaReadCompletion, self).__init__()
- self._complete = False
- self._cb = process_result_callback
-
+class DeepSeaReadCompletion(orchestrator.Completion):
def _process_result(self, data):
- self._result = self._cb(data)
- self._complete = True
-
- @property
- def result(self):
- return self._result
-
- @property
- def has_result(self):
- return self._complete
+ self.finalize(data)
class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
self._event_reader = None
self._reading_events = False
self._last_failure_msg = None
- self._all_completions = dict()
+ self._all_completions = dict() # type: Dict[str, DeepSeaReadCompletion]
self._completion_lock = Lock()
self.inventory_cache = orchestrator.OutdatableDict()
self.service_cache = orchestrator.OutdatableDict()
# Logging a warning if the request failed, so we can continue
# checking any other completions, then get onto reading events
self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
+ self._all_completions[tag].fail(ex)
for line in self._event_response.iter_lines():
with self._completion_lock: