]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: agent: cache ls output
authorAdam King <adking@redhat.com>
Tue, 12 Oct 2021 14:08:23 +0000 (10:08 -0400)
committerAdam King <adking@redhat.com>
Fri, 22 Oct 2021 18:18:32 +0000 (14:18 -0400)
ls is really slow, but we can cache the info and only run
the full thing if we find a new or removed container or if
one of the daemons' container id changes

Signed-off-by: Adam King <adking@redhat.com>
src/cephadm/cephadm

index efcf79322cc7c72197797806953eda17f5b8c34a..3276047c9ae841e6127fe4409f9758e5c4234cf4 100755 (executable)
@@ -3525,6 +3525,7 @@ class CephadmAgent():
         self.device_enhanced_scan = False
         self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
         self.recent_iteration_index: int = 0
+        self.cached_ls_values: Dict[str, Dict[str, str]] = {}
 
     def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
         if not config:
@@ -3652,7 +3653,7 @@ WantedBy=ceph-{fsid}.target
                     networks_list[key] = {k: list(v)}
 
             data = json.dumps({'host': self.host,
-                              'ls': list_daemons(self.ctx),
+                              'ls': self._get_ls(),
                                'networks': networks_list,
                                'facts': HostFacts(self.ctx).dump(),
                                'volume': volume,
@@ -3694,6 +3695,121 @@ WantedBy=ceph-{fsid}.target
         else:
             raise Exception('ceph-volume returned empty value')
 
+    def _daemon_ls_subset(self) -> Dict[str, Dict[str, Any]]:
+        # gets a subset of ls info quickly. The results of this will tell us if our
+        # cached info is still good or if we need to run the full ls again.
+        # for legacy containers, we just grab the full info. For cephadmv1 containers,
+        # we only grab enabled, state, mem_usage and container id. If container id has
+        # not changed for any daemon, we assume our cached info is good.
+        daemons: Dict[str, Dict[str, Any]] = {}
+        data_dir = self.ctx.data_dir
+        seen_memusage = {}  # type: Dict[str, int]
+        out, err, code = call(
+            self.ctx,
+            [self.ctx.container_engine.path, 'stats', '--format', '{{.ID}},{{.MemUsage}}', '--no-stream'],
+            verbosity=CallVerbosity.DEBUG
+        )
+        seen_memusage_cid_len, seen_memusage = _parse_mem_usage(code, out)
+        # we need a mapping from container names to ids. Later we will convert daemon
+        # names to container names to get daemons container id to see if it has changed
+        out, err, code = call(
+            self.ctx,
+            [self.ctx.container_engine.path, 'ps', '--format', '{{.ID}},{{.Names}}', '--no-trunc'],
+            verbosity=CallVerbosity.DEBUG
+        )
+        name_id_mapping: Dict[str, str] = self._parse_container_id_name(code, out)
+        for i in os.listdir(data_dir):
+            if i in ['mon', 'osd', 'mds', 'mgr']:
+                daemon_type = i
+                for j in os.listdir(os.path.join(data_dir, i)):
+                    if '-' not in j:
+                        continue
+                    (cluster, daemon_id) = j.split('-', 1)
+                    legacy_unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id)
+                    (enabled, state, _) = check_unit(self.ctx, legacy_unit_name)
+                    daemons[f'{daemon_type}.{daemon_id}'] = {
+                        'style': 'legacy',
+                        'name': '%s.%s' % (daemon_type, daemon_id),
+                        'fsid': self.ctx.fsid if self.ctx.fsid is not None else 'unknown',
+                        'systemd_unit': legacy_unit_name,
+                        'enabled': 'true' if enabled else 'false',
+                        'state': state,
+                    }
+            elif is_fsid(i):
+                fsid = str(i)  # convince mypy that fsid is a str here
+                for j in os.listdir(os.path.join(data_dir, i)):
+                    if '.' in j and os.path.isdir(os.path.join(data_dir, fsid, j)):
+                        (daemon_type, daemon_id) = j.split('.', 1)
+                        unit_name = get_unit_name(fsid, daemon_type, daemon_id)
+                        (enabled, state, _) = check_unit(self.ctx, unit_name)
+                        daemons[j] = {
+                            'style': 'cephadm:v1',
+                            'systemd_unit': unit_name,
+                            'enabled': 'true' if enabled else 'false',
+                            'state': state,
+                        }
+                        c = CephContainer.for_daemon(self.ctx, self.ctx.fsid, daemon_type, daemon_id, 'bash')
+                        container_id: Optional[str] = None
+                        for name in (c.cname, c.old_cname):
+                            if name in name_id_mapping:
+                                container_id = name_id_mapping[name]
+                                break
+                        daemons[j]['container_id'] = container_id
+                        if container_id:
+                            daemons[j]['memory_usage'] = seen_memusage.get(container_id[0:seen_memusage_cid_len])
+        return daemons
+
+    def _parse_container_id_name(self, code: int, out: str) -> Dict[str, str]:
+        # map container names to ids from ps output
+        name_id_mapping = {}  # type: Dict[str, str]
+        if not code:
+            for line in out.splitlines():
+                id, name = line.split(',')
+                name_id_mapping[name] = id
+        return name_id_mapping
+
+    def _get_ls(self) -> List[Dict[str, str]]:
+        if not self.cached_ls_values:
+            logger.info('No cached ls output. Running full daemon ls')
+            ls = list_daemons(self.ctx)
+            for d in ls:
+                self.cached_ls_values[d['name']] = d
+        else:
+            ls_subset = self._daemon_ls_subset()
+            need_full_ls = False
+            if set(self.cached_ls_values.keys()) != set(ls_subset.keys()):
+                # case for a new daemon in ls or an old daemon no longer appearing.
+                # If that happens we need a full ls
+                logger.info('Change detected in state of daemons. Running full daemon ls')
+                ls = list_daemons(self.ctx)
+                for d in ls:
+                    self.cached_ls_values[d['name']] = d
+                return ls
+            for daemon, info in self.cached_ls_values.items():
+                if info['style'] == 'legacy':
+                    # for legacy containers, ls_subset just grabs all the info
+                    self.cached_ls_values[daemon] = ls_subset[daemon]
+                else:
+                    if info['container_id'] != ls_subset[daemon]['container_id']:
+                        # case for container id having changed. We need full ls as
+                        # info we didn't grab like version and start time could have changed
+                        need_full_ls = True
+                        break
+                    # if we reach here, container id matched. Update the few values we do track
+                    # from ls subset: state, enabled, memory_usage.
+                    self.cached_ls_values[daemon]['enabled'] = ls_subset[daemon]['enabled']
+                    self.cached_ls_values[daemon]['state'] = ls_subset[daemon]['state']
+                    if 'memory_usage' in ls_subset[daemon]:
+                        self.cached_ls_values[daemon]['memory_usage'] = ls_subset[daemon]['memory_usage']
+            if need_full_ls:
+                logger.info('Change detected in state of daemons. Running full daemon ls')
+                ls = list_daemons(self.ctx)
+                for d in ls:
+                    self.cached_ls_values[d['name']] = d
+            else:
+                ls = [info for daemon, info in self.cached_ls_values.items()]
+        return ls
+
 
 def command_agent(ctx: CephadmContext) -> None:
     agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id)