]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/deepsea: Adapt to new orchestrator completions
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 6 Sep 2019 10:57:25 +0000 (12:57 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 27 Nov 2019 12:38:20 +0000 (13:38 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/deepsea/module.py

index 10aef36775f970d78d2febad7adf5c7b1bd4ed7c..2e4b39597a3075bf441b611431703e9baa9de77b 100644 (file)
@@ -10,6 +10,8 @@ ceph-mgr DeepSea orchestrator module
 
 import json
 import errno
+from typing import Dict
+
 import requests
 
 from threading import Event, Thread, Lock
@@ -25,23 +27,9 @@ class RequestException(Exception):
         self.status_code = status_code
 
 
-class DeepSeaReadCompletion(orchestrator.ReadCompletion):
-    def __init__(self, process_result_callback):
-        super(DeepSeaReadCompletion, self).__init__()
-        self._complete = False
-        self._cb = process_result_callback
-
+class DeepSeaReadCompletion(orchestrator.Completion):
     def _process_result(self, data):
-        self._result = self._cb(data)
-        self._complete = True
-
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def has_result(self):
-        return self._complete
+        self.finalize(data)
 
 
 class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
@@ -107,7 +95,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._event_reader = None
         self._reading_events = False
         self._last_failure_msg = None
-        self._all_completions = dict()
+        self._all_completions = dict()  # type: Dict[str, DeepSeaReadCompletion]
         self._completion_lock = Lock()
         self.inventory_cache = orchestrator.OutdatableDict()
         self.service_cache = orchestrator.OutdatableDict()
@@ -417,6 +405,7 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
                     # Logging a warning if the request failed, so we can continue
                     # checking any other completions, then get onto reading events
                     self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
+                    self._all_completions[tag].fail(ex)
 
             for line in self._event_response.iter_lines():
                 with self._completion_lock: