import logging
import pickle
import re
-import time
-import uuid
from collections import namedtuple, OrderedDict
from contextlib import contextmanager
-from functools import wraps
+from functools import wraps, reduce
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
Sequence, Dict, cast
return cast(FuncT, wrapper_copy)
+def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
+ """
+ Decorator to make Orchestrator methods return
+ an OrchResult.
+ """
+
+ @wraps(f)
+ def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
+ try:
+ return OrchResult(f(*args, **kwargs))
+ except Exception as e:
+ return OrchResult(None, exception=e)
+
+ return cast(Callable[..., OrchResult[T]], wrapper)
+
+
class InnerCliCommandCallable(Protocol):
def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
...
cls.handle_command = handle_command
-def _no_result() -> None:
- return object() # type: ignore
-
-
-class _Promise(object):
+class OrchResult(Generic[T]):
+ """
+ Stores a result and an exception. Mainly to circumvent the
+ MgrModule.remote() method that hides all exceptions and for
+ handling different sub-interpreters.
"""
- A completion may need multiple promises to be fulfilled. `_Promise` is one
- step.
- Typically ``Orchestrator`` implementations inherit from this class to
- build their own way of finishing a step to fulfil a future.
+ def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
+ self.result = result
+ self.serialized_exception: Optional[bytes] = None
+ self.exception_str: str = ''
+ self.set_exception(exception)
- They are not exposed in the orchestrator interface and can be seen as a
- helper to build orchestrator modules.
- """
- INITIALIZED = 1 # We have a parent completion and a next completion
- RUNNING = 2
- FINISHED = 3 # we have a final result
+ __slots__ = 'result', 'serialized_exception', 'exception_str'
- NO_RESULT: None = _no_result() # type: ignore
- ASYNC_RESULT = object()
+ def set_exception(self, e: Optional[Exception]) -> None:
+ if e is None:
+ self.serialized_exception = None
+ self.exception_str = ''
+ return
- def __init__(self,
- _first_promise: Optional["_Promise"] = None,
- value: Optional[Any] = NO_RESULT,
- on_complete: Optional[Callable] = None,
- name: Optional[str] = None,
- ):
- self._on_complete_ = on_complete
- self._name = name
- self._next_promise: Optional[_Promise] = None
-
- self._state = self.INITIALIZED
- self._exception: Optional[Exception] = None
-
- # Value of this _Promise. may be an intermediate result.
- self._value = value
-
- # _Promise is not a continuation monad, as `_result` is of type
- # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
- self._first_promise: '_Promise' = _first_promise or self
-
- @property
- def _exception(self) -> Optional[Exception]:
- return getattr(self, '_exception_', None)
-
- @_exception.setter
- def _exception(self, e: Exception) -> None:
- self._exception_ = e
+ self.exception_str = f'{type(e)}: {str(e)}'
try:
- self._serialized_exception_ = pickle.dumps(e) if e is not None else None
+ self.serialized_exception = pickle.dumps(e)
except pickle.PicklingError:
logger.error(f"failed to pickle {e}")
if isinstance(e, Exception):
else:
e = Exception(str(e))
# degenerate to a plain Exception
- self._serialized_exception_ = pickle.dumps(e)
-
- @property
- def _serialized_exception(self) -> Optional[bytes]:
- return getattr(self, '_serialized_exception_', None)
-
- @property
- def _on_complete(self) -> Optional[Callable]:
- # https://github.com/python/mypy/issues/4125
- return self._on_complete_
-
- @_on_complete.setter
- def _on_complete(self, val: Optional[Callable]) -> None:
- self._on_complete_ = val
-
- def __repr__(self) -> str:
- name = self._name or getattr(self._on_complete, '__name__',
- '??') if self._on_complete else 'None'
- val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
- return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
- self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
- next, '_progress_reference', 'NA'), repr(self._next_promise)
- )
-
- def pretty_print_1(self) -> str:
- if self._name:
- name = self._name
- elif self._on_complete is None:
- name = 'lambda x: x'
- elif hasattr(self._on_complete, '__name__'):
- name = getattr(self._on_complete, '__name__')
- else:
- name = self._on_complete.__class__.__name__
- val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
- prefix = {
- self.INITIALIZED: ' ',
- self.RUNNING: ' >>>', # noqa: E241
- self.FINISHED: '(done)', # noqa: E241
- }[self._state]
- return '{} {}({}),'.format(prefix, name, val)
-
- def then(self: Any, on_complete: Callable) -> Any:
- """
- Call ``on_complete`` as soon as this promise is finalized.
- """
- assert self._state in (self.INITIALIZED, self.RUNNING)
-
- if self._next_promise is not None:
- return self._next_promise.then(on_complete)
-
- if self._on_complete is not None:
- self._set_next_promise(self.__class__(
- _first_promise=self._first_promise,
- on_complete=on_complete
- ))
- return self._next_promise
-
- else:
- self._on_complete = on_complete
- self._set_next_promise(self.__class__(_first_promise=self._first_promise))
- return self._next_promise
-
- def _set_next_promise(self, next: '_Promise') -> None:
- assert self is not next
- assert self._state in (self.INITIALIZED, self.RUNNING)
-
- self._next_promise = next
- assert self._next_promise is not None
- for p in iter(self._next_promise):
- p._first_promise = self._first_promise
-
- def _finalize(self, value: Optional[T] = NO_RESULT) -> None:
- """
- Sets this promise to complete.
-
- Orchestrators may choose to use this helper function.
-
- :param value: new value.
- """
- if self._state not in (self.INITIALIZED, self.RUNNING):
- raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
-
- self._state = self.RUNNING
-
- if value is not self.NO_RESULT:
- self._value = value
- assert self._value is not self.NO_RESULT, repr(self)
-
- if self._on_complete:
- try:
- next_result = self._on_complete(self._value)
- except Exception as e:
- self.fail(e)
- return
- else:
- next_result = self._value
-
- if isinstance(next_result, _Promise):
- # hack: _Promise is not a continuation monad.
- next_result = next_result._first_promise # type: ignore
- assert next_result not in self, repr(self._first_promise) + repr(next_result)
- assert self not in next_result
- next_result._append_promise(self._next_promise)
- self._set_next_promise(next_result)
- assert self._next_promise
- if self._next_promise._value is self.NO_RESULT:
- self._next_promise._value = self._value
- self.propagate_to_next()
- elif next_result is not self.ASYNC_RESULT:
- # simple map. simply forward
- if self._next_promise:
- self._next_promise._value = next_result
- else:
- # Hack: next_result is of type U, _value is of type T
- self._value = next_result # type: ignore
- self.propagate_to_next()
- else:
- # asynchronous promise
- pass
-
- def propagate_to_next(self) -> None:
- self._state = self.FINISHED
- logger.debug('finalized {}'.format(repr(self)))
- if self._next_promise:
- self._next_promise._finalize()
-
- def fail(self, e: Exception) -> None:
- """
- Sets the whole completion to be faild with this exception and end the
- evaluation.
- """
- if self._state == self.FINISHED:
- raise ValueError(
- 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
- assert self._state in (self.INITIALIZED, self.RUNNING)
- logger.exception('_Promise failed')
- self._exception = e
- self._value = f'_exception: {e}'
- if self._next_promise:
- self._next_promise.fail(e)
- self._state = self.FINISHED
-
- def __contains__(self, item: '_Promise') -> bool:
- return any(item is p for p in iter(self._first_promise))
-
- def __iter__(self) -> Iterator['_Promise']:
- yield self
- elem = self._next_promise
- while elem is not None:
- yield elem
- elem = elem._next_promise
-
- def _append_promise(self, other: Optional['_Promise']) -> None:
- if other is not None:
- assert self not in other
- assert other not in self
- self._last_promise()._set_next_promise(other)
-
- def _last_promise(self) -> '_Promise':
- return list(iter(self))[-1]
-
-
-class ProgressReference(object):
- def __init__(self,
- message: str,
- mgr: Any,
- completion: Optional[Callable[[], 'Completion']] = None
- ):
- """
- ProgressReference can be used within Completions::
-
- +---------------+ +---------------------------------+
- | | then | |
- | My Completion | +--> | on_complete=ProgressReference() |
- | | | |
- +---------------+ +---------------------------------+
-
- See :func:`Completion.with_progress` for an easy way to create
- a progress reference
-
- """
- super(ProgressReference, self).__init__()
- self.progress_id = str(uuid.uuid4())
- self.message = message
- self.mgr = mgr
-
- #: The completion can already have a result, before the write
- #: operation is effective. progress == 1 means, the services are
- #: created / removed.
- self.completion: Optional[Callable[[], Completion]] = completion
-
- #: if a orchestrator module can provide a more detailed
- #: progress information, it needs to also call ``progress.update()``.
- self.progress = 0.0
-
- self._completion_has_result = False
- self.mgr.all_progress_references.append(self)
-
- def __str__(self) -> str:
- """
- ``__str__()`` is used for determining the message for progress events.
- """
- return self.message or super(ProgressReference, self).__str__()
-
- def __call__(self, arg: T) -> T:
- self._completion_has_result = True
- self.progress = 1.0
- return arg
-
- @property
- def progress(self) -> float:
- return self._progress
-
- @progress.setter
- def progress(self, progress: float) -> None:
- assert progress <= 1.0
- self._progress = progress
- try:
- if self.effective:
- self.mgr.remote("progress", "complete", self.progress_id)
- self.mgr.all_progress_references = [
- p for p in self.mgr.all_progress_references if p is not self]
- else:
- self.mgr.remote("progress", "update", self.progress_id, self.message,
- progress,
- [("origin", "orchestrator")])
- except ImportError:
- # If the progress module is disabled that's fine,
- # they just won't see the output.
- pass
-
- @property
- def effective(self) -> bool:
- return self.progress == 1 and self._completion_has_result
-
- def update(self) -> None:
- def progress_run(progress: float) -> None:
- self.progress = progress
- if self.completion:
- c = self.completion().then(progress_run)
- self.mgr.process([c._first_promise])
- else:
- self.progress = 1
-
- def fail(self) -> None:
- self._completion_has_result = True
- self.progress = 1
-
-
-class Completion(_Promise, Generic[T]):
- """
- Combines multiple promises into one overall operation.
-
- Completions are composable by being able to
- call one completion from another completion. I.e. making them re-usable
- using Promises E.g.::
-
- >>> #doctest: +SKIP
- ... return Orchestrator().get_hosts().then(self._create_osd)
-
- where ``get_hosts`` returns a Completion of list of hosts and
- ``_create_osd`` takes a list of hosts.
-
- The concept behind this is to store the computation steps
- explicit and then explicitly evaluate the chain:
-
- >>> #doctest: +SKIP
- ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
- ... p.finalize(2)
- ... assert p.result = "4"
-
- or graphically::
-
- +---------------+ +-----------------+
- | | then | |
- | lambda x: x*x | +--> | lambda x: str(x)|
- | | | |
- +---------------+ +-----------------+
-
- """
-
- def __init__(self,
- _first_promise: Optional["Completion"] = None,
- value: Any = _Promise.NO_RESULT,
- on_complete: Optional[Callable] = None,
- name: Optional[str] = None,
- ) -> None:
- super(Completion, self).__init__(_first_promise, value, on_complete, name)
-
- @property
- def _progress_reference(self) -> Optional[ProgressReference]:
- if hasattr(self._on_complete, 'progress_id'):
- return self._on_complete # type: ignore
- return None
-
- @property
- def progress_reference(self) -> Optional[ProgressReference]:
- """
- ProgressReference. Marks this completion
- as a write completeion.
- """
-
- references = [c._progress_reference for c in iter( # type: ignore
- self) if c._progress_reference is not None] # type: ignore
- if references:
- assert len(references) == 1
- return references[0]
- return None
-
- @classmethod
- def with_progress(cls: Any,
- message: str,
- mgr: Any,
- _first_promise: Optional["Completion"] = None,
- value: Any = _Promise.NO_RESULT,
- on_complete: Optional[Callable] = None,
- calc_percent: Optional[Callable[[], Any]] = None
- ) -> Any:
-
- c = cls(
- _first_promise=_first_promise,
- value=value,
- on_complete=on_complete
- ).add_progress(message, mgr, calc_percent)
-
- return c._first_promise
-
- def add_progress(self,
- message: str,
- mgr: Any,
- calc_percent: Optional[Callable[[], Any]] = None
- ) -> Any:
- return self.then(
- on_complete=ProgressReference(
- message=message,
- mgr=mgr,
- completion=calc_percent
- )
- )
-
- def fail(self, e: Exception) -> None:
- super(Completion, self).fail(e)
- if self._progress_reference:
- self._progress_reference.fail()
-
- def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT) -> None:
- if self._first_promise._state == self.INITIALIZED:
- self._first_promise._finalize(result)
-
- @property
- def result(self) -> T:
- """
- The result of the operation that we were waited
- for. Only valid after calling Orchestrator.process() on this
- completion.
- """
- last = self._last_promise()
- assert last._state == _Promise.FINISHED
- return cast(T, last._value)
+ self.serialized_exception = pickle.dumps(e)
def result_str(self) -> str:
"""Force a string."""
return '\n'.join(str(x) for x in self.result)
return str(self.result)
- @property
- def exception(self) -> Optional[Exception]:
- return self._last_promise()._exception
-
- @property
- def serialized_exception(self) -> Optional[bytes]:
- return self._last_promise()._serialized_exception
-
- @property
- def has_result(self) -> bool:
- """
- Has the operation already a result?
-
- For Write operations, it can already have a
- result, if the orchestrator's configuration is
- persistently written. Typically this would
- indicate that an update had been written to
- a manifest, but that the update had not
- necessarily been pushed out to the cluster.
-
- :return:
- """
- return self._last_promise()._state == _Promise.FINISHED
-
- @property
- def is_errored(self) -> bool:
- """
- Has the completion failed. Default implementation looks for
- self.exception. Can be overwritten.
- """
- return self.exception is not None
-
- @property
- def needs_result(self) -> bool:
- """
- Could the external operation be deemed as complete,
- or should we wait?
- We must wait for a read operation only if it is not complete.
- """
- return not self.is_errored and not self.has_result
- @property
- def is_finished(self) -> bool:
- """
- Could the external operation be deemed as complete,
- or should we wait?
- We must wait for a read operation only if it is not complete.
- """
- return self.is_errored or (self.has_result)
-
- def pretty_print(self) -> str:
-
- reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
- return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-
-
-def pretty_print(completions: Sequence[Completion]) -> str:
- return ', '.join(c.pretty_print() for c in completions)
-
-
-def raise_if_exception(c: Completion) -> None:
+def raise_if_exception(c: OrchResult[T]) -> T:
"""
- :raises OrchestratorError: Some user error or a config error.
- :raises Exception: Some internal error
+ Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
"""
if c.serialized_exception is not None:
try:
e = pickle.loads(c.serialized_exception)
except (KeyError, AttributeError):
- raise Exception('{}: {}'.format(type(c.exception), c.exception))
+ raise Exception(c.exception_str)
raise e
-
-
-class TrivialReadCompletion(Completion[T]):
- """
- This is the trivial completion simply wrapping a result.
- """
-
- def __init__(self, result: T):
- super(TrivialReadCompletion, self).__init__()
- if result:
- self.finalize(result)
+ assert c.result is not None, 'OrchResult should either have an exception or a result'
+ return c.result
def _hide_in_features(f: FuncT) -> FuncT:
"""
raise NotImplementedError()
- @_hide_in_features
- def process(self, completions: List[Completion]) -> None:
- """
- Given a list of Completion instances, process any which are
- incomplete.
-
- Callers should inspect the detail of each completion to identify
- partial completion/progress information, and present that information
- to the user.
-
- This method should not block, as this would make it slow to query
- a status, while other long running operations are in progress.
- """
- raise NotImplementedError()
-
@_hide_in_features
def get_feature_set(self) -> Dict[str, dict]:
"""Describes which methods this orchestrator implements
def resume(self) -> None:
raise NotImplementedError()
- def add_host(self, host_spec: HostSpec) -> Completion[str]:
+ def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
"""
Add a host to the orchestrator inventory.
"""
raise NotImplementedError()
- def remove_host(self, host: str) -> Completion[str]:
+ def remove_host(self, host: str) -> OrchResult[str]:
"""
Remove a host from the orchestrator inventory.
"""
raise NotImplementedError()
- def update_host_addr(self, host: str, addr: str) -> Completion[str]:
+ def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
"""
Update a host's address
"""
raise NotImplementedError()
- def get_hosts(self) -> Completion[List[HostSpec]]:
+ def get_hosts(self) -> OrchResult[List[HostSpec]]:
"""
Report the hosts in the cluster.
"""
raise NotImplementedError()
- def add_host_label(self, host: str, label: str) -> Completion[str]:
+ def add_host_label(self, host: str, label: str) -> OrchResult[str]:
"""
Add a host label
"""
raise NotImplementedError()
- def remove_host_label(self, host: str, label: str) -> Completion[str]:
+ def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
"""
Remove a host label
"""
raise NotImplementedError()
- def host_ok_to_stop(self, hostname: str) -> Completion:
+ def host_ok_to_stop(self, hostname: str) -> OrchResult:
"""
Check if the specified host can be safely stopped without reducing availability
"""
raise NotImplementedError()
- def enter_host_maintenance(self, hostname: str, force: bool = False) -> Completion:
+ def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
"""
Place a host in maintenance, stopping daemons and disabling it's systemd target
"""
raise NotImplementedError()
- def exit_host_maintenance(self, hostname: str) -> Completion:
+ def exit_host_maintenance(self, hostname: str) -> OrchResult:
"""
Return a host from maintenance, restarting the clusters systemd target
"""
raise NotImplementedError()
- def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
+ def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
"""
Returns something that was created by `ceph-volume inventory`.
"""
raise NotImplementedError()
- def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
+ def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
"""
raise NotImplementedError()
- def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
+ def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
"""
Describe a daemon (of any kind) that is already configured in
the orchestrator.
"""
raise NotImplementedError()
- def apply(self, specs: Sequence["GenericSpec"]) -> Completion[List[str]]:
+ @handle_orch_error
+ def apply(self, specs: Sequence["GenericSpec"]) -> List[str]:
"""
Applies any spec
"""
- fns: Dict[str, Callable] = {
+ fns: Dict[str, Callable[..., OrchResult[str]]] = {
'alertmanager': self.apply_alertmanager,
'crash': self.apply_crash,
'grafana': self.apply_grafana,
'mon': self.apply_mon,
'nfs': self.apply_nfs,
'node-exporter': self.apply_node_exporter,
- 'osd': lambda dg: self.apply_drivegroups([dg]),
+ 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
'prometheus': self.apply_prometheus,
'rbd-mirror': self.apply_rbd_mirror,
'rgw': self.apply_rgw,
'cephadm-exporter': self.apply_cephadm_exporter,
}
- def merge(ls: Union[List[T], T], r: Union[List[T], T]) -> List[T]:
- if isinstance(ls, list):
- return ls + [r] # type: ignore
- return [ls, r] # type: ignore
-
- spec, *specs = specs
-
- fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
- completion = fn(spec)
- for s in specs:
- def next(ls: list) -> Any:
- fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
- return fn(s).then(lambda r: merge(ls, r))
- completion = completion.then(next)
- return completion
+ def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
+ l_res = raise_if_exception(l)
+ r_res = raise_if_exception(r)
+ l_res.append(r_res)
+ return OrchResult(l_res)
+ return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
- def plan(self, spec: Sequence["GenericSpec"]) -> Completion[List]:
+ def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
"""
Plan (Dry-run, Preview) a List of Specs.
"""
raise NotImplementedError()
- def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
+ def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
"""
Remove specific daemon(s).
"""
raise NotImplementedError()
- def remove_service(self, service_name: str) -> Completion[str]:
+ def remove_service(self, service_name: str) -> OrchResult[str]:
"""
Remove a service (a collection of daemons).
"""
raise NotImplementedError()
- def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
+ def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
"""
Perform an action (start/stop/reload) on a service (i.e., all daemons
providing the logical service).
:param action: one of "start", "stop", "restart", "redeploy", "reconfig"
:param service_name: service_type + '.' + service_id
(e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
- :rtype: Completion
+ :rtype: OrchResult
"""
# assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]:
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
"""
Perform an action (start/stop/reload) on a daemon.
:param action: one of "start", "stop", "restart", "redeploy", "reconfig"
:param daemon_name: name of daemon
:param image: Container image when redeploying that daemon
- :rtype: Completion
+ :rtype: OrchResult
"""
# assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
+ def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
"""
Create one or more OSDs within a single Drive Group.
"""
raise NotImplementedError()
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
""" Update OSD cluster """
raise NotImplementedError()
def preview_osdspecs(self,
osdspec_name: Optional[str] = 'osd',
osdspecs: Optional[List[DriveGroupSpec]] = None
- ) -> Completion[str]:
+ ) -> OrchResult[str]:
""" Get a preview for OSD deployments """
raise NotImplementedError()
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> Completion[str]:
+ force: bool = False) -> OrchResult[str]:
"""
:param osd_ids: list of OSD IDs
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
"""
raise NotImplementedError()
- def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+ def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
"""
TODO
"""
raise NotImplementedError()
- def remove_osds_status(self) -> Completion:
+ def remove_osds_status(self) -> OrchResult:
"""
Returns a status of the ongoing OSD removal operations.
"""
raise NotImplementedError()
- def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
+ def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
"""
Instructs the orchestrator to enable or disable either the ident or the fault LED.
"""
raise NotImplementedError()
- def zap_device(self, host: str, path: str) -> Completion[str]:
+ def zap_device(self, host: str, path: str) -> OrchResult[str]:
"""Zap/Erase a device (DESTROYS DATA)"""
raise NotImplementedError()
- def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_mon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create mon daemon(s)"""
raise NotImplementedError()
- def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update mon cluster"""
raise NotImplementedError()
- def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_mgr(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create mgr daemon(s)"""
raise NotImplementedError()
- def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update mgr cluster"""
raise NotImplementedError()
- def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_mds(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create MDS daemon(s)"""
raise NotImplementedError()
- def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update MDS cluster"""
raise NotImplementedError()
- def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
+ def add_rgw(self, spec: RGWSpec) -> OrchResult[List[str]]:
"""Create RGW daemon(s)"""
raise NotImplementedError()
- def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
+ def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
"""Update RGW cluster"""
raise NotImplementedError()
- def apply_ha_rgw(self, spec: HA_RGWSpec) -> Completion[str]:
+ def apply_ha_rgw(self, spec: HA_RGWSpec) -> OrchResult[str]:
"""Update ha-rgw daemons"""
raise NotImplementedError()
- def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create rbd-mirror daemon(s)"""
raise NotImplementedError()
- def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update rbd-mirror cluster"""
raise NotImplementedError()
- def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
+ def add_nfs(self, spec: NFSServiceSpec) -> OrchResult[List[str]]:
"""Create NFS daemon(s)"""
raise NotImplementedError()
- def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
+ def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
"""Update NFS cluster"""
raise NotImplementedError()
- def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
+ def add_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[List[str]]:
"""Create iscsi daemon(s)"""
raise NotImplementedError()
- def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
+ def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
"""Update iscsi cluster"""
raise NotImplementedError()
- def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_prometheus(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create new prometheus daemon"""
raise NotImplementedError()
- def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update prometheus cluster"""
raise NotImplementedError()
- def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_node_exporter(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create a new Node-Exporter service"""
raise NotImplementedError()
- def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
- def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_crash(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create a new crash service"""
raise NotImplementedError()
- def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a crash daemon(s)"""
raise NotImplementedError()
- def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_grafana(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create a new grafana service"""
raise NotImplementedError()
- def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a grafana service"""
raise NotImplementedError()
- def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_alertmanager(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create a new AlertManager service"""
raise NotImplementedError()
- def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
- def add_cephadm_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
+ def add_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[List[str]]:
"""Create a new cephadm exporter daemon"""
raise NotImplementedError()
- def apply_cephadm_exporter(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update an existing cephadm exporter daemon"""
raise NotImplementedError()
- def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+ def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+ def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_pause(self) -> Completion[str]:
+ def upgrade_pause(self) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_resume(self) -> Completion[str]:
+ def upgrade_resume(self) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_stop(self) -> Completion[str]:
+ def upgrade_stop(self) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
+ def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.
raise NotImplementedError()
@_hide_in_features
- def upgrade_available(self) -> Completion:
+ def upgrade_available(self) -> OrchResult:
"""
Report on what versions are available to upgrade to
>>> class MyModule(OrchestratorClientMixin):
... def func(self):
... completion = self.add_host('somehost') # calls `_oremote()`
- ... self._orchestrator_wait([completion])
... self.log.debug(completion.result)
.. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
if meth not in f_set or not f_set[meth]['available']:
raise NotImplementedError(f'{o} does not implement {meth}') from e
raise
-
- def _orchestrator_wait(self, completions: List[Completion]) -> None:
- """
- Wait for completions to complete (reads) or
- become persistent (writes).
-
- Waits for writes to be *persistent* but not *effective*.
-
- :param completions: List of Completions
- :raises NoOrchestrator:
- :raises RuntimeError: something went wrong while calling the process method.
- :raises ImportError: no `orchestrator` module or backend not found.
- """
- while any(not c.has_result for c in completions):
- self.process(completions)
- self.__get_mgr().log.info("Operations pending: %s",
- sum(1 for c in completions if not c.has_result))
- if any(c.needs_result for c in completions):
- time.sleep(1)
- else:
- break