try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
- Type, Sequence, Dict, cast
+ Type, Sequence, Dict, cast
except ImportError:
pass
T = TypeVar('T')
+
class OrchestratorError(Exception):
"""
General orchestrator specific error.
It's not intended for programming errors or orchestrator internal errors.
"""
+
def __init__(self,
msg: str,
errno: int = -errno.EINVAL,
"""
No orchestrator in configured.
"""
+
def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
def _serialized_exception(self) -> Optional[bytes]:
return getattr(self, '_serialized_exception_', None)
-
-
@property
def _on_complete(self) -> Optional[Callable]:
# https://github.com/python/mypy/issues/4125
def _on_complete(self, val: Optional[Callable]) -> None:
self._on_complete_ = val
-
def __repr__(self):
- name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
+ name = self._name or getattr(self._on_complete, '__name__',
+ '??') if self._on_complete else 'None'
val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
- self.__class__, self._state, val, self._on_complete, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
+ self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
+ next, '_progress_reference', 'NA'), repr(self._next_promise)
)
def pretty_print_1(self):
# asynchronous promise
pass
-
def propagate_to_next(self):
self._state = self.FINISHED
logger.debug('finalized {}'.format(repr(self)))
message: str,
mgr,
completion: Optional[Callable[[], 'Completion']] = None
- ):
+ ):
"""
ProgressReference can be used within Completions::
try:
if self.effective:
self.mgr.remote("progress", "complete", self.progress_id)
- self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
+ self.mgr.all_progress_references = [
+ p for p in self.mgr.all_progress_references if p is not self]
else:
self.mgr.remote("progress", "update", self.progress_id, self.message,
progress,
+---------------+ +-----------------+
"""
+
def __init__(self,
_first_promise: Optional["Completion"] = None,
value: Any = _Promise.NO_RESULT,
as a write completeion.
"""
- references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
+ references = [c._progress_reference for c in iter(
+ self) if c._progress_reference is not None]
if references:
assert len(references) == 1
return references[0]
if self._progress_reference:
self._progress_reference.fail()
- def finalize(self, result: Union[None, object, T]=_Promise.NO_RESULT):
+ def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
if self._first_promise._state == self.INITIALIZED:
self._first_promise._finalize(result)
"""
This is the trivial completion simply wrapping a result.
"""
+
def __init__(self, result: T):
super(TrivialReadCompletion, self).__init__()
if result:
"""
raise NotImplementedError()
- def host_ok_to_stop(self, hostname:str) -> Completion:
+ def host_ok_to_stop(self, hostname: str) -> Completion:
"""
Check if the specified host can be safely stopped without reducing availability
(e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
:rtype: Completion
"""
- #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+ # assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> Completion[str]:
:param image: Container image when redeploying that daemon
:rtype: Completion
"""
- #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+ # assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
else:
return ServiceSpec.from_json(spec)
+
class UpgradeStatusSpec(object):
# Orchestrator's report on what's going on with any ongoing upgrade
def __init__(self):
last_configured=None,
osdspec_affinity=None,
last_deployed=None,
- events: Optional[List['OrchestratorEvent']]=None):
+ events: Optional[List['OrchestratorEvent']] = None):
# Host is at the same granularity as InventoryHost
self.hostname: str = hostname
return self.osdspec_affinity
def _match():
- err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " \
- f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
+ err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
+ f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
if not self.hostname:
# TODO: can a DaemonDescription exist without a hostname?
yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
+
class ServiceDescription(object):
"""
For responding to queries about the status of a particular service,
created=None,
size=0,
running=0,
- events: Optional[List['OrchestratorEvent']]=None):
+ events: Optional[List['OrchestratorEvent']] = None):
# Not everyone runs in containers, but enough people do to
# justify having the container_image_id (image hash) and container_image
# (image name)
in e.g. OSD servers.
"""
+
def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
#: Optional: get info about hosts matching labels
When fetching inventory, all Devices are groups inside of an
InventoryHost.
"""
+
def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
if devices is None:
devices = inventory.Devices([])
except TypeError as e:
raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
-
@classmethod
def from_nested_items(cls, hosts):
devs = inventory.Devices.from_json