"""
def __init__(cls, name, bases, dct):
super(CLICommandMeta, cls).__init__(name, bases, dct)
- dispatch = {} # type: Dict[str, CLICommand]
+ dispatch: Dict[str, CLICommand] = {}
for v in dct.values():
try:
dispatch[v._prefix] = v._cli_command
RUNNING = 2
FINISHED = 3 # we have a final result
- NO_RESULT = _no_result() # type: None
+ NO_RESULT: None = _no_result()
ASYNC_RESULT = object()
def __init__(self,
- _first_promise=None, # type: Optional["_Promise"]
- value=NO_RESULT, # type: Optional[Any]
- on_complete=None, # type: Optional[Callable]
- name=None, # type: Optional[str]
+ _first_promise: Optional["_Promise"] = None,
+ value: Optional[Any] = NO_RESULT,
+ on_complete: Optional[Callable] = None,
+ name: Optional[str] = None,
):
self._on_complete_ = on_complete
self._name = name
- self._next_promise = None # type: Optional[_Promise]
+ self._next_promise: Optional[_Promise] = None
self._state = self.INITIALIZED
- self._exception = None # type: Optional[Exception]
+ self._exception: Optional[Exception] = None
# Value of this _Promise. may be an intermediate result.
self._value = value
# _Promise is not a continuation monad, as `_result` is of type
# T instead of (T -> r) -> r. Therefore we need to store the first promise here.
- self._first_promise = _first_promise or self # type: '_Promise'
+ self._first_promise: '_Promise' = _first_promise or self
@property
- def _exception(self):
- # type: () -> Optional[Exception]
+ def _exception(self) -> Optional[Exception]:
return getattr(self, '_exception_', None)
@_exception.setter
self._serialized_exception_ = pickle.dumps(e)
@property
- def _serialized_exception(self):
- # type: () -> Optional[bytes]
+ def _serialized_exception(self) -> Optional[bytes]:
return getattr(self, '_serialized_exception_', None)
@property
- def _on_complete(self):
- # type: () -> Optional[Callable]
+ def _on_complete(self) -> Optional[Callable]:
# https://github.com/python/mypy/issues/4125
return self._on_complete_
@_on_complete.setter
- def _on_complete(self, val):
- # type: (Optional[Callable]) -> None
+ def _on_complete(self, val: Optional[Callable]) -> None:
self._on_complete_ = val
}[self._state]
return '{} {}({}),'.format(prefix, name, val)
- def then(self, on_complete):
- # type: (Any, Callable) -> Any
+ def then(self: Any, on_complete: Callable) -> Any:
"""
Call ``on_complete`` as soon as this promise is finalized.
"""
self._set_next_promise(self.__class__(_first_promise=self._first_promise))
return self._next_promise
- def _set_next_promise(self, next):
- # type: (_Promise) -> None
+ def _set_next_promise(self, next: '_Promise') -> None:
assert self is not next
assert self._state in (self.INITIALIZED, self.RUNNING)
if self._next_promise:
self._next_promise._finalize()
- def fail(self, e):
- # type: (Exception) -> None
+ def fail(self, e: Exception) -> None:
"""
Sets the whole completion to be faild with this exception and end the
evaluation.
assert other not in self
self._last_promise()._set_next_promise(other)
- def _last_promise(self):
- # type: () -> _Promise
+ def _last_promise(self) -> '_Promise':
return list(iter(self))[-1]
class ProgressReference(object):
def __init__(self,
- message, # type: str
+ message: str,
mgr,
- completion=None # type: Optional[Callable[[], Completion]]
+ completion: Optional[Callable[[], 'Completion']] = None
):
"""
ProgressReference can be used within Completions::
#: The completion can already have a result, before the write
#: operation is effective. progress == 1 means, the services are
#: created / removed.
- self.completion = completion # type: Optional[Callable[[], Completion]]
+ self.completion: Optional[Callable[[], Completion]] = completion
#: if a orchestrator module can provide a more detailed
#: progress information, it needs to also call ``progress.update()``.
"""
def __init__(self,
- _first_promise=None, # type: Optional["Completion"]
- value=_Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable]
- name=None, # type: Optional[str]
+ _first_promise: Optional["Completion"] = None,
+ value: Any = _Promise.NO_RESULT,
+ on_complete: Optional[Callable] = None,
+ name: Optional[str] = None,
):
super(Completion, self).__init__(_first_promise, value, on_complete, name)
@property
- def _progress_reference(self):
- # type: () -> Optional[ProgressReference]
+ def _progress_reference(self) -> Optional[ProgressReference]:
if hasattr(self._on_complete, 'progress_id'):
return self._on_complete # type: ignore
return None
@property
- def progress_reference(self):
- # type: () -> Optional[ProgressReference]
+ def progress_reference(self) -> Optional[ProgressReference]:
"""
ProgressReference. Marks this completion
as a write completeion.
return None
@classmethod
- def with_progress(cls, # type: Any
- message, # type: str
+ def with_progress(cls: Any,
+ message: str,
mgr,
- _first_promise=None, # type: Optional["Completion"]
- value=_Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable]
- calc_percent=None # type: Optional[Callable[[], Any]]
- ):
- # type: (...) -> Any
+ _first_promise: Optional["Completion"] = None,
+ value: Any = _Promise.NO_RESULT,
+ on_complete: Optional[Callable] = None,
+ calc_percent: Optional[Callable[[], Any]] = None
+ ) -> Any:
c = cls(
_first_promise=_first_promise,
return c._first_promise
def add_progress(self,
- message, # type: str
+ message: str,
mgr,
- calc_percent=None # type: Optional[Callable[[], Any]]
+ calc_percent: Optional[Callable[[], Any]] = None
):
return self.then(
on_complete=ProgressReference(
return str(self.result)
@property
- def exception(self):
- # type: () -> Optional[Exception]
+ def exception(self) -> Optional[Exception]:
return self._last_promise()._exception
@property
- def serialized_exception(self):
- # type: () -> Optional[bytes]
+ def serialized_exception(self) -> Optional[bytes]:
return self._last_promise()._serialized_exception
@property
- def has_result(self):
- # type: () -> bool
+ def has_result(self) -> bool:
"""
Has the operation already a result?
return self._last_promise()._state == _Promise.FINISHED
@property
- def is_errored(self):
- # type: () -> bool
+ def is_errored(self) -> bool:
"""
Has the completion failed. Default implementation looks for
self.exception. Can be overwritten.
return self.exception is not None
@property
- def needs_result(self):
- # type: () -> bool
+ def needs_result(self) -> bool:
"""
Could the external operation be deemed as complete,
or should we wait?
return not self.is_errored and not self.has_result
@property
- def is_finished(self):
- # type: () -> bool
+ def is_finished(self) -> bool:
"""
Could the external operation be deemed as complete,
or should we wait?
return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-def pretty_print(completions):
- # type: (Sequence[Completion]) -> str
+def pretty_print(completions: Sequence[Completion]) -> str:
return ', '.join(c.pretty_print() for c in completions)
-def raise_if_exception(c):
- # type: (Completion) -> None
+def raise_if_exception(c: Completion) -> None:
"""
:raises OrchestratorError: Some user error or a config error.
:raises Exception: Some internal error
return True
@_hide_in_features
- def available(self):
- # type: () -> Tuple[bool, str]
+ def available(self) -> Tuple[bool, str]:
"""
Report whether we can talk to the orchestrator. This is the
place to give the user a meaningful message if the orchestrator
raise NotImplementedError()
@_hide_in_features
- def process(self, completions):
- # type: (List[Completion]) -> None
+ def process(self, completions: List[Completion]) -> None:
"""
Given a list of Completion instances, process any which are
incomplete.
}
return features
- def cancel_completions(self):
- # type: () -> None
+ def cancel_completions(self) -> None:
"""
Cancels ongoing completions. Unstuck the mgr.
"""
raise NotImplementedError()
- def pause(self):
- # type: () -> None
+ def pause(self) -> None:
raise NotImplementedError()
- def resume(self):
- # type: () -> None
+ def resume(self) -> None:
raise NotImplementedError()
- def add_host(self, host_spec):
- # type: (HostSpec) -> Completion[str]
+ def add_host(self, host_spec: HostSpec) -> Completion[str]:
"""
Add a host to the orchestrator inventory.
"""
raise NotImplementedError()
- def remove_host(self, host):
- # type: (str) -> Completion[str]
+ def remove_host(self, host: str) -> Completion[str]:
"""
Remove a host from the orchestrator inventory.
"""
raise NotImplementedError()
- def update_host_addr(self, host, addr):
- # type: (str, str) -> Completion[str]
+ def update_host_addr(self, host: str, addr: str) -> Completion[str]:
"""
Update a host's address
"""
raise NotImplementedError()
- def get_hosts(self):
- # type: () -> Completion[List[HostSpec]]
+ def get_hosts(self) -> Completion[List[HostSpec]]:
"""
Report the hosts in the cluster.
"""
raise NotImplementedError()
- def add_host_label(self, host, label):
- # type: (str, str) -> Completion[str]
+ def add_host_label(self, host: str, label: str) -> Completion[str]:
"""
Add a host label
"""
raise NotImplementedError()
- def remove_host_label(self, host, label):
- # type: (str, str) -> Completion[str]
+ def remove_host_label(self, host: str, label: str) -> Completion[str]:
"""
Remove a host label
"""
"""
raise NotImplementedError()
- def get_inventory(self, host_filter=None, refresh=False):
- # type: (Optional[InventoryFilter], bool) -> Completion[List[InventoryHost]]
+ def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
"""
Returns something that was created by `ceph-volume inventory`.
"""
raise NotImplementedError()
- def describe_service(self, service_type=None, service_name=None, refresh=False):
- # type: (Optional[str], Optional[str], bool) -> Completion[List[ServiceDescription]]
+ def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
"""
raise NotImplementedError()
- def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
- # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion[List[DaemonDescription]]
+ def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
"""
Describe a daemon (of any kind) that is already configured in
the orchestrator.
"""
raise NotImplementedError()
- def remove_daemons(self, names):
- # type: (List[str]) -> Completion[List[str]]
+ def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
"""
Remove specific daemon(s).
"""
raise NotImplementedError()
- def remove_service(self, service_name):
- # type: (str) -> Completion[str]
+ def remove_service(self, service_name: str) -> Completion[str]:
"""
Remove a service (a collection of daemons).
"""
raise NotImplementedError()
- def service_action(self, action, service_name):
- # type: (str, str) -> Completion[List[str]]
+ def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
"""
Perform an action (start/stop/reload) on a service (i.e., all daemons
providing the logical service).
#assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> Completion[str]
+ def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
"""
Create one or more OSDs within a single Drive Group.
"""
raise NotImplementedError()
- def remove_osds_status(self):
- # type: () -> Completion
+ def remove_osds_status(self) -> Completion:
"""
Returns a status of the ongoing OSD removal operations.
"""
raise NotImplementedError()
- def blink_device_light(self, ident_fault, on, locations):
- # type: (str, bool, List[DeviceLightLoc]) -> Completion[List[str]]
+ def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
"""
Instructs the orchestrator to enable or disable either the ident or the fault LED.
"""
raise NotImplementedError()
- def zap_device(self, host, path):
- # type: (str, str) -> Completion[str]
+ def zap_device(self, host: str, path: str) -> Completion[str]:
"""Zap/Erase a device (DESTROYS DATA)"""
raise NotImplementedError()
- def add_mon(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create mon daemon(s)"""
raise NotImplementedError()
- def apply_mon(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
"""Update mon cluster"""
raise NotImplementedError()
- def add_mgr(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create mgr daemon(s)"""
raise NotImplementedError()
- def apply_mgr(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
"""Update mgr cluster"""
raise NotImplementedError()
- def add_mds(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create MDS daemon(s)"""
raise NotImplementedError()
- def apply_mds(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
"""Update MDS cluster"""
raise NotImplementedError()
- def add_rgw(self, spec):
- # type: (RGWSpec) -> Completion[List[str]]
+ def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
"""Create RGW daemon(s)"""
raise NotImplementedError()
- def apply_rgw(self, spec):
- # type: (RGWSpec) -> Completion[str]
+ def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
"""Update RGW cluster"""
raise NotImplementedError()
- def add_rbd_mirror(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create rbd-mirror daemon(s)"""
raise NotImplementedError()
- def apply_rbd_mirror(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
"""Update rbd-mirror cluster"""
raise NotImplementedError()
- def add_nfs(self, spec):
- # type: (NFSServiceSpec) -> Completion[List[str]]
+ def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
"""Create NFS daemon(s)"""
raise NotImplementedError()
- def apply_nfs(self, spec):
- # type: (NFSServiceSpec) -> Completion[str]
+ def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
"""Update NFS cluster"""
raise NotImplementedError()
- def add_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> Completion[List[str]]
+ def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
"""Create iscsi daemon(s)"""
raise NotImplementedError()
- def apply_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> Completion[str]
+ def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
"""Update iscsi cluster"""
raise NotImplementedError()
- def add_prometheus(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create new prometheus daemon"""
raise NotImplementedError()
- def apply_prometheus(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
"""Update prometheus cluster"""
raise NotImplementedError()
- def add_node_exporter(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new Node-Exporter service"""
raise NotImplementedError()
- def apply_node_exporter(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
- def add_crash(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new crash service"""
raise NotImplementedError()
- def apply_crash(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
"""Update existing a crash daemon(s)"""
raise NotImplementedError()
- def add_grafana(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new Node-Exporter service"""
raise NotImplementedError()
- def apply_grafana(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
- def add_alertmanager(self, spec):
- # type: (ServiceSpec) -> Completion[List[str]]
+ def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new AlertManager service"""
raise NotImplementedError()
- def apply_alertmanager(self, spec):
- # type: (ServiceSpec) -> Completion[str]
+ def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
- def upgrade_check(self, image, version):
- # type: (Optional[str], Optional[str]) -> Completion[str]
+ def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
raise NotImplementedError()
- def upgrade_start(self, image, version):
- # type: (Optional[str], Optional[str]) -> Completion[str]
+ def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
raise NotImplementedError()
- def upgrade_pause(self):
- # type: () -> Completion[str]
+ def upgrade_pause(self) -> Completion[str]:
raise NotImplementedError()
- def upgrade_resume(self):
- # type: () -> Completion[str]
+ def upgrade_resume(self) -> Completion[str]:
raise NotImplementedError()
- def upgrade_stop(self):
- # type: () -> Completion[str]
+ def upgrade_stop(self) -> Completion[str]:
raise NotImplementedError()
- def upgrade_status(self):
- # type: () -> Completion[UpgradeStatusSpec]
+ def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.
raise NotImplementedError()
@_hide_in_features
- def upgrade_available(self):
- # type: () -> Completion
+ def upgrade_available(self) -> Completion:
"""
Report on what versions are available to upgrade to
GenericSpec = Union[ServiceSpec, HostSpec]
-def json_to_generic_spec(spec):
- # type: (dict) -> GenericSpec
+
+def json_to_generic_spec(spec: dict) -> GenericSpec:
if 'service_type' in spec and spec['service_type'] == 'host':
return HostSpec.from_json(spec)
else:
self.status_desc = status_desc
# datetime when this info was last refreshed
- self.last_refresh = last_refresh # type: Optional[datetime.datetime]
+ self.last_refresh: Optional[datetime.datetime] = last_refresh
- self.created = created # type: Optional[datetime.datetime]
- self.started = started # type: Optional[datetime.datetime]
- self.last_configured = last_configured # type: Optional[datetime.datetime]
- self.last_deployed = last_deployed # type: Optional[datetime.datetime]
+ self.created: Optional[datetime.datetime] = created
+ self.started: Optional[datetime.datetime] = started
+ self.last_configured: Optional[datetime.datetime] = last_configured
+ self.last_deployed: Optional[datetime.datetime] = last_deployed
# Affinity to a certain OSDSpec
- self.osdspec_affinity = osdspec_affinity # type: Optional[str]
+ self.osdspec_affinity: Optional[str] = osdspec_affinity
self.events: List[OrchestratorEvent] = events or []
def name(self):
return '%s.%s' % (self.daemon_type, self.daemon_id)
- def matches_service(self, service_name):
- # type: (Optional[str]) -> bool
+ def matches_service(self, service_name: Optional[str]) -> bool:
if service_name:
return self.name().startswith(service_name + '.')
return False
self.running = running
# datetime when this info was last refreshed
- self.last_refresh = last_refresh # type: Optional[datetime.datetime]
- self.created = created # type: Optional[datetime.datetime]
+ self.last_refresh: Optional[datetime.datetime] = last_refresh
+ self.created: Optional[datetime.datetime] = created
self.spec: ServiceSpec = spec
in e.g. OSD servers.
"""
- def __init__(self, labels=None, hosts=None):
- # type: (Optional[List[str]], Optional[List[str]]) -> None
+ def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
#: Optional: get info about hosts matching labels
self.labels = labels
When fetching inventory, all Devices are groups inside of an
InventoryHost.
"""
- def __init__(self, name, devices=None, labels=None, addr=None):
- # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
+ def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
if devices is None:
devices = inventory.Devices([])
if labels is None:
return "<InventoryHost>({name})".format(name=self.name)
@staticmethod
- def get_host_names(hosts):
- # type: (List[InventoryHost]) -> List[str]
+ def get_host_names(hosts: List['InventoryHost']) -> List[str]:
return [host.name for host in hosts]
def __eq__(self, other):
... self.orch_client.set_mgr(self.mgr))
"""
- def set_mgr(self, mgr):
- # type: (MgrModule) -> None
+ def set_mgr(self, mgr: MgrModule) -> None:
"""
Useable in the Dashbord that uses a global ``mgr``
"""
raise NotImplementedError(f'{o} does not implement {meth}') from e
raise
- def _orchestrator_wait(self, completions):
- # type: (List[Completion]) -> None
+ def _orchestrator_wait(self, completions: List[Completion]) -> None:
"""
Wait for completions to complete (reads) or
become persistent (writes).