From: Sage Weil Date: Mon, 4 Nov 2019 18:31:34 +0000 (-0600) Subject: mgr/ssh: cache services X-Git-Tag: v15.1.0~1020^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=89c1bf519431b0e71e68d709373aaa614987e14f;p=ceph.git mgr/ssh: cache services Signed-off-by: Sage Weil --- diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py index 6783a9b4dbd5..505dfcdcecca 100644 --- a/src/pybind/mgr/ssh/module.py +++ b/src/pybind/mgr/ssh/module.py @@ -106,7 +106,7 @@ def log_exceptions(f): return wrapper -class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): +class SSHOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): _STORE_HOST_PREFIX = "host" @@ -124,6 +124,12 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): 'default': 10 * 60, 'desc': 'seconds to cache device inventory', }, + { + 'name': 'service_cache_timeout', + 'type': 'seconds', + 'default': 60, + 'desc': 'seconds to cache service (daemon) inventory', + }, ] COMMANDS = [ @@ -166,8 +172,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): self.inventory_cache = orchestrator.OutdatablePersistentDict( self, self._STORE_HOST_PREFIX + '.devices') - self.daemon_cache = orchestrator.OutdatablePersistentDict( - self, self._STORE_HOST_PREFIX + '.daemons') + self.service_cache = orchestrator.OutdatablePersistentDict( + self, self._STORE_HOST_PREFIX + '.services') def config_notify(self): """ @@ -418,6 +424,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): @log_exceptions def run(host): self.inventory_cache[host] = orchestrator.OutdatableData() + self.service_cache[host] = orchestrator.OutdatableData() return "Added host '{}'".format(host) return SSHWriteCompletion( @@ -432,6 +439,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): @log_exceptions def run(host): del self.inventory_cache[host] + del self.service_cache[host] return "Removed host '{}'".format(host) return SSHWriteCompletion( @@ -450,20 +458,42 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache] return orchestrator.TrivialReadCompletion(nodes) + def _refresh_host_services(self, host): + out, code = self._run_ceph_daemon( + host, 'mon', 'ls', [], no_fsid=True) + data = json.loads(''.join(out)) + self.log.debug('refreshed host %s services: %s' % (host, data)) + self.service_cache[host] = orchestrator.OutdatableData(data) + return data + def _get_services(self, service_type=None, service_name=None, service_id=None, - node_name=None): - daemons = {} - for host, _ in self._get_hosts(): - self.log.info("refresh stale daemons for '{}'".format(host)) - out, code = self._run_ceph_daemon( - host, 'mon', 'ls', [], no_fsid=True) - daemons[host] = json.loads(''.join(out)) + node_name=None, + refresh=False): + hosts = [] + wait_for = [] + for host, host_info in self.service_cache.items_filtered(): + hosts.append(host) + if host_info.outdated(self.service_cache_timeout) or refresh: + self.log.info("refresing stale services for '{}'".format(host)) + wait_for.append( + SSHReadCompletion(self._worker_pool.apply_async( + self._refresh_host_services, (host,)))) + else: + self.log.debug('have recent services for %s: %s' % ( + host, host_info.data)) + wait_for.append( + orchestrator.TrivialReadCompletion([host_info.data])) + self._orchestrator_wait(wait_for) + + services = {} + for host, c in zip(hosts, wait_for): + services[host] = c.result[0] result = [] - for host, ls in daemons.items(): + for host, ls in services.items(): for d in ls: if not d['style'].startswith('ceph-daemon'): self.log.debug('ignoring non-ceph-daemon on %s: %s' % (host, d)) @@ -504,7 +534,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): service_type + " unsupported") result = self._get_services(service_type, service_id=service_id, - node_name=node_name) + node_name=node_name, + refresh=refresh) return orchestrator.TrivialReadCompletion(result) def service_action(self, action, service_type,