]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: replace daemon_cache with an explicit set of dicts
authorSage Weil <sage@redhat.com>
Fri, 14 Feb 2020 16:49:37 +0000 (10:49 -0600)
committerSage Weil <sage@redhat.com>
Tue, 18 Feb 2020 21:43:02 +0000 (15:43 -0600)
- Cache DaemonDescription explicitly
- explicit timestamp for the host
- serve() scrapes inventory based on that timestamp

For the moment, persistence is broken, and --refresh is broken.

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py

index 410187bf893f7b93e88025d5f96b646bbe95b405..5e547a87c247307c5eafcab3249f6a763a0d1275 100644 (file)
@@ -409,8 +409,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self.inventory_cache = orchestrator.OutdatablePersistentDict(
             self, self._STORE_HOST_PREFIX + '.devices')
 
-        self.daemon_cache = orchestrator.OutdatablePersistentDict(
-            self, self._STORE_HOST_PREFIX + '.daemons')
+        self.daemon_cache = {}  # type: ignore
 
         # ensure the host lists are in sync
         for h in self.inventory.keys():
@@ -419,7 +418,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                 self.inventory_cache[h] = orchestrator.OutdatableData()
             if h not in self.daemon_cache:
                 self.log.debug('adding service item for %s' % h)
-                self.daemon_cache[h] = orchestrator.OutdatableData()
+                self.daemon_cache[h] = {
+                    'last_update': None,
+                    'daemons': {},
+                }
         for h in self.inventory_cache:
             if h not in self.inventory:
                 del self.inventory_cache[h]
@@ -704,7 +706,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             }
         self.set_health_checks(self.health_checks)
 
-    def _check_for_strays(self, daemons):
+    def _check_for_strays(self):
         self.log.debug('_check_for_strays')
         for k in ['CEPHADM_STRAY_HOST',
                   'CEPHADM_STRAY_DAEMON']:
@@ -713,8 +715,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
             ls = self.list_servers()
             managed = []
-            for s in daemons:
-                managed.append(s.name())
+            for host, di in self.daemon_cache.items():
+                for name, dd in di['daemons'].items():
+                    managed.append(name)
             host_detail = []     # type: List[str]
             host_num_daemons = 0
             daemon_detail = []  # type: List[str]
@@ -766,30 +769,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             self._check_hosts()
 
             # refresh daemons
-            self.log.debug('refreshing daemons')
-            completion = self._get_daemons(maybe_refresh=True)
-            self._orchestrator_wait([completion])
-            # FIXME: this is a band-aid to avoid crashing the mgr, but what
-            # we really need to do here is raise health alerts for individual
-            # hosts that fail and continue with the ones that do not fail.
-            if completion.exception is not None:
-                self.log.error('failed to refresh daemons: %s' % completion.exception)
-                self.health_checks['CEPHADM_REFRESH_FAILED'] = {
-                    'severity': 'warning',
-                    'summary': 'failed to probe one or more hosts',
-                    'count': 1,
-                    'detail': [str(completion.exception)],
-                }
-                self.set_health_checks(self.health_checks)
-                self._serve_sleep()
-                continue
-            if 'CEPHADM_REFRESH_FAILED' in self.health_checks:
-                del self.health_checks['CEPHADM_REFRESH_FAILED']
-                self.set_health_checks(self.health_checks)
-            daemons = completion.result
-            self.log.debug('daemons %s' % daemons)
+            cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.daemon_cache_timeout)
+            cutoffs = cutoff.strftime(DATEFMT)
+            self.log.debug('refreshing daemons, cutoff %s' % cutoffs)
+            for host, di in self.daemon_cache.items():
+                if not di['last_update'] or di['last_update'] < cutoffs:
+                    self.log.debug('refreshing %s' % host)
+                    self._refresh_host_daemons(host)
 
-            self._check_for_strays(daemons)
+            self._check_for_strays()
 
             if self.upgrade_state and not self.upgrade_state.get('paused'):
                 completion = self._do_upgrade(daemons)
@@ -1202,7 +1190,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         }
         self._save_inventory()
         self.inventory_cache[spec.hostname] = orchestrator.OutdatableData()
-        self.daemon_cache[spec.hostname] = orchestrator.OutdatableData()
+        self.daemon_cache[spec.hostname] = {
+            'last_update': None,
+            'daemons': {},
+        }
         self.event.set()  # refresh stray health check
         return "Added host '{}'".format(spec.hostname)
 
@@ -1276,20 +1267,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self._save_inventory()
         return 'Removed label %s from host %s' % (label, host)
 
-    @async_map_completion
     def _refresh_host_daemons(self, host):
         out, err, code = self._run_cephadm(
             host, 'mon', 'ls', [], no_fsid=True)
-        data = json.loads(''.join(out))
-        for d in data:
-            d['last_refresh'] = datetime.datetime.utcnow().strftime(DATEFMT)
-        self.log.debug('Refreshed host %s daemons: %s' % (host, data))
-        self.daemon_cache[host] = orchestrator.OutdatableData(data)
-        return host, data
-
-    def _proc_ls(self, host, ls):
-        # type: (str, List[Dict[str,str]]) -> List[orchestrator.DaemonDescription]
-        result = []
+        ls = json.loads(''.join(out))
+        dm = {}
         for d in ls:
             if not d['style'].startswith('cephadm'):
                 self.log.debug('ignoring non-cephadm on %s: %s' % (host, d))
@@ -1297,14 +1279,12 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             if d['fsid'] != self._cluster_fsid:
                 self.log.debug('ignoring foreign daemon on %s: %s' % (host, d))
                 continue
-            self.log.debug('including %s %s' % (host, d))
-            sd = orchestrator.DaemonDescription()
-            if 'last_refresh' in d:
-                sd.last_refresh = datetime.datetime.strptime(
-                    d['last_refresh'], DATEFMT)
             if '.' not in d['name']:
                 self.log.debug('ignoring dot-less daemon on %s: %s' % (host, d))
                 continue
+            self.log.debug('including %s %s' % (host, d))
+            sd = orchestrator.DaemonDescription()
+            sd.last_refresh = datetime.datetime.utcnow()
             sd.daemon_type = d['name'].split('.')[0]
             sd.daemon_id = '.'.join(d['name'].split('.')[1:])
             sd.nodename = host
@@ -1323,64 +1303,39 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             else:
                 sd.status_desc = 'unknown'
                 sd.status = None
-            result.append(sd)
-        return result
+            dm[sd.name()] = sd
+        self.log.debug('Refreshed host %s daemons: %s' % (host, dm))
+        self.daemon_cache[host] = {
+            'last_update': datetime.datetime.utcnow().strftime(DATEFMT),
+            'daemons': dm,
+        }
+        return host, dm
 
     def _get_daemons(self,
                      daemon_type=None,
                      daemon_id=None,
                      service_name=None,
-                     host=None,
-                     refresh=False,
-                     maybe_refresh=False):
-        hosts = []
-        wait_for_args = []
-        daemons = {}
-        keys = None
-        if host is not None:
-            keys = [host]
-        for host, host_info in self.daemon_cache.items_filtered(keys):
-            hosts.append(host)
-            if refresh:
-                self.log.info("refreshing daemons for '{}'".format(host))
-                wait_for_args.append((host,))
-            elif maybe_refresh and host_info.outdated(self.daemon_cache_timeout):  # type: ignore
-                self.log.info("refreshing stale daemons for '{}'".format(host))
-                wait_for_args.append((host,))
-            elif not host_info.last_refresh:
-                daemons[host] = [
-                    {
-                        'name': '*.*',
-                        'style': 'cephadm:v1',
-                        'fsid': self._cluster_fsid,
-                    },
-                ]
-            else:
-                self.log.debug('have recent daemons for %s: %s' % (
-                    host, host_info.data))
-                daemons[host] = host_info.data
-
-        def _get_daemons_result(results):
-            for host, data in results:
-                daemons[host] = data
-
-            result = []
-            for host, ls in daemons.items():
-                for d in self._proc_ls(host, ls):
-                    if daemon_type and daemon_type != d.daemon_type:
-                        continue
-                    if daemon_id and daemon_id != d.daemon_id:
-                        continue
-                    if service_name and not d.daemon_id.startswith(service_name + '.'):
-                        continue
-                    result.append(d)
-            return result
-
-        if wait_for_args:
-            return self._refresh_host_daemons(wait_for_args).then(
-                _get_daemons_result)
-        else:
-            return trivial_result(_get_daemons_result({}))
+                     want_host=None,
+                     refresh=False):
+        if refresh:
+            ######### FIXME #########
+            raise NotImplementedError()
+        result = []
+        self.log.debug('_get_daemons')
+        for host, info in self.daemon_cache.items():
+            self.log.debug('_get_daemons info %s' % info)
+            if want_host and host != want_host:
+                continue
+            for name, dd in info.get('daemons', {}).items():
+                if daemon_type and daemon_type != dd.daemon_type:
+                    continue
+                if daemon_id and daemon_id != dd.daemon_id:
+                    continue
+                if service_name and not dd.name().startswith(service_name + '.'):
+                    continue
+                result.append(dd)
+        self.log.debug('_get_daemons returns %s' % result)
+        return trivial_result(result)
 
 #    def describe_service(self, service_type=None, service_id=None,
 #                         node_name=None, refresh=False):
@@ -1397,7 +1352,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                      host=None, refresh=False):
         result = self._get_daemons(daemon_type=daemon_type,
                                    daemon_id=daemon_id,
-                                   host=host,
+                                   want_host=host,
                                    refresh=refresh)
         return result
 
@@ -1440,7 +1395,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                 host, name, 'unit',
                 ['--name', name, a],
                 error_ok=True)
-            self.daemon_cache.invalidate(host)
+            self.daemon_cache[host]['last_update'] = None
             self.log.debug('_daemon_action code %s out %s' % (code, out))
         return "{} {} from host '{}'".format(action, name, host)
 
@@ -1751,21 +1706,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         if not code and host in self.daemon_cache:
             # prime cached service state with what we (should have)
             # just created
-            sd = {
-                'style': 'cephadm:v1',
-                'name': '%s.%s' % (daemon_type, daemon_id),
-                'fsid': self._cluster_fsid,
-                'enabled': True,
-                'state': 'running',
-            }
-            data = self.daemon_cache[host].data
-            if data:
-                data = [d for d in data if '%s.%s' % (daemon_type, daemon_id) != d['name']]
-                data.append(sd)
-            else:
-                data = [sd]
-            self.daemon_cache[host] = orchestrator.OutdatableData(data)
-        self.daemon_cache.invalidate(host)
+            sd = orchestrator.DaemonDescription()
+            sd.daemon_type = daemon_type
+            sd.daemon_id = daemon_id
+            sd.nodename = host
+            sd.status = 1
+            sd.status_desc = 'starting'
+            self.daemon_cache[host]['daemons'][sd.name()] = sd
+        self.daemon_cache[host]['last_update'] = None
         self.event.set()
         return "{} {} on host '{}'".format(
             'Reconfigured' if reconfig else 'Deployed', name, host)
@@ -1784,11 +1732,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self.log.debug('_remove_daemon code %s out %s' % (code, out))
         if not code and host in self.daemon_cache:
             # remove item from cache
-            data = self.daemon_cache[host].data
-            if data:
-                data = [d for d in data if d['name'] != name]
-                self.daemon_cache[host] = orchestrator.OutdatableData(data)
-        self.daemon_cache.invalidate(host)
+            if name in self.daemon_cache[host]['daemons']:
+                del self.daemon_cache[host]['daemons'][name]
+        self.daemon_cache[host]['last_update'] = None
         return "Removed {} from host '{}'".format(name, host)
 
     def _update_service(self, daemon_type, add_func, spec):