From 814787b70129a51c006869e21333503122b9eac6 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 17 Apr 2020 17:12:55 +0200 Subject: [PATCH] mgr/cephadm: add Inventory class This is just a minor cleanup. Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/inventory.py | 88 ++++++++++++++++++++++++ src/pybind/mgr/cephadm/module.py | 101 ++++++---------------------- 2 files changed, 109 insertions(+), 80 deletions(-) create mode 100644 src/pybind/mgr/cephadm/inventory.py diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py new file mode 100644 index 00000000000..62ad03f31ef --- /dev/null +++ b/src/pybind/mgr/cephadm/inventory.py @@ -0,0 +1,88 @@ +import json +import logging +from typing import TYPE_CHECKING, Dict, List, Iterator + +from orchestrator import OrchestratorError, HostSpec + +if TYPE_CHECKING: + from .module import CephadmOrchestrator + + +logger = logging.getLogger(__name__) + +class Inventory: + def __init__(self, mgr: 'CephadmOrchestrator'): + self.mgr = mgr + # load inventory + i = self.mgr.get_store('inventory') + if i: + self._inventory: Dict[str, dict] = json.loads(i) + else: + self._inventory = dict() + logger.debug('Loaded inventory %s' % self._inventory) + + def keys(self) -> List[str]: + return list(self._inventory.keys()) + + def __contains__(self, host: str) -> bool: + return host in self._inventory + + def assert_host(self, host): + if host not in self._inventory: + raise OrchestratorError('host %s does not exist' % host) + + def add_host(self, spec: HostSpec): + self._inventory[spec.hostname] = spec.to_json() + self.save() + + def rm_host(self, host: str): + self.assert_host(host) + del self._inventory[host] + self.save() + + def set_addr(self, host, addr): + self.assert_host(host) + self._inventory[host]['addr'] = addr + self.save() + + def add_label(self, host, label): + self.assert_host(host) + + if 'labels' not in self._inventory[host]: + self._inventory[host]['labels'] = list() + if label not in self._inventory[host]['labels']: + self._inventory[host]['labels'].append(label) + self.save() + + def rm_label(self, host, label): + self.assert_host(host) + + if 'labels' not in self._inventory[host]: + self._inventory[host]['labels'] = list() + if label in self._inventory[host]['labels']: + self._inventory[host]['labels'].remove(label) + self.save() + + def get_addr(self, host) -> str: + self.assert_host(host) + return self._inventory[host].get('addr', host) + + def filter_by_label(self, label=None) -> Iterator[str]: + for h, hostspec in self._inventory.items(): + if not label or label in hostspec.get('labels', []): + yield h + + def spec_from_dict(self, info): + hostname = info['hostname'] + return HostSpec( + hostname, + addr=info.get('addr', hostname), + labels=info.get('labels', []), + status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), + ) + + def all_specs(self) -> Iterator[HostSpec]: + return map(self.spec_from_dict, self._inventory.values()) + + def save(self): + self.mgr.set_store('inventory', json.dumps(self._inventory)) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 2a1ff2db642..29d87a9b4e3 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -43,7 +43,7 @@ from . import remotes from . import utils from .nfs import NFSGanesha from .osd import RemoveUtil, OSDRemoval - +from .inventory import Inventory try: import remoto @@ -704,13 +704,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.all_progress_references = list() # type: List[orchestrator.ProgressReference] - # load inventory - i = self.get_store('inventory') - if i: - self.inventory: Dict[str, dict] = json.loads(i) - else: - self.inventory = dict() - self.log.debug('Loaded inventory %s' % self.inventory) + self.inventory = Inventory(self) self.cache = HostCache(self) self.cache.load() @@ -1254,8 +1248,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): p = re.compile(r'(.*)\.%s.*' % (host)) return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id)) - def _save_inventory(self): - self.set_store('inventory', json.dumps(self.inventory)) def _save_upgrade_state(self): self.set_store('upgrade_state', json.dumps(self.upgrade_state)) @@ -1358,20 +1350,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): for p in completions: p.finalize() - def _require_hosts(self, hosts): - """ - Raise an error if any of the given hosts are unregistered. - """ - if isinstance(hosts, six.string_types): - hosts = [hosts] - keys = self.inventory.keys() - unregistered_hosts = set(hosts) - keys - if unregistered_hosts: - logger.warning('keys = {}'.format(keys)) - raise RuntimeError("Host(s) {} not registered".format( - ", ".join(map(lambda h: "'{}'".format(h), - unregistered_hosts)))) - @orchestrator._cli_write_command( prefix='cephadm set-ssh-config', desc='Set the ssh_config file (use -i )') @@ -1558,7 +1536,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): Run cephadm on the remote host with the given command + args """ if not addr and host in self.inventory: - addr = self.inventory[host].get('addr', host) + addr = self.inventory.get_addr(host) self.offline_hosts_remove(host) @@ -1659,11 +1637,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def _get_hosts(self, label=None): # type: (Optional[str]) -> List[str] - r = [] - for h, hostspec in self.inventory.items(): - if not label or label in hostspec.get('labels', []): - r.append(h) - return r + return list(self.inventory.filter_by_label(label)) @async_completion def add_host(self, spec): @@ -1682,8 +1656,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): raise OrchestratorError('New host %s (%s) failed check: %s' % ( spec.hostname, spec.addr, err)) - self.inventory[spec.hostname] = spec.to_json() - self._save_inventory() + self.inventory.add_host(spec) self.cache.prime_empty_host(spec.hostname) self.offline_hosts_remove(spec.hostname) self.event.set() # refresh stray health check @@ -1698,8 +1671,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): :param host: host name """ - del self.inventory[host] - self._save_inventory() + self.inventory.rm_host(host) self.cache.rm_host(host) self._reset_con(host) self.event.set() # refresh stray health check @@ -1708,10 +1680,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): @async_completion def update_host_addr(self, host, addr): - if host not in self.inventory: - raise OrchestratorError('host %s not registered' % host) - self.inventory[host]['addr'] = addr - self._save_inventory() + self.inventory.set_addr(host, addr) self._reset_con(host) self.event.set() # refresh stray health check self.log.info('Set host %s addr to %s' % (host, addr)) @@ -1726,39 +1695,17 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): Notes: - skip async: manager reads from cache. """ - r = [] - for hostname, info in self.inventory.items(): - r.append(orchestrator.HostSpec( - hostname, - addr=info.get('addr', hostname), - labels=info.get('labels', []), - status='Offline' if hostname in self.offline_hosts else info.get('status', ''), - )) - return r + return list(self.inventory.all_specs()) @async_completion def add_host_label(self, host, label): - if host not in self.inventory: - raise OrchestratorError('host %s does not exist' % host) - - if 'labels' not in self.inventory[host]: - self.inventory[host]['labels'] = list() - if label not in self.inventory[host]['labels']: - self.inventory[host]['labels'].append(label) - self._save_inventory() + self.inventory.add_label(host, label) self.log.info('Added label %s to host %s' % (label, host)) return 'Added label %s to host %s' % (label, host) @async_completion def remove_host_label(self, host, label): - if host not in self.inventory: - raise OrchestratorError('host %s does not exist' % host) - - if 'labels' not in self.inventory[host]: - self.inventory[host]['labels'] = list() - if label in self.inventory[host]['labels']: - self.inventory[host]['labels'].remove(label) - self._save_inventory() + self.inventory.rm_label(host, label) self.log.info('Removed label %s to host %s' % (label, host)) return 'Removed label %s from host %s' % (label, host) @@ -1861,7 +1808,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): refresh=False): if refresh: # ugly sync path, FIXME someday perhaps? - for host, hi in self.inventory.items(): + for host in self.inventory.keys(): self._refresh_host_daemons(host) # sm = {} # type: Dict[str, orchestrator.ServiceDescription] @@ -1933,7 +1880,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): if host: self._refresh_host_daemons(host) else: - for hostname, hi in self.inventory.items(): + for hostname in self.inventory.keys(): self._refresh_host_daemons(hostname) result = [] for h, dm in self.cache.get_daemons_with_volatile_status(): @@ -2036,7 +1983,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): for host in host_filter.hosts: self._refresh_host_devices(host) else: - for host, hi in self.inventory.items(): + for host in self.inventory.keys(): self._refresh_host_devices(host) result = [] @@ -2205,7 +2152,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return ret_all def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]: - self._require_hosts(host) + self.inventory.assert_host(host) # get bootstrap key ret, keyring, err = self.mon_command({ @@ -2625,8 +2572,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): 'GRAFANA_API_URL') if grafanas: host = grafanas[0].hostname - url = 'https://%s:3000' % (self.inventory[host].get('addr', - host)) + url = f'https://{self.inventory.get_addr(host)}:3000' if current_url != url: self.log.info('Setting dashboard grafana config to %s' % url) self.set_module_option_ex('dashboard', 'GRAFANA_API_URL', @@ -3027,16 +2973,14 @@ api_secure = {api_secure} continue if dd.daemon_id == self.get_mgr_id(): continue - hi = self.inventory.get(dd.hostname, {}) - addr = hi.get('addr', dd.hostname) + addr = self.inventory.get_addr(dd.hostname) mgr_scrape_list.append(addr.split(':')[0] + ':' + port) # scrape node exporters node_configs = '' for dd in self.cache.get_daemons_by_service('node-exporter'): deps.append(dd.name()) - hi = self.inventory.get(dd.hostname, {}) - addr = hi.get('addr', dd.hostname) + addr = self.inventory.get_addr(dd.hostname) if not node_configs: node_configs = """ - job_name: 'node' @@ -3053,8 +2997,7 @@ api_secure = {api_secure} alertmgr_targets = [] for dd in self.cache.get_daemons_by_service('alertmanager'): deps.append(dd.name()) - hi = self.inventory.get(dd.hostname, {}) - addr = hi.get('addr', dd.hostname) + addr = self.inventory.get_addr(dd.hostname) alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0])) if alertmgr_targets: alertmgr_configs = """alerting: @@ -3218,8 +3161,7 @@ datasources: continue if dd.daemon_id == self.get_mgr_id(): continue - hi = self.inventory.get(dd.hostname, {}) - addr = hi.get('addr', dd.hostname) + addr = self.inventory.get_addr(dd.hostname) dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0], port)) @@ -3248,8 +3190,7 @@ receivers: port = '9094' for dd in self.cache.get_daemons_by_service('alertmanager'): deps.append(dd.name()) - hi = self.inventory.get(dd.hostname, {}) - addr = hi.get('addr', dd.hostname) + addr = self.inventory.get_addr(dd.hostname) peers.append(addr.split(':')[0] + ':' + port) return { "files": { @@ -3325,7 +3266,7 @@ receivers: def _get_container_image_id(self, image_name): # pick a random host... host = None - for host_name, hi in self.inventory.items(): + for host_name in self.inventory.keys(): host = host_name break if not host: -- 2.39.5