From: Sage Weil Date: Tue, 18 Feb 2020 15:46:48 +0000 (-0600) Subject: mgr/cephadm: scrape and cache devices, too X-Git-Tag: v15.1.1~355^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=34c853d5a73281024fe90b2d354cccb96fab21aa;p=ceph-ci.git mgr/cephadm: scrape and cache devices, too Signed-off-by: Sage Weil --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 02394ddc219..a8eddecbef3 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -112,6 +112,8 @@ class HostCache(): self.mgr = mgr self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] self.last_daemon_update = {} # type: Dict[str, datetime.datetime] + self.devices = {} # type: Dict[str, List[inventory.Device]] + self.last_device_update = {} # type: Dict[str, datetime.datetime] def load(self): # type: () -> None @@ -126,11 +128,14 @@ class HostCache(): # we do ignore the persisted last_*_update to trigger a new # scrape on mgr restart self.daemons[host] = {} + self.devices[host] = [] for name, d in j.get('daemons', {}).items(): self.daemons[host][name] = \ orchestrator.DaemonDescription.from_json(d) - self.mgr.log.debug('HostCache.load: host %s has %d daemons' % ( - host, len(self.daemons[host]))) + for d in j.get('devices', []): + self.devices[host].append(inventory.Device.from_json(d)) + self.mgr.log.debug('HostCache.load: host %s has %d daemons, %d devices' % ( + host, len(self.daemons[host]), len(self.devices[host]))) except Exception as e: self.mgr.log.warning('unable to load cached state for %s: %s' % ( host, e)) @@ -141,35 +146,53 @@ class HostCache(): self.daemons[host] = dm self.last_daemon_update[host] = datetime.datetime.utcnow() + def update_host_devices(self, host, dls): + # type: (str, List[inventory.Device]) -> None + self.devices[host] = dls + self.last_device_update[host] = datetime.datetime.utcnow() + def prime_empty_host(self, host): # type: (str) -> None """ Install an empty entry for a host """ self.daemons[host] = {} + self.devices[host] = [] def invalidate_host_daemons(self, host): # type: (str) -> None if host in self.last_daemon_update: del self.last_daemon_update[host] + def invalidate_host_devices(self, host): + # type: (str) -> None + if host in self.last_device_update: + del self.last_device_update[host] + def save_host(self, host): # type: (str) -> None j = { # type: ignore 'daemons': {}, + 'devices': [], } if host in self.last_daemon_update: j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore for name, dd in self.daemons[host].items(): - j['daemons'][name] = dd.to_json() + j['daemons'][name] = dd.to_json() # type: ignore + for d in self.devices[host]: + j['devices'].append(d.to_json()) # type: ignore self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) def rm_host(self, host): # type: (str) -> None if host in self.daemons: del self.daemons[host] + if host in self.devices: + del self.devices[host] if host in self.last_daemon_update: del self.last_daemon_update[host] + if host in self.last_device_update: + del self.last_device_update[host] self.mgr.set_store(HOST_CACHE_PREFIX + host, None) def get_hosts(self): @@ -212,6 +235,14 @@ class HostCache(): return True return False + def host_needs_device_refresh(self, host): + # type: (str) -> bool + cutoff = datetime.datetime.utcnow() - datetime.timedelta( + seconds=self.mgr.device_cache_timeout) + if host not in self.last_device_update or self.last_device_update[host] < cutoff: + return True + return False + def add_daemon(self, host, dd): # type: (str, orchestrator.DaemonDescription) -> None assert host in self.daemons @@ -401,7 +432,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'desc': 'customized SSH config file to connect to managed hosts', }, { - 'name': 'inventory_cache_timeout', + 'name': 'device_cache_timeout', 'type': 'secs', 'default': 10 * 60, 'desc': 'seconds to cache device inventory', @@ -458,7 +489,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): # for mypy which does not run the code if TYPE_CHECKING: self.ssh_config_file = None # type: Optional[str] - self.inventory_cache_timeout = 0 + self.device_cache_timeout = 0 self.daemon_cache_timeout = 0 self.mode = '' self.container_image_base = '' @@ -869,19 +900,24 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self._check_hosts() # refresh daemons - self.log.debug('refreshing daemons') + self.log.debug('refreshing hosts') failures = [] for host in self.cache.get_hosts(): if self.cache.host_needs_daemon_refresh(host): - self.log.debug('refreshing %s' % host) + self.log.debug('refreshing %s daemons' % host) r = self._refresh_host_daemons(host) if r: failures.append(r) + if self.cache.host_needs_device_refresh(host): + self.log.debug('refreshing %s devices' % host) + r = self._refresh_host_devices(host) + if r: + failures.append(r) if failures: self.health_checks['CEPHADM_REFRESH_FAILED'] = { 'severity': 'warning', - 'summary': 'failed to probe %s hosts' % len(failures), - 'count': 1, + 'summary': 'failed to probe daemons or devices', + 'count': len(failures), 'detail': failures, } self.set_health_checks(self.health_checks) @@ -1425,8 +1461,20 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): dm[sd.name()] = sd self.log.debug('Refreshed host %s daemons: %s' % (host, dm)) self.cache.update_host_daemons(host, dm) + self.cache.save_host(host) return None + def _refresh_host_devices(self, host): + out, err, code = self._run_cephadm( + host, 'osd', + 'ceph-volume', + ['--', 'inventory', '--format=json']) + data = json.loads(''.join(out)) + self.log.debug('Refreshed host %s devices: %s' % (host, data)) + devices = inventory.Devices.from_json(data) + self.cache.update_host_devices(host, devices.devices) + self.cache.save_host(host) + return None def describe_service(self, service_type=None, service_name=None, refresh=False): @@ -1581,7 +1629,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _get_inventory(host, host_info): # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode - if host_info.outdated(self.inventory_cache_timeout) or refresh: + if host_info.outdated(self.device_cache_timeout) or refresh: self.log.info("refresh stale inventory for '{}'".format(host)) out, err, code = self._run_cephadm( host, 'osd',