]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/orch: replace Completion with OrchResult(Generic[T])
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 8 Feb 2021 00:01:08 +0000 (01:01 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Mon, 1 Mar 2021 12:38:10 +0000 (13:38 +0100)
Greatly simplify the orchestrator interface

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/orchestrator/__init__.py
src/pybind/mgr/orchestrator/_interface.py

index 374d68d32562b2096ba723e87fcbb387f599c31b..3842a84a3c384a7e33a6f0cc0f4ed4155318d376 100644 (file)
@@ -4,7 +4,7 @@ from .module import OrchestratorCli
 
 # usage: E.g. `from orchestrator import StatelessServiceSpec`
 from ._interface import \
-    Completion, TrivialReadCompletion, raise_if_exception, ProgressReference, pretty_print, _Promise, \
+    OrchResult, raise_if_exception, \
     CLICommand, _cli_write_command, _cli_read_command, CLICommandMeta, \
     Orchestrator, OrchestratorClientMixin, \
     OrchestratorValidationError, OrchestratorError, NoOrchestrator, \
index 798f70f83c35ae7907adc0c331528f1592102e78..353b8e77651189591d71f269bb2a6310a3e69535 100644 (file)
@@ -12,12 +12,10 @@ import errno
 import logging
 import pickle
 import re
-import time
-import uuid
 
 from collections import namedtuple, OrderedDict
 from contextlib import contextmanager
-from functools import wraps
+from functools import wraps, reduce
 
 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
     Sequence, Dict, cast
@@ -115,6 +113,22 @@ def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
     return cast(FuncT, wrapper_copy)
 
 
+def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
+    """
+    Decorator to make Orchestrator methods return
+    an OrchResult.
+    """
+
+    @wraps(f)
+    def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
+        try:
+            return OrchResult(f(*args, **kwargs))
+        except Exception as e:
+            return OrchResult(None, exception=e)
+
+    return cast(Callable[..., OrchResult[T]], wrapper)
+
+
 class InnerCliCommandCallable(Protocol):
     def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
         ...
@@ -156,57 +170,30 @@ class CLICommandMeta(type):
         cls.handle_command = handle_command
 
 
-def _no_result() -> None:
-    return object()  # type: ignore
-
-
-class _Promise(object):
+class OrchResult(Generic[T]):
+    """
+    Stores a result and an exception. Mainly to circumvent the
+    MgrModule.remote() method that hides all exceptions and for
+    handling different sub-interpreters.
     """
-    A completion may need multiple promises to be fulfilled. `_Promise` is one
-    step.
 
-    Typically ``Orchestrator`` implementations inherit from this class to
-    build their own way of finishing a step to fulfil a future.
+    def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
+        self.result = result
+        self.serialized_exception: Optional[bytes] = None
+        self.exception_str: str = ''
+        self.set_exception(exception)
 
-    They are not exposed in the orchestrator interface and can be seen as a
-    helper to build orchestrator modules.
-    """
-    INITIALIZED = 1  # We have a parent completion and a next completion
-    RUNNING = 2
-    FINISHED = 3  # we have a final result
+    __slots__ = 'result', 'serialized_exception', 'exception_str'
 
-    NO_RESULT: None = _no_result()  # type: ignore
-    ASYNC_RESULT = object()
+    def set_exception(self, e: Optional[Exception]) -> None:
+        if e is None:
+            self.serialized_exception = None
+            self.exception_str = ''
+            return
 
-    def __init__(self,
-                 _first_promise: Optional["_Promise"] = None,
-                 value: Optional[Any] = NO_RESULT,
-                 on_complete: Optional[Callable] = None,
-                 name: Optional[str] = None,
-                 ):
-        self._on_complete_ = on_complete
-        self._name = name
-        self._next_promise: Optional[_Promise] = None
-
-        self._state = self.INITIALIZED
-        self._exception: Optional[Exception] = None
-
-        # Value of this _Promise. may be an intermediate result.
-        self._value = value
-
-        # _Promise is not a continuation monad, as `_result` is of type
-        # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
-        self._first_promise: '_Promise' = _first_promise or self
-
-    @property
-    def _exception(self) -> Optional[Exception]:
-        return getattr(self, '_exception_', None)
-
-    @_exception.setter
-    def _exception(self, e: Exception) -> None:
-        self._exception_ = e
+        self.exception_str = f'{type(e)}: {str(e)}'
         try:
-            self._serialized_exception_ = pickle.dumps(e) if e is not None else None
+            self.serialized_exception = pickle.dumps(e)
         except pickle.PicklingError:
             logger.error(f"failed to pickle {e}")
             if isinstance(e, Exception):
@@ -214,365 +201,7 @@ class _Promise(object):
             else:
                 e = Exception(str(e))
             # degenerate to a plain Exception
-            self._serialized_exception_ = pickle.dumps(e)
-
-    @property
-    def _serialized_exception(self) -> Optional[bytes]:
-        return getattr(self, '_serialized_exception_', None)
-
-    @property
-    def _on_complete(self) -> Optional[Callable]:
-        # https://github.com/python/mypy/issues/4125
-        return self._on_complete_
-
-    @_on_complete.setter
-    def _on_complete(self, val: Optional[Callable]) -> None:
-        self._on_complete_ = val
-
-    def __repr__(self) -> str:
-        name = self._name or getattr(self._on_complete, '__name__',
-                                     '??') if self._on_complete else 'None'
-        val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
-        return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
-            self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
-                next, '_progress_reference', 'NA'), repr(self._next_promise)
-        )
-
-    def pretty_print_1(self) -> str:
-        if self._name:
-            name = self._name
-        elif self._on_complete is None:
-            name = 'lambda x: x'
-        elif hasattr(self._on_complete, '__name__'):
-            name = getattr(self._on_complete, '__name__')
-        else:
-            name = self._on_complete.__class__.__name__
-        val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
-        prefix = {
-            self.INITIALIZED: '      ',
-            self.RUNNING:     '   >>>',  # noqa: E241
-            self.FINISHED:    '(done)',  # noqa: E241
-        }[self._state]
-        return '{} {}({}),'.format(prefix, name, val)
-
-    def then(self: Any, on_complete: Callable) -> Any:
-        """
-        Call ``on_complete`` as soon as this promise is finalized.
-        """
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        if self._next_promise is not None:
-            return self._next_promise.then(on_complete)
-
-        if self._on_complete is not None:
-            self._set_next_promise(self.__class__(
-                _first_promise=self._first_promise,
-                on_complete=on_complete
-            ))
-            return self._next_promise
-
-        else:
-            self._on_complete = on_complete
-            self._set_next_promise(self.__class__(_first_promise=self._first_promise))
-            return self._next_promise
-
-    def _set_next_promise(self, next: '_Promise') -> None:
-        assert self is not next
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        self._next_promise = next
-        assert self._next_promise is not None
-        for p in iter(self._next_promise):
-            p._first_promise = self._first_promise
-
-    def _finalize(self, value: Optional[T] = NO_RESULT) -> None:
-        """
-        Sets this promise to complete.
-
-        Orchestrators may choose to use this helper function.
-
-        :param value: new value.
-        """
-        if self._state not in (self.INITIALIZED, self.RUNNING):
-            raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
-
-        self._state = self.RUNNING
-
-        if value is not self.NO_RESULT:
-            self._value = value
-        assert self._value is not self.NO_RESULT, repr(self)
-
-        if self._on_complete:
-            try:
-                next_result = self._on_complete(self._value)
-            except Exception as e:
-                self.fail(e)
-                return
-        else:
-            next_result = self._value
-
-        if isinstance(next_result, _Promise):
-            # hack: _Promise is not a continuation monad.
-            next_result = next_result._first_promise  # type: ignore
-            assert next_result not in self, repr(self._first_promise) + repr(next_result)
-            assert self not in next_result
-            next_result._append_promise(self._next_promise)
-            self._set_next_promise(next_result)
-            assert self._next_promise
-            if self._next_promise._value is self.NO_RESULT:
-                self._next_promise._value = self._value
-            self.propagate_to_next()
-        elif next_result is not self.ASYNC_RESULT:
-            # simple map. simply forward
-            if self._next_promise:
-                self._next_promise._value = next_result
-            else:
-                # Hack: next_result is of type U, _value is of type T
-                self._value = next_result  # type: ignore
-            self.propagate_to_next()
-        else:
-            # asynchronous promise
-            pass
-
-    def propagate_to_next(self) -> None:
-        self._state = self.FINISHED
-        logger.debug('finalized {}'.format(repr(self)))
-        if self._next_promise:
-            self._next_promise._finalize()
-
-    def fail(self, e: Exception) -> None:
-        """
-        Sets the whole completion to be faild with this exception and end the
-        evaluation.
-        """
-        if self._state == self.FINISHED:
-            raise ValueError(
-                'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-        logger.exception('_Promise failed')
-        self._exception = e
-        self._value = f'_exception: {e}'
-        if self._next_promise:
-            self._next_promise.fail(e)
-        self._state = self.FINISHED
-
-    def __contains__(self, item: '_Promise') -> bool:
-        return any(item is p for p in iter(self._first_promise))
-
-    def __iter__(self) -> Iterator['_Promise']:
-        yield self
-        elem = self._next_promise
-        while elem is not None:
-            yield elem
-            elem = elem._next_promise
-
-    def _append_promise(self, other: Optional['_Promise']) -> None:
-        if other is not None:
-            assert self not in other
-            assert other not in self
-            self._last_promise()._set_next_promise(other)
-
-    def _last_promise(self) -> '_Promise':
-        return list(iter(self))[-1]
-
-
-class ProgressReference(object):
-    def __init__(self,
-                 message: str,
-                 mgr: Any,
-                 completion: Optional[Callable[[], 'Completion']] = None
-                 ):
-        """
-        ProgressReference can be used within Completions::
-
-            +---------------+      +---------------------------------+
-            |               | then |                                 |
-            | My Completion | +--> | on_complete=ProgressReference() |
-            |               |      |                                 |
-            +---------------+      +---------------------------------+
-
-        See :func:`Completion.with_progress` for an easy way to create
-        a progress reference
-
-        """
-        super(ProgressReference, self).__init__()
-        self.progress_id = str(uuid.uuid4())
-        self.message = message
-        self.mgr = mgr
-
-        #: The completion can already have a result, before the write
-        #: operation is effective. progress == 1 means, the services are
-        #: created / removed.
-        self.completion: Optional[Callable[[], Completion]] = completion
-
-        #: if a orchestrator module can provide a more detailed
-        #: progress information, it needs to also call ``progress.update()``.
-        self.progress = 0.0
-
-        self._completion_has_result = False
-        self.mgr.all_progress_references.append(self)
-
-    def __str__(self) -> str:
-        """
-        ``__str__()`` is used for determining the message for progress events.
-        """
-        return self.message or super(ProgressReference, self).__str__()
-
-    def __call__(self, arg: T) -> T:
-        self._completion_has_result = True
-        self.progress = 1.0
-        return arg
-
-    @property
-    def progress(self) -> float:
-        return self._progress
-
-    @progress.setter
-    def progress(self, progress: float) -> None:
-        assert progress <= 1.0
-        self._progress = progress
-        try:
-            if self.effective:
-                self.mgr.remote("progress", "complete", self.progress_id)
-                self.mgr.all_progress_references = [
-                    p for p in self.mgr.all_progress_references if p is not self]
-            else:
-                self.mgr.remote("progress", "update", self.progress_id, self.message,
-                                progress,
-                                [("origin", "orchestrator")])
-        except ImportError:
-            # If the progress module is disabled that's fine,
-            # they just won't see the output.
-            pass
-
-    @property
-    def effective(self) -> bool:
-        return self.progress == 1 and self._completion_has_result
-
-    def update(self) -> None:
-        def progress_run(progress: float) -> None:
-            self.progress = progress
-        if self.completion:
-            c = self.completion().then(progress_run)
-            self.mgr.process([c._first_promise])
-        else:
-            self.progress = 1
-
-    def fail(self) -> None:
-        self._completion_has_result = True
-        self.progress = 1
-
-
-class Completion(_Promise, Generic[T]):
-    """
-    Combines multiple promises into one overall operation.
-
-    Completions are composable by being able to
-    call one completion from another completion. I.e. making them re-usable
-    using Promises E.g.::
-
-        >>> #doctest: +SKIP
-        ... return Orchestrator().get_hosts().then(self._create_osd)
-
-    where ``get_hosts`` returns a Completion of list of hosts and
-    ``_create_osd`` takes a list of hosts.
-
-    The concept behind this is to store the computation steps
-    explicit and then explicitly evaluate the chain:
-
-        >>> #doctest: +SKIP
-        ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
-        ... p.finalize(2)
-        ... assert p.result = "4"
-
-    or graphically::
-
-        +---------------+      +-----------------+
-        |               | then |                 |
-        | lambda x: x*x | +--> | lambda x: str(x)|
-        |               |      |                 |
-        +---------------+      +-----------------+
-
-    """
-
-    def __init__(self,
-                 _first_promise: Optional["Completion"] = None,
-                 value: Any = _Promise.NO_RESULT,
-                 on_complete: Optional[Callable] = None,
-                 name: Optional[str] = None,
-                 ) -> None:
-        super(Completion, self).__init__(_first_promise, value, on_complete, name)
-
-    @property
-    def _progress_reference(self) -> Optional[ProgressReference]:
-        if hasattr(self._on_complete, 'progress_id'):
-            return self._on_complete  # type: ignore
-        return None
-
-    @property
-    def progress_reference(self) -> Optional[ProgressReference]:
-        """
-        ProgressReference. Marks this completion
-        as a write completeion.
-        """
-
-        references = [c._progress_reference for c in iter(  # type: ignore
-            self) if c._progress_reference is not None]  # type: ignore
-        if references:
-            assert len(references) == 1
-            return references[0]
-        return None
-
-    @classmethod
-    def with_progress(cls: Any,
-                      message: str,
-                      mgr: Any,
-                      _first_promise: Optional["Completion"] = None,
-                      value: Any = _Promise.NO_RESULT,
-                      on_complete: Optional[Callable] = None,
-                      calc_percent: Optional[Callable[[], Any]] = None
-                      ) -> Any:
-
-        c = cls(
-            _first_promise=_first_promise,
-            value=value,
-            on_complete=on_complete
-        ).add_progress(message, mgr, calc_percent)
-
-        return c._first_promise
-
-    def add_progress(self,
-                     message: str,
-                     mgr: Any,
-                     calc_percent: Optional[Callable[[], Any]] = None
-                     ) -> Any:
-        return self.then(
-            on_complete=ProgressReference(
-                message=message,
-                mgr=mgr,
-                completion=calc_percent
-            )
-        )
-
-    def fail(self, e: Exception) -> None:
-        super(Completion, self).fail(e)
-        if self._progress_reference:
-            self._progress_reference.fail()
-
-    def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT) -> None:
-        if self._first_promise._state == self.INITIALIZED:
-            self._first_promise._finalize(result)
-
-    @property
-    def result(self) -> T:
-        """
-        The result of the operation that we were waited
-        for.  Only valid after calling Orchestrator.process() on this
-        completion.
-        """
-        last = self._last_promise()
-        assert last._state == _Promise.FINISHED
-        return cast(T, last._value)
+            self.serialized_exception = pickle.dumps(e)
 
     def result_str(self) -> str:
         """Force a string."""
@@ -582,88 +211,19 @@ class Completion(_Promise, Generic[T]):
             return '\n'.join(str(x) for x in self.result)
         return str(self.result)
 
-    @property
-    def exception(self) -> Optional[Exception]:
-        return self._last_promise()._exception
-
-    @property
-    def serialized_exception(self) -> Optional[bytes]:
-        return self._last_promise()._serialized_exception
-
-    @property
-    def has_result(self) -> bool:
-        """
-        Has the operation already a result?
-
-        For Write operations, it can already have a
-        result, if the orchestrator's configuration is
-        persistently written. Typically this would
-        indicate that an update had been written to
-        a manifest, but that the update had not
-        necessarily been pushed out to the cluster.
-
-        :return:
-        """
-        return self._last_promise()._state == _Promise.FINISHED
-
-    @property
-    def is_errored(self) -> bool:
-        """
-        Has the completion failed. Default implementation looks for
-        self.exception. Can be overwritten.
-        """
-        return self.exception is not None
-
-    @property
-    def needs_result(self) -> bool:
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return not self.is_errored and not self.has_result
 
-    @property
-    def is_finished(self) -> bool:
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return self.is_errored or (self.has_result)
-
-    def pretty_print(self) -> str:
-
-        reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
-        return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-
-
-def pretty_print(completions: Sequence[Completion]) -> str:
-    return ', '.join(c.pretty_print() for c in completions)
-
-
-def raise_if_exception(c: Completion) -> None:
+def raise_if_exception(c: OrchResult[T]) -> T:
     """
-    :raises OrchestratorError: Some user error or a config error.
-    :raises Exception: Some internal error
+    Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
     """
     if c.serialized_exception is not None:
         try:
             e = pickle.loads(c.serialized_exception)
         except (KeyError, AttributeError):
-            raise Exception('{}: {}'.format(type(c.exception), c.exception))
+            raise Exception(c.exception_str)
         raise e
-
-
-class TrivialReadCompletion(Completion[T]):
-    """
-    This is the trivial completion simply wrapping a result.
-    """
-
-    def __init__(self, result: T):
-        super(TrivialReadCompletion, self).__init__()
-        if result:
-            self.finalize(result)
+    assert c.result is not None, 'OrchResult should either have an exception or a result'
+    return c.result
 
 
 def _hide_in_features(f: FuncT) -> FuncT:
@@ -732,21 +292,6 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    @_hide_in_features
-    def process(self, completions: List[Completion]) -> None:
-        """
-        Given a list of Completion instances, process any which are
-        incomplete.
-
-        Callers should inspect the detail of each completion to identify
-        partial completion/progress information, and present that information
-        to the user.
-
-        This method should not block, as this would make it slow to query
-        a status, while other long running operations are in progress.
-        """
-        raise NotImplementedError()
-
     @_hide_in_features
     def get_feature_set(self) -> Dict[str, dict]:
         """Describes which methods this orchestrator implements
@@ -790,7 +335,7 @@ class Orchestrator(object):
     def resume(self) -> None:
         raise NotImplementedError()
 
-    def add_host(self, host_spec: HostSpec) -> Completion[str]:
+    def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
         """
         Add a host to the orchestrator inventory.
 
@@ -798,7 +343,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_host(self, host: str) -> Completion[str]:
+    def remove_host(self, host: str) -> OrchResult[str]:
         """
         Remove a host from the orchestrator inventory.
 
@@ -806,7 +351,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def update_host_addr(self, host: str, addr: str) -> Completion[str]:
+    def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
         """
         Update a host's address
 
@@ -815,7 +360,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def get_hosts(self) -> Completion[List[HostSpec]]:
+    def get_hosts(self) -> OrchResult[List[HostSpec]]:
         """
         Report the hosts in the cluster.
 
@@ -823,19 +368,19 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def add_host_label(self, host: str, label: str) -> Completion[str]:
+    def add_host_label(self, host: str, label: str) -> OrchResult[str]:
         """
         Add a host label
         """
         raise NotImplementedError()
 
-    def remove_host_label(self, host: str, label: str) -> Completion[str]:
+    def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
         """
         Remove a host label
         """
         raise NotImplementedError()
 
-    def host_ok_to_stop(self, hostname: str) -> Completion:
+    def host_ok_to_stop(self, hostname: str) -> OrchResult:
         """
         Check if the specified host can be safely stopped without reducing availability
 
@@ -843,19 +388,19 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def enter_host_maintenance(self, hostname: str, force: bool = False) -> Completion:
+    def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
         """
         Place a host in maintenance, stopping daemons and disabling it's systemd target
         """
         raise NotImplementedError()
 
-    def exit_host_maintenance(self, hostname: str) -> Completion:
+    def exit_host_maintenance(self, hostname: str) -> OrchResult:
         """
         Return a host from maintenance, restarting the clusters systemd target
         """
         raise NotImplementedError()
 
-    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
+    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
         """
         Returns something that was created by `ceph-volume inventory`.
 
@@ -863,7 +408,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
+    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
@@ -877,7 +422,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
+    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
         """
         Describe a daemon (of any kind) that is already configured in
         the orchestrator.
@@ -886,11 +431,12 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply(self, specs: Sequence["GenericSpec"]) -> Completion[List[str]]:
+    @handle_orch_error
+    def apply(self, specs: Sequence["GenericSpec"]) -> List[str]:
         """
         Applies any spec
         """
-        fns: Dict[str, Callable] = {
+        fns: Dict[str, Callable[..., OrchResult[str]]] = {
             'alertmanager': self.apply_alertmanager,
             'crash': self.apply_crash,
             'grafana': self.apply_grafana,
@@ -900,7 +446,7 @@ class Orchestrator(object):
             'mon': self.apply_mon,
             'nfs': self.apply_nfs,
             'node-exporter': self.apply_node_exporter,
-            'osd': lambda dg: self.apply_drivegroups([dg]),
+            'osd': lambda dg: self.apply_drivegroups([dg]),  # type: ignore
             'prometheus': self.apply_prometheus,
             'rbd-mirror': self.apply_rbd_mirror,
             'rgw': self.apply_rgw,
@@ -909,29 +455,20 @@ class Orchestrator(object):
             'cephadm-exporter': self.apply_cephadm_exporter,
         }
 
-        def merge(ls: Union[List[T], T], r: Union[List[T], T]) -> List[T]:
-            if isinstance(ls, list):
-                return ls + [r]  # type: ignore
-            return [ls, r]  # type: ignore
-
-        spec, *specs = specs
-
-        fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-        completion = fn(spec)
-        for s in specs:
-            def next(ls: list) -> Any:
-                fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-                return fn(s).then(lambda r: merge(ls, r))
-            completion = completion.then(next)
-        return completion
+        def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]:  # noqa: E741
+            l_res = raise_if_exception(l)
+            r_res = raise_if_exception(r)
+            l_res.append(r_res)
+            return OrchResult(l_res)
+        return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
 
-    def plan(self, spec: Sequence["GenericSpec"]) -> Completion[List]:
+    def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
         """
         Plan (Dry-run, Preview) a List of Specs.
         """
         raise NotImplementedError()
 
-    def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
+    def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
         """
         Remove specific daemon(s).
 
@@ -939,7 +476,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_service(self, service_name: str) -> Completion[str]:
+    def remove_service(self, service_name: str) -> OrchResult[str]:
         """
         Remove a service (a collection of daemons).
 
@@ -947,7 +484,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
+    def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
         """
         Perform an action (start/stop/reload) on a service (i.e., all daemons
         providing the logical service).
@@ -955,24 +492,24 @@ class Orchestrator(object):
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param service_name: service_type + '.' + service_id
                             (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
-        :rtype: Completion
+        :rtype: OrchResult
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]:
+    def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
         """
         Perform an action (start/stop/reload) on a daemon.
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param daemon_name: name of daemon
         :param image: Container image when redeploying that daemon
-        :rtype: Completion
+        :rtype: OrchResult
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
+    def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
         """
         Create one or more OSDs within a single Drive Group.
 
@@ -983,7 +520,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
+    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
         """ Update OSD cluster """
         raise NotImplementedError()
 
@@ -997,13 +534,13 @@ class Orchestrator(object):
     def preview_osdspecs(self,
                          osdspec_name: Optional[str] = 'osd',
                          osdspecs: Optional[List[DriveGroupSpec]] = None
-                         ) -> Completion[str]:
+                         ) -> OrchResult[str]:
         """ Get a preview for OSD deployments """
         raise NotImplementedError()
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> Completion[str]:
+                    force: bool = False) -> OrchResult[str]:
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
@@ -1013,19 +550,19 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+    def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
         """
         TODO
         """
         raise NotImplementedError()
 
-    def remove_osds_status(self) -> Completion:
+    def remove_osds_status(self) -> OrchResult:
         """
         Returns a status of the ongoing OSD removal operations.
         """
         raise NotImplementedError()
 
-    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
+    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
         """
         Instructs the orchestrator to enable or disable either the ident or the fault LED.
 
@@ -1035,134 +572,134 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def zap_device(self, host: str, path: str) -> Completion[str]:
+    def zap_device(self, host: str, path: str) -> OrchResult[str]:
         """Zap/Erase a device (DESTROYS DATA)"""
         raise NotImplementedError()
 
-    def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_mon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create mon daemon(s)"""
         raise NotImplementedError()
 
-    def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mon cluster"""
         raise NotImplementedError()
 
-    def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_mgr(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create mgr daemon(s)"""
         raise NotImplementedError()
 
-    def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mgr cluster"""
         raise NotImplementedError()
 
-    def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_mds(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create MDS daemon(s)"""
         raise NotImplementedError()
 
-    def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update MDS cluster"""
         raise NotImplementedError()
 
-    def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
+    def add_rgw(self, spec: RGWSpec) -> OrchResult[List[str]]:
         """Create RGW daemon(s)"""
         raise NotImplementedError()
 
-    def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
+    def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
         """Update RGW cluster"""
         raise NotImplementedError()
 
-    def apply_ha_rgw(self, spec: HA_RGWSpec) -> Completion[str]:
+    def apply_ha_rgw(self, spec: HA_RGWSpec) -> OrchResult[str]:
         """Update ha-rgw daemons"""
         raise NotImplementedError()
 
-    def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create rbd-mirror daemon(s)"""
         raise NotImplementedError()
 
-    def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update rbd-mirror cluster"""
         raise NotImplementedError()
 
-    def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
+    def add_nfs(self, spec: NFSServiceSpec) -> OrchResult[List[str]]:
         """Create NFS daemon(s)"""
         raise NotImplementedError()
 
-    def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
+    def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
         """Update NFS cluster"""
         raise NotImplementedError()
 
-    def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
+    def add_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[List[str]]:
         """Create iscsi daemon(s)"""
         raise NotImplementedError()
 
-    def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
+    def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
         """Update iscsi cluster"""
         raise NotImplementedError()
 
-    def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_prometheus(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create new prometheus daemon"""
         raise NotImplementedError()
 
-    def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update prometheus cluster"""
         raise NotImplementedError()
 
-    def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_node_exporter(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create a new Node-Exporter service"""
         raise NotImplementedError()
 
-    def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update existing a Node-Exporter daemon(s)"""
         raise NotImplementedError()
 
-    def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_crash(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create a new crash service"""
         raise NotImplementedError()
 
-    def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update existing a crash daemon(s)"""
         raise NotImplementedError()
 
-    def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_grafana(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create a new grafana service"""
         raise NotImplementedError()
 
-    def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update existing a grafana service"""
         raise NotImplementedError()
 
-    def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_alertmanager(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create a new AlertManager service"""
         raise NotImplementedError()
 
-    def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update an existing AlertManager daemon(s)"""
         raise NotImplementedError()
 
-    def add_cephadm_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
+    def add_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[List[str]]:
         """Create a new cephadm exporter daemon"""
         raise NotImplementedError()
 
-    def apply_cephadm_exporter(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update an existing cephadm exporter daemon"""
         raise NotImplementedError()
 
-    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+    def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_pause(self) -> Completion[str]:
+    def upgrade_pause(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_resume(self) -> Completion[str]:
+    def upgrade_resume(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_stop(self) -> Completion[str]:
+    def upgrade_stop(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
+    def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
         """
         If an upgrade is currently underway, report on where
         we are in the process, or if some error has occurred.
@@ -1172,7 +709,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     @_hide_in_features
-    def upgrade_available(self) -> Completion:
+    def upgrade_available(self) -> OrchResult:
         """
         Report on what versions are available to upgrade to
 
@@ -1778,7 +1315,6 @@ class OrchestratorClientMixin(Orchestrator):
     >>> class MyModule(OrchestratorClientMixin):
     ...    def func(self):
     ...        completion = self.add_host('somehost')  # calls `_oremote()`
-    ...        self._orchestrator_wait([completion])
     ...        self.log.debug(completion.result)
 
     .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
@@ -1835,24 +1371,3 @@ class OrchestratorClientMixin(Orchestrator):
             if meth not in f_set or not f_set[meth]['available']:
                 raise NotImplementedError(f'{o} does not implement {meth}') from e
             raise
-
-    def _orchestrator_wait(self, completions: List[Completion]) -> None:
-        """
-        Wait for completions to complete (reads) or
-        become persistent (writes).
-
-        Waits for writes to be *persistent* but not *effective*.
-
-        :param completions: List of Completions
-        :raises NoOrchestrator:
-        :raises RuntimeError: something went wrong while calling the process method.
-        :raises ImportError: no `orchestrator` module or backend not found.
-        """
-        while any(not c.has_result for c in completions):
-            self.process(completions)
-            self.__get_mgr().log.info("Operations pending: %s",
-                                      sum(1 for c in completions if not c.has_result))
-            if any(c.needs_result for c in completions):
-                time.sleep(1)
-            else:
-                break