From 7a432a02259a5ea211d1095e86efdeea06b19b12 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 26 Aug 2019 14:00:32 +0200 Subject: [PATCH] mgr/orchestrator: Introduce composable completions Also: * unify Read and Write completions * Distinguish should_wait and is_finished * Removed `all_hosts` parameter to `osd_create` Signed-off-by: Sebastian Wagner --- src/pybind/mgr/ansible/module.py | 4 +- src/pybind/mgr/deepsea/module.py | 2 +- src/pybind/mgr/orchestrator.py | 571 ++++++++++++++------- src/pybind/mgr/orchestrator_cli/module.py | 14 +- src/pybind/mgr/rook/module.py | 173 ++----- src/pybind/mgr/ssh/module.py | 6 +- src/pybind/mgr/test_orchestrator/module.py | 10 +- src/pybind/mgr/tests/test_orchestrator.py | 125 ++++- 8 files changed, 588 insertions(+), 317 deletions(-) diff --git a/src/pybind/mgr/ansible/module.py b/src/pybind/mgr/ansible/module.py index 344d0be5f4a28..0498625a596a9 100644 --- a/src/pybind/mgr/ansible/module.py +++ b/src/pybind/mgr/ansible/module.py @@ -91,7 +91,7 @@ class AnsibleReadOperation(orchestrator.ReadCompletion): return "Playbook {playbook_name}".format(playbook_name=self.playbook) @property - def is_complete(self): + def has_result(self): return self._is_complete @property @@ -306,7 +306,7 @@ class AnsibleChangeOperation(orchestrator.WriteCompletion): raise NotImplementedError() @property - def is_persistent(self): + def has_result(self): """ Has the operation updated the orchestrator's configuration persistently? Typically this would indicate that an update diff --git a/src/pybind/mgr/deepsea/module.py b/src/pybind/mgr/deepsea/module.py index b2e1b2e283b80..10aef36775f97 100644 --- a/src/pybind/mgr/deepsea/module.py +++ b/src/pybind/mgr/deepsea/module.py @@ -40,7 +40,7 @@ class DeepSeaReadCompletion(orchestrator.ReadCompletion): return self._result @property - def is_complete(self): + def has_result(self): return self._complete diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index d2df4bc78bd74..1ec3959c11cb0 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -4,6 +4,8 @@ ceph-mgr orchestrator interface Please see the ceph-mgr module developer's guide for more information. """ +import functools +import logging import sys import time from collections import namedtuple @@ -24,12 +26,19 @@ from mgr_util import format_bytes try: from ceph.deployment.drive_group import DriveGroupSpec - from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator + from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, Type - T = TypeVar('T') - G = Generic[T] except ImportError: - T, G = object, object + pass + #T, G = object, object + +T = TypeVar('T') +U = TypeVar('U') +V = TypeVar('V') +G = Generic[T] +Promises = TypeVar('Promises', bound='_Promise') +Completions = TypeVar('Completions', bound='Completion') + def parse_host_specs(host, require_network=True): @@ -136,27 +145,334 @@ def handle_exception(prefix, cmd_args, desc, perm, func): return CLICommand(prefix, cmd_args, desc, perm)(wrapper) + def _cli_command(perm): def inner_cli_command(prefix, cmd_args="", desc=""): return lambda func: handle_exception(prefix, cmd_args, desc, perm, func) return inner_cli_command + _cli_read_command = _cli_command('r') _cli_write_command = _cli_command('rw') -class _Completion(G): +def _no_result(): + return object() + + +class _Promise(Generic[T]): + """ + A completion may need multiple promises to be fulfilled. `_Promise` is one + step. + + Typically ``Orchestrator`` implementations inherit from this class to + build their own way of finishing a step to fulfil a future. + + They are not exposed in the orchestrator interface and can be seen as a + helper to build orchestrator modules. + """ + INITIALIZED = 1 # We have a parent completion and a next completion + FINISHED = 2 # we have a final result + + NO_RESULT = _no_result() # type: None + + def __init__(self, + _first_promise=None, # type: Optional["_Promise[V]"] + value=NO_RESULT, # type: Optional[T] + on_complete=None # type: Optional[Callable[[T], Union[U, _Promise[U]]]] + ): + self._on_complete = on_complete + self._next_promise = None # type: Optional[_Promise[U]] + + self._state = self.INITIALIZED + self._exception = None # type: Optional[Exception] + + # Value of this _Promise. may be an intermediate result. + self._value = value + + # _Promise is not a continuation monad, as `_result` is of type + # T instead of (T -> r) -> r. Therefore we need to store the first promise here. + self._first_promise = _first_promise or self # type: 'Completion' + + def __repr__(self): + name = getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None' + val = repr(self._value) if self._value is not self.NO_RESULT else 'NA' + return '{}(_s={}, val={}, id={}, name={}, pr={}, _next={})'.format( + self.__class__, self._state, val, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise) + ) + + def then(self, on_complete): + # type: (Promises, Callable[[T], Union[U, _Promise[U]]]) -> Promises[U] + """ + Call ``on_complete`` as soon as this promise is finalized. + """ + assert self._state is self.INITIALIZED + if self._on_complete is not None: + assert self._next_promise is None + self._set_next_promise(self.__class__( + _first_promise=self._first_promise, + on_complete=on_complete + )) + return self._next_promise + + else: + self._on_complete = on_complete + self._set_next_promise(self.__class__(_first_promise=self._first_promise)) + return self._next_promise + + def _set_next_promise(self, next): + # type: (_Promise[U]) -> None + assert self is not next + assert self._state is self.INITIALIZED + + self._next_promise = next + assert self._next_promise is not None + for p in iter(self._next_promise): + p._first_promise = self._first_promise + + def finalize(self, value=NO_RESULT): + # type: (Optional[T]) -> None + """ + Sets this promise to complete. + + Orchestrators may choose to use this helper function. + + :param value: new value. + """ + assert self._state is self.INITIALIZED + + if value is not self.NO_RESULT: + self._value = value + assert self._value is not self.NO_RESULT + + if self._on_complete: + try: + next_result = self._on_complete(self._value) + except Exception as e: + self.fail(e) + return + else: + next_result = self._value + + if isinstance(next_result, _Promise): + # hack: _Promise is not a continuation monad. + next_result = next_result._first_promise # type: ignore + assert next_result not in self, repr(self._first_promise) + repr(next_result) + assert self not in next_result + next_result._append_promise(self._next_promise) + self._set_next_promise(next_result) + if self._next_promise._value is self.NO_RESULT: + self._next_promise._value = self._value + else: + # simple map. simply forward + if self._next_promise: + self._next_promise._value = next_result + else: + # Hack: next_result is of type U, _value is of type T + self._value = next_result # type: ignore + self._state = self.FINISHED + logger.debug('finalized {}'.format(repr(self))) + self.propagate_to_next() + + def propagate_to_next(self): + assert self._state is self.FINISHED + if self._next_promise: + self._next_promise.finalize() + + def fail(self, e): + # type: (Exception) -> None + """ + Sets the whole completion to be faild with this exception and end the + evaluation. + """ + assert self._state is self.INITIALIZED + logger.exception('_Promise failed') + self._exception = e + self._value = 'exception' + if self._next_promise: + self._next_promise.fail(e) + self._state = self.FINISHED + + def __contains__(self, item): + return any(item is p for p in iter(self._first_promise)) + + def __iter__(self): + yield self + elem = self._next_promise + while elem is not None: + yield elem + elem = elem._next_promise + + def _append_promise(self, other): + if other is not None: + assert self not in other + assert other not in self + self._last_promise()._set_next_promise(other) + + def _last_promise(self): + # type: () -> _Promise + return list(iter(self))[-1] + + +class ProgressReference(object): + def __init__(self, + message, # type: str + mgr, + completion=None # type: Optional[Callable[[], Completion[float]]] + ): + """ + ProgressReference can be used within Completions: + + +---------------+ +---------------------------------+ + | | then | | + | My Completion | +--> | on_complete=ProgressReference() | + | | | | + +---------------+ +---------------------------------+ + + """ + super(ProgressReference, self).__init__() + self.progress_id = str(uuid.uuid4()) + self.message = message + self.mgr = mgr + + #: The completion can already have a result, before the write + #: operation is effective. progress == 1 means, the services are + #: created / removed. + self.completion = completion # type: Optional[Callable[[], Completion[float]]] + + #: if a orchestrator module can provide a more detailed + #: progress information, it needs to also call ``progress.update()``. + self.progress = 0.0 + + self._completion_has_result = False + self.mgr.all_progress_references.append(self) + + def __str__(self): + """ + ``__str__()`` is used for determining the message for progress events. + """ + return self.message or super(ProgressReference, self).__str__() + + def __call__(self, arg): + self._completion_has_result = True + if self.progress == 0.0: + self.progress = 0.5 + return arg + + @property + def progress(self): + return self._progress + + @progress.setter + def progress(self, progress): + self._progress = progress + try: + if self.effective: + self.mgr.remote("progress", "complete", self.progress_id) + self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self] + else: + self.mgr.remote("progress", "update", self.progress_id, self.message, + progress, + [("origin", "orchestrator")]) + except ImportError: + # If the progress module is disabled that's fine, + # they just won't see the output. + pass + + @property + def effective(self): + return self.progress == 1 and self._completion_has_result + + def update(self): + def run(progress): + self.progress = progress + if self.completion: + c = self.completion().then(run) + self.mgr.process([c._first_promise]) + else: + self.progress = 1 + + def fail(self): + self._completion_has_result = True + self.progress = 1 + +class Completion(_Promise[T]): + """ + Combines multiple promises into one overall operation. + + :ivar exception: Holds an exception object, if the completion errored. + + """ + def __init__(self, + _first_promise=None, # type: Optional["Completion[V]"] + value=_Promise.NO_RESULT, # type: Optional[T] + on_complete=None # type: Optional[Callable[[T], Union[U, Completion[U]]]] + ): + super(Completion, self).__init__(_first_promise, value, on_complete) + + @property + def _progress_reference(self): + # type: () -> Optional[ProgressReference] + if hasattr(self._on_complete, 'progress_id'): + return self._on_complete + return None + + @property + def progress_reference(self): + # type: () -> Optional[ProgressReference] + """ + ProgressReference. Marks this completion + as a write completeion. + """ + + references = [c._progress_reference for c in iter(self) if c._progress_reference is not None] + if references: + assert len(references) == 1 + return references[0] + return None + + @classmethod + def with_progress(cls, # type: Completions[T] + message, # type: str + mgr, + _first_promise=None, # type: Optional["Completions[V]"] + value=_Promise.NO_RESULT, # type: Optional[T] + on_complete=None, # type: Optional[Callable[[T], Union[U, Completions[U]]]] + calc_percent=None # type: Optional[Callable[[], Completions[float]]] + ): + # type: (...) -> Completions[T] + + c = cls( + _first_promise=_first_promise, + value=value, + on_complete=on_complete + ).then( + on_complete=ProgressReference( + message=message, + mgr=mgr, + completion=calc_percent + ) + ) + return c._first_promise + + def fail(self, e): + super(Completion, self).fail(e) + if self._progress_reference: + self._progress_reference.fail() + @property def result(self): - # type: () -> T """ - Return the result of the operation that we were waited - for. Only valid after calling Orchestrator.wait() on this + The result of the operation that we were waited + for. Only valid after calling Orchestrator.process() on this completion. """ - raise NotImplementedError() + last = self._last_promise() + assert last._state == _Promise.FINISHED + return last._value def result_str(self): + """Force a string.""" if self.result is None: return '' return str(self.result) @@ -164,27 +480,24 @@ class _Completion(G): @property def exception(self): # type: () -> Optional[Exception] - """ - Holds an exception object. - """ - try: - return self.__exception - except AttributeError: - return None - - @exception.setter - def exception(self, value): - self.__exception = value + return self._last_promise()._exception @property - def is_read(self): + def has_result(self): # type: () -> bool - raise NotImplementedError() + """ + Has the operation already a result? - @property - def is_complete(self): - # type: () -> bool - raise NotImplementedError() + For Write operations, it can already have a + result, if the orchestrator's configuration is + persistently written. Typically this would + indicate that an update had been written to + a manifest, but that the update had not + necessarily been pushed out to the cluster. + + :return: + """ + return self._last_promise()._state == _Promise.FINISHED @property def is_errored(self): @@ -196,13 +509,28 @@ class _Completion(G): return self.exception is not None @property - def should_wait(self): + def needs_result(self): # type: () -> bool - raise NotImplementedError() + """ + Could the external operation be deemed as complete, + or should we wait? + We must wait for a read operation only if it is not complete. + """ + return not self.is_errored and not self.has_result + + @property + def is_finished(self): + # type: () -> bool + """ + Could the external operation be deemed as complete, + or should we wait? + We must wait for a read operation only if it is not complete. + """ + return self.is_errored or (self.has_result) def raise_if_exception(c): - # type: (_Completion) -> None + # type: (Completion) -> None """ :raises OrchestratorError: Some user error or a config error. :raises Exception: Some internal error @@ -233,30 +561,7 @@ def raise_if_exception(c): raise e -class ReadCompletion(_Completion): - """ - ``Orchestrator`` implementations should inherit from this - class to implement their own handles to operations in progress, and - return an instance of their subclass from calls into methods. - """ - - def __init__(self): - pass - - @property - def is_read(self): - return True - - @property - def should_wait(self): - """Could the external operation be deemed as complete, - or should we wait? - We must wait for a read operation only if it is not complete. - """ - return not self.is_complete - - -class TrivialReadCompletion(ReadCompletion): +class TrivialReadCompletion(Completion[T]): """ This is the trivial completion simply wrapping a result. """ @@ -264,72 +569,6 @@ class TrivialReadCompletion(ReadCompletion): super(TrivialReadCompletion, self).__init__() self._result = result - @property - def result(self): - return self._result - - @property - def is_complete(self): - return True - - -class WriteCompletion(_Completion): - """ - ``Orchestrator`` implementations should inherit from this - class to implement their own handles to operations in progress, and - return an instance of their subclass from calls into methods. - """ - - def __init__(self): - self.progress_id = str(uuid.uuid4()) - - #: if a orchestrator module can provide a more detailed - #: progress information, it needs to also call ``progress.update()``. - self.progress = 0.5 - - def __str__(self): - """ - ``__str__()`` is used for determining the message for progress events. - """ - return super(WriteCompletion, self).__str__() - - @property - def is_persistent(self): - # type: () -> bool - """ - Has the operation updated the orchestrator's configuration - persistently? Typically this would indicate that an update - had been written to a manifest, but that the update - had not necessarily been pushed out to the cluster. - """ - raise NotImplementedError() - - @property - def is_effective(self): - """ - Has the operation taken effect on the cluster? For example, - if we were adding a service, has it come up and appeared - in Ceph's cluster maps? - """ - raise NotImplementedError() - - @property - def is_complete(self): - return self.is_errored or (self.is_persistent and self.is_effective) - - @property - def is_read(self): - return False - - @property - def should_wait(self): - """Could the external operation be deemed as complete, - or should we wait? - We must wait for a write operation only if we know - it is not persistent yet. - """ - return not self.is_persistent - def _hide_in_features(f): f._hide_in_features = True @@ -397,7 +636,7 @@ class Orchestrator(object): @_hide_in_features def process(self, completions): - # type: (List[_Completion]) -> None + # type: (List[Completion]) -> None """ Given a list of Completion instances, process any which are incomplete. @@ -441,7 +680,7 @@ class Orchestrator(object): return features def add_host(self, host): - # type: (str) -> WriteCompletion + # type: (str) -> Completion """ Add a host to the orchestrator inventory. @@ -450,7 +689,7 @@ class Orchestrator(object): raise NotImplementedError() def remove_host(self, host): - # type: (str) -> WriteCompletion + # type: (str) -> Completion """ Remove a host from the orchestrator inventory. @@ -459,7 +698,7 @@ class Orchestrator(object): raise NotImplementedError() def get_hosts(self): - # type: () -> ReadCompletion[List[InventoryNode]] + # type: () -> Completion[List[InventoryNode]] """ Report the hosts in the cluster. @@ -484,7 +723,7 @@ class Orchestrator(object): return NotImplementedError() def get_inventory(self, node_filter=None, refresh=False): - # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]] + # type: (InventoryFilter, bool) -> Completion[List[InventoryNode]] """ Returns something that was created by `ceph-volume inventory`. @@ -493,7 +732,7 @@ class Orchestrator(object): raise NotImplementedError() def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False): - # type: (Optional[str], Optional[str], Optional[str], bool) -> ReadCompletion[List[ServiceDescription]] + # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion[List[ServiceDescription]] """ Describe a service (of any kind) that is already configured in the orchestrator. For example, when viewing an OSD in the dashboard @@ -508,7 +747,7 @@ class Orchestrator(object): raise NotImplementedError() def service_action(self, action, service_type, service_name=None, service_id=None): - # type: (str, str, str, str) -> WriteCompletion + # type: (str, str, str, str) -> Completion """ Perform an action (start/stop/reload) on a service. @@ -523,15 +762,15 @@ class Orchestrator(object): :param service_type: e.g. "mds", "rgw", ... :param service_name: name of logical service ("cephfs", "us-east", ...) :param service_id: service daemon instance (usually a short hostname) - :rtype: WriteCompletion + :rtype: Completion """ - assert action in ["start", "stop", "reload", "restart", "redeploy"] - assert service_name or service_id - assert not (service_name and service_id) + #assert action in ["start", "stop", "reload, "restart", "redeploy"] + #assert service_name or service_id + #assert not (service_name and service_id) raise NotImplementedError() - def create_osds(self, drive_group, all_hosts): - # type: (DriveGroupSpec, List[str]) -> WriteCompletion + def create_osds(self, drive_group): + # type: (DriveGroupSpec) -> Completion """ Create one or more OSDs within a single Drive Group. @@ -548,8 +787,8 @@ class Orchestrator(object): """ raise NotImplementedError() - def remove_osds(self, osd_ids, destroy=False): - # type: (List[str], bool) -> WriteCompletion + def remove_osds(self, osd_ids): + # type: (List[str]) -> Completion """ :param osd_ids: list of OSD IDs :param destroy: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` @@ -571,7 +810,7 @@ class Orchestrator(object): raise NotImplementedError() def update_mgrs(self, num, hosts): - # type: (int, List[str]) -> WriteCompletion + # type: (int, List[str]) -> Completion """ Update the number of cluster managers. @@ -581,7 +820,7 @@ class Orchestrator(object): raise NotImplementedError() def update_mons(self, num, hosts): - # type: (int, List[Tuple[str,str]]) -> WriteCompletion + # type: (int, List[Tuple[str,str]]) -> Completion """ Update the number of cluster monitors. @@ -591,17 +830,17 @@ class Orchestrator(object): raise NotImplementedError() def add_mds(self, spec): - # type: (StatelessServiceSpec) -> WriteCompletion + # type: (StatelessServiceSpec) -> Completion """Create a new MDS cluster""" raise NotImplementedError() def remove_mds(self, name): - # type: (str) -> WriteCompletion + # type: (str) -> Completion """Remove an MDS cluster""" raise NotImplementedError() def update_mds(self, spec): - # type: (StatelessServiceSpec) -> WriteCompletion + # type: (StatelessServiceSpec) -> Completion """ Update / redeploy existing MDS cluster Like for example changing the number of service instances. @@ -627,17 +866,17 @@ class Orchestrator(object): raise NotImplementedError() def add_nfs(self, spec): - # type: (NFSServiceSpec) -> WriteCompletion + # type: (NFSServiceSpec) -> Completion """Create a new MDS cluster""" raise NotImplementedError() def remove_nfs(self, name): - # type: (str) -> WriteCompletion + # type: (str) -> Completion """Remove a NFS cluster""" raise NotImplementedError() def update_nfs(self, spec): - # type: (NFSServiceSpec) -> WriteCompletion + # type: (NFSServiceSpec) -> Completion """ Update / redeploy existing NFS cluster Like for example changing the number of service instances. @@ -645,17 +884,17 @@ class Orchestrator(object): raise NotImplementedError() def add_rgw(self, spec): - # type: (RGWSpec) -> WriteCompletion + # type: (RGWSpec) -> Completion """Create a new MDS zone""" raise NotImplementedError() def remove_rgw(self, zone): - # type: (str) -> WriteCompletion + # type: (str) -> Completion """Remove a RGW zone""" raise NotImplementedError() def update_rgw(self, spec): - # type: (StatelessServiceSpec) -> WriteCompletion + # type: (StatelessServiceSpec) -> Completion """ Update / redeploy existing RGW zone Like for example changing the number of service instances. @@ -664,12 +903,12 @@ class Orchestrator(object): @_hide_in_features def upgrade_start(self, upgrade_spec): - # type: (UpgradeSpec) -> WriteCompletion + # type: (UpgradeSpec) -> Completion raise NotImplementedError() @_hide_in_features def upgrade_status(self): - # type: () -> ReadCompletion[UpgradeStatusSpec] + # type: () -> Completion[UpgradeStatusSpec] """ If an upgrade is currently underway, report on where we are in the process, or if some error has occurred. @@ -680,7 +919,7 @@ class Orchestrator(object): @_hide_in_features def upgrade_available(self): - # type: () -> ReadCompletion[List[str]] + # type: () -> Completion[List[str]] """ Report on what versions are available to upgrade to @@ -1017,6 +1256,11 @@ class InventoryNode(object): def __repr__(self): return "({name})".format(name=self.name) + @staticmethod + def get_host_names(nodes): + # type: (List[InventoryNode]) -> List[str] + return [node.name for node in nodes] + class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev'])): """ @@ -1037,7 +1281,6 @@ def _mk_orch_methods(cls): def shim(method_name): def inner(self, *args, **kwargs): completion = self._oremote(method_name, args, kwargs) - self._update_completion_progress(completion, 0) return completion return inner @@ -1099,25 +1342,9 @@ class OrchestratorClientMixin(Orchestrator): mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs)) return mgr.remote(o, meth, *args, **kwargs) - def _update_completion_progress(self, completion, force_progress=None): - # type: (WriteCompletion, Optional[float]) -> None - try: - progress = force_progress if force_progress is not None else completion.progress - if completion.is_complete: - self.remote("progress", "complete", completion.progress_id) - else: - self.remote("progress", "update", completion.progress_id, str(completion), progress, - [("origin", "orchestrator")]) - except AttributeError: - # No WriteCompletion. Ignore. - pass - except ImportError: - # If the progress module is disabled that's fine, - # they just won't see the output. - pass def _orchestrator_wait(self, completions): - # type: (List[_Completion]) -> None + # type: (List[Completion]) -> None """ Wait for completions to complete (reads) or become persistent (writes). @@ -1129,18 +1356,14 @@ class OrchestratorClientMixin(Orchestrator): :raises RuntimeError: something went wrong while calling the process method. :raises ImportError: no `orchestrator_cli` module or backend not found. """ - for c in completions: - self._update_completion_progress(c) - while any(not c.is_complete for c in completions): - self.wait(completions) + while any(not c.has_result for c in completions): + self.process(completions) self.__get_mgr().log.info("Operations pending: %s", - sum(1 for c in completions if not c.is_complete)) - if any(c.should_wait for c in completions): + sum(1 for c in completions if not c.has_result)) + if any(c.needs_result for c in completions): time.sleep(1) else: break - for c in completions: - self._update_completion_progress(c) class OutdatableData(object): diff --git a/src/pybind/mgr/orchestrator_cli/module.py b/src/pybind/mgr/orchestrator_cli/module.py index 49b62f78356af..3765b4201f79e 100644 --- a/src/pybind/mgr/orchestrator_cli/module.py +++ b/src/pybind/mgr/orchestrator_cli/module.py @@ -339,19 +339,7 @@ Usage: else: return HandleCommandResult(-errno.EINVAL, stderr=usage) - # TODO: Remove this and make the orchestrator composable - # Like a future or so. - host_completion = self.get_hosts() - self._orchestrator_wait([host_completion]) - orchestrator.raise_if_exception(host_completion) - all_hosts = [h.name for h in host_completion.result] - - try: - drive_group.validate(all_hosts) - except DriveGroupValidationError as e: - return HandleCommandResult(-errno.EINVAL, stderr=str(e)) - - completion = self.create_osds(drive_group, all_hosts) + completion = self.create_osds(drive_group) self._orchestrator_wait([completion]) orchestrator.raise_if_exception(completion) self.log.warning(str(completion.result)) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index d95f1a63f84cd..41c4c442d01ec 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -6,7 +6,7 @@ import uuid from ceph.deployment import inventory try: - from typing import List, Dict + from typing import List, Dict, Optional, Callable from ceph.deployment.drive_group import DriveGroupSpec except ImportError: pass # just for type checking @@ -34,108 +34,34 @@ import orchestrator from .rook_cluster import RookCluster -class RookCompletionMixin(object): - # hacky global - all_completions = [] - - def __init__(self, message): - super(RookCompletionMixin, self).__init__() - self.message = message - - # Result of k8s API call, this is set if executed==True - self._result = None - - all_completions.append(self) - - - @property - def result(self): - return self._result - - def __str__(self): - return self.message - - -class RookReadCompletion(RookCompletionMixin, orchestrator.ReadCompletion): - """ - All reads are simply API calls: avoid spawning - huge numbers of threads by just running them - inline when someone calls wait() - """ - - def __init__(self, cb): - super(RookReadCompletion, self).__init__("") - self.cb = cb - self._complete = False - - @property - def has_result(self): - return self._complete - - def execute(self): - try: - self._result = self.cb() - except Exception as e: - self.exception = e - finally: - self._complete = True - - -class RookWriteCompletion(RookCompletionMixin, orchestrator.WriteCompletion): +class RookWriteCompletion(orchestrator.Completion): """ Writes are a two-phase thing, firstly sending the write to the k8s API (fast) and then waiting for the corresponding change to appear in the Ceph cluster (slow) """ - # XXX kubernetes bindings call_api already usefully has - # a completion= param that uses threads. Maybe just - # use that? - def __init__(self, execute_cb, complete_cb, message): - super(RookWriteCompletion, self).__init__(message) - self.execute_cb = execute_cb - self.complete_cb = complete_cb - - # Executed means I executed my k8s API call, it may or may - # not have succeeded - self.executed = False - - # Effective means, Rook finished applying the changes - self.effective = False - - self.id = str(uuid.uuid4()) - - @property - - @property - def is_effective(self): - return self.effective - - def execute(self): - try: - if not self.executed: - self._result = self.execute_cb() - self.executed = True - - if not self.effective: - # TODO: check self.result for API errors - if self.complete_cb is None: - self.effective = True - else: - self.effective = self.complete_cb() - except Exception as e: - self.exception = e + def __init__(self, message): + self.progress_reference = orchestrator.ProgressReference( + message=message + ) + super(RookWriteCompletion, self).__init__() def deferred_read(f): + # type: (Callable) -> Callable[..., orchestrator.Completion] """ Decorator to make RookOrchestrator methods return a completion object that executes themselves. """ @functools.wraps(f) - def wrapper(*args, **kwargs): - return RookReadCompletion(lambda: f(*args, **kwargs)) + def wrapper(self, *args, **kwargs): + c = orchestrator.Completion() + c.then( + lambda: f(*args, **kwargs) + ) + return c return wrapper @@ -165,27 +91,31 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ] def process(self, completions): - if completions: - self.log.info("wait: completions={0}".format(completions)) + pass + + def process_promises(self, promises): + # type: (List[RookPromise]) -> None + + + if promises: + self.log.info("wait: promises={0}".format(promises)) # Synchronously call the K8s API - for c in completions: - if not isinstance(c, RookReadCompletion) and \ - not isinstance(c, RookWriteCompletion): + for p in promises: + if not isinstance(p, RookPromise): raise TypeError( "wait() requires list of completions, not {0}".format( - c.__class__ + p.__class__ )) - if c.is_complete: + if not p.needs_result: continue - c.execute() + new_promise = p.execute() if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError): self.log.exception("Completion {0} threw an exception:".format( c.message )) - c.exception = e c._complete = True @@ -221,6 +151,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._shutdown = threading.Event() + self.all_promises = list() # type: List[RookPromise] + def shutdown(self): self._shutdown.set() @@ -276,9 +208,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): # in case we had a caller that wait()'ed on them long enough # to get persistence but not long enough to get completion - self.wait(RookCompletionMixin.all_completions) - RookCompletionMixin.all_completions = [c for c in RookCompletionMixin.all_completions if - not c.is_finished] + self.process(self.all_promises) + self.all_promises = [p for p in self.all_promises if not p.completion.is_finished] self._shutdown.wait(5) @@ -418,30 +349,39 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): lambda: self.rook_cluster.update_nfs_count(spec.name, num), None, "Updating NFS server count in {0} to {1}".format(spec.name, num)) - def create_osds(self, drive_group, all_hosts): + def create_osds(self, drive_group, _): # type: (DriveGroupSpec, List[str]) -> RookWriteCompletion - assert len(drive_group.hosts(all_hosts)) == 1 targets = [] if drive_group.data_devices: targets += drive_group.data_devices.paths if drive_group.data_directories: targets += drive_group.data_directories - if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(drive_group.hosts(all_hosts))) + p = orchestrator.ProgressReference( + "Creating OSD on {0}:{1}".format(drive_group.hosts(drive_group.host_pattern), + targets)) - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") + def execute(all_hosts): + p.effective_when( + lambda hosts: has_osds + ) - def execute(): + assert len(drive_group.hosts(all_hosts)) == 1 + + if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]): + raise RuntimeError("Node '{0}' is not in the Kubernetes " + "cluster".format(drive_group.hosts(all_hosts))) + + # Validate whether cluster CRD can accept individual OSD + # creations (i.e. not useAllDevices) + if not self.rook_cluster.can_create_osd(): + raise RuntimeError("Rook cluster configuration does not " + "support OSD creation.") return self.rook_cluster.add_osds(drive_group, all_hosts) - def is_complete(): + @deferred_read + def has_osds(all_hosts): # Find OSD pods on this host pod_osd_ids = set() pods = self._k8s.list_namespaced_pod(self._rook_env.namespace, @@ -471,7 +411,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return found is not None - return RookWriteCompletion(execute, is_complete, - "Creating OSD on {0}:{1}".format( - drive_group.hosts(all_hosts)[0], targets - )) + + c = self.get_hosts().then(execute) + c.progress_reference = p + return c + diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py index 5a5bfb08bf51d..dbd33e7d6333b 100644 --- a/src/pybind/mgr/ssh/module.py +++ b/src/pybind/mgr/ssh/module.py @@ -79,14 +79,14 @@ class SSHCompletionmMixin(object): class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion): @property - def is_complete(self): + def has_result(self): return all(map(lambda r: r.ready(), self._result)) class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion): @property - def is_persistent(self): + def has_result(self): return all(map(lambda r: r.ready(), self._result)) @property @@ -113,7 +113,7 @@ class SSHWriteCompletionReady(SSHWriteCompletion): return self._result @property - def is_persistent(self): + def has_result(self): return True @property diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py index c9d8ed7d756fa..ce41ce89ab7fa 100644 --- a/src/pybind/mgr/test_orchestrator/module.py +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -19,7 +19,7 @@ import orchestrator class TestCompletionMixin(object): - all_completions = [] # Hacky global + all_completions = [] # type: orchestrator.Completion def __init__(self, cb, message, *args, **kwargs): super(TestCompletionMixin, self).__init__(*args, **kwargs) @@ -37,7 +37,7 @@ class TestCompletionMixin(object): return self._result @property - def is_complete(self): + def has_result(self): return self._complete def execute(self): @@ -64,7 +64,7 @@ class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion): super(TestWriteCompletion, self).__init__(cb, message) @property - def is_persistent(self): + def has_result(self): return (not self.is_errored) and self.executed @property @@ -114,7 +114,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): c.__class__ )) - if not c.has_result: + if c.needs_result: c.execute() @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w') @@ -153,7 +153,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): self.wait(TestCompletionMixin.all_completions) TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if - not c.is_complete] + c.is_finished] self._shutdown.wait(5) diff --git a/src/pybind/mgr/tests/test_orchestrator.py b/src/pybind/mgr/tests/test_orchestrator.py index ad1493c5ddc29..ff4d71ca0dc00 100644 --- a/src/pybind/mgr/tests/test_orchestrator.py +++ b/src/pybind/mgr/tests/test_orchestrator.py @@ -1,10 +1,11 @@ from __future__ import absolute_import import json +from unittest.mock import MagicMock import pytest from ceph.deployment import inventory -from orchestrator import ReadCompletion, raise_if_exception, RGWSpec +from orchestrator import raise_if_exception, RGWSpec, Completion, ProgressReference from orchestrator import InventoryNode, ServiceDescription from orchestrator import OrchestratorValidationError from orchestrator import parse_host_specs @@ -87,8 +88,8 @@ def test_service_description(): def test_raise(): - c = ReadCompletion() - c.exception = ZeroDivisionError() + c = Completion() + c._exception = ZeroDivisionError() with pytest.raises(ZeroDivisionError): raise_if_exception(c) @@ -107,3 +108,121 @@ def test_rgwspec(): example = json.loads(test_rgwspec.__doc__.strip()) spec = RGWSpec.from_json(example) assert spec.validate_add() is None + + +def test_promise(): + p = Completion(value=3) + p.finalize() + assert p.result == 3 + + +def test_promise_then(): + p = Completion(value=3).then(lambda three: three + 1) + p._first_promise.finalize() + assert p.result == 4 + + +def test_promise_mondatic_then(): + p = Completion(value=3) + p.then(lambda three: Completion(value=three + 1)) + p._first_promise.finalize() + assert p.result == 4 + + +def some_complex_completion(): + c = Completion(value=3).then( + lambda three: Completion(value=three + 1).then( + lambda four: four + 1)) + return c._first_promise + +def test_promise_mondatic_then_combined(): + p = some_complex_completion() + p._first_promise.finalize() + assert p.result == 5 + + +def test_promise_flat(): + p = Completion() + p.then(lambda r1: Completion(value=r1 + ' there').then( + lambda r11: r11 + '!')) + p.finalize('hello') + assert p.result == 'hello there!' + + +def test_side_effect(): + foo = {'x': 1} + + def run(x): + foo['x'] = x + + foo['x'] = 1 + Completion(value=3).then(run)._first_promise.finalize() + assert foo['x'] == 3 + + +def test_progress(): + c = some_complex_completion() + mgr = MagicMock() + mgr.process = lambda cs: [c.finalize(None) for c in cs] + + progress_val = 0.75 + c._last_promise().then( + on_complete=ProgressReference(message='hello world', + mgr=mgr, + completion=lambda: Completion( + on_complete=lambda _: progress_val)) + ) + mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.0, ['orchestrator']) + + c.finalize() + mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.5, ['orchestrator']) + + c.progress_reference.update() + mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', progress_val, ['orchestrator']) + assert not c.progress_reference.effective + + progress_val = 1 + c.progress_reference.update() + assert c.progress_reference.effective + mgr.remote.assert_called_with('progress', 'complete', c.progress_reference.progress_id) + + +def test_with_progress(): + mgr = MagicMock() + mgr.process = lambda cs: [c.finalize(None) for c in cs] + + def execute(y): + return str(y) + + def run(x): + def two(_): + return execute(x * 2) + + return Completion.with_progress( + message='message', + on_complete=two, + mgr=mgr + + ) + c = Completion(on_complete=lambda x: x * 10).then(run)._first_promise + c.finalize(2) + assert c.result == '40' + c.progress_reference.update() + assert c.progress_reference.effective + + +def test_exception(): + + def run(x): + raise KeyError(x) + + c = Completion(value=3).then(run)._first_promise + c.finalize() + + assert isinstance(c.exception, KeyError) + + +def test_fail(): + c = Completion().then(lambda _: 3) + c._first_promise.fail(KeyError()) + assert isinstance(c.exception, KeyError) -- 2.39.5