def get_hosts(self):
# type: () -> List[str]
- r = []
- for host, di in self.daemons.items():
- r.append(host)
- return r
+ return list(self.daemons)
def get_schedulable_hosts(self) -> List[HostSpec]:
"""
return self.facts.get(host, {})
def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
- for dm in self.daemons.values():
+ for dm in self.daemons.copy().values():
yield from dm.values()
def get_daemons(self):
def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
r = []
- for host, dm in self.daemons.items():
- for name, dd in dm.items():
- if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
- r.append(dd)
+ for dd in self._get_daemons():
+ if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
+ r.append(dd)
return r
def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
dd.events = self.mgr.events.get_for_daemon(dd.name())
return dd
- for host, dm in self.daemons.items():
+ for host, dm in self.daemons.copy().items():
yield host, {name: alter(host, d) for name, d in dm.items()}
def get_daemons_by_service(self, service_name):
def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
assert service_type not in ['keepalived', 'haproxy']
- result = [] # type: List[orchestrator.DaemonDescription]
- for h, dm in self.daemons.items():
- if host and h != host:
- continue
- for name, d in dm.items():
- if d.daemon_type in service_to_daemon_types(service_type):
- result.append(d)
- return result
+ daemons = self.daemons[host].values() if host else self._get_daemons()
+
+ return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
def get_daemon_types(self, hostname: str) -> List[str]:
"""Provide a list of the types of daemons on the host"""
def get_daemon_names(self):
# type: () -> List[str]
- r = []
- for host, dm in self.daemons.items():
- for name, dd in dm.items():
- r.append(name)
- return r
+ return [d.name() for d in self._get_daemons()]
def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
if host in self.daemon_config_deps: