self.mgr = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
+ self.devices = {} # type: Dict[str, List[inventory.Device]]
+ self.last_device_update = {} # type: Dict[str, datetime.datetime]
def load(self):
# type: () -> None
# we do ignore the persisted last_*_update to trigger a new
# scrape on mgr restart
self.daemons[host] = {}
+ self.devices[host] = []
for name, d in j.get('daemons', {}).items():
self.daemons[host][name] = \
orchestrator.DaemonDescription.from_json(d)
- self.mgr.log.debug('HostCache.load: host %s has %d daemons' % (
- host, len(self.daemons[host])))
+ for d in j.get('devices', []):
+ self.devices[host].append(inventory.Device.from_json(d))
+ self.mgr.log.debug('HostCache.load: host %s has %d daemons, %d devices' % (
+ host, len(self.daemons[host]), len(self.devices[host])))
except Exception as e:
self.mgr.log.warning('unable to load cached state for %s: %s' % (
host, e))
self.daemons[host] = dm
self.last_daemon_update[host] = datetime.datetime.utcnow()
+ def update_host_devices(self, host, dls):
+ # type: (str, List[inventory.Device]) -> None
+ self.devices[host] = dls
+ self.last_device_update[host] = datetime.datetime.utcnow()
+
def prime_empty_host(self, host):
# type: (str) -> None
"""
Install an empty entry for a host
"""
self.daemons[host] = {}
+ self.devices[host] = []
def invalidate_host_daemons(self, host):
# type: (str) -> None
if host in self.last_daemon_update:
del self.last_daemon_update[host]
+ def invalidate_host_devices(self, host):
+ # type: (str) -> None
+ if host in self.last_device_update:
+ del self.last_device_update[host]
+
def save_host(self, host):
# type: (str) -> None
j = { # type: ignore
'daemons': {},
+ 'devices': [],
}
if host in self.last_daemon_update:
j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
for name, dd in self.daemons[host].items():
- j['daemons'][name] = dd.to_json()
+ j['daemons'][name] = dd.to_json() # type: ignore
+ for d in self.devices[host]:
+ j['devices'].append(d.to_json()) # type: ignore
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
def rm_host(self, host):
# type: (str) -> None
if host in self.daemons:
del self.daemons[host]
+ if host in self.devices:
+ del self.devices[host]
if host in self.last_daemon_update:
del self.last_daemon_update[host]
+ if host in self.last_device_update:
+ del self.last_device_update[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
def get_hosts(self):
return True
return False
+ def host_needs_device_refresh(self, host):
+ # type: (str) -> bool
+ cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ seconds=self.mgr.device_cache_timeout)
+ if host not in self.last_device_update or self.last_device_update[host] < cutoff:
+ return True
+ return False
+
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
'desc': 'customized SSH config file to connect to managed hosts',
},
{
- 'name': 'inventory_cache_timeout',
+ 'name': 'device_cache_timeout',
'type': 'secs',
'default': 10 * 60,
'desc': 'seconds to cache device inventory',
# for mypy which does not run the code
if TYPE_CHECKING:
self.ssh_config_file = None # type: Optional[str]
- self.inventory_cache_timeout = 0
+ self.device_cache_timeout = 0
self.daemon_cache_timeout = 0
self.mode = ''
self.container_image_base = ''
self._check_hosts()
# refresh daemons
- self.log.debug('refreshing daemons')
+ self.log.debug('refreshing hosts')
failures = []
for host in self.cache.get_hosts():
if self.cache.host_needs_daemon_refresh(host):
- self.log.debug('refreshing %s' % host)
+ self.log.debug('refreshing %s daemons' % host)
r = self._refresh_host_daemons(host)
if r:
failures.append(r)
+ if self.cache.host_needs_device_refresh(host):
+ self.log.debug('refreshing %s devices' % host)
+ r = self._refresh_host_devices(host)
+ if r:
+ failures.append(r)
if failures:
self.health_checks['CEPHADM_REFRESH_FAILED'] = {
'severity': 'warning',
- 'summary': 'failed to probe %s hosts' % len(failures),
- 'count': 1,
+ 'summary': 'failed to probe daemons or devices',
+ 'count': len(failures),
'detail': failures,
}
self.set_health_checks(self.health_checks)
dm[sd.name()] = sd
self.log.debug('Refreshed host %s daemons: %s' % (host, dm))
self.cache.update_host_daemons(host, dm)
+ self.cache.save_host(host)
return None
+ def _refresh_host_devices(self, host):
+ out, err, code = self._run_cephadm(
+ host, 'osd',
+ 'ceph-volume',
+ ['--', 'inventory', '--format=json'])
+ data = json.loads(''.join(out))
+ self.log.debug('Refreshed host %s devices: %s' % (host, data))
+ devices = inventory.Devices.from_json(data)
+ self.cache.update_host_devices(host, devices.devices)
+ self.cache.save_host(host)
+ return None
def describe_service(self, service_type=None, service_name=None,
refresh=False):
def _get_inventory(host, host_info):
# type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
- if host_info.outdated(self.inventory_cache_timeout) or refresh:
+ if host_info.outdated(self.device_cache_timeout) or refresh:
self.log.info("refresh stale inventory for '{}'".format(host))
out, err, code = self._run_cephadm(
host, 'osd',