From 328a1fd05b038b0124d0106706821acb7ddf7990 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 14 Feb 2020 10:49:37 -0600 Subject: [PATCH] mgr/cephadm: replace daemon_cache with an explicit set of dicts - Cache DaemonDescription explicitly - explicit timestamp for the host - serve() scrapes inventory based on that timestamp For the moment, persistence is broken, and --refresh is broken. Signed-off-by: Sage Weil --- src/pybind/mgr/cephadm/module.py | 188 +++++++++++-------------------- 1 file changed, 67 insertions(+), 121 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 410187bf893..5e547a87c24 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -409,8 +409,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.inventory_cache = orchestrator.OutdatablePersistentDict( self, self._STORE_HOST_PREFIX + '.devices') - self.daemon_cache = orchestrator.OutdatablePersistentDict( - self, self._STORE_HOST_PREFIX + '.daemons') + self.daemon_cache = {} # type: ignore # ensure the host lists are in sync for h in self.inventory.keys(): @@ -419,7 +418,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.inventory_cache[h] = orchestrator.OutdatableData() if h not in self.daemon_cache: self.log.debug('adding service item for %s' % h) - self.daemon_cache[h] = orchestrator.OutdatableData() + self.daemon_cache[h] = { + 'last_update': None, + 'daemons': {}, + } for h in self.inventory_cache: if h not in self.inventory: del self.inventory_cache[h] @@ -704,7 +706,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): } self.set_health_checks(self.health_checks) - def _check_for_strays(self, daemons): + def _check_for_strays(self): self.log.debug('_check_for_strays') for k in ['CEPHADM_STRAY_HOST', 'CEPHADM_STRAY_DAEMON']: @@ -713,8 +715,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if self.warn_on_stray_hosts or self.warn_on_stray_daemons: ls = self.list_servers() managed = [] - for s in daemons: - managed.append(s.name()) + for host, di in self.daemon_cache.items(): + for name, dd in di['daemons'].items(): + managed.append(name) host_detail = [] # type: List[str] host_num_daemons = 0 daemon_detail = [] # type: List[str] @@ -766,30 +769,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self._check_hosts() # refresh daemons - self.log.debug('refreshing daemons') - completion = self._get_daemons(maybe_refresh=True) - self._orchestrator_wait([completion]) - # FIXME: this is a band-aid to avoid crashing the mgr, but what - # we really need to do here is raise health alerts for individual - # hosts that fail and continue with the ones that do not fail. - if completion.exception is not None: - self.log.error('failed to refresh daemons: %s' % completion.exception) - self.health_checks['CEPHADM_REFRESH_FAILED'] = { - 'severity': 'warning', - 'summary': 'failed to probe one or more hosts', - 'count': 1, - 'detail': [str(completion.exception)], - } - self.set_health_checks(self.health_checks) - self._serve_sleep() - continue - if 'CEPHADM_REFRESH_FAILED' in self.health_checks: - del self.health_checks['CEPHADM_REFRESH_FAILED'] - self.set_health_checks(self.health_checks) - daemons = completion.result - self.log.debug('daemons %s' % daemons) + cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.daemon_cache_timeout) + cutoffs = cutoff.strftime(DATEFMT) + self.log.debug('refreshing daemons, cutoff %s' % cutoffs) + for host, di in self.daemon_cache.items(): + if not di['last_update'] or di['last_update'] < cutoffs: + self.log.debug('refreshing %s' % host) + self._refresh_host_daemons(host) - self._check_for_strays(daemons) + self._check_for_strays() if self.upgrade_state and not self.upgrade_state.get('paused'): completion = self._do_upgrade(daemons) @@ -1202,7 +1190,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): } self._save_inventory() self.inventory_cache[spec.hostname] = orchestrator.OutdatableData() - self.daemon_cache[spec.hostname] = orchestrator.OutdatableData() + self.daemon_cache[spec.hostname] = { + 'last_update': None, + 'daemons': {}, + } self.event.set() # refresh stray health check return "Added host '{}'".format(spec.hostname) @@ -1276,20 +1267,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self._save_inventory() return 'Removed label %s from host %s' % (label, host) - @async_map_completion def _refresh_host_daemons(self, host): out, err, code = self._run_cephadm( host, 'mon', 'ls', [], no_fsid=True) - data = json.loads(''.join(out)) - for d in data: - d['last_refresh'] = datetime.datetime.utcnow().strftime(DATEFMT) - self.log.debug('Refreshed host %s daemons: %s' % (host, data)) - self.daemon_cache[host] = orchestrator.OutdatableData(data) - return host, data - - def _proc_ls(self, host, ls): - # type: (str, List[Dict[str,str]]) -> List[orchestrator.DaemonDescription] - result = [] + ls = json.loads(''.join(out)) + dm = {} for d in ls: if not d['style'].startswith('cephadm'): self.log.debug('ignoring non-cephadm on %s: %s' % (host, d)) @@ -1297,14 +1279,12 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if d['fsid'] != self._cluster_fsid: self.log.debug('ignoring foreign daemon on %s: %s' % (host, d)) continue - self.log.debug('including %s %s' % (host, d)) - sd = orchestrator.DaemonDescription() - if 'last_refresh' in d: - sd.last_refresh = datetime.datetime.strptime( - d['last_refresh'], DATEFMT) if '.' not in d['name']: self.log.debug('ignoring dot-less daemon on %s: %s' % (host, d)) continue + self.log.debug('including %s %s' % (host, d)) + sd = orchestrator.DaemonDescription() + sd.last_refresh = datetime.datetime.utcnow() sd.daemon_type = d['name'].split('.')[0] sd.daemon_id = '.'.join(d['name'].split('.')[1:]) sd.nodename = host @@ -1323,64 +1303,39 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): else: sd.status_desc = 'unknown' sd.status = None - result.append(sd) - return result + dm[sd.name()] = sd + self.log.debug('Refreshed host %s daemons: %s' % (host, dm)) + self.daemon_cache[host] = { + 'last_update': datetime.datetime.utcnow().strftime(DATEFMT), + 'daemons': dm, + } + return host, dm def _get_daemons(self, daemon_type=None, daemon_id=None, service_name=None, - host=None, - refresh=False, - maybe_refresh=False): - hosts = [] - wait_for_args = [] - daemons = {} - keys = None - if host is not None: - keys = [host] - for host, host_info in self.daemon_cache.items_filtered(keys): - hosts.append(host) - if refresh: - self.log.info("refreshing daemons for '{}'".format(host)) - wait_for_args.append((host,)) - elif maybe_refresh and host_info.outdated(self.daemon_cache_timeout): # type: ignore - self.log.info("refreshing stale daemons for '{}'".format(host)) - wait_for_args.append((host,)) - elif not host_info.last_refresh: - daemons[host] = [ - { - 'name': '*.*', - 'style': 'cephadm:v1', - 'fsid': self._cluster_fsid, - }, - ] - else: - self.log.debug('have recent daemons for %s: %s' % ( - host, host_info.data)) - daemons[host] = host_info.data - - def _get_daemons_result(results): - for host, data in results: - daemons[host] = data - - result = [] - for host, ls in daemons.items(): - for d in self._proc_ls(host, ls): - if daemon_type and daemon_type != d.daemon_type: - continue - if daemon_id and daemon_id != d.daemon_id: - continue - if service_name and not d.daemon_id.startswith(service_name + '.'): - continue - result.append(d) - return result - - if wait_for_args: - return self._refresh_host_daemons(wait_for_args).then( - _get_daemons_result) - else: - return trivial_result(_get_daemons_result({})) + want_host=None, + refresh=False): + if refresh: + ######### FIXME ######### + raise NotImplementedError() + result = [] + self.log.debug('_get_daemons') + for host, info in self.daemon_cache.items(): + self.log.debug('_get_daemons info %s' % info) + if want_host and host != want_host: + continue + for name, dd in info.get('daemons', {}).items(): + if daemon_type and daemon_type != dd.daemon_type: + continue + if daemon_id and daemon_id != dd.daemon_id: + continue + if service_name and not dd.name().startswith(service_name + '.'): + continue + result.append(dd) + self.log.debug('_get_daemons returns %s' % result) + return trivial_result(result) # def describe_service(self, service_type=None, service_id=None, # node_name=None, refresh=False): @@ -1397,7 +1352,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): host=None, refresh=False): result = self._get_daemons(daemon_type=daemon_type, daemon_id=daemon_id, - host=host, + want_host=host, refresh=refresh) return result @@ -1440,7 +1395,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): host, name, 'unit', ['--name', name, a], error_ok=True) - self.daemon_cache.invalidate(host) + self.daemon_cache[host]['last_update'] = None self.log.debug('_daemon_action code %s out %s' % (code, out)) return "{} {} from host '{}'".format(action, name, host) @@ -1751,21 +1706,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if not code and host in self.daemon_cache: # prime cached service state with what we (should have) # just created - sd = { - 'style': 'cephadm:v1', - 'name': '%s.%s' % (daemon_type, daemon_id), - 'fsid': self._cluster_fsid, - 'enabled': True, - 'state': 'running', - } - data = self.daemon_cache[host].data - if data: - data = [d for d in data if '%s.%s' % (daemon_type, daemon_id) != d['name']] - data.append(sd) - else: - data = [sd] - self.daemon_cache[host] = orchestrator.OutdatableData(data) - self.daemon_cache.invalidate(host) + sd = orchestrator.DaemonDescription() + sd.daemon_type = daemon_type + sd.daemon_id = daemon_id + sd.nodename = host + sd.status = 1 + sd.status_desc = 'starting' + self.daemon_cache[host]['daemons'][sd.name()] = sd + self.daemon_cache[host]['last_update'] = None self.event.set() return "{} {} on host '{}'".format( 'Reconfigured' if reconfig else 'Deployed', name, host) @@ -1784,11 +1732,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.log.debug('_remove_daemon code %s out %s' % (code, out)) if not code and host in self.daemon_cache: # remove item from cache - data = self.daemon_cache[host].data - if data: - data = [d for d in data if d['name'] != name] - self.daemon_cache[host] = orchestrator.OutdatableData(data) - self.daemon_cache.invalidate(host) + if name in self.daemon_cache[host]['daemons']: + del self.daemon_cache[host]['daemons'][name] + self.daemon_cache[host]['last_update'] = None return "Removed {} from host '{}'".format(name, host) def _update_service(self, daemon_type, add_func, spec): -- 2.39.5