]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/orchestrator: rename wait() -> process()
authorSebastian Wagner <sebastian.wagner@suse.com>
Thu, 22 Aug 2019 13:53:00 +0000 (15:53 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 27 Nov 2019 12:35:24 +0000 (13:35 +0100)
Because wait() should actually not wait!

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/ansible/module.py
src/pybind/mgr/deepsea/module.py
src/pybind/mgr/orchestrator.py
src/pybind/mgr/rook/module.py
src/pybind/mgr/ssh/module.py
src/pybind/mgr/test_orchestrator/module.py

index b6dd3100526c4495169888b5c70179658fef1a2d..344d0be5f4a28267fee69f7cfe5fa01ee237efa0 100644 (file)
@@ -475,7 +475,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
         return (available, msg)
 
-    def wait(self, completions):
+    def process(self, completions):
         """Given a list of Completion instances, progress any which are
            incomplete.
 
@@ -488,13 +488,6 @@ class Module(MgrModule, orchestrator.Orchestrator):
         for operation in completions:
             self.log.info("<%s> status:%s", operation, operation.status)
 
-        completions = filter(lambda x: not x.is_complete, completions)
-
-        ops_pending = len(completions)
-        self.log.info("Operations pending: %s", ops_pending)
-
-        return ops_pending == 0
-
     def serve(self):
         """ Mandatory for standby modules
         """
index 114f6aefff94ed9e7b4e9f52592a37b63995e395..b2e1b2e283b80abe4bfa6b6c06be0c3124519e4b 100644 (file)
@@ -272,29 +272,24 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             return c
 
-    def wait(self, completions):
-        incomplete = False
+    def process(self, completions):
+        """
+        Does nothing, as completions are processed in another thread.
+        """
 
-        with self._completion_lock:
-            for c in completions:
-                if c.is_complete:
-                    continue
-                if not c.is_complete:
-                    # TODO: the job is in the bus, it should reach us eventually
-                    # unless something has gone wrong (e.g. salt-api died, etc.),
-                    # in which case it's possible the job finished but we never
-                    # noticed the salt/run/$id/ret event.  Need to add the job ID
-                    # (or possibly the full event tag) to the completion object.
-                    # That way, if we want to double check on a job that hasn't
-                    # been completed yet, we can make a synchronous request to
-                    # salt-api to invoke jobs.lookup_jid, and if it's complete we
-                    # should be able to pass its return value to _process_result()
-                    # Question: do we do this automatically after some timeout?
-                    # Or do we add a function so the admin can check and "unstick"
-                    # a stuck completion?
-                    incomplete = True
-
-        return not incomplete
+        # If the job is still incomplete:
+        # TODO: the job is in the bus, it should reach us eventually
+        # unless something has gone wrong (e.g. salt-api died, etc.),
+        # in which case it's possible the job finished but we never
+        # noticed the salt/run/$id/ret event.  Need to add the job ID
+        # (or possibly the full event tag) to the completion object.
+        # That way, if we want to double check on a job that hasn't
+        # been completed yet, we can make a synchronous request to
+        # salt-api to invoke jobs.lookup_jid, and if it's complete we
+        # should be able to pass its return value to _process_result()
+        # Question: do we do this automatically after some timeout?
+        # Or do we add a function so the admin can check and "unstick"
+        # a stuck completion?
 
 
     def handle_command(self, inbuf, cmd):
index 9bc26239a332ad32f398813b53049747480a84b3..d2df4bc78bd74b061819b6836533d9508a8fb4d3 100644 (file)
@@ -343,6 +343,10 @@ class Orchestrator(object):
     internet downloads.  For that reason, all are asynchronous, and return
     ``Completion`` objects.
 
+    Methods should only return the completion and not directly execute
+    anything, like network calls. Otherwise the purpose of
+    those completions is defeated.
+
     Implementations are not required to start work on an operation until
     the caller waits on the relevant Completion objects.  Callers making
     multiple updates should not wait on Completions until they're done
@@ -392,19 +396,18 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     @_hide_in_features
-    def wait(self, completions):
+    def process(self, completions):
+        # type: (List[_Completion]) -> None
         """
-        Given a list of Completion instances, progress any which are
-        incomplete.  Return a true if everything is done.
+        Given a list of Completion instances, process any which are
+        incomplete.
 
         Callers should inspect the detail of each completion to identify
         partial completion/progress information, and present that information
         to the user.
 
-        For fast operations (e.g. reading from a database), implementations
-        may choose to do blocking IO in this call.
-
-        :rtype: bool
+        This method should not block, as this would make it slow to query
+        a status, while other long running operations are in progress.
         """
         raise NotImplementedError()
 
@@ -1069,6 +1072,12 @@ class OrchestratorClientMixin(Orchestrator):
 
         self.__mgr = mgr  # Make sure we're not overwriting any other `mgr` properties
 
+    def __get_mgr(self):
+        try:
+            return self.__mgr
+        except AttributeError:
+            return self
+
     def _oremote(self, meth, args, kwargs):
         """
         Helper for invoking `remote` on whichever orchestrator is enabled
@@ -1077,10 +1086,8 @@ class OrchestratorClientMixin(Orchestrator):
         :raises OrchestratorError: orchestrator failed to perform
         :raises ImportError: no `orchestrator_cli` module or backend not found.
         """
-        try:
-            mgr = self.__mgr
-        except AttributeError:
-            mgr = self
+        mgr = self.__get_mgr()
+
         try:
             o = mgr._select_orchestrator()
         except AttributeError:
@@ -1119,11 +1126,15 @@ class OrchestratorClientMixin(Orchestrator):
 
         :param completions: List of Completions
         :raises NoOrchestrator:
+        :raises RuntimeError: something went wrong while calling the process method.
         :raises ImportError: no `orchestrator_cli` module or backend not found.
         """
         for c in completions:
             self._update_completion_progress(c)
-        while not self.wait(completions):
+        while any(not c.is_complete for c in completions):
+            self.wait(completions)
+            self.__get_mgr().log.info("Operations pending: %s",
+                                      sum(1 for c in completions if not c.is_complete))
             if any(c.should_wait for c in completions):
                 time.sleep(1)
             else:
index 63bb25cffdb6a2f9bcd30d9999b1dbce8f72fe14..d95f1a63f84cd462e68ecda2c093419a90f76a48 100644 (file)
@@ -34,10 +34,29 @@ import orchestrator
 from .rook_cluster import RookCluster
 
 
-all_completions = []
+class RookCompletionMixin(object):
+    # hacky global
+    all_completions = []
 
+    def __init__(self, message):
+        super(RookCompletionMixin, self).__init__()
+        self.message = message
+
+        # Result of k8s API call, this is set if executed==True
+        self._result = None
+
+        all_completions.append(self)
 
-class RookReadCompletion(orchestrator.ReadCompletion):
+
+    @property
+    def result(self):
+        return self._result
+
+    def __str__(self):
+        return self.message
+
+
+class RookReadCompletion(RookCompletionMixin, orchestrator.ReadCompletion):
     """
     All reads are simply API calls: avoid spawning
     huge numbers of threads by just running them
@@ -45,31 +64,24 @@ class RookReadCompletion(orchestrator.ReadCompletion):
     """
 
     def __init__(self, cb):
-        super(RookReadCompletion, self).__init__()
+        super(RookReadCompletion, self).__init__("<read op>")
         self.cb = cb
-        self._result = None
         self._complete = False
 
-        self.message = "<read op>"
-
-        # XXX hacky global
-        global all_completions
-        all_completions.append(self)
-
-    @property
-    def result(self):
-        return self._result
-
     @property
-    def is_complete(self):
+    def has_result(self):
         return self._complete
 
     def execute(self):
-        self._result = self.cb()
-        self._complete = True
+        try:
+            self._result = self.cb()
+        except Exception as e:
+            self.exception = e
+        finally:
+            self._complete = True
 
 
-class RookWriteCompletion(orchestrator.WriteCompletion):
+class RookWriteCompletion(RookCompletionMixin, orchestrator.WriteCompletion):
     """
     Writes are a two-phase thing, firstly sending
     the write to the k8s API (fast) and then waiting
@@ -80,7 +92,7 @@ class RookWriteCompletion(orchestrator.WriteCompletion):
     # a completion= param that uses threads.  Maybe just
     # use that?
     def __init__(self, execute_cb, complete_cb, message):
-        super(RookWriteCompletion, self).__init__()
+        super(RookWriteCompletion, self).__init__(message)
         self.execute_cb = execute_cb
         self.complete_cb = complete_cb
 
@@ -88,47 +100,31 @@ class RookWriteCompletion(orchestrator.WriteCompletion):
         # not have succeeded
         self.executed = False
 
-        # Result of k8s API call, this is set if executed==True
-        self._result = None
-
+        # Effective means, Rook finished applying the changes
         self.effective = False
 
         self.id = str(uuid.uuid4())
 
-        self.message = message
-
-        self.exception = None
-
-        # XXX hacky global
-        global all_completions
-        all_completions.append(self)
-
-    def __str__(self):
-        return self.message
-
-    @property
-    def result(self):
-        return self._result
-
     @property
-    def is_persistent(self):
-        return (not self.is_errored) and self.executed
 
     @property
     def is_effective(self):
         return self.effective
 
     def execute(self):
-        if not self.executed:
-            self._result = self.execute_cb()
-            self.executed = True
-
-        if not self.effective:
-            # TODO: check self.result for API errors
-            if self.complete_cb is None:
-                self.effective = True
-            else:
-                self.effective = self.complete_cb()
+        try:
+            if not self.executed:
+                self._result = self.execute_cb()
+                self.executed = True
+
+            if not self.effective:
+                # TODO: check self.result for API errors
+                if self.complete_cb is None:
+                    self.effective = True
+                else:
+                    self.effective = self.complete_cb()
+        except Exception as e:
+            self.exception = e
 
 
 def deferred_read(f):
@@ -168,14 +164,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         # TODO: configure k8s API addr instead of assuming local
     ]
 
-    def wait(self, completions):
+    def process(self, completions):
         if completions:
             self.log.info("wait: completions={0}".format(completions))
 
-        incomplete = False
-
-        # Our `wait` implementation is very simple because everything's
-        # just an API call.
+        # Synchronously call the K8s API
         for c in completions:
             if not isinstance(c, RookReadCompletion) and \
                     not isinstance(c, RookWriteCompletion):
@@ -187,20 +180,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             if c.is_complete:
                 continue
 
-            try:
-                c.execute()
-            except Exception as e:
-                if not isinstance(e, orchestrator.OrchestratorError):
-                    self.log.exception("Completion {0} threw an exception:".format(
-                        c.message
-                    ))
+            c.execute()
+            if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError):
+                self.log.exception("Completion {0} threw an exception:".format(
+                    c.message
+                ))
                 c.exception = e
                 c._complete = True
 
-            if not c.is_complete:
-                incomplete = True
 
-        return not incomplete
 
     @staticmethod
     def can_run():
@@ -288,15 +276,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             # in case we had a caller that wait()'ed on them long enough
             # to get persistence but not long enough to get completion
 
-            global all_completions
-            self.wait(all_completions)
-            all_completions = [c for c in all_completions if not c.is_complete]
+            self.wait(RookCompletionMixin.all_completions)
+            RookCompletionMixin.all_completions = [c for c in RookCompletionMixin.all_completions if
+                                                   not c.is_finished]
 
             self._shutdown.wait(5)
 
-        # TODO: watch Rook for config changes to complain/update if
-        # things look a bit out of sync?
-
     @deferred_read
     def get_inventory(self, node_filter=None, refresh=False):
         node_list = None
index 644230fa59e2a83de8a7dfcef9f2f9d97073c802..5a5bfb08bf51d654340a5949db032e31d35ae9d4 100644 (file)
@@ -330,21 +330,10 @@ class SSHOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         """
         return self.can_run()
 
-    def wait(self, completions):
-        self.log.info("wait: completions={}".format(completions))
-
-        complete = True
-        for c in completions:
-            if c.is_complete:
-                continue
-
-            if not isinstance(c, SSHReadCompletion) and \
-                    not isinstance(c, SSHWriteCompletion):
-                raise TypeError("unexpected completion: {}".format(c.__class__))
-
-            complete = False
-
-        return complete
+    def process(self, completions):
+        """
+        Does nothing, as completions are processed in another thread.
+        """
 
     def _require_hosts(self, hosts):
         """
index 9ad65968dff9ee19c2ea430963f694b34348cab4..c9d8ed7d756fae0af7b98e63a7d88ec4852c1008 100644 (file)
@@ -41,9 +41,13 @@ class TestCompletionMixin(object):
         return self._complete
 
     def execute(self):
-        self._result = self.cb()
-        self.executed = True
-        self._complete = True
+        try:
+            self._result = self.cb()
+            self.executed = True
+        except Exception as e:
+            self.exception = e
+        finally:
+            self._complete = True
 
     def __str__(self):
         return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
@@ -101,11 +105,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     The implementation is similar to the Rook orchestrator, but simpler.
     """
 
-    def wait(self, completions):
-        self.log.info("wait: completions={0}".format(completions))
-
-        # Our `wait` implementation is very simple because everything's
-        # just an API call.
+    def process(self, completions):
         for c in completions:
             if not isinstance(c, TestReadCompletion) and \
                     not isinstance(c, TestWriteCompletion):
@@ -114,19 +114,8 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
                         c.__class__
                     ))
 
-            if c.is_complete:
-                continue
-
-            try:
+            if not c.has_result:
                 c.execute()
-            except Exception as e:
-                self.log.exception("Completion {0} threw an exception:".format(
-                    c.message
-                ))
-                c.exception = e
-                c._complete = True
-
-        return all(c.is_complete for c in completions)
 
     @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
     def _load_data(self, inbuf):