return (available, msg)
- def wait(self, completions):
+ def process(self, completions):
"""Given a list of Completion instances, progress any which are
incomplete.
for operation in completions:
self.log.info("<%s> status:%s", operation, operation.status)
- completions = filter(lambda x: not x.is_complete, completions)
-
- ops_pending = len(completions)
- self.log.info("Operations pending: %s", ops_pending)
-
- return ops_pending == 0
-
def serve(self):
""" Mandatory for standby modules
"""
return c
- def wait(self, completions):
- incomplete = False
+ def process(self, completions):
+ """
+ Does nothing, as completions are processed in another thread.
+ """
- with self._completion_lock:
- for c in completions:
- if c.is_complete:
- continue
- if not c.is_complete:
- # TODO: the job is in the bus, it should reach us eventually
- # unless something has gone wrong (e.g. salt-api died, etc.),
- # in which case it's possible the job finished but we never
- # noticed the salt/run/$id/ret event. Need to add the job ID
- # (or possibly the full event tag) to the completion object.
- # That way, if we want to double check on a job that hasn't
- # been completed yet, we can make a synchronous request to
- # salt-api to invoke jobs.lookup_jid, and if it's complete we
- # should be able to pass its return value to _process_result()
- # Question: do we do this automatically after some timeout?
- # Or do we add a function so the admin can check and "unstick"
- # a stuck completion?
- incomplete = True
-
- return not incomplete
+ # If the job is still incomplete:
+ # TODO: the job is in the bus, it should reach us eventually
+ # unless something has gone wrong (e.g. salt-api died, etc.),
+ # in which case it's possible the job finished but we never
+ # noticed the salt/run/$id/ret event. Need to add the job ID
+ # (or possibly the full event tag) to the completion object.
+ # That way, if we want to double check on a job that hasn't
+ # been completed yet, we can make a synchronous request to
+ # salt-api to invoke jobs.lookup_jid, and if it's complete we
+ # should be able to pass its return value to _process_result()
+ # Question: do we do this automatically after some timeout?
+ # Or do we add a function so the admin can check and "unstick"
+ # a stuck completion?
def handle_command(self, inbuf, cmd):
internet downloads. For that reason, all are asynchronous, and return
``Completion`` objects.
+ Methods should only return the completion and not directly execute
+ anything, like network calls. Otherwise the purpose of
+ those completions is defeated.
+
Implementations are not required to start work on an operation until
the caller waits on the relevant Completion objects. Callers making
multiple updates should not wait on Completions until they're done
raise NotImplementedError()
@_hide_in_features
- def wait(self, completions):
+ def process(self, completions):
+ # type: (List[_Completion]) -> None
"""
- Given a list of Completion instances, progress any which are
- incomplete. Return a true if everything is done.
+ Given a list of Completion instances, process any which are
+ incomplete.
Callers should inspect the detail of each completion to identify
partial completion/progress information, and present that information
to the user.
- For fast operations (e.g. reading from a database), implementations
- may choose to do blocking IO in this call.
-
- :rtype: bool
+ This method should not block, as this would make it slow to query
+ a status, while other long running operations are in progress.
"""
raise NotImplementedError()
self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
+ def __get_mgr(self):
+ try:
+ return self.__mgr
+ except AttributeError:
+ return self
+
def _oremote(self, meth, args, kwargs):
"""
Helper for invoking `remote` on whichever orchestrator is enabled
:raises OrchestratorError: orchestrator failed to perform
:raises ImportError: no `orchestrator_cli` module or backend not found.
"""
- try:
- mgr = self.__mgr
- except AttributeError:
- mgr = self
+ mgr = self.__get_mgr()
+
try:
o = mgr._select_orchestrator()
except AttributeError:
:param completions: List of Completions
:raises NoOrchestrator:
+ :raises RuntimeError: something went wrong while calling the process method.
:raises ImportError: no `orchestrator_cli` module or backend not found.
"""
for c in completions:
self._update_completion_progress(c)
- while not self.wait(completions):
+ while any(not c.is_complete for c in completions):
+ self.wait(completions)
+ self.__get_mgr().log.info("Operations pending: %s",
+ sum(1 for c in completions if not c.is_complete))
if any(c.should_wait for c in completions):
time.sleep(1)
else:
from .rook_cluster import RookCluster
-all_completions = []
+class RookCompletionMixin(object):
+ # hacky global
+ all_completions = []
+ def __init__(self, message):
+ super(RookCompletionMixin, self).__init__()
+ self.message = message
+
+ # Result of k8s API call, this is set if executed==True
+ self._result = None
+
+ all_completions.append(self)
-class RookReadCompletion(orchestrator.ReadCompletion):
+
+ @property
+ def result(self):
+ return self._result
+
+ def __str__(self):
+ return self.message
+
+
+class RookReadCompletion(RookCompletionMixin, orchestrator.ReadCompletion):
"""
All reads are simply API calls: avoid spawning
huge numbers of threads by just running them
"""
def __init__(self, cb):
- super(RookReadCompletion, self).__init__()
+ super(RookReadCompletion, self).__init__("<read op>")
self.cb = cb
- self._result = None
self._complete = False
- self.message = "<read op>"
-
- # XXX hacky global
- global all_completions
- all_completions.append(self)
-
- @property
- def result(self):
- return self._result
-
@property
- def is_complete(self):
+ def has_result(self):
return self._complete
def execute(self):
- self._result = self.cb()
- self._complete = True
+ try:
+ self._result = self.cb()
+ except Exception as e:
+ self.exception = e
+ finally:
+ self._complete = True
-class RookWriteCompletion(orchestrator.WriteCompletion):
+class RookWriteCompletion(RookCompletionMixin, orchestrator.WriteCompletion):
"""
Writes are a two-phase thing, firstly sending
the write to the k8s API (fast) and then waiting
# a completion= param that uses threads. Maybe just
# use that?
def __init__(self, execute_cb, complete_cb, message):
- super(RookWriteCompletion, self).__init__()
+ super(RookWriteCompletion, self).__init__(message)
self.execute_cb = execute_cb
self.complete_cb = complete_cb
# not have succeeded
self.executed = False
- # Result of k8s API call, this is set if executed==True
- self._result = None
-
+ # Effective means, Rook finished applying the changes
self.effective = False
self.id = str(uuid.uuid4())
- self.message = message
-
- self.exception = None
-
- # XXX hacky global
- global all_completions
- all_completions.append(self)
-
- def __str__(self):
- return self.message
-
- @property
- def result(self):
- return self._result
-
@property
- def is_persistent(self):
- return (not self.is_errored) and self.executed
@property
def is_effective(self):
return self.effective
def execute(self):
- if not self.executed:
- self._result = self.execute_cb()
- self.executed = True
-
- if not self.effective:
- # TODO: check self.result for API errors
- if self.complete_cb is None:
- self.effective = True
- else:
- self.effective = self.complete_cb()
+ try:
+ if not self.executed:
+ self._result = self.execute_cb()
+ self.executed = True
+
+ if not self.effective:
+ # TODO: check self.result for API errors
+ if self.complete_cb is None:
+ self.effective = True
+ else:
+ self.effective = self.complete_cb()
+ except Exception as e:
+ self.exception = e
def deferred_read(f):
# TODO: configure k8s API addr instead of assuming local
]
- def wait(self, completions):
+ def process(self, completions):
if completions:
self.log.info("wait: completions={0}".format(completions))
- incomplete = False
-
- # Our `wait` implementation is very simple because everything's
- # just an API call.
+ # Synchronously call the K8s API
for c in completions:
if not isinstance(c, RookReadCompletion) and \
not isinstance(c, RookWriteCompletion):
if c.is_complete:
continue
- try:
- c.execute()
- except Exception as e:
- if not isinstance(e, orchestrator.OrchestratorError):
- self.log.exception("Completion {0} threw an exception:".format(
- c.message
- ))
+ c.execute()
+ if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError):
+ self.log.exception("Completion {0} threw an exception:".format(
+ c.message
+ ))
c.exception = e
c._complete = True
- if not c.is_complete:
- incomplete = True
- return not incomplete
@staticmethod
def can_run():
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
- global all_completions
- self.wait(all_completions)
- all_completions = [c for c in all_completions if not c.is_complete]
+ self.wait(RookCompletionMixin.all_completions)
+ RookCompletionMixin.all_completions = [c for c in RookCompletionMixin.all_completions if
+ not c.is_finished]
self._shutdown.wait(5)
- # TODO: watch Rook for config changes to complain/update if
- # things look a bit out of sync?
-
@deferred_read
def get_inventory(self, node_filter=None, refresh=False):
node_list = None
"""
return self.can_run()
- def wait(self, completions):
- self.log.info("wait: completions={}".format(completions))
-
- complete = True
- for c in completions:
- if c.is_complete:
- continue
-
- if not isinstance(c, SSHReadCompletion) and \
- not isinstance(c, SSHWriteCompletion):
- raise TypeError("unexpected completion: {}".format(c.__class__))
-
- complete = False
-
- return complete
+ def process(self, completions):
+ """
+ Does nothing, as completions are processed in another thread.
+ """
def _require_hosts(self, hosts):
"""
return self._complete
def execute(self):
- self._result = self.cb()
- self.executed = True
- self._complete = True
+ try:
+ self._result = self.cb()
+ self.executed = True
+ except Exception as e:
+ self.exception = e
+ finally:
+ self._complete = True
def __str__(self):
return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
The implementation is similar to the Rook orchestrator, but simpler.
"""
- def wait(self, completions):
- self.log.info("wait: completions={0}".format(completions))
-
- # Our `wait` implementation is very simple because everything's
- # just an API call.
+ def process(self, completions):
for c in completions:
if not isinstance(c, TestReadCompletion) and \
not isinstance(c, TestWriteCompletion):
c.__class__
))
- if c.is_complete:
- continue
-
- try:
+ if not c.has_result:
c.execute()
- except Exception as e:
- self.log.exception("Completion {0} threw an exception:".format(
- c.message
- ))
- c.exception = e
- c._complete = True
-
- return all(c.is_complete for c in completions)
@CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
def _load_data(self, inbuf):