def __contains__(self, host: str) -> bool:
return host in self._inventory
- def assert_host(self, host):
+ def assert_host(self, host: str) -> None:
if host not in self._inventory:
raise OrchestratorError('host %s does not exist' % host)
- def add_host(self, spec: HostSpec):
+ def add_host(self, spec: HostSpec) -> None:
self._inventory[spec.hostname] = spec.to_json()
self.save()
- def rm_host(self, host: str):
+ def rm_host(self, host: str) -> None:
self.assert_host(host)
del self._inventory[host]
self.save()
- def set_addr(self, host, addr):
+ def set_addr(self, host: str, addr: str) -> None:
self.assert_host(host)
self._inventory[host]['addr'] = addr
self.save()
- def add_label(self, host, label):
+ def add_label(self, host: str, label: str) -> None:
self.assert_host(host)
if 'labels' not in self._inventory[host]:
self._inventory[host]['labels'].append(label)
self.save()
- def rm_label(self, host, label):
+ def rm_label(self, host: str, label: str) -> None:
self.assert_host(host)
if 'labels' not in self._inventory[host]:
self._inventory[host]['labels'].remove(label)
self.save()
- def get_addr(self, host) -> str:
+ def get_addr(self, host: str) -> str:
self.assert_host(host)
return self._inventory[host].get('addr', host)
else:
yield h
- def spec_from_dict(self, info) -> HostSpec:
+ def spec_from_dict(self, info: dict) -> HostSpec:
hostname = info['hostname']
return HostSpec(
hostname,
def all_specs(self) -> List[HostSpec]:
return list(map(self.spec_from_dict, self._inventory.values()))
- def save(self):
+ def save(self) -> None:
self.mgr.set_store('inventory', json.dumps(self._inventory))
self.networks[host] = nets
self.last_device_update[host] = datetime.datetime.utcnow()
- def update_daemon_config_deps(self, host, name, deps, stamp):
+ def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
self.daemon_config_deps[host][name] = {
'deps': deps,
'last_config': stamp,
del self.last_device_update[host]
self.mgr.event.set()
- def distribute_new_registry_login_info(self):
+ def distribute_new_registry_login_info(self) -> None:
self.registry_login_queue = set(self.mgr.inventory.keys())
def save_host(self, host: str) -> None:
raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
- def alter(host, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
+ def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
dd = copy(dd_orig)
if host in self.mgr.offline_hosts:
dd.status = -1
r.append(name)
return r
- def get_daemon_last_config_deps(self, host, name) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
+ def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
if host in self.daemon_config_deps:
if name in self.daemon_config_deps[host]:
return self.daemon_config_deps[host][name].get('deps', []), \
return True
return False
- def host_needs_osdspec_preview_refresh(self, host):
+ def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
return False
seconds=self.mgr.host_check_interval)
return host not in self.last_host_check or self.last_host_check[host] < cutoff
- def host_needs_new_etc_ceph_ceph_conf(self, host: str):
+ def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool:
if not self.mgr.manage_etc_ceph_ceph_conf:
return False
if self.mgr.paused:
# already up to date:
return False
- def update_last_etc_ceph_ceph_conf(self, host: str):
+ def update_last_etc_ceph_ceph_conf(self, host: str) -> None:
if not self.mgr.last_monmap:
return
self.last_etc_ceph_ceph_conf[host] = datetime.datetime.utcnow()
assert host in self.daemons
self.daemons[host][dd.name()] = dd
- def rm_daemon(self, host, name):
+ def rm_daemon(self, host: str, name: str) -> None:
if host in self.daemons:
if name in self.daemons[host]:
del self.daemons[host][name]
- def daemon_cache_filled(self):
+ def daemon_cache_filled(self) -> bool:
"""
i.e. we have checked the daemons for each hosts at least once.
excluding offline hosts.
return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
for h in self.get_hosts())
- def schedule_daemon_action(self, host: str, daemon_name: str, action: str):
+ def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
priorities = {
'start': 1,
'restart': 2,
self.scheduled_daemon_actions[host] = {}
self.scheduled_daemon_actions[host][daemon_name] = action
- def rm_scheduled_daemon_action(self, host: str, daemon_name: str):
+ def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
if host in self.scheduled_daemon_actions:
if daemon_name in self.scheduled_daemon_actions[host]:
del self.scheduled_daemon_actions[host][daemon_name]
if not self.scheduled_daemon_actions[host]:
del self.scheduled_daemon_actions[host]
- def get_scheduled_daemon_action(self, host, daemon) -> Optional[str]:
+ def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
return self.scheduled_daemon_actions.get(host, {}).get(daemon)
# limit to five events for now.
self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
- def for_service(self, spec: ServiceSpec, level, message) -> None:
+ def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
e = OrchestratorEvent(datetime.datetime.utcnow(), 'service',
spec.service_name(), level, message)
self.add(e)
- def from_orch_error(self, e: OrchestratorError):
+ def from_orch_error(self, e: OrchestratorError) -> None:
if e.event_subject is not None:
self.add(OrchestratorEvent(
datetime.datetime.utcnow(),
str(e)
))
- def for_daemon(self, daemon_name, level, message):
+ def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message)
self.add(e)
- def for_daemon_from_exception(self, daemon_name, e: Exception):
+ def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
self.for_daemon(
daemon_name,
"ERROR",
for k_s in unknowns:
del self.events[k_s]
- def get_for_service(self, name) -> List[OrchestratorEvent]:
+ def get_for_service(self, name: str) -> List[OrchestratorEvent]:
return self.events.get('service:' + name, [])
- def get_for_daemon(self, name) -> List[OrchestratorEvent]:
+ def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
return self.events.get('daemon:' + name, [])