try:
from ceph.deployment.drive_group import DriveGroupSpec
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, Type
-
except ImportError:
pass
- #T, G = object, object
-T = TypeVar('T')
-U = TypeVar('U')
-V = TypeVar('V')
-G = Generic[T]
-Promises = TypeVar('Promises', bound='_Promise')
-Completions = TypeVar('Completions', bound='Completion')
+logger = logging.getLogger(__name__)
return object()
-class _Promise(Generic[T]):
+class _Promise(object):
"""
A completion may need multiple promises to be fulfilled. `_Promise` is one
step.
NO_RESULT = _no_result() # type: None
def __init__(self,
- _first_promise=None, # type: Optional["_Promise[V]"]
- value=NO_RESULT, # type: Optional[T]
- on_complete=None # type: Optional[Callable[[T], Union[U, _Promise[U]]]]
+ _first_promise=None, # type: Optional["_Promise"]
+ value=NO_RESULT, # type: Optional
+ on_complete=None # type: Optional[Callable]
):
self._on_complete = on_complete
- self._next_promise = None # type: Optional[_Promise[U]]
+ self._next_promise = None # type: Optional[_Promise]
self._state = self.INITIALIZED
self._exception = None # type: Optional[Exception]
)
def then(self, on_complete):
- # type: (Promises, Callable[[T], Union[U, _Promise[U]]]) -> Promises[U]
+ # type: (Any, Callable) -> Any
"""
Call ``on_complete`` as soon as this promise is finalized.
"""
return self._next_promise
def _set_next_promise(self, next):
- # type: (_Promise[U]) -> None
+ # type: (_Promise) -> None
assert self is not next
assert self._state is self.INITIALIZED
p._first_promise = self._first_promise
def finalize(self, value=NO_RESULT):
- # type: (Optional[T]) -> None
"""
Sets this promise to complete.
def __init__(self,
message, # type: str
mgr,
- completion=None # type: Optional[Callable[[], Completion[float]]]
+ completion=None # type: Optional[Callable[[], Completion]]
):
"""
ProgressReference can be used within Completions::
#: The completion can already have a result, before the write
#: operation is effective. progress == 1 means, the services are
#: created / removed.
- self.completion = completion # type: Optional[Callable[[], Completion[float]]]
+ self.completion = completion # type: Optional[Callable[[], Completion]]
#: if a orchestrator module can provide a more detailed
#: progress information, it needs to also call ``progress.update()``.
self.progress = 1
-class Completion(_Promise[T]):
+class Completion(_Promise):
"""
Combines multiple promises into one overall operation.
"""
def __init__(self,
- _first_promise=None, # type: Optional["Completion[V]"]
- value=_Promise.NO_RESULT, # type: Optional[T]
- on_complete=None # type: Optional[Callable[[T], Union[U, Completion[U]]]]
+ _first_promise=None, # type: Optional["Completion"]
+ value=_Promise.NO_RESULT, # type: Any
+ on_complete=None # type: Optional[Callable]
):
super(Completion, self).__init__(_first_promise, value, on_complete)
return None
@classmethod
- def with_progress(cls, # type: Completions[T]
+ def with_progress(cls, # type: Any
message, # type: str
mgr,
- _first_promise=None, # type: Optional["Completions[V]"]
- value=_Promise.NO_RESULT, # type: Optional[T]
- on_complete=None, # type: Optional[Callable[[T], Union[U, Completions[U]]]]
- calc_percent=None # type: Optional[Callable[[], Completions[float]]]
+ _first_promise=None, # type: Optional["Completions"]
+ value=_Promise.NO_RESULT, # type: Any
+ on_complete=None, # type: Optional[Callable]
+ calc_percent=None # type: Optional[Callable[[], Any]]
):
- # type: (...) -> Completions[T]
+ # type: (...) -> Any
c = cls(
_first_promise=_first_promise,
raise e
-class TrivialReadCompletion(Completion[T]):
+class TrivialReadCompletion(Completion):
"""
This is the trivial completion simply wrapping a result.
"""
raise NotImplementedError()
def get_hosts(self):
- # type: () -> Completion[List[InventoryNode]]
+ # type: () -> Completion
"""
Report the hosts in the cluster.
return NotImplementedError()
def get_inventory(self, node_filter=None, refresh=False):
- # type: (InventoryFilter, bool) -> Completion[List[InventoryNode]]
+ # type: (InventoryFilter, bool) -> Completion
"""
Returns something that was created by `ceph-volume inventory`.
raise NotImplementedError()
def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
- # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion[List[ServiceDescription]]
+ # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
@_hide_in_features
def upgrade_status(self):
- # type: () -> Completion[UpgradeStatusSpec]
+ # type: () -> Completion
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.
@_hide_in_features
def upgrade_available(self):
- # type: () -> Completion[List[str]]
+ # type: () -> Completion
"""
Report on what versions are available to upgrade to
except ImportError:
pass # just for type checking
-T = TypeVar('T')
import six
import os
# multiple bootstrapping / initialization
-class AsyncCompletion(orchestrator.Completion[T]):
- def __init__(self, *args, many=False, **kwargs):
- self.__on_complete = None # type: Callable[[T], Any]
- self.many = many
+class AsyncCompletion(orchestrator.Completion):
+ def __init__(self, *args, **kwargs):
+ self.__on_complete = None # type: Callable
+ self.many = kwargs.pop('many')
super(AsyncCompletion, self).__init__(*args, **kwargs)
def propagate_to_next(self):
@property
def _on_complete(self):
- # type: () -> Optional[Callable[[T], Any]]
+ # type: () -> Optional[Callable]
if self.__on_complete is None:
return None
@_on_complete.setter
def _on_complete(self, inner):
- # type: (Callable[[T], Any]) -> None
+ # type: (Callable) -> None
self.__on_complete = inner
def async_completion(f):
- # type: (Callable[..., T]) -> Callable[..., AsyncCompletion[T]]
+ # type: (Callable) -> Callable[..., AsyncCompletion]
return ssh_completion()(f)
def async_map_completion(f):
- # type: (Callable[..., T]) -> Callable[..., AsyncCompletion[T]]
+ # type: (Callable) -> Callable[..., AsyncCompletion]
return ssh_completion(many=True)(f)
def trivial_completion(f):
- # type: (Callable[..., T]) -> Callable[..., orchestrator.Completion[T]]
+ # type: (Callable) -> Callable[..., orchestrator.Completion]
return ssh_completion(cls=orchestrator.Completion)(f)