self.inventory_cache = orchestrator.OutdatablePersistentDict(
self, self._STORE_HOST_PREFIX + '.devices')
- self.daemon_cache = orchestrator.OutdatablePersistentDict(
- self, self._STORE_HOST_PREFIX + '.daemons')
+ self.daemon_cache = {} # type: ignore
# ensure the host lists are in sync
for h in self.inventory.keys():
self.inventory_cache[h] = orchestrator.OutdatableData()
if h not in self.daemon_cache:
self.log.debug('adding service item for %s' % h)
- self.daemon_cache[h] = orchestrator.OutdatableData()
+ self.daemon_cache[h] = {
+ 'last_update': None,
+ 'daemons': {},
+ }
for h in self.inventory_cache:
if h not in self.inventory:
del self.inventory_cache[h]
}
self.set_health_checks(self.health_checks)
- def _check_for_strays(self, daemons):
+ def _check_for_strays(self):
self.log.debug('_check_for_strays')
for k in ['CEPHADM_STRAY_HOST',
'CEPHADM_STRAY_DAEMON']:
if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
ls = self.list_servers()
managed = []
- for s in daemons:
- managed.append(s.name())
+ for host, di in self.daemon_cache.items():
+ for name, dd in di['daemons'].items():
+ managed.append(name)
host_detail = [] # type: List[str]
host_num_daemons = 0
daemon_detail = [] # type: List[str]
self._check_hosts()
# refresh daemons
- self.log.debug('refreshing daemons')
- completion = self._get_daemons(maybe_refresh=True)
- self._orchestrator_wait([completion])
- # FIXME: this is a band-aid to avoid crashing the mgr, but what
- # we really need to do here is raise health alerts for individual
- # hosts that fail and continue with the ones that do not fail.
- if completion.exception is not None:
- self.log.error('failed to refresh daemons: %s' % completion.exception)
- self.health_checks['CEPHADM_REFRESH_FAILED'] = {
- 'severity': 'warning',
- 'summary': 'failed to probe one or more hosts',
- 'count': 1,
- 'detail': [str(completion.exception)],
- }
- self.set_health_checks(self.health_checks)
- self._serve_sleep()
- continue
- if 'CEPHADM_REFRESH_FAILED' in self.health_checks:
- del self.health_checks['CEPHADM_REFRESH_FAILED']
- self.set_health_checks(self.health_checks)
- daemons = completion.result
- self.log.debug('daemons %s' % daemons)
+ cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.daemon_cache_timeout)
+ cutoffs = cutoff.strftime(DATEFMT)
+ self.log.debug('refreshing daemons, cutoff %s' % cutoffs)
+ for host, di in self.daemon_cache.items():
+ if not di['last_update'] or di['last_update'] < cutoffs:
+ self.log.debug('refreshing %s' % host)
+ self._refresh_host_daemons(host)
- self._check_for_strays(daemons)
+ self._check_for_strays()
if self.upgrade_state and not self.upgrade_state.get('paused'):
completion = self._do_upgrade(daemons)
}
self._save_inventory()
self.inventory_cache[spec.hostname] = orchestrator.OutdatableData()
- self.daemon_cache[spec.hostname] = orchestrator.OutdatableData()
+ self.daemon_cache[spec.hostname] = {
+ 'last_update': None,
+ 'daemons': {},
+ }
self.event.set() # refresh stray health check
return "Added host '{}'".format(spec.hostname)
self._save_inventory()
return 'Removed label %s from host %s' % (label, host)
- @async_map_completion
def _refresh_host_daemons(self, host):
out, err, code = self._run_cephadm(
host, 'mon', 'ls', [], no_fsid=True)
- data = json.loads(''.join(out))
- for d in data:
- d['last_refresh'] = datetime.datetime.utcnow().strftime(DATEFMT)
- self.log.debug('Refreshed host %s daemons: %s' % (host, data))
- self.daemon_cache[host] = orchestrator.OutdatableData(data)
- return host, data
-
- def _proc_ls(self, host, ls):
- # type: (str, List[Dict[str,str]]) -> List[orchestrator.DaemonDescription]
- result = []
+ ls = json.loads(''.join(out))
+ dm = {}
for d in ls:
if not d['style'].startswith('cephadm'):
self.log.debug('ignoring non-cephadm on %s: %s' % (host, d))
if d['fsid'] != self._cluster_fsid:
self.log.debug('ignoring foreign daemon on %s: %s' % (host, d))
continue
- self.log.debug('including %s %s' % (host, d))
- sd = orchestrator.DaemonDescription()
- if 'last_refresh' in d:
- sd.last_refresh = datetime.datetime.strptime(
- d['last_refresh'], DATEFMT)
if '.' not in d['name']:
self.log.debug('ignoring dot-less daemon on %s: %s' % (host, d))
continue
+ self.log.debug('including %s %s' % (host, d))
+ sd = orchestrator.DaemonDescription()
+ sd.last_refresh = datetime.datetime.utcnow()
sd.daemon_type = d['name'].split('.')[0]
sd.daemon_id = '.'.join(d['name'].split('.')[1:])
sd.nodename = host
else:
sd.status_desc = 'unknown'
sd.status = None
- result.append(sd)
- return result
+ dm[sd.name()] = sd
+ self.log.debug('Refreshed host %s daemons: %s' % (host, dm))
+ self.daemon_cache[host] = {
+ 'last_update': datetime.datetime.utcnow().strftime(DATEFMT),
+ 'daemons': dm,
+ }
+ return host, dm
def _get_daemons(self,
daemon_type=None,
daemon_id=None,
service_name=None,
- host=None,
- refresh=False,
- maybe_refresh=False):
- hosts = []
- wait_for_args = []
- daemons = {}
- keys = None
- if host is not None:
- keys = [host]
- for host, host_info in self.daemon_cache.items_filtered(keys):
- hosts.append(host)
- if refresh:
- self.log.info("refreshing daemons for '{}'".format(host))
- wait_for_args.append((host,))
- elif maybe_refresh and host_info.outdated(self.daemon_cache_timeout): # type: ignore
- self.log.info("refreshing stale daemons for '{}'".format(host))
- wait_for_args.append((host,))
- elif not host_info.last_refresh:
- daemons[host] = [
- {
- 'name': '*.*',
- 'style': 'cephadm:v1',
- 'fsid': self._cluster_fsid,
- },
- ]
- else:
- self.log.debug('have recent daemons for %s: %s' % (
- host, host_info.data))
- daemons[host] = host_info.data
-
- def _get_daemons_result(results):
- for host, data in results:
- daemons[host] = data
-
- result = []
- for host, ls in daemons.items():
- for d in self._proc_ls(host, ls):
- if daemon_type and daemon_type != d.daemon_type:
- continue
- if daemon_id and daemon_id != d.daemon_id:
- continue
- if service_name and not d.daemon_id.startswith(service_name + '.'):
- continue
- result.append(d)
- return result
-
- if wait_for_args:
- return self._refresh_host_daemons(wait_for_args).then(
- _get_daemons_result)
- else:
- return trivial_result(_get_daemons_result({}))
+ want_host=None,
+ refresh=False):
+ if refresh:
+ ######### FIXME #########
+ raise NotImplementedError()
+ result = []
+ self.log.debug('_get_daemons')
+ for host, info in self.daemon_cache.items():
+ self.log.debug('_get_daemons info %s' % info)
+ if want_host and host != want_host:
+ continue
+ for name, dd in info.get('daemons', {}).items():
+ if daemon_type and daemon_type != dd.daemon_type:
+ continue
+ if daemon_id and daemon_id != dd.daemon_id:
+ continue
+ if service_name and not dd.name().startswith(service_name + '.'):
+ continue
+ result.append(dd)
+ self.log.debug('_get_daemons returns %s' % result)
+ return trivial_result(result)
# def describe_service(self, service_type=None, service_id=None,
# node_name=None, refresh=False):
host=None, refresh=False):
result = self._get_daemons(daemon_type=daemon_type,
daemon_id=daemon_id,
- host=host,
+ want_host=host,
refresh=refresh)
return result
host, name, 'unit',
['--name', name, a],
error_ok=True)
- self.daemon_cache.invalidate(host)
+ self.daemon_cache[host]['last_update'] = None
self.log.debug('_daemon_action code %s out %s' % (code, out))
return "{} {} from host '{}'".format(action, name, host)
if not code and host in self.daemon_cache:
# prime cached service state with what we (should have)
# just created
- sd = {
- 'style': 'cephadm:v1',
- 'name': '%s.%s' % (daemon_type, daemon_id),
- 'fsid': self._cluster_fsid,
- 'enabled': True,
- 'state': 'running',
- }
- data = self.daemon_cache[host].data
- if data:
- data = [d for d in data if '%s.%s' % (daemon_type, daemon_id) != d['name']]
- data.append(sd)
- else:
- data = [sd]
- self.daemon_cache[host] = orchestrator.OutdatableData(data)
- self.daemon_cache.invalidate(host)
+ sd = orchestrator.DaemonDescription()
+ sd.daemon_type = daemon_type
+ sd.daemon_id = daemon_id
+ sd.nodename = host
+ sd.status = 1
+ sd.status_desc = 'starting'
+ self.daemon_cache[host]['daemons'][sd.name()] = sd
+ self.daemon_cache[host]['last_update'] = None
self.event.set()
return "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', name, host)
self.log.debug('_remove_daemon code %s out %s' % (code, out))
if not code and host in self.daemon_cache:
# remove item from cache
- data = self.daemon_cache[host].data
- if data:
- data = [d for d in data if d['name'] != name]
- self.daemon_cache[host] = orchestrator.OutdatableData(data)
- self.daemon_cache.invalidate(host)
+ if name in self.daemon_cache[host]['daemons']:
+ del self.daemon_cache[host]['daemons'][name]
+ self.daemon_cache[host]['last_update'] = None
return "Removed {} from host '{}'".format(name, host)
def _update_service(self, daemon_type, add_func, spec):