[mypy-cephadm.*]
disallow_untyped_defs = True
+[mypy-orchestrator.*]
+disallow_untyped_defs = True
+
# Make cephadm and rook happy
[mypy-OpenSSL]
ignore_missing_imports = True
"""Provide a list of the types of daemons on the host"""
result = set()
for _d, dm in self.daemons[hostname].items():
+ assert dm.daemon_type is not None
result.add(dm.daemon_type)
return list(result)
def _host_ok_to_stop(self, hostname: str) -> Tuple[int, str]:
self.log.debug("running host-ok-to-stop checks")
daemons = self.cache.get_daemons()
- daemon_map = defaultdict(lambda: [])
+ daemon_map: Dict[str, List[str]] = defaultdict(lambda: [])
for dd in daemons:
+ assert dd.hostname is not None
+ assert dd.daemon_type is not None
+ assert dd.daemon_id is not None
if dd.hostname == hostname:
daemon_map[dd.daemon_type].append(dd.daemon_id)
osd_count = 0
for h, dm in self.cache.get_daemons_with_volatile_status():
for name, dd in dm.items():
+ assert dd.hostname is not None
+ assert dd.daemon_type is not None
+
if service_type and service_type != dd.daemon_type:
continue
n: str = dd.service_name()
@trivial_completion
def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
d = self.cache.get_daemon(daemon_name)
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
and not self.mgr_service.mgr_map_has_standby():
def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
dd = self.cache.get_daemon(daemon_name)
+ assert dd.daemon_type is not None
+ assert dd.daemon_id is not None
+ assert dd.hostname is not None
if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
and not self.mgr_service.mgr_map_has_standby():
raise OrchestratorError(
return f"Unable to find OSDs: {osd_ids}"
for daemon in to_remove_daemons:
+ assert daemon.daemon_id is not None
try:
self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
replace=replace,
# remove any?
def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
- r = self.mgr.cephadm_services[service_type].ok_to_stop(daemon_ids)
+ assert None not in daemon_ids
+ r = self.mgr.cephadm_services[service_type].ok_to_stop(cast(List[str], daemon_ids))
return not r.retval
while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
r = True
# NOTE: we are passing the 'force' flag here, which means
# we can delete a mon instances data.
+ assert d.hostname is not None
self._remove_daemon(d.name(), d.hostname)
if r is None:
for dd in daemons:
# orphan?
spec = self.mgr.spec_store.specs.get(dd.service_name(), None)
+ assert dd.hostname is not None
+ assert dd.daemon_type is not None
+ assert dd.daemon_id is not None
if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
# (mon and mgr specs should always exist; osds aren't matched
# to a service spec)
ha_rgw_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
for daemon in ha_rgw_daemons:
if daemon.hostname in [h.hostname for h in hosts] and daemon.hostname not in add_hosts:
+ assert daemon.hostname is not None
self.mgr.cache.schedule_daemon_action(
daemon.hostname, daemon.name(), 'reconfig')
return spec
"""
Called before the daemon is removed.
"""
+ assert daemon.daemon_type is not None
assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
"""
Called after the daemon is removed.
"""
+ assert daemon.daemon_type is not None
assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
}
def remove_keyring(self, daemon: DaemonDescription) -> None:
+ assert daemon.daemon_id is not None
+ assert daemon.hostname is not None
daemon_id: str = daemon.daemon_id
host: str = daemon.hostname
def pre_remove(self, daemon: DaemonDescription) -> None:
super().pre_remove(daemon)
+ assert daemon.daemon_id is not None
daemon_id: str = daemon.daemon_id
self._check_safe_to_destroy(daemon_id)
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
for daemon in daemon_descrs:
+ assert daemon.daemon_type is not None
+ assert daemon.daemon_id is not None
if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
return daemon
# if no active mgr found, return empty Daemon Desc
rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw')
rgw_servers = []
for daemon in rgw_daemons:
+ assert daemon.hostname is not None
rgw_servers.append(self.rgw_server(
daemon.name(), resolve_ip(daemon.hostname)))
'value': "true"
})
for dd in daemon_descrs:
+ assert dd.hostname is not None
spec = cast(IscsiServiceSpec,
self.mgr.spec_store.specs.get(dd.service_name(), None))
if not spec:
prom_services = [] # type: List[str]
for dd in self.mgr.cache.get_daemons_by_service('prometheus'):
+ assert dd.hostname is not None
prom_services.append(dd.hostname)
deps.append(dd.name())
grafana_data_sources = self.mgr.template.render(
def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
# TODO: signed cert
dd = self.get_active_daemon(daemon_descrs)
+ assert dd.hostname is not None
service_url = 'https://{}:{}'.format(
self._inventory_get_addr(dd.hostname), self.DEFAULT_SERVICE_PORT)
self._set_service_url_on_dashboard(
continue
if dd.daemon_id == self.mgr.get_mgr_id():
continue
+ assert dd.hostname is not None
addr = self.mgr.inventory.get_addr(dd.hostname)
dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0],
port))
peers = []
port = '9094'
for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
+ assert dd.hostname is not None
deps.append(dd.name())
addr = self.mgr.inventory.get_addr(dd.hostname)
peers.append(addr.split(':')[0] + ':' + port)
def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
dd = self.get_active_daemon(daemon_descrs)
+ assert dd.hostname is not None
service_url = 'http://{}:{}'.format(self._inventory_get_addr(dd.hostname),
self.DEFAULT_SERVICE_PORT)
self._set_service_url_on_dashboard(
continue
if dd.daemon_id == self.mgr.get_mgr_id():
continue
+ assert dd.hostname is not None
addr = self.mgr.inventory.get_addr(dd.hostname)
mgr_scrape_list.append(addr.split(':')[0] + ':' + port)
# scrape node exporters
nodes = []
for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
+ assert dd.hostname is not None
deps.append(dd.name())
addr = self.mgr.inventory.get_addr(dd.hostname)
nodes.append({
# scrape alert managers
alertmgr_targets = []
for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
+ assert dd.hostname is not None
deps.append(dd.name())
addr = self.mgr.inventory.get_addr(dd.hostname)
alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0]))
def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
dd = self.get_active_daemon(daemon_descrs)
+ assert dd.hostname is not None
service_url = 'http://{}:{}'.format(
self._inventory_get_addr(dd.hostname), self.DEFAULT_SERVICE_PORT)
self._set_service_url_on_dashboard(
return keyring
def remove_rgw_keyring(self, daemon: DaemonDescription) -> None:
+ assert daemon.daemon_id is not None
daemon_id: str = daemon.daemon_id
entity: AuthEntity = self.get_auth_entity(f'{daemon_id}-rgw')
def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool:
# only wait a little bit; the service might go away for something
+ assert s.daemon_type is not None
+ assert s.daemon_id is not None
tries = 4
while tries > 0:
if not self.upgrade_state or self.upgrade_state.paused:
daemon_type, d.daemon_id,
d.container_image_name, d.container_image_id, d.version))
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+ assert d.hostname is not None
+
if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
self.mgr.get_mgr_id())
from contextlib import contextmanager
from functools import wraps
+from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
+ Sequence, Dict, cast
+
+try:
+ from typing import Protocol # Protocol was added in Python 3.8
+except ImportError:
+ class Protocol: pass # type: ignore
+
+
import yaml
from ceph.deployment import inventory
from mgr_module import MgrModule, CLICommand, HandleCommandResult
-try:
- from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
- Type, Sequence, Dict, cast
-except ImportError:
- pass
logger = logging.getLogger(__name__)
T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable[..., Any])
class OrchestratorError(Exception):
def __init__(self,
msg: str,
errno: int = -errno.EINVAL,
- event_kind_subject: Optional[Tuple[str, str]] = None):
+ event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
super(Exception, self).__init__(msg)
self.errno = errno
# See OrchestratorEvent.subject
No orchestrator in configured.
"""
- def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
+ def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
@contextmanager
-def set_exception_subject(kind, subject, overwrite=False):
+def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
try:
yield
except OrchestratorError as e:
raise
-def handle_exception(prefix, perm, func):
+def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
@wraps(func)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
except (OrchestratorError, ServiceSpecValidationError) as e:
wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
wrapper_copy._cli_command.func = wrapper_copy # type: ignore
- return wrapper_copy
+ return cast(FuncT, wrapper_copy)
+
+class InnerCliCommandCallable(Protocol):
+ def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]: ...
-def _cli_command(perm):
- def inner_cli_command(prefix):
+def _cli_command(perm: str) -> InnerCliCommandCallable:
+ def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
return lambda func: handle_exception(prefix, perm, func)
return inner_cli_command
We make use of CLICommand, except for the use of the global variable.
"""
- def __init__(cls, name, bases, dct):
+ def __init__(cls, name: str, bases: Any, dct: Any) -> None:
super(CLICommandMeta, cls).__init__(name, bases, dct)
dispatch: Dict[str, CLICommand] = {}
for v in dct.values():
except AttributeError:
pass
- def handle_command(self, inbuf, cmd):
+ def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
if cmd['prefix'] not in dispatch:
return self.handle_command(inbuf, cmd)
cls.handle_command = handle_command
-def _no_result():
- return object()
+def _no_result() -> None:
+ return object() # type: ignore
class _Promise(object):
RUNNING = 2
FINISHED = 3 # we have a final result
- NO_RESULT: None = _no_result()
+ NO_RESULT: None = _no_result() # type: ignore
ASYNC_RESULT = object()
def __init__(self,
return getattr(self, '_exception_', None)
@_exception.setter
- def _exception(self, e):
+ def _exception(self, e: Exception) -> None:
self._exception_ = e
try:
self._serialized_exception_ = pickle.dumps(e) if e is not None else None
def _on_complete(self, val: Optional[Callable]) -> None:
self._on_complete_ = val
- def __repr__(self):
+ def __repr__(self) -> str:
name = self._name or getattr(self._on_complete, '__name__',
'??') if self._on_complete else 'None'
val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
next, '_progress_reference', 'NA'), repr(self._next_promise)
)
- def pretty_print_1(self):
+ def pretty_print_1(self) -> str:
if self._name:
name = self._name
elif self._on_complete is None:
for p in iter(self._next_promise):
p._first_promise = self._first_promise
- def _finalize(self, value=NO_RESULT):
+ def _finalize(self, value: Optional[T] = NO_RESULT) -> None:
"""
Sets this promise to complete.
# asynchronous promise
pass
- def propagate_to_next(self):
+ def propagate_to_next(self) -> None:
self._state = self.FINISHED
logger.debug('finalized {}'.format(repr(self)))
if self._next_promise:
self._next_promise.fail(e)
self._state = self.FINISHED
- def __contains__(self, item):
+ def __contains__(self, item: '_Promise') -> bool:
return any(item is p for p in iter(self._first_promise))
- def __iter__(self):
+ def __iter__(self) -> Iterator['_Promise']:
yield self
elem = self._next_promise
while elem is not None:
yield elem
elem = elem._next_promise
- def _append_promise(self, other):
+ def _append_promise(self, other: Optional['_Promise']) -> None:
if other is not None:
assert self not in other
assert other not in self
class ProgressReference(object):
def __init__(self,
message: str,
- mgr,
+ mgr: Any,
completion: Optional[Callable[[], 'Completion']] = None
):
"""
self._completion_has_result = False
self.mgr.all_progress_references.append(self)
- def __str__(self):
+ def __str__(self) -> str:
"""
``__str__()`` is used for determining the message for progress events.
"""
return self.message or super(ProgressReference, self).__str__()
- def __call__(self, arg):
+ def __call__(self, arg: T) -> T:
self._completion_has_result = True
self.progress = 1.0
return arg
@property
- def progress(self):
+ def progress(self) -> float:
return self._progress
@progress.setter
- def progress(self, progress):
+ def progress(self, progress: float) -> None:
assert progress <= 1.0
self._progress = progress
try:
pass
@property
- def effective(self):
+ def effective(self) -> bool:
return self.progress == 1 and self._completion_has_result
- def update(self):
- def progress_run(progress):
+ def update(self) -> None:
+ def progress_run(progress: float) -> None:
self.progress = progress
if self.completion:
c = self.completion().then(progress_run)
else:
self.progress = 1
- def fail(self):
+ def fail(self) -> None:
self._completion_has_result = True
self.progress = 1
value: Any = _Promise.NO_RESULT,
on_complete: Optional[Callable] = None,
name: Optional[str] = None,
- ):
+ ) -> None:
super(Completion, self).__init__(_first_promise, value, on_complete, name)
@property
as a write completeion.
"""
- references = [c._progress_reference for c in iter(
- self) if c._progress_reference is not None]
+ references = [c._progress_reference for c in iter( # type: ignore
+ self) if c._progress_reference is not None] # type: ignore
if references:
assert len(references) == 1
return references[0]
@classmethod
def with_progress(cls: Any,
message: str,
- mgr,
+ mgr: Any,
_first_promise: Optional["Completion"] = None,
value: Any = _Promise.NO_RESULT,
on_complete: Optional[Callable] = None,
def add_progress(self,
message: str,
- mgr,
+ mgr: Any,
calc_percent: Optional[Callable[[], Any]] = None
- ):
+ ) -> Any:
return self.then(
on_complete=ProgressReference(
message=message,
)
)
- def fail(self, e: Exception):
+ def fail(self, e: Exception) -> None:
super(Completion, self).fail(e)
if self._progress_reference:
self._progress_reference.fail()
- def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
+ def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT) -> None:
if self._first_promise._state == self.INITIALIZED:
self._first_promise._finalize(result)
"""
return self.is_errored or (self.has_result)
- def pretty_print(self):
+ def pretty_print(self) -> str:
reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
self.finalize(result)
-def _hide_in_features(f):
- f._hide_in_features = True
+def _hide_in_features(f: FuncT) -> FuncT:
+ f._hide_in_features = True # type: ignore
return f
-
class Orchestrator(object):
"""
Calls in this class may do long running remote operations, with time
"""
@_hide_in_features
- def is_orchestrator_module(self):
+ def is_orchestrator_module(self) -> bool:
"""
Enable other modules to interrogate this module to discover
whether it's usable as an orchestrator module.
raise NotImplementedError()
@_hide_in_features
- def get_feature_set(self):
+ def get_feature_set(self) -> Dict[str, dict]:
"""Describes which methods this orchestrator implements
.. note::
'cephadm-exporter': self.apply_cephadm_exporter,
}
- def merge(ls, r):
+ def merge(ls: Union[List[T], T], r: Union[List[T], T]) -> List[T]:
if isinstance(ls, list):
- return ls + [r]
- return [ls, r]
+ return ls + [r] # type: ignore
+ return [ls, r] # type: ignore
spec, *specs = specs
fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
completion = fn(spec)
for s in specs:
- def next(ls):
+ def next(ls: list) -> Any:
fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
return fn(s).then(lambda r: merge(ls, r))
completion = completion.then(next)
def set_unmanaged_flag(self,
unmanaged_flag: bool,
service_type: str = 'osd',
- service_name=None
+ service_name: Optional[str] = None
) -> HandleCommandResult:
raise NotImplementedError()
class UpgradeStatusSpec(object):
# Orchestrator's report on what's going on with any ongoing upgrade
- def __init__(self):
+ def __init__(self) -> None:
self.in_progress = False # Is an upgrade underway?
- self.target_image = None
- self.services_complete = [] # Which daemon types are fully updated?
+ self.target_image: Optional[str] = None
+ self.services_complete: List[str] = [] # Which daemon types are fully updated?
self.message = "" # Freeform description
-def handle_type_error(method):
+def handle_type_error(method: FuncT) -> FuncT:
@wraps(method)
- def inner(cls, *args, **kwargs):
+ def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
try:
return method(cls, *args, **kwargs)
except TypeError as e:
error_msg = '{}: {}'.format(cls.__name__, e)
raise OrchestratorValidationError(error_msg)
- return inner
+ return cast(FuncT, inner)
class DaemonDescription(object):
"""
def __init__(self,
- daemon_type=None,
- daemon_id=None,
- hostname=None,
- container_id=None,
- container_image_id=None,
- container_image_name=None,
- version=None,
- status=None,
- status_desc=None,
- last_refresh=None,
- created=None,
- started=None,
- last_configured=None,
- osdspec_affinity=None,
- last_deployed=None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ hostname: Optional[str] = None,
+ container_id: Optional[str] = None,
+ container_image_id: Optional[str] = None,
+ container_image_name: Optional[str] = None,
+ version: Optional[str] = None,
+ status: Optional[int] = None,
+ status_desc: Optional[str] = None,
+ last_refresh: Optional[datetime.datetime] = None,
+ created: Optional[datetime.datetime] = None,
+ started: Optional[datetime.datetime] = None,
+ last_configured: Optional[datetime.datetime] = None,
+ osdspec_affinity: Optional[str] = None,
+ last_deployed: Optional[datetime.datetime] = None,
events: Optional[List['OrchestratorEvent']] = None,
- is_active: bool = False):
+ is_active: bool = False) -> None:
# Host is at the same granularity as InventoryHost
- self.hostname: str = hostname
+ self.hostname: Optional[str] = hostname
# Not everyone runs in containers, but enough people do to
# justify having the container_id (runtime id) and container_image
# typically either based on hostnames or on pod names.
# This is the <foo> in mds.<foo>, the ID that will appear
# in the FSMap/ServiceMap.
- self.daemon_id: str = daemon_id
+ self.daemon_id: Optional[str] = daemon_id
# Service version that was deployed
self.version = version
self.is_active = is_active
- def name(self):
+ def name(self) -> str:
return '%s.%s' % (self.daemon_type, self.daemon_id)
def matches_service(self, service_name: Optional[str]) -> bool:
+ assert self.daemon_id is not None
+ assert self.daemon_type is not None
if service_name:
return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
return False
- def service_id(self):
+ def service_id(self) -> str:
+ assert self.daemon_id is not None
+ assert self.daemon_type is not None
if self.daemon_type == 'osd' and self.osdspec_affinity:
return self.osdspec_affinity
- def _match():
+ def _match() -> str:
+ assert self.daemon_id is not None
err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
return self.daemon_id
- def service_name(self):
+ def service_name(self) -> str:
+ assert self.daemon_type is not None
if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
return daemon_type_to_service(self.daemon_type)
- def __repr__(self):
+ def __repr__(self) -> str:
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
id=self.daemon_id)
- def to_json(self):
- out = OrderedDict()
+ def to_json(self) -> dict:
+ out: Dict[str, Any] = OrderedDict()
out['daemon_type'] = self.daemon_type
out['daemon_id'] = self.daemon_id
out['hostname'] = self.hostname
@classmethod
@handle_type_error
- def from_json(cls, data):
+ def from_json(cls, data: dict) -> 'DaemonDescription':
c = data.copy()
event_strs = c.pop('events', [])
for k in ['last_refresh', 'created', 'started', 'last_deployed',
events = [OrchestratorEvent.from_json(e) for e in event_strs]
return cls(events=events, **c)
- def __copy__(self):
+ def __copy__(self) -> 'DaemonDescription':
# feel free to change this:
return DaemonDescription.from_json(self.to_json())
@staticmethod
- def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
return dumper.represent_dict(data.to_json().items())
def __init__(self,
spec: ServiceSpec,
- container_image_id=None,
- container_image_name=None,
- rados_config_location=None,
- service_url=None,
- last_refresh=None,
- created=None,
- size=0,
- running=0,
- events: Optional[List['OrchestratorEvent']] = None):
+ container_image_id: Optional[str] = None,
+ container_image_name: Optional[str] = None,
+ rados_config_location: Optional[str] = None,
+ service_url: Optional[str] = None,
+ last_refresh: Optional[datetime.datetime] = None,
+ created: Optional[datetime.datetime] = None,
+ size: int = 0,
+ running: int = 0,
+ events: Optional[List['OrchestratorEvent']] = None) -> None:
# Not everyone runs in containers, but enough people do to
# justify having the container_image_id (image hash) and container_image
# (image name)
self.events: List[OrchestratorEvent] = events or []
- def service_type(self):
+ def service_type(self) -> str:
return self.spec.service_type
- def __repr__(self):
+ def __repr__(self) -> str:
return f"<ServiceDescription of {self.spec.one_line_str()}>"
def to_json(self) -> OrderedDict:
@classmethod
@handle_type_error
- def from_json(cls, data: dict):
+ def from_json(cls, data: dict) -> 'ServiceDescription':
c = data.copy()
status = c.pop('status', {})
event_strs = c.pop('events', [])
return cls(spec=spec, events=events, **c_status)
@staticmethod
- def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
return dumper.represent_dict(data.to_json().items())
self.devices = devices
self.labels = labels
- def to_json(self):
+ def to_json(self) -> dict:
return {
'name': self.name,
'addr': self.addr,
}
@classmethod
- def from_json(cls, data):
+ def from_json(cls, data: dict) -> 'InventoryHost':
try:
_data = copy.deepcopy(data)
name = _data.pop('name')
raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
@classmethod
- def from_nested_items(cls, hosts):
+ def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
devs = inventory.Devices.from_json
return [cls(item[0], devs(item[1].data)) for item in hosts]
- def __repr__(self):
+ def __repr__(self) -> str:
return "<InventoryHost>({name})".format(name=self.name)
@staticmethod
def get_host_names(hosts: List['InventoryHost']) -> List[str]:
return [host.name for host in hosts]
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return self.name == other.name and self.devices == other.devices
ERROR = 'ERROR'
regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
- def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
+ def __init__(self, created: Union[str, datetime.datetime], kind: str,
+ subject: str, level: str, message: str) -> None:
if isinstance(created, str):
created = str_to_datetime(created)
self.created: datetime.datetime = created
@classmethod
@handle_type_error
- def from_json(cls, data) -> "OrchestratorEvent":
+ def from_json(cls, data: str) -> "OrchestratorEvent":
"""
>>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
'2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
return cls(*match.groups())
raise ValueError(f'Unable to match: "{data}"')
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if not isinstance(other, OrchestratorEvent):
return False
and self.subject == other.subject and self.message == other.message
-def _mk_orch_methods(cls):
+def _mk_orch_methods(cls: Any) -> Any:
# Needs to be defined outside of for.
# Otherwise meth is always bound to last key
- def shim(method_name):
- def inner(self, *args, **kwargs):
+ def shim(method_name: str) -> Callable:
+ def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
completion = self._oremote(method_name, args, kwargs)
return completion
return inner
self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
- def __get_mgr(self):
+ def __get_mgr(self) -> Any:
try:
return self.__mgr
except AttributeError:
return self
- def _oremote(self, meth, args, kwargs):
+ def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
"""
Helper for invoking `remote` on whichever orchestrator is enabled
import json
from typing import List, Set, Optional, Iterator, cast, Dict, Any, Union
import re
-import ast
+import datetime
import yaml
from prettytable import PrettyTable
ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, GenericSpec
-def nice_delta(now, t, suffix=''):
+def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str:
if t:
return to_pretty_timedelta(now - t) + suffix
else:
reconfig = 'reconfig'
-def to_format(what, format: Format, many: bool, cls):
- def to_json_1(obj):
+def to_format(what: Any, format: Format, many: bool, cls: Any) -> Any:
+ def to_json_1(obj: Any) -> Any:
if hasattr(obj, 'to_json'):
return obj.to_json()
return obj
- def to_json_n(objs):
+ def to_json_n(objs: List) -> List:
return [to_json_1(o) for o in objs]
to_json = to_json_n if many else to_json_1
else:
copy = what
- def to_yaml_1(obj):
+ def to_yaml_1(obj: Any) -> Any:
if hasattr(obj, 'yaml_representer'):
return obj
return to_json_1(obj)
- def to_yaml_n(objs):
+ def to_yaml_n(objs: list) -> list:
return [to_yaml_1(o) for o in objs]
to_yaml = to_yaml_n if many else to_yaml_1
raise OrchestratorError(f'unsupported format type: {format}')
-def generate_preview_tables(data, osd_only=False):
+def generate_preview_tables(data: Any, osd_only: bool = False) -> str:
error = [x.get('error') for x in data if x.get('error')]
if error:
return json.dumps(error)
return tables
-def preview_table_osd(data):
+def preview_table_osd(data: List) -> str:
table = PrettyTable(header_style='upper', title='OSDSPEC PREVIEWS', border=True)
table.field_names = "service name host data db wal".split()
table.align = 'l'
return table.get_string()
-def preview_table_services(data):
+def preview_table_services(data: List) -> str:
table = PrettyTable(header_style='upper', title="SERVICESPEC PREVIEW", border=True)
table.field_names = 'SERVICE NAME ADD_TO REMOVE_FROM'.split()
table.align = 'l'
]
NATIVE_OPTIONS = [] # type: List[dict]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(OrchestratorCli, self).__init__(*args, **kwargs)
self.ident = set() # type: Set[str]
self.fault = set() # type: Set[str]
self._load()
self._refresh_health()
- def _load(self):
+ def _load(self) -> None:
active = self.get_store('active_devices')
if active:
decoded = json.loads(active)
self.fault = set(decoded.get('fault', []))
self.log.debug('ident {}, fault {}'.format(self.ident, self.fault))
- def _save(self):
+ def _save(self) -> None:
encoded = json.dumps({
'ident': list(self.ident),
'fault': list(self.fault),
})
self.set_store('active_devices', encoded)
- def _refresh_health(self):
+ def _refresh_health(self) -> None:
h = {}
if self.ident:
h['DEVICE_IDENT_ON'] = {
return [DeviceLightLoc(**l) for l in sum(locs, [])]
@_cli_read_command(prefix='device ls-lights')
- def _device_ls(self):
+ def _device_ls(self) -> HandleCommandResult:
"""List currently active device indicator lights"""
return HandleCommandResult(
stdout=json.dumps({
else:
return self.light_off(light_type.value, devid, force)
- def _select_orchestrator(self):
- return self.get_module_option("orchestrator")
+ def _select_orchestrator(self) -> str:
+ return cast(str, self.get_module_option("orchestrator"))
@_cli_write_command('orch host add')
- def _add_host(self, hostname: str, addr: Optional[str] = None, labels: Optional[List[str]] = None):
+ def _add_host(self, hostname: str, addr: Optional[str] = None, labels: Optional[List[str]] = None) -> HandleCommandResult:
"""Add a host"""
s = HostSpec(hostname=hostname, addr=addr, labels=labels)
completion = self.add_host(s)
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch host rm')
- def _remove_host(self, hostname: str):
+ def _remove_host(self, hostname: str) -> HandleCommandResult:
"""Remove a host"""
completion = self.remove_host(hostname)
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch host set-addr')
- def _update_set_addr(self, hostname: str, addr: str):
+ def _update_set_addr(self, hostname: str, addr: str) -> HandleCommandResult:
"""Update a host address"""
completion = self.update_host_addr(hostname, addr)
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
@_cli_read_command('orch host ls')
- def _get_hosts(self, format: Format = Format.plain):
+ def _get_hosts(self, format: Format = Format.plain) -> HandleCommandResult:
"""List hosts"""
completion = self.get_hosts()
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=output)
@_cli_write_command('orch host label add')
- def _host_label_add(self, hostname: str, label: str):
+ def _host_label_add(self, hostname: str, label: str) -> HandleCommandResult:
"""Add a host label"""
completion = self.add_host_label(hostname, label)
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch host label rm')
- def _host_label_rm(self, hostname: str, label: str):
+ def _host_label_rm(self, hostname: str, label: str) -> HandleCommandResult:
"""Remove a host label"""
completion = self.remove_host_label(hostname, label)
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch host ok-to-stop')
- def _host_ok_to_stop(self, hostname: str):
+ def _host_ok_to_stop(self, hostname: str) -> HandleCommandResult:
"""Check if the specified host can be safely stopped without reducing availability"""""
completion = self.host_ok_to_stop(hostname)
self._orchestrator_wait([completion])
@_cli_write_command(
'orch host maintenance enter')
- def _host_maintenance_enter(self, hostname: str):
+ def _host_maintenance_enter(self, hostname: str) -> HandleCommandResult:
"""
Prepare a host for maintenance by shutting down and disabling all Ceph daemons (cephadm only)
"""
@_cli_write_command(
'orch host maintenance exit')
- def _host_maintenance_exit(self, hostname: str):
+ def _host_maintenance_exit(self, hostname: str) -> HandleCommandResult:
"""
Return a host from maintenance, restarting all Ceph daemons (cephadm only)
"""
return HandleCommandResult(stdout='\n'.join(out))
@_cli_write_command('orch device zap')
- def _zap_device(self, hostname: str, path: str, force: bool = False):
+ def _zap_device(self, hostname: str, path: str, force: bool = False) -> HandleCommandResult:
"""
Zap (erase!) a device so it can be re-used
"""
service_name: Optional[str] = None,
export: bool = False,
format: Format = Format.plain,
- refresh: bool = False):
+ refresh: bool = False) -> HandleCommandResult:
"""
List services known to orchestrator
"""
raise_if_exception(completion)
services: List[ServiceDescription] = completion.result
- def ukn(s):
+ def ukn(s: Optional[str]) -> str:
return '<unknown>' if s is None else s
# Sort the list for display
daemon_type: Optional[str] = None,
daemon_id: Optional[str] = None,
format: Format = Format.plain,
- refresh: bool = False):
+ refresh: bool = False) -> HandleCommandResult:
"""
List daemons known to orchestrator
"""
raise_if_exception(completion)
daemons: List[DaemonDescription] = completion.result
- def ukn(s):
+ def ukn(s: Optional[str]) -> str:
return '<unknown>' if s is None else s
# Sort the list for display
daemons.sort(key=lambda s: (ukn(s.daemon_type), ukn(s.hostname), ukn(s.daemon_id)))
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch')
- def _service_action(self, action: ServiceAction, service_name: str):
+ def _service_action(self, action: ServiceAction, service_name: str) -> HandleCommandResult:
"""Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)"""
completion = self.service_action(action.value, service_name)
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch daemon')
- def _daemon_action(self, action: DaemonAction, name: str):
+ def _daemon_action(self, action: DaemonAction, name: str) -> HandleCommandResult:
"""Start, stop, restart, (redeploy,) or reconfig a specific daemon"""
if '.' not in name:
raise OrchestratorError('%s is not a valid daemon name' % name)
@_cli_write_command('orch daemon rm')
def _daemon_rm(self,
names: List[str],
- force: Optional[bool] = False):
+ force: Optional[bool] = False) -> HandleCommandResult:
"""Remove specific daemon(s)"""
for name in names:
if '.' not in name:
@_cli_write_command('orch rm')
def _service_rm(self,
service_name: str,
- force: bool = False):
+ force: bool = False) -> HandleCommandResult:
"""Remove a service"""
if service_name in ['mon', 'mgr'] and not force:
raise OrchestratorError('The mon and mgr services cannot be removed')
return HandleCommandResult(stdout=out)
@_cli_write_command('orch set backend')
- def _set_backend(self, module_name: Optional[str] = None):
+ def _set_backend(self, module_name: Optional[str] = None) -> HandleCommandResult:
"""
Select orchestrator module backend
"""
return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
@_cli_write_command('orch pause')
- def _pause(self):
+ def _pause(self) -> HandleCommandResult:
"""Pause orchestrator background work"""
self.pause()
return HandleCommandResult()
@_cli_write_command('orch resume')
- def _resume(self):
+ def _resume(self) -> HandleCommandResult:
"""Resume orchestrator background work (if paused)"""
self.resume()
return HandleCommandResult()
@_cli_write_command('orch cancel')
- def _cancel(self):
+ def _cancel(self) -> HandleCommandResult:
"""
cancels ongoing operations
return HandleCommandResult()
@_cli_read_command('orch status')
- def _status(self, format: Format = Format.plain):
+ def _status(self, format: Format = Format.plain) -> HandleCommandResult:
"""Report configured backend and its status"""
o = self._select_orchestrator()
if o is None:
raise NoOrchestrator()
avail, why = self.available()
- result = {
+ result: Dict[str, Any] = {
"backend": o
}
if avail is not None:
output += ' ({0})'.format(result['reason'])
return HandleCommandResult(stdout=output)
- def self_test(self):
+ def self_test(self) -> None:
old_orch = self._select_orchestrator()
self._set_backend('')
assert self._select_orchestrator() is None
assert c.has_result
@staticmethod
- def _upgrade_check_image_name(image, ceph_version):
+ def _upgrade_check_image_name(image: Optional[str], ceph_version: Optional[str]) -> None:
"""
>>> OrchestratorCli._upgrade_check_image_name('v15.2.0', None)
Traceback (most recent call last):
@_cli_write_command('orch upgrade check')
def _upgrade_check(self,
image: Optional[str] = None,
- ceph_version: Optional[str] = None):
+ ceph_version: Optional[str] = None) -> HandleCommandResult:
"""Check service versions vs available and target containers"""
self._upgrade_check_image_name(image, ceph_version)
completion = self.upgrade_check(image=image, version=ceph_version)
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch upgrade status')
- def _upgrade_status(self):
+ def _upgrade_status(self) -> HandleCommandResult:
"""Check service versions vs available and target containers"""
completion = self.upgrade_status()
self._orchestrator_wait([completion])
@_cli_write_command('orch upgrade start')
def _upgrade_start(self,
image: Optional[str] = None,
- ceph_version: Optional[str] = None):
+ ceph_version: Optional[str] = None) -> HandleCommandResult:
"""Initiate upgrade"""
self._upgrade_check_image_name(image, ceph_version)
completion = self.upgrade_start(image, ceph_version)
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch upgrade pause')
- def _upgrade_pause(self):
+ def _upgrade_pause(self) -> HandleCommandResult:
"""Pause an in-progress upgrade"""
completion = self.upgrade_pause()
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch upgrade resume')
- def _upgrade_resume(self):
+ def _upgrade_resume(self) -> HandleCommandResult:
"""Resume paused upgrade"""
completion = self.upgrade_resume()
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=completion.result_str())
@_cli_write_command('orch upgrade stop')
- def _upgrade_stop(self):
+ def _upgrade_stop(self) -> HandleCommandResult:
"""Stop an in-progress upgrade"""
completion = self.upgrade_stop()
self._orchestrator_wait([completion])
daemon_size = len(list(daemons))
services.append(orchestrator.ServiceDescription(
spec=ServiceSpec(
- service_type=daemon_type,
+ service_type=daemon_type, # type: ignore
),
size=daemon_size, running=daemon_size))