--- /dev/null
+import json
+import logging
+from typing import TYPE_CHECKING, Dict, List, Iterator
+
+from orchestrator import OrchestratorError, HostSpec
+
+if TYPE_CHECKING:
+ from .module import CephadmOrchestrator
+
+
+logger = logging.getLogger(__name__)
+
+class Inventory:
+ def __init__(self, mgr: 'CephadmOrchestrator'):
+ self.mgr = mgr
+ # load inventory
+ i = self.mgr.get_store('inventory')
+ if i:
+ self._inventory: Dict[str, dict] = json.loads(i)
+ else:
+ self._inventory = dict()
+ logger.debug('Loaded inventory %s' % self._inventory)
+
+ def keys(self) -> List[str]:
+ return list(self._inventory.keys())
+
+ def __contains__(self, host: str) -> bool:
+ return host in self._inventory
+
+ def assert_host(self, host):
+ if host not in self._inventory:
+ raise OrchestratorError('host %s does not exist' % host)
+
+ def add_host(self, spec: HostSpec):
+ self._inventory[spec.hostname] = spec.to_json()
+ self.save()
+
+ def rm_host(self, host: str):
+ self.assert_host(host)
+ del self._inventory[host]
+ self.save()
+
+ def set_addr(self, host, addr):
+ self.assert_host(host)
+ self._inventory[host]['addr'] = addr
+ self.save()
+
+ def add_label(self, host, label):
+ self.assert_host(host)
+
+ if 'labels' not in self._inventory[host]:
+ self._inventory[host]['labels'] = list()
+ if label not in self._inventory[host]['labels']:
+ self._inventory[host]['labels'].append(label)
+ self.save()
+
+ def rm_label(self, host, label):
+ self.assert_host(host)
+
+ if 'labels' not in self._inventory[host]:
+ self._inventory[host]['labels'] = list()
+ if label in self._inventory[host]['labels']:
+ self._inventory[host]['labels'].remove(label)
+ self.save()
+
+ def get_addr(self, host) -> str:
+ self.assert_host(host)
+ return self._inventory[host].get('addr', host)
+
+ def filter_by_label(self, label=None) -> Iterator[str]:
+ for h, hostspec in self._inventory.items():
+ if not label or label in hostspec.get('labels', []):
+ yield h
+
+ def spec_from_dict(self, info):
+ hostname = info['hostname']
+ return HostSpec(
+ hostname,
+ addr=info.get('addr', hostname),
+ labels=info.get('labels', []),
+ status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
+ )
+
+ def all_specs(self) -> Iterator[HostSpec]:
+ return map(self.spec_from_dict, self._inventory.values())
+
+ def save(self):
+ self.mgr.set_store('inventory', json.dumps(self._inventory))
from . import utils
from .nfs import NFSGanesha
from .osd import RemoveUtil, OSDRemoval
-
+from .inventory import Inventory
try:
import remoto
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
- # load inventory
- i = self.get_store('inventory')
- if i:
- self.inventory: Dict[str, dict] = json.loads(i)
- else:
- self.inventory = dict()
- self.log.debug('Loaded inventory %s' % self.inventory)
+ self.inventory = Inventory(self)
self.cache = HostCache(self)
self.cache.load()
p = re.compile(r'(.*)\.%s.*' % (host))
return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id))
- def _save_inventory(self):
- self.set_store('inventory', json.dumps(self.inventory))
def _save_upgrade_state(self):
self.set_store('upgrade_state', json.dumps(self.upgrade_state))
for p in completions:
p.finalize()
- def _require_hosts(self, hosts):
- """
- Raise an error if any of the given hosts are unregistered.
- """
- if isinstance(hosts, six.string_types):
- hosts = [hosts]
- keys = self.inventory.keys()
- unregistered_hosts = set(hosts) - keys
- if unregistered_hosts:
- logger.warning('keys = {}'.format(keys))
- raise RuntimeError("Host(s) {} not registered".format(
- ", ".join(map(lambda h: "'{}'".format(h),
- unregistered_hosts))))
-
@orchestrator._cli_write_command(
prefix='cephadm set-ssh-config',
desc='Set the ssh_config file (use -i <ssh_config>)')
Run cephadm on the remote host with the given command + args
"""
if not addr and host in self.inventory:
- addr = self.inventory[host].get('addr', host)
+ addr = self.inventory.get_addr(host)
self.offline_hosts_remove(host)
def _get_hosts(self, label=None):
# type: (Optional[str]) -> List[str]
- r = []
- for h, hostspec in self.inventory.items():
- if not label or label in hostspec.get('labels', []):
- r.append(h)
- return r
+ return list(self.inventory.filter_by_label(label))
@async_completion
def add_host(self, spec):
raise OrchestratorError('New host %s (%s) failed check: %s' % (
spec.hostname, spec.addr, err))
- self.inventory[spec.hostname] = spec.to_json()
- self._save_inventory()
+ self.inventory.add_host(spec)
self.cache.prime_empty_host(spec.hostname)
self.offline_hosts_remove(spec.hostname)
self.event.set() # refresh stray health check
:param host: host name
"""
- del self.inventory[host]
- self._save_inventory()
+ self.inventory.rm_host(host)
self.cache.rm_host(host)
self._reset_con(host)
self.event.set() # refresh stray health check
@async_completion
def update_host_addr(self, host, addr):
- if host not in self.inventory:
- raise OrchestratorError('host %s not registered' % host)
- self.inventory[host]['addr'] = addr
- self._save_inventory()
+ self.inventory.set_addr(host, addr)
self._reset_con(host)
self.event.set() # refresh stray health check
self.log.info('Set host %s addr to %s' % (host, addr))
Notes:
- skip async: manager reads from cache.
"""
- r = []
- for hostname, info in self.inventory.items():
- r.append(orchestrator.HostSpec(
- hostname,
- addr=info.get('addr', hostname),
- labels=info.get('labels', []),
- status='Offline' if hostname in self.offline_hosts else info.get('status', ''),
- ))
- return r
+ return list(self.inventory.all_specs())
@async_completion
def add_host_label(self, host, label):
- if host not in self.inventory:
- raise OrchestratorError('host %s does not exist' % host)
-
- if 'labels' not in self.inventory[host]:
- self.inventory[host]['labels'] = list()
- if label not in self.inventory[host]['labels']:
- self.inventory[host]['labels'].append(label)
- self._save_inventory()
+ self.inventory.add_label(host, label)
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
@async_completion
def remove_host_label(self, host, label):
- if host not in self.inventory:
- raise OrchestratorError('host %s does not exist' % host)
-
- if 'labels' not in self.inventory[host]:
- self.inventory[host]['labels'] = list()
- if label in self.inventory[host]['labels']:
- self.inventory[host]['labels'].remove(label)
- self._save_inventory()
+ self.inventory.rm_label(host, label)
self.log.info('Removed label %s to host %s' % (label, host))
return 'Removed label %s from host %s' % (label, host)
refresh=False):
if refresh:
# ugly sync path, FIXME someday perhaps?
- for host, hi in self.inventory.items():
+ for host in self.inventory.keys():
self._refresh_host_daemons(host)
# <service_map>
sm = {} # type: Dict[str, orchestrator.ServiceDescription]
if host:
self._refresh_host_daemons(host)
else:
- for hostname, hi in self.inventory.items():
+ for hostname in self.inventory.keys():
self._refresh_host_daemons(hostname)
result = []
for h, dm in self.cache.get_daemons_with_volatile_status():
for host in host_filter.hosts:
self._refresh_host_devices(host)
else:
- for host, hi in self.inventory.items():
+ for host in self.inventory.keys():
self._refresh_host_devices(host)
result = []
return ret_all
def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
- self._require_hosts(host)
+ self.inventory.assert_host(host)
# get bootstrap key
ret, keyring, err = self.mon_command({
'GRAFANA_API_URL')
if grafanas:
host = grafanas[0].hostname
- url = 'https://%s:3000' % (self.inventory[host].get('addr',
- host))
+ url = f'https://{self.inventory.get_addr(host)}:3000'
if current_url != url:
self.log.info('Setting dashboard grafana config to %s' % url)
self.set_module_option_ex('dashboard', 'GRAFANA_API_URL',
continue
if dd.daemon_id == self.get_mgr_id():
continue
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
+ addr = self.inventory.get_addr(dd.hostname)
mgr_scrape_list.append(addr.split(':')[0] + ':' + port)
# scrape node exporters
node_configs = ''
for dd in self.cache.get_daemons_by_service('node-exporter'):
deps.append(dd.name())
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
+ addr = self.inventory.get_addr(dd.hostname)
if not node_configs:
node_configs = """
- job_name: 'node'
alertmgr_targets = []
for dd in self.cache.get_daemons_by_service('alertmanager'):
deps.append(dd.name())
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
+ addr = self.inventory.get_addr(dd.hostname)
alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0]))
if alertmgr_targets:
alertmgr_configs = """alerting:
continue
if dd.daemon_id == self.get_mgr_id():
continue
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
+ addr = self.inventory.get_addr(dd.hostname)
dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0],
port))
port = '9094'
for dd in self.cache.get_daemons_by_service('alertmanager'):
deps.append(dd.name())
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
+ addr = self.inventory.get_addr(dd.hostname)
peers.append(addr.split(':')[0] + ':' + port)
return {
"files": {
def _get_container_image_id(self, image_name):
# pick a random host...
host = None
- for host_name, hi in self.inventory.items():
+ for host_name in self.inventory.keys():
host = host_name
break
if not host: