]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/orchestrator: Introduce composable completions
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 26 Aug 2019 12:00:32 +0000 (14:00 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 27 Nov 2019 12:35:24 +0000 (13:35 +0100)
Also:

* unify Read and Write completions
* Distinguish should_wait and is_finished
* Removed `all_hosts` parameter to `osd_create`

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/ansible/module.py
src/pybind/mgr/deepsea/module.py
src/pybind/mgr/orchestrator.py
src/pybind/mgr/orchestrator_cli/module.py
src/pybind/mgr/rook/module.py
src/pybind/mgr/ssh/module.py
src/pybind/mgr/test_orchestrator/module.py
src/pybind/mgr/tests/test_orchestrator.py

index 344d0be5f4a28267fee69f7cfe5fa01ee237efa0..0498625a596a94fdeac613e738972f690f31869b 100644 (file)
@@ -91,7 +91,7 @@ class AnsibleReadOperation(orchestrator.ReadCompletion):
         return "Playbook {playbook_name}".format(playbook_name=self.playbook)
 
     @property
-    def is_complete(self):
+    def has_result(self):
         return self._is_complete
 
     @property
@@ -306,7 +306,7 @@ class AnsibleChangeOperation(orchestrator.WriteCompletion):
         raise NotImplementedError()
 
     @property
-    def is_persistent(self):
+    def has_result(self):
         """
         Has the operation updated the orchestrator's configuration
         persistently?  Typically this would indicate that an update
index b2e1b2e283b80abe4bfa6b6c06be0c3124519e4b..10aef36775f970d78d2febad7adf5c7b1bd4ed7c 100644 (file)
@@ -40,7 +40,7 @@ class DeepSeaReadCompletion(orchestrator.ReadCompletion):
         return self._result
 
     @property
-    def is_complete(self):
+    def has_result(self):
         return self._complete
 
 
index d2df4bc78bd74b061819b6836533d9508a8fb4d3..1ec3959c11cb0551cca0b70f969aa3092c745c82 100644 (file)
@@ -4,6 +4,8 @@ ceph-mgr orchestrator interface
 
 Please see the ceph-mgr module developer's guide for more information.
 """
+import functools
+import logging
 import sys
 import time
 from collections import namedtuple
@@ -24,12 +26,19 @@ from mgr_util import format_bytes
 
 try:
     from ceph.deployment.drive_group import DriveGroupSpec
-    from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator
+    from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, Type
 
-    T = TypeVar('T')
-    G = Generic[T]
 except ImportError:
-    T, G = object, object
+    pass
+    #T, G = object, object
+
+T = TypeVar('T')
+U = TypeVar('U')
+V = TypeVar('V')
+G = Generic[T]
+Promises = TypeVar('Promises', bound='_Promise')
+Completions = TypeVar('Completions', bound='Completion')
+
 
 
 def parse_host_specs(host, require_network=True):
@@ -136,27 +145,334 @@ def handle_exception(prefix, cmd_args, desc, perm, func):
 
     return CLICommand(prefix, cmd_args, desc, perm)(wrapper)
 
+
 def _cli_command(perm):
     def inner_cli_command(prefix, cmd_args="", desc=""):
         return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
     return inner_cli_command
 
+
 _cli_read_command = _cli_command('r')
 _cli_write_command = _cli_command('rw')
 
 
-class _Completion(G):
+def _no_result():
+    return object()
+
+
+class _Promise(Generic[T]):
+    """
+    A completion may need multiple promises to be fulfilled. `_Promise` is one
+    step.
+
+    Typically ``Orchestrator`` implementations inherit from this class to
+    build their own way of finishing a step to fulfil a future.
+
+    They are not exposed in the orchestrator interface and can be seen as a
+    helper to build orchestrator modules.
+    """
+    INITIALIZED = 1  # We have a parent completion and a next completion
+    FINISHED = 2  # we have a final result
+
+    NO_RESULT = _no_result()  # type: None
+
+    def __init__(self,
+                 _first_promise=None,  # type: Optional["_Promise[V]"]
+                 value=NO_RESULT,  # type: Optional[T]
+                 on_complete=None    # type: Optional[Callable[[T], Union[U, _Promise[U]]]]
+                 ):
+        self._on_complete = on_complete
+        self._next_promise = None  # type: Optional[_Promise[U]]
+
+        self._state = self.INITIALIZED
+        self._exception = None  # type: Optional[Exception]
+
+        # Value of this _Promise. may be an intermediate result.
+        self._value = value
+
+        # _Promise is not a continuation monad, as `_result` is of type
+        # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
+        self._first_promise = _first_promise or self  # type: 'Completion'
+
+    def __repr__(self):
+        name = getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
+        val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
+        return '{}(_s={}, val={}, id={}, name={}, pr={}, _next={})'.format(
+            self.__class__, self._state, val, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
+        )
+
+    def then(self, on_complete):
+        # type: (Promises, Callable[[T], Union[U, _Promise[U]]]) -> Promises[U]
+        """
+        Call ``on_complete`` as soon as this promise is finalized.
+        """
+        assert self._state is self.INITIALIZED
+        if self._on_complete is not None:
+            assert self._next_promise is None
+            self._set_next_promise(self.__class__(
+                _first_promise=self._first_promise,
+                on_complete=on_complete
+            ))
+            return self._next_promise
+
+        else:
+            self._on_complete = on_complete
+            self._set_next_promise(self.__class__(_first_promise=self._first_promise))
+            return self._next_promise
+
+    def _set_next_promise(self, next):
+        # type: (_Promise[U]) -> None
+        assert self is not next
+        assert self._state is self.INITIALIZED
+
+        self._next_promise = next
+        assert self._next_promise is not None
+        for p in iter(self._next_promise):
+            p._first_promise = self._first_promise
+
+    def finalize(self, value=NO_RESULT):
+        # type: (Optional[T]) -> None
+        """
+        Sets this promise to complete.
+
+        Orchestrators may choose to use this helper function.
+
+        :param value: new value.
+        """
+        assert self._state is self.INITIALIZED
+
+        if value is not self.NO_RESULT:
+            self._value = value
+        assert self._value is not self.NO_RESULT
+
+        if self._on_complete:
+            try:
+                next_result = self._on_complete(self._value)
+            except Exception as e:
+                self.fail(e)
+                return
+        else:
+            next_result = self._value
+
+        if isinstance(next_result, _Promise):
+            # hack: _Promise is not a continuation monad.
+            next_result = next_result._first_promise  # type: ignore
+            assert next_result not in self, repr(self._first_promise) + repr(next_result)
+            assert self not in next_result
+            next_result._append_promise(self._next_promise)
+            self._set_next_promise(next_result)
+            if self._next_promise._value is self.NO_RESULT:
+                self._next_promise._value = self._value
+        else:
+            # simple map. simply forward
+            if self._next_promise:
+                self._next_promise._value = next_result
+            else:
+                # Hack: next_result is of type U, _value is of type T
+                self._value = next_result  # type: ignore
+        self._state = self.FINISHED
+        logger.debug('finalized {}'.format(repr(self)))
+        self.propagate_to_next()
+
+    def propagate_to_next(self):
+        assert self._state is self.FINISHED
+        if self._next_promise:
+            self._next_promise.finalize()
+
+    def fail(self, e):
+        # type: (Exception) -> None
+        """
+        Sets the whole completion to be faild with this exception and end the
+        evaluation.
+        """
+        assert self._state is self.INITIALIZED
+        logger.exception('_Promise failed')
+        self._exception = e
+        self._value = 'exception'
+        if self._next_promise:
+            self._next_promise.fail(e)
+        self._state = self.FINISHED
+
+    def __contains__(self, item):
+        return any(item is p for p in iter(self._first_promise))
+
+    def __iter__(self):
+        yield self
+        elem = self._next_promise
+        while elem is not None:
+            yield elem
+            elem = elem._next_promise
+
+    def _append_promise(self, other):
+        if other is not None:
+            assert self not in other
+            assert other not in self
+            self._last_promise()._set_next_promise(other)
+
+    def _last_promise(self):
+        # type: () -> _Promise
+        return list(iter(self))[-1]
+
+
+class ProgressReference(object):
+    def __init__(self,
+                 message,  # type: str
+                 mgr,
+                 completion=None  # type: Optional[Callable[[], Completion[float]]]
+                ):
+        """
+        ProgressReference can be used within Completions:
+
+        +---------------+      +---------------------------------+
+        |               | then |                                 |
+        | My Completion | +--> | on_complete=ProgressReference() |
+        |               |      |                                 |
+        +---------------+      +---------------------------------+
+
+        """
+        super(ProgressReference, self).__init__()
+        self.progress_id = str(uuid.uuid4())
+        self.message = message
+        self.mgr = mgr
+
+        #: The completion can already have a result, before the write
+        #: operation is effective. progress == 1 means, the services are
+        #: created / removed.
+        self.completion = completion  # type: Optional[Callable[[], Completion[float]]]
+
+        #: if a orchestrator module can provide a more detailed
+        #: progress information, it needs to also call ``progress.update()``.
+        self.progress = 0.0
+
+        self._completion_has_result = False
+        self.mgr.all_progress_references.append(self)
+
+    def __str__(self):
+        """
+        ``__str__()`` is used for determining the message for progress events.
+        """
+        return self.message or super(ProgressReference, self).__str__()
+
+    def __call__(self, arg):
+        self._completion_has_result = True
+        if self.progress == 0.0:
+            self.progress = 0.5
+        return arg
+
+    @property
+    def progress(self):
+        return self._progress
+
+    @progress.setter
+    def progress(self, progress):
+        self._progress = progress
+        try:
+            if self.effective:
+                self.mgr.remote("progress", "complete", self.progress_id)
+                self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
+            else:
+                self.mgr.remote("progress", "update", self.progress_id, self.message,
+                                progress,
+                                [("origin", "orchestrator")])
+        except ImportError:
+            # If the progress module is disabled that's fine,
+            # they just won't see the output.
+            pass
+
+    @property
+    def effective(self):
+        return self.progress == 1 and self._completion_has_result
+
+    def update(self):
+        def run(progress):
+            self.progress = progress
+        if self.completion:
+            c = self.completion().then(run)
+            self.mgr.process([c._first_promise])
+        else:
+            self.progress = 1
+
+    def fail(self):
+        self._completion_has_result = True
+        self.progress = 1
+
+class Completion(_Promise[T]):
+    """
+    Combines multiple promises into one overall operation.
+
+    :ivar exception: Holds an exception object, if the completion errored.
+
+    """
+    def __init__(self,
+                 _first_promise=None,  # type: Optional["Completion[V]"]
+                 value=_Promise.NO_RESULT,  # type: Optional[T]
+                 on_complete=None  # type: Optional[Callable[[T], Union[U, Completion[U]]]]
+                 ):
+        super(Completion, self).__init__(_first_promise, value, on_complete)
+
+    @property
+    def _progress_reference(self):
+        # type: () -> Optional[ProgressReference]
+        if hasattr(self._on_complete, 'progress_id'):
+            return self._on_complete
+        return None
+
+    @property
+    def progress_reference(self):
+        # type: () -> Optional[ProgressReference]
+        """
+        ProgressReference. Marks this completion
+        as a write completeion.
+        """
+
+        references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
+        if references:
+            assert len(references) == 1
+            return references[0]
+        return None
+
+    @classmethod
+    def with_progress(cls,  # type: Completions[T]
+                      message,  # type: str
+                      mgr,
+                      _first_promise=None,  # type: Optional["Completions[V]"]
+                      value=_Promise.NO_RESULT,  # type: Optional[T]
+                      on_complete=None,  # type: Optional[Callable[[T], Union[U, Completions[U]]]]
+                      calc_percent=None  # type: Optional[Callable[[], Completions[float]]]
+                      ):
+        # type: (...) -> Completions[T]
+
+        c = cls(
+            _first_promise=_first_promise,
+            value=value,
+            on_complete=on_complete
+        ).then(
+            on_complete=ProgressReference(
+                message=message,
+                mgr=mgr,
+                completion=calc_percent
+            )
+        )
+        return c._first_promise
+
+    def fail(self, e):
+        super(Completion, self).fail(e)
+        if self._progress_reference:
+            self._progress_reference.fail()
+
     @property
     def result(self):
-        # type: () -> T
         """
-        Return the result of the operation that we were waited
-        for.  Only valid after calling Orchestrator.wait() on this
+        The result of the operation that we were waited
+        for.  Only valid after calling Orchestrator.process() on this
         completion.
         """
-        raise NotImplementedError()
+        last = self._last_promise()
+        assert last._state == _Promise.FINISHED
+        return last._value
 
     def result_str(self):
+        """Force a string."""
         if self.result is None:
             return ''
         return str(self.result)
@@ -164,27 +480,24 @@ class _Completion(G):
     @property
     def exception(self):
         # type: () -> Optional[Exception]
-        """
-        Holds an exception object.
-        """
-        try:
-            return self.__exception
-        except AttributeError:
-            return None
-
-    @exception.setter
-    def exception(self, value):
-        self.__exception = value
+        return self._last_promise()._exception
 
     @property
-    def is_read(self):
+    def has_result(self):
         # type: () -> bool
-        raise NotImplementedError()
+        """
+        Has the operation already a result?
 
-    @property
-    def is_complete(self):
-        # type: () -> bool
-        raise NotImplementedError()
+        For Write operations, it can already have a
+        result, if the orchestrator's configuration is
+        persistently written. Typically this would
+        indicate that an update had been written to
+        a manifest, but that the update had not
+        necessarily been pushed out to the cluster.
+
+        :return:
+        """
+        return self._last_promise()._state == _Promise.FINISHED
 
     @property
     def is_errored(self):
@@ -196,13 +509,28 @@ class _Completion(G):
         return self.exception is not None
 
     @property
-    def should_wait(self):
+    def needs_result(self):
         # type: () -> bool
-        raise NotImplementedError()
+        """
+        Could the external operation be deemed as complete,
+        or should we wait?
+        We must wait for a read operation only if it is not complete.
+        """
+        return not self.is_errored and not self.has_result
+
+    @property
+    def is_finished(self):
+        # type: () -> bool
+        """
+        Could the external operation be deemed as complete,
+        or should we wait?
+        We must wait for a read operation only if it is not complete.
+        """
+        return self.is_errored or (self.has_result)
 
 
 def raise_if_exception(c):
-    # type: (_Completion) -> None
+    # type: (Completion) -> None
     """
     :raises OrchestratorError: Some user error or a config error.
     :raises Exception: Some internal error
@@ -233,30 +561,7 @@ def raise_if_exception(c):
         raise e
 
 
-class ReadCompletion(_Completion):
-    """
-    ``Orchestrator`` implementations should inherit from this
-    class to implement their own handles to operations in progress, and
-    return an instance of their subclass from calls into methods.
-    """
-
-    def __init__(self):
-        pass
-
-    @property
-    def is_read(self):
-        return True
-
-    @property
-    def should_wait(self):
-        """Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return not self.is_complete
-
-
-class TrivialReadCompletion(ReadCompletion):
+class TrivialReadCompletion(Completion[T]):
     """
     This is the trivial completion simply wrapping a result.
     """
@@ -264,72 +569,6 @@ class TrivialReadCompletion(ReadCompletion):
         super(TrivialReadCompletion, self).__init__()
         self._result = result
 
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def is_complete(self):
-        return True
-
-
-class WriteCompletion(_Completion):
-    """
-    ``Orchestrator`` implementations should inherit from this
-    class to implement their own handles to operations in progress, and
-    return an instance of their subclass from calls into methods.
-    """
-
-    def __init__(self):
-        self.progress_id = str(uuid.uuid4())
-
-        #: if a orchestrator module can provide a more detailed
-        #: progress information, it needs to also call ``progress.update()``.
-        self.progress = 0.5
-
-    def __str__(self):
-        """
-        ``__str__()`` is used for determining the message for progress events.
-        """
-        return super(WriteCompletion, self).__str__()
-
-    @property
-    def is_persistent(self):
-        # type: () -> bool
-        """
-        Has the operation updated the orchestrator's configuration
-        persistently?  Typically this would indicate that an update
-        had been written to a manifest, but that the update
-        had not necessarily been pushed out to the cluster.
-        """
-        raise NotImplementedError()
-
-    @property
-    def is_effective(self):
-        """
-        Has the operation taken effect on the cluster?  For example,
-        if we were adding a service, has it come up and appeared
-        in Ceph's cluster maps?
-        """
-        raise NotImplementedError()
-
-    @property
-    def is_complete(self):
-        return self.is_errored or (self.is_persistent and self.is_effective)
-
-    @property
-    def is_read(self):
-        return False
-
-    @property
-    def should_wait(self):
-        """Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a write operation only if we know
-        it is not persistent yet.
-        """
-        return not self.is_persistent
-
 
 def _hide_in_features(f):
     f._hide_in_features = True
@@ -397,7 +636,7 @@ class Orchestrator(object):
 
     @_hide_in_features
     def process(self, completions):
-        # type: (List[_Completion]) -> None
+        # type: (List[Completion]) -> None
         """
         Given a list of Completion instances, process any which are
         incomplete.
@@ -441,7 +680,7 @@ class Orchestrator(object):
         return features
 
     def add_host(self, host):
-        # type: (str) -> WriteCompletion
+        # type: (str) -> Completion
         """
         Add a host to the orchestrator inventory.
 
@@ -450,7 +689,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def remove_host(self, host):
-        # type: (str) -> WriteCompletion
+        # type: (str) -> Completion
         """
         Remove a host from the orchestrator inventory.
 
@@ -459,7 +698,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def get_hosts(self):
-        # type: () -> ReadCompletion[List[InventoryNode]]
+        # type: () -> Completion[List[InventoryNode]]
         """
         Report the hosts in the cluster.
 
@@ -484,7 +723,7 @@ class Orchestrator(object):
         return NotImplementedError()
 
     def get_inventory(self, node_filter=None, refresh=False):
-        # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]]
+        # type: (InventoryFilter, bool) -> Completion[List[InventoryNode]]
         """
         Returns something that was created by `ceph-volume inventory`.
 
@@ -493,7 +732,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
-        # type: (Optional[str], Optional[str], Optional[str], bool) -> ReadCompletion[List[ServiceDescription]]
+        # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion[List[ServiceDescription]]
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
@@ -508,7 +747,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def service_action(self, action, service_type, service_name=None, service_id=None):
-        # type: (str, str, str, str) -> WriteCompletion
+        # type: (str, str, str, str) -> Completion
         """
         Perform an action (start/stop/reload) on a service.
 
@@ -523,15 +762,15 @@ class Orchestrator(object):
         :param service_type: e.g. "mds", "rgw", ...
         :param service_name: name of logical service ("cephfs", "us-east", ...)
         :param service_id: service daemon instance (usually a short hostname)
-        :rtype: WriteCompletion
+        :rtype: Completion
         """
-        assert action in ["start", "stop", "reload", "restart", "redeploy"]
-        assert service_name or service_id
-        assert not (service_name and service_id)
+        #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+        #assert service_name or service_id
+        #assert not (service_name and service_id)
         raise NotImplementedError()
 
-    def create_osds(self, drive_group, all_hosts):
-        # type: (DriveGroupSpec, List[str]) -> WriteCompletion
+    def create_osds(self, drive_group):
+        # type: (DriveGroupSpec) -> Completion
         """
         Create one or more OSDs within a single Drive Group.
 
@@ -548,8 +787,8 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_osds(self, osd_ids, destroy=False):
-        # type: (List[str], bool) -> WriteCompletion
+    def remove_osds(self, osd_ids):
+        # type: (List[str]) -> Completion
         """
         :param osd_ids: list of OSD IDs
         :param destroy: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
@@ -571,7 +810,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def update_mgrs(self, num, hosts):
-        # type: (int, List[str]) -> WriteCompletion
+        # type: (int, List[str]) -> Completion
         """
         Update the number of cluster managers.
 
@@ -581,7 +820,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def update_mons(self, num, hosts):
-        # type: (int, List[Tuple[str,str]]) -> WriteCompletion
+        # type: (int, List[Tuple[str,str]]) -> Completion
         """
         Update the number of cluster monitors.
 
@@ -591,17 +830,17 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def add_mds(self, spec):
-        # type: (StatelessServiceSpec) -> WriteCompletion
+        # type: (StatelessServiceSpec) -> Completion
         """Create a new MDS cluster"""
         raise NotImplementedError()
 
     def remove_mds(self, name):
-        # type: (str) -> WriteCompletion
+        # type: (str) -> Completion
         """Remove an MDS cluster"""
         raise NotImplementedError()
 
     def update_mds(self, spec):
-        # type: (StatelessServiceSpec) -> WriteCompletion
+        # type: (StatelessServiceSpec) -> Completion
         """
         Update / redeploy existing MDS cluster
         Like for example changing the number of service instances.
@@ -627,17 +866,17 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def add_nfs(self, spec):
-        # type: (NFSServiceSpec) -> WriteCompletion
+        # type: (NFSServiceSpec) -> Completion
         """Create a new MDS cluster"""
         raise NotImplementedError()
 
     def remove_nfs(self, name):
-        # type: (str) -> WriteCompletion
+        # type: (str) -> Completion
         """Remove a NFS cluster"""
         raise NotImplementedError()
 
     def update_nfs(self, spec):
-        # type: (NFSServiceSpec) -> WriteCompletion
+        # type: (NFSServiceSpec) -> Completion
         """
         Update / redeploy existing NFS cluster
         Like for example changing the number of service instances.
@@ -645,17 +884,17 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def add_rgw(self, spec):
-        # type: (RGWSpec) -> WriteCompletion
+        # type: (RGWSpec) -> Completion
         """Create a new MDS zone"""
         raise NotImplementedError()
 
     def remove_rgw(self, zone):
-        # type: (str) -> WriteCompletion
+        # type: (str) -> Completion
         """Remove a RGW zone"""
         raise NotImplementedError()
 
     def update_rgw(self, spec):
-        # type: (StatelessServiceSpec) -> WriteCompletion
+        # type: (StatelessServiceSpec) -> Completion
         """
         Update / redeploy existing RGW zone
         Like for example changing the number of service instances.
@@ -664,12 +903,12 @@ class Orchestrator(object):
 
     @_hide_in_features
     def upgrade_start(self, upgrade_spec):
-        # type: (UpgradeSpec) -> WriteCompletion
+        # type: (UpgradeSpec) -> Completion
         raise NotImplementedError()
 
     @_hide_in_features
     def upgrade_status(self):
-        # type: () -> ReadCompletion[UpgradeStatusSpec]
+        # type: () -> Completion[UpgradeStatusSpec]
         """
         If an upgrade is currently underway, report on where
         we are in the process, or if some error has occurred.
@@ -680,7 +919,7 @@ class Orchestrator(object):
 
     @_hide_in_features
     def upgrade_available(self):
-        # type: () -> ReadCompletion[List[str]]
+        # type: () -> Completion[List[str]]
         """
         Report on what versions are available to upgrade to
 
@@ -1017,6 +1256,11 @@ class InventoryNode(object):
     def __repr__(self):
         return "<InventoryNode>({name})".format(name=self.name)
 
+    @staticmethod
+    def get_host_names(nodes):
+        # type: (List[InventoryNode]) -> List[str]
+        return [node.name for node in nodes]
+
 
 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev'])):
     """
@@ -1037,7 +1281,6 @@ def _mk_orch_methods(cls):
     def shim(method_name):
         def inner(self, *args, **kwargs):
             completion = self._oremote(method_name, args, kwargs)
-            self._update_completion_progress(completion, 0)
             return completion
         return inner
 
@@ -1099,25 +1342,9 @@ class OrchestratorClientMixin(Orchestrator):
         mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
         return mgr.remote(o, meth, *args, **kwargs)
 
-    def _update_completion_progress(self, completion, force_progress=None):
-        # type: (WriteCompletion, Optional[float]) -> None
-        try:
-            progress = force_progress if force_progress is not None else completion.progress
-            if completion.is_complete:
-                self.remote("progress", "complete", completion.progress_id)
-            else:
-                self.remote("progress", "update", completion.progress_id, str(completion), progress,
-                            [("origin", "orchestrator")])
-        except AttributeError:
-            # No WriteCompletion. Ignore.
-            pass
-        except ImportError:
-            # If the progress module is disabled that's fine,
-            # they just won't see the output.
-            pass
 
     def _orchestrator_wait(self, completions):
-        # type: (List[_Completion]) -> None
+        # type: (List[Completion]) -> None
         """
         Wait for completions to complete (reads) or
         become persistent (writes).
@@ -1129,18 +1356,14 @@ class OrchestratorClientMixin(Orchestrator):
         :raises RuntimeError: something went wrong while calling the process method.
         :raises ImportError: no `orchestrator_cli` module or backend not found.
         """
-        for c in completions:
-            self._update_completion_progress(c)
-        while any(not c.is_complete for c in completions):
-            self.wait(completions)
+        while any(not c.has_result for c in completions):
+            self.process(completions)
             self.__get_mgr().log.info("Operations pending: %s",
-                                      sum(1 for c in completions if not c.is_complete))
-            if any(c.should_wait for c in completions):
+                                      sum(1 for c in completions if not c.has_result))
+            if any(c.needs_result for c in completions):
                 time.sleep(1)
             else:
                 break
-        for c in completions:
-            self._update_completion_progress(c)
 
 
 class OutdatableData(object):
index 49b62f78356aff424ce3d8cae7f14a330236b80b..3765b4201f79ed64c317941864802e4054f0966e 100644 (file)
@@ -339,19 +339,7 @@ Usage:
         else:
             return HandleCommandResult(-errno.EINVAL, stderr=usage)
 
-        # TODO: Remove this and make the orchestrator composable
-        #   Like a future or so.
-        host_completion = self.get_hosts()
-        self._orchestrator_wait([host_completion])
-        orchestrator.raise_if_exception(host_completion)
-        all_hosts = [h.name for h in host_completion.result]
-
-        try:
-            drive_group.validate(all_hosts)
-        except DriveGroupValidationError as e:
-            return HandleCommandResult(-errno.EINVAL, stderr=str(e))
-
-        completion = self.create_osds(drive_group, all_hosts)
+        completion = self.create_osds(drive_group)
         self._orchestrator_wait([completion])
         orchestrator.raise_if_exception(completion)
         self.log.warning(str(completion.result))
index d95f1a63f84cd462e68ecda2c093419a90f76a48..41c4c442d01ec1b39595a914831a5e14d24012b6 100644 (file)
@@ -6,7 +6,7 @@ import uuid
 from ceph.deployment import inventory
 
 try:
-    from typing import List, Dict
+    from typing import List, Dict, Optional, Callable
     from ceph.deployment.drive_group import DriveGroupSpec
 except ImportError:
     pass  # just for type checking
@@ -34,108 +34,34 @@ import orchestrator
 from .rook_cluster import RookCluster
 
 
-class RookCompletionMixin(object):
-    # hacky global
-    all_completions = []
-
-    def __init__(self, message):
-        super(RookCompletionMixin, self).__init__()
-        self.message = message
-
-        # Result of k8s API call, this is set if executed==True
-        self._result = None
-
-        all_completions.append(self)
-
-
-    @property
-    def result(self):
-        return self._result
-
-    def __str__(self):
-        return self.message
-
-
-class RookReadCompletion(RookCompletionMixin, orchestrator.ReadCompletion):
-    """
-    All reads are simply API calls: avoid spawning
-    huge numbers of threads by just running them
-    inline when someone calls wait()
-    """
-
-    def __init__(self, cb):
-        super(RookReadCompletion, self).__init__("<read op>")
-        self.cb = cb
-        self._complete = False
-
-    @property
-    def has_result(self):
-        return self._complete
-
-    def execute(self):
-        try:
-            self._result = self.cb()
-        except Exception as e:
-            self.exception = e
-        finally:
-            self._complete = True
-
-
-class RookWriteCompletion(RookCompletionMixin, orchestrator.WriteCompletion):
+class RookWriteCompletion(orchestrator.Completion):
     """
     Writes are a two-phase thing, firstly sending
     the write to the k8s API (fast) and then waiting
     for the corresponding change to appear in the
     Ceph cluster (slow)
     """
-    # XXX kubernetes bindings call_api already usefully has
-    # a completion= param that uses threads.  Maybe just
-    # use that?
-    def __init__(self, execute_cb, complete_cb, message):
-        super(RookWriteCompletion, self).__init__(message)
-        self.execute_cb = execute_cb
-        self.complete_cb = complete_cb
-
-        # Executed means I executed my k8s API call, it may or may
-        # not have succeeded
-        self.executed = False
-
-        # Effective means, Rook finished applying the changes
-        self.effective = False
-
-        self.id = str(uuid.uuid4())
-
-    @property
-
-    @property
-    def is_effective(self):
-        return self.effective
-
-    def execute(self):
-        try:
-            if not self.executed:
-                self._result = self.execute_cb()
-                self.executed = True
-
-            if not self.effective:
-                # TODO: check self.result for API errors
-                if self.complete_cb is None:
-                    self.effective = True
-                else:
-                    self.effective = self.complete_cb()
-        except Exception as e:
-            self.exception = e
+    def __init__(self, message):
+        self.progress_reference = orchestrator.ProgressReference(
+            message=message
+        )
+        super(RookWriteCompletion, self).__init__()
 
 
 def deferred_read(f):
+    # type: (Callable) -> Callable[..., orchestrator.Completion]
     """
     Decorator to make RookOrchestrator methods return
     a completion object that executes themselves.
     """
 
     @functools.wraps(f)
-    def wrapper(*args, **kwargs):
-        return RookReadCompletion(lambda: f(*args, **kwargs))
+    def wrapper(self, *args, **kwargs):
+        c = orchestrator.Completion()
+        c.then(
+            lambda: f(*args, **kwargs)
+        )
+        return c
 
     return wrapper
 
@@ -165,27 +91,31 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
     ]
 
     def process(self, completions):
-        if completions:
-            self.log.info("wait: completions={0}".format(completions))
+        pass
+
+    def process_promises(self, promises):
+        # type: (List[RookPromise]) -> None
+
+
+        if promises:
+            self.log.info("wait: promises={0}".format(promises))
 
         # Synchronously call the K8s API
-        for c in completions:
-            if not isinstance(c, RookReadCompletion) and \
-                    not isinstance(c, RookWriteCompletion):
+        for p in promises:
+            if not isinstance(p, RookPromise):
                 raise TypeError(
                     "wait() requires list of completions, not {0}".format(
-                        c.__class__
+                        p.__class__
                     ))
 
-            if c.is_complete:
+            if not p.needs_result:
                 continue
 
-            c.execute()
+            new_promise = p.execute()
             if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError):
                 self.log.exception("Completion {0} threw an exception:".format(
                     c.message
                 ))
-                c.exception = e
                 c._complete = True
 
 
@@ -221,6 +151,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._shutdown = threading.Event()
 
+        self.all_promises = list()  # type: List[RookPromise]
+
     def shutdown(self):
         self._shutdown.set()
 
@@ -276,9 +208,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             # in case we had a caller that wait()'ed on them long enough
             # to get persistence but not long enough to get completion
 
-            self.wait(RookCompletionMixin.all_completions)
-            RookCompletionMixin.all_completions = [c for c in RookCompletionMixin.all_completions if
-                                                   not c.is_finished]
+            self.process(self.all_promises)
+            self.all_promises = [p for p in self.all_promises if not p.completion.is_finished]
 
             self._shutdown.wait(5)
 
@@ -418,30 +349,39 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
                 "Updating NFS server count in {0} to {1}".format(spec.name, num))
 
-    def create_osds(self, drive_group, all_hosts):
+    def create_osds(self, drive_group, _):
         # type: (DriveGroupSpec, List[str]) -> RookWriteCompletion
 
-        assert len(drive_group.hosts(all_hosts)) == 1
         targets = []
         if drive_group.data_devices:
             targets += drive_group.data_devices.paths
         if drive_group.data_directories:
             targets += drive_group.data_directories
 
-        if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
-            raise RuntimeError("Node '{0}' is not in the Kubernetes "
-                               "cluster".format(drive_group.hosts(all_hosts)))
+        p = orchestrator.ProgressReference(
+            "Creating OSD on {0}:{1}".format(drive_group.hosts(drive_group.host_pattern),
+                                             targets))
 
-        # Validate whether cluster CRD can accept individual OSD
-        # creations (i.e. not useAllDevices)
-        if not self.rook_cluster.can_create_osd():
-            raise RuntimeError("Rook cluster configuration does not "
-                               "support OSD creation.")
+        def execute(all_hosts):
+            p.effective_when(
+                lambda hosts: has_osds
+            )
 
-        def execute():
+            assert len(drive_group.hosts(all_hosts)) == 1
+
+            if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
+                raise RuntimeError("Node '{0}' is not in the Kubernetes "
+                                   "cluster".format(drive_group.hosts(all_hosts)))
+
+            # Validate whether cluster CRD can accept individual OSD
+            # creations (i.e. not useAllDevices)
+            if not self.rook_cluster.can_create_osd():
+                raise RuntimeError("Rook cluster configuration does not "
+                                   "support OSD creation.")
             return self.rook_cluster.add_osds(drive_group, all_hosts)
 
-        def is_complete():
+        @deferred_read
+        def has_osds(all_hosts):
             # Find OSD pods on this host
             pod_osd_ids = set()
             pods = self._k8s.list_namespaced_pod(self._rook_env.namespace,
@@ -471,7 +411,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             return found is not None
 
-        return RookWriteCompletion(execute, is_complete,
-                                   "Creating OSD on {0}:{1}".format(
-                                       drive_group.hosts(all_hosts)[0], targets
-                                   ))
+
+        c = self.get_hosts().then(execute)
+        c.progress_reference = p
+        return c
+
index 5a5bfb08bf51d654340a5949db032e31d35ae9d4..dbd33e7d6333b01006da59ea715a8334ebf2d586 100644 (file)
@@ -79,14 +79,14 @@ class SSHCompletionmMixin(object):
 
 class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
     @property
-    def is_complete(self):
+    def has_result(self):
         return all(map(lambda r: r.ready(), self._result))
 
 
 class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
 
     @property
-    def is_persistent(self):
+    def has_result(self):
         return all(map(lambda r: r.ready(), self._result))
 
     @property
@@ -113,7 +113,7 @@ class SSHWriteCompletionReady(SSHWriteCompletion):
         return self._result
 
     @property
-    def is_persistent(self):
+    def has_result(self):
         return True
 
     @property
index c9d8ed7d756fae0af7b98e63a7d88ec4852c1008..ce41ce89ab7fab0241820c9581b5ced7a4006c82 100644 (file)
@@ -19,7 +19,7 @@ import orchestrator
 
 
 class TestCompletionMixin(object):
-    all_completions = []  # Hacky global
+    all_completions = []  # type: orchestrator.Completion
 
     def __init__(self, cb, message, *args, **kwargs):
         super(TestCompletionMixin, self).__init__(*args, **kwargs)
@@ -37,7 +37,7 @@ class TestCompletionMixin(object):
         return self._result
 
     @property
-    def is_complete(self):
+    def has_result(self):
         return self._complete
 
     def execute(self):
@@ -64,7 +64,7 @@ class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
         super(TestWriteCompletion, self).__init__(cb, message)
 
     @property
-    def is_persistent(self):
+    def has_result(self):
         return (not self.is_errored) and self.executed
 
     @property
@@ -114,7 +114,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
                         c.__class__
                     ))
 
-            if not c.has_result:
+            if c.needs_result:
                 c.execute()
 
     @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
@@ -153,7 +153,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             self.wait(TestCompletionMixin.all_completions)
             TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
-                                                   not c.is_complete]
+                                                   c.is_finished]
 
             self._shutdown.wait(5)
 
index ad1493c5ddc2937a58eeb2453802f73e201bf590..ff4d71ca0dc00060dc1b246dd5087401130f14b6 100644 (file)
@@ -1,10 +1,11 @@
 from __future__ import absolute_import
 import json
+from unittest.mock import MagicMock
 
 import pytest
 
 from ceph.deployment import inventory
-from orchestrator import ReadCompletion, raise_if_exception, RGWSpec
+from orchestrator import raise_if_exception, RGWSpec, Completion, ProgressReference
 from orchestrator import InventoryNode, ServiceDescription
 from orchestrator import OrchestratorValidationError
 from orchestrator import parse_host_specs
@@ -87,8 +88,8 @@ def test_service_description():
 
 
 def test_raise():
-    c = ReadCompletion()
-    c.exception = ZeroDivisionError()
+    c = Completion()
+    c._exception = ZeroDivisionError()
     with pytest.raises(ZeroDivisionError):
         raise_if_exception(c)
 
@@ -107,3 +108,121 @@ def test_rgwspec():
     example = json.loads(test_rgwspec.__doc__.strip())
     spec = RGWSpec.from_json(example)
     assert spec.validate_add() is None
+
+
+def test_promise():
+    p = Completion(value=3)
+    p.finalize()
+    assert p.result == 3
+
+
+def test_promise_then():
+    p = Completion(value=3).then(lambda three: three + 1)
+    p._first_promise.finalize()
+    assert p.result == 4
+
+
+def test_promise_mondatic_then():
+    p = Completion(value=3)
+    p.then(lambda three: Completion(value=three + 1))
+    p._first_promise.finalize()
+    assert p.result == 4
+
+
+def some_complex_completion():
+    c = Completion(value=3).then(
+        lambda three: Completion(value=three + 1).then(
+            lambda four: four + 1))
+    return c._first_promise
+
+def test_promise_mondatic_then_combined():
+    p = some_complex_completion()
+    p._first_promise.finalize()
+    assert p.result == 5
+
+
+def test_promise_flat():
+    p = Completion()
+    p.then(lambda r1: Completion(value=r1 + ' there').then(
+        lambda r11: r11 + '!'))
+    p.finalize('hello')
+    assert p.result == 'hello there!'
+
+
+def test_side_effect():
+    foo = {'x': 1}
+
+    def run(x):
+        foo['x'] = x
+
+    foo['x'] = 1
+    Completion(value=3).then(run)._first_promise.finalize()
+    assert foo['x'] == 3
+
+
+def test_progress():
+    c = some_complex_completion()
+    mgr = MagicMock()
+    mgr.process = lambda cs: [c.finalize(None) for c in cs]
+
+    progress_val = 0.75
+    c._last_promise().then(
+        on_complete=ProgressReference(message='hello world',
+                                      mgr=mgr,
+                                      completion=lambda: Completion(
+                                          on_complete=lambda _: progress_val))
+    )
+    mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.0, ['orchestrator'])
+
+    c.finalize()
+    mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', 0.5, ['orchestrator'])
+
+    c.progress_reference.update()
+    mgr.remote.assert_called_with('progress', 'update', c.progress_reference.progress_id, 'hello world', progress_val, ['orchestrator'])
+    assert not c.progress_reference.effective
+
+    progress_val = 1
+    c.progress_reference.update()
+    assert c.progress_reference.effective
+    mgr.remote.assert_called_with('progress', 'complete', c.progress_reference.progress_id)
+
+
+def test_with_progress():
+    mgr = MagicMock()
+    mgr.process = lambda cs: [c.finalize(None) for c in cs]
+
+    def execute(y):
+        return str(y)
+
+    def run(x):
+        def two(_):
+            return execute(x * 2)
+
+        return Completion.with_progress(
+            message='message',
+            on_complete=two,
+            mgr=mgr
+
+        )
+    c = Completion(on_complete=lambda x: x * 10).then(run)._first_promise
+    c.finalize(2)
+    assert c.result == '40'
+    c.progress_reference.update()
+    assert c.progress_reference.effective
+
+
+def test_exception():
+
+    def run(x):
+        raise KeyError(x)
+
+    c = Completion(value=3).then(run)._first_promise
+    c.finalize()
+
+    assert isinstance(c.exception, KeyError)
+
+
+def test_fail():
+    c = Completion().then(lambda _: 3)
+    c._first_promise.fail(KeyError())
+    assert isinstance(c.exception, KeyError)