]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: scrape and cache devices, too
authorSage Weil <sage@redhat.com>
Tue, 18 Feb 2020 15:46:48 +0000 (09:46 -0600)
committerSage Weil <sage@redhat.com>
Wed, 19 Feb 2020 20:53:47 +0000 (14:53 -0600)
Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py

index 02394ddc219df1c1970fa08ee9b8d7079f459309..a8eddecbef31917a3b4490387513c745d73b35ef 100644 (file)
@@ -112,6 +112,8 @@ class HostCache():
         self.mgr = mgr
         self.daemons = {}   # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
         self.last_daemon_update = {}   # type: Dict[str, datetime.datetime]
+        self.devices = {}              # type: Dict[str, List[inventory.Device]]
+        self.last_device_update = {}   # type: Dict[str, datetime.datetime]
 
     def load(self):
         # type: () -> None
@@ -126,11 +128,14 @@ class HostCache():
                 # we do ignore the persisted last_*_update to trigger a new
                 # scrape on mgr restart
                 self.daemons[host] = {}
+                self.devices[host] = []
                 for name, d in j.get('daemons', {}).items():
                     self.daemons[host][name] = \
                         orchestrator.DaemonDescription.from_json(d)
-                self.mgr.log.debug('HostCache.load: host %s has %d daemons' % (
-                    host, len(self.daemons[host])))
+                for d in j.get('devices', []):
+                    self.devices[host].append(inventory.Device.from_json(d))
+                self.mgr.log.debug('HostCache.load: host %s has %d daemons, %d devices' % (
+                    host, len(self.daemons[host]), len(self.devices[host])))
             except Exception as e:
                 self.mgr.log.warning('unable to load cached state for %s: %s' % (
                     host, e))
@@ -141,35 +146,53 @@ class HostCache():
         self.daemons[host] = dm
         self.last_daemon_update[host] = datetime.datetime.utcnow()
 
+    def update_host_devices(self, host, dls):
+        # type: (str, List[inventory.Device]) -> None
+        self.devices[host] = dls
+        self.last_device_update[host] = datetime.datetime.utcnow()
+
     def prime_empty_host(self, host):
         # type: (str) -> None
         """
         Install an empty entry for a host
         """
         self.daemons[host] = {}
+        self.devices[host] = []
 
     def invalidate_host_daemons(self, host):
         # type: (str) -> None
         if host in self.last_daemon_update:
             del self.last_daemon_update[host]
 
+    def invalidate_host_devices(self, host):
+        # type: (str) -> None
+        if host in self.last_device_update:
+            del self.last_device_update[host]
+
     def save_host(self, host):
         # type: (str) -> None
         j = {   # type: ignore
             'daemons': {},
+            'devices': [],
         }
         if host in self.last_daemon_update:
             j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
         for name, dd in self.daemons[host].items():
-            j['daemons'][name] = dd.to_json()
+            j['daemons'][name] = dd.to_json()  # type: ignore
+        for d in self.devices[host]:
+            j['devices'].append(d.to_json())  # type: ignore
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
 
     def rm_host(self, host):
         # type: (str) -> None
         if host in self.daemons:
             del self.daemons[host]
+        if host in self.devices:
+            del self.devices[host]
         if host in self.last_daemon_update:
             del self.last_daemon_update[host]
+        if host in self.last_device_update:
+            del self.last_device_update[host]
         self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
 
     def get_hosts(self):
@@ -212,6 +235,14 @@ class HostCache():
             return True
         return False
 
+    def host_needs_device_refresh(self, host):
+        # type: (str) -> bool
+        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+            seconds=self.mgr.device_cache_timeout)
+        if host not in self.last_device_update or self.last_device_update[host] < cutoff:
+            return True
+        return False
+
     def add_daemon(self, host, dd):
         # type: (str, orchestrator.DaemonDescription) -> None
         assert host in self.daemons
@@ -401,7 +432,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             'desc': 'customized SSH config file to connect to managed hosts',
         },
         {
-            'name': 'inventory_cache_timeout',
+            'name': 'device_cache_timeout',
             'type': 'secs',
             'default': 10 * 60,
             'desc': 'seconds to cache device inventory',
@@ -458,7 +489,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         # for mypy which does not run the code
         if TYPE_CHECKING:
             self.ssh_config_file = None  # type: Optional[str]
-            self.inventory_cache_timeout = 0
+            self.device_cache_timeout = 0
             self.daemon_cache_timeout = 0
             self.mode = ''
             self.container_image_base = ''
@@ -869,19 +900,24 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             self._check_hosts()
 
             # refresh daemons
-            self.log.debug('refreshing daemons')
+            self.log.debug('refreshing hosts')
             failures = []
             for host in self.cache.get_hosts():
                 if self.cache.host_needs_daemon_refresh(host):
-                    self.log.debug('refreshing %s' % host)
+                    self.log.debug('refreshing %s daemons' % host)
                     r = self._refresh_host_daemons(host)
                     if r:
                         failures.append(r)
+                if self.cache.host_needs_device_refresh(host):
+                    self.log.debug('refreshing %s devices' % host)
+                    r = self._refresh_host_devices(host)
+                    if r:
+                        failures.append(r)
             if failures:
                 self.health_checks['CEPHADM_REFRESH_FAILED'] = {
                     'severity': 'warning',
-                    'summary': 'failed to probe %s hosts' % len(failures),
-                    'count': 1,
+                    'summary': 'failed to probe daemons or devices',
+                    'count': len(failures),
                     'detail': failures,
                 }
                 self.set_health_checks(self.health_checks)
@@ -1425,8 +1461,20 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             dm[sd.name()] = sd
         self.log.debug('Refreshed host %s daemons: %s' % (host, dm))
         self.cache.update_host_daemons(host, dm)
+        self.cache.save_host(host)
         return None
 
+    def _refresh_host_devices(self, host):
+        out, err, code = self._run_cephadm(
+            host, 'osd',
+            'ceph-volume',
+            ['--', 'inventory', '--format=json'])
+        data = json.loads(''.join(out))
+        self.log.debug('Refreshed host %s devices: %s' % (host, data))
+        devices = inventory.Devices.from_json(data)
+        self.cache.update_host_devices(host, devices.devices)
+        self.cache.save_host(host)
+        return None
 
     def describe_service(self, service_type=None, service_name=None,
                          refresh=False):
@@ -1581,7 +1629,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         def _get_inventory(host, host_info):
             # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
 
-            if host_info.outdated(self.inventory_cache_timeout) or refresh:
+            if host_info.outdated(self.device_cache_timeout) or refresh:
                 self.log.info("refresh stale inventory for '{}'".format(host))
                 out, err, code = self._run_cephadm(
                     host, 'osd',