From 8fa14ad3b80fbe005810d7a75b363e7724a7cb03 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 22 Aug 2019 15:53:00 +0200 Subject: [PATCH] mgr/orchestrator: rename wait() -> process() Because wait() should actually not wait! Signed-off-by: Sebastian Wagner --- src/pybind/mgr/ansible/module.py | 9 +- src/pybind/mgr/deepsea/module.py | 39 +++---- src/pybind/mgr/orchestrator.py | 35 ++++-- src/pybind/mgr/rook/module.py | 125 +++++++++------------ src/pybind/mgr/ssh/module.py | 19 +--- src/pybind/mgr/test_orchestrator/module.py | 29 ++--- 6 files changed, 109 insertions(+), 147 deletions(-) diff --git a/src/pybind/mgr/ansible/module.py b/src/pybind/mgr/ansible/module.py index b6dd3100526..344d0be5f4a 100644 --- a/src/pybind/mgr/ansible/module.py +++ b/src/pybind/mgr/ansible/module.py @@ -475,7 +475,7 @@ class Module(MgrModule, orchestrator.Orchestrator): return (available, msg) - def wait(self, completions): + def process(self, completions): """Given a list of Completion instances, progress any which are incomplete. @@ -488,13 +488,6 @@ class Module(MgrModule, orchestrator.Orchestrator): for operation in completions: self.log.info("<%s> status:%s", operation, operation.status) - completions = filter(lambda x: not x.is_complete, completions) - - ops_pending = len(completions) - self.log.info("Operations pending: %s", ops_pending) - - return ops_pending == 0 - def serve(self): """ Mandatory for standby modules """ diff --git a/src/pybind/mgr/deepsea/module.py b/src/pybind/mgr/deepsea/module.py index 114f6aefff9..b2e1b2e283b 100644 --- a/src/pybind/mgr/deepsea/module.py +++ b/src/pybind/mgr/deepsea/module.py @@ -272,29 +272,24 @@ class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): return c - def wait(self, completions): - incomplete = False + def process(self, completions): + """ + Does nothing, as completions are processed in another thread. + """ - with self._completion_lock: - for c in completions: - if c.is_complete: - continue - if not c.is_complete: - # TODO: the job is in the bus, it should reach us eventually - # unless something has gone wrong (e.g. salt-api died, etc.), - # in which case it's possible the job finished but we never - # noticed the salt/run/$id/ret event. Need to add the job ID - # (or possibly the full event tag) to the completion object. - # That way, if we want to double check on a job that hasn't - # been completed yet, we can make a synchronous request to - # salt-api to invoke jobs.lookup_jid, and if it's complete we - # should be able to pass its return value to _process_result() - # Question: do we do this automatically after some timeout? - # Or do we add a function so the admin can check and "unstick" - # a stuck completion? - incomplete = True - - return not incomplete + # If the job is still incomplete: + # TODO: the job is in the bus, it should reach us eventually + # unless something has gone wrong (e.g. salt-api died, etc.), + # in which case it's possible the job finished but we never + # noticed the salt/run/$id/ret event. Need to add the job ID + # (or possibly the full event tag) to the completion object. + # That way, if we want to double check on a job that hasn't + # been completed yet, we can make a synchronous request to + # salt-api to invoke jobs.lookup_jid, and if it's complete we + # should be able to pass its return value to _process_result() + # Question: do we do this automatically after some timeout? + # Or do we add a function so the admin can check and "unstick" + # a stuck completion? def handle_command(self, inbuf, cmd): diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index 9bc26239a33..d2df4bc78bd 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -343,6 +343,10 @@ class Orchestrator(object): internet downloads. For that reason, all are asynchronous, and return ``Completion`` objects. + Methods should only return the completion and not directly execute + anything, like network calls. Otherwise the purpose of + those completions is defeated. + Implementations are not required to start work on an operation until the caller waits on the relevant Completion objects. Callers making multiple updates should not wait on Completions until they're done @@ -392,19 +396,18 @@ class Orchestrator(object): raise NotImplementedError() @_hide_in_features - def wait(self, completions): + def process(self, completions): + # type: (List[_Completion]) -> None """ - Given a list of Completion instances, progress any which are - incomplete. Return a true if everything is done. + Given a list of Completion instances, process any which are + incomplete. Callers should inspect the detail of each completion to identify partial completion/progress information, and present that information to the user. - For fast operations (e.g. reading from a database), implementations - may choose to do blocking IO in this call. - - :rtype: bool + This method should not block, as this would make it slow to query + a status, while other long running operations are in progress. """ raise NotImplementedError() @@ -1069,6 +1072,12 @@ class OrchestratorClientMixin(Orchestrator): self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties + def __get_mgr(self): + try: + return self.__mgr + except AttributeError: + return self + def _oremote(self, meth, args, kwargs): """ Helper for invoking `remote` on whichever orchestrator is enabled @@ -1077,10 +1086,8 @@ class OrchestratorClientMixin(Orchestrator): :raises OrchestratorError: orchestrator failed to perform :raises ImportError: no `orchestrator_cli` module or backend not found. """ - try: - mgr = self.__mgr - except AttributeError: - mgr = self + mgr = self.__get_mgr() + try: o = mgr._select_orchestrator() except AttributeError: @@ -1119,11 +1126,15 @@ class OrchestratorClientMixin(Orchestrator): :param completions: List of Completions :raises NoOrchestrator: + :raises RuntimeError: something went wrong while calling the process method. :raises ImportError: no `orchestrator_cli` module or backend not found. """ for c in completions: self._update_completion_progress(c) - while not self.wait(completions): + while any(not c.is_complete for c in completions): + self.wait(completions) + self.__get_mgr().log.info("Operations pending: %s", + sum(1 for c in completions if not c.is_complete)) if any(c.should_wait for c in completions): time.sleep(1) else: diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 63bb25cffdb..d95f1a63f84 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -34,10 +34,29 @@ import orchestrator from .rook_cluster import RookCluster -all_completions = [] +class RookCompletionMixin(object): + # hacky global + all_completions = [] + def __init__(self, message): + super(RookCompletionMixin, self).__init__() + self.message = message + + # Result of k8s API call, this is set if executed==True + self._result = None + + all_completions.append(self) -class RookReadCompletion(orchestrator.ReadCompletion): + + @property + def result(self): + return self._result + + def __str__(self): + return self.message + + +class RookReadCompletion(RookCompletionMixin, orchestrator.ReadCompletion): """ All reads are simply API calls: avoid spawning huge numbers of threads by just running them @@ -45,31 +64,24 @@ class RookReadCompletion(orchestrator.ReadCompletion): """ def __init__(self, cb): - super(RookReadCompletion, self).__init__() + super(RookReadCompletion, self).__init__("") self.cb = cb - self._result = None self._complete = False - self.message = "" - - # XXX hacky global - global all_completions - all_completions.append(self) - - @property - def result(self): - return self._result - @property - def is_complete(self): + def has_result(self): return self._complete def execute(self): - self._result = self.cb() - self._complete = True + try: + self._result = self.cb() + except Exception as e: + self.exception = e + finally: + self._complete = True -class RookWriteCompletion(orchestrator.WriteCompletion): +class RookWriteCompletion(RookCompletionMixin, orchestrator.WriteCompletion): """ Writes are a two-phase thing, firstly sending the write to the k8s API (fast) and then waiting @@ -80,7 +92,7 @@ class RookWriteCompletion(orchestrator.WriteCompletion): # a completion= param that uses threads. Maybe just # use that? def __init__(self, execute_cb, complete_cb, message): - super(RookWriteCompletion, self).__init__() + super(RookWriteCompletion, self).__init__(message) self.execute_cb = execute_cb self.complete_cb = complete_cb @@ -88,47 +100,31 @@ class RookWriteCompletion(orchestrator.WriteCompletion): # not have succeeded self.executed = False - # Result of k8s API call, this is set if executed==True - self._result = None - + # Effective means, Rook finished applying the changes self.effective = False self.id = str(uuid.uuid4()) - self.message = message - - self.exception = None - - # XXX hacky global - global all_completions - all_completions.append(self) - - def __str__(self): - return self.message - - @property - def result(self): - return self._result - @property - def is_persistent(self): - return (not self.is_errored) and self.executed @property def is_effective(self): return self.effective def execute(self): - if not self.executed: - self._result = self.execute_cb() - self.executed = True - - if not self.effective: - # TODO: check self.result for API errors - if self.complete_cb is None: - self.effective = True - else: - self.effective = self.complete_cb() + try: + if not self.executed: + self._result = self.execute_cb() + self.executed = True + + if not self.effective: + # TODO: check self.result for API errors + if self.complete_cb is None: + self.effective = True + else: + self.effective = self.complete_cb() + except Exception as e: + self.exception = e def deferred_read(f): @@ -168,14 +164,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): # TODO: configure k8s API addr instead of assuming local ] - def wait(self, completions): + def process(self, completions): if completions: self.log.info("wait: completions={0}".format(completions)) - incomplete = False - - # Our `wait` implementation is very simple because everything's - # just an API call. + # Synchronously call the K8s API for c in completions: if not isinstance(c, RookReadCompletion) and \ not isinstance(c, RookWriteCompletion): @@ -187,20 +180,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): if c.is_complete: continue - try: - c.execute() - except Exception as e: - if not isinstance(e, orchestrator.OrchestratorError): - self.log.exception("Completion {0} threw an exception:".format( - c.message - )) + c.execute() + if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError): + self.log.exception("Completion {0} threw an exception:".format( + c.message + )) c.exception = e c._complete = True - if not c.is_complete: - incomplete = True - return not incomplete @staticmethod def can_run(): @@ -288,15 +276,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): # in case we had a caller that wait()'ed on them long enough # to get persistence but not long enough to get completion - global all_completions - self.wait(all_completions) - all_completions = [c for c in all_completions if not c.is_complete] + self.wait(RookCompletionMixin.all_completions) + RookCompletionMixin.all_completions = [c for c in RookCompletionMixin.all_completions if + not c.is_finished] self._shutdown.wait(5) - # TODO: watch Rook for config changes to complain/update if - # things look a bit out of sync? - @deferred_read def get_inventory(self, node_filter=None, refresh=False): node_list = None diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py index 644230fa59e..5a5bfb08bf5 100644 --- a/src/pybind/mgr/ssh/module.py +++ b/src/pybind/mgr/ssh/module.py @@ -330,21 +330,10 @@ class SSHOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): """ return self.can_run() - def wait(self, completions): - self.log.info("wait: completions={}".format(completions)) - - complete = True - for c in completions: - if c.is_complete: - continue - - if not isinstance(c, SSHReadCompletion) and \ - not isinstance(c, SSHWriteCompletion): - raise TypeError("unexpected completion: {}".format(c.__class__)) - - complete = False - - return complete + def process(self, completions): + """ + Does nothing, as completions are processed in another thread. + """ def _require_hosts(self, hosts): """ diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py index 9ad65968dff..c9d8ed7d756 100644 --- a/src/pybind/mgr/test_orchestrator/module.py +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -41,9 +41,13 @@ class TestCompletionMixin(object): return self._complete def execute(self): - self._result = self.cb() - self.executed = True - self._complete = True + try: + self._result = self.cb() + self.executed = True + except Exception as e: + self.exception = e + finally: + self._complete = True def __str__(self): return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result, @@ -101,11 +105,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): The implementation is similar to the Rook orchestrator, but simpler. """ - def wait(self, completions): - self.log.info("wait: completions={0}".format(completions)) - - # Our `wait` implementation is very simple because everything's - # just an API call. + def process(self, completions): for c in completions: if not isinstance(c, TestReadCompletion) and \ not isinstance(c, TestWriteCompletion): @@ -114,19 +114,8 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): c.__class__ )) - if c.is_complete: - continue - - try: + if not c.has_result: c.execute() - except Exception as e: - self.log.exception("Completion {0} threw an exception:".format( - c.message - )) - c.exception = e - c._complete = True - - return all(c.is_complete for c in completions) @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w') def _load_data(self, inbuf): -- 2.39.5