]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/ssh: cache services
authorSage Weil <sage@redhat.com>
Mon, 4 Nov 2019 18:31:34 +0000 (12:31 -0600)
committerSage Weil <sage@redhat.com>
Wed, 6 Nov 2019 14:25:49 +0000 (08:25 -0600)
Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/ssh/module.py

index 6783a9b4dbd5c90bf95c58c3d1c2d89bb90a7f70..505dfcdceccaa7edb1adc040b46e64e2d6b7f804 100644 (file)
@@ -106,7 +106,7 @@ def log_exceptions(f):
         return wrapper
 
 
-class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
+class SSHOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
 
     _STORE_HOST_PREFIX = "host"
 
@@ -124,6 +124,12 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
             'default': 10 * 60,
             'desc': 'seconds to cache device inventory',
         },
+        {
+            'name': 'service_cache_timeout',
+            'type': 'seconds',
+            'default': 60,
+            'desc': 'seconds to cache service (daemon) inventory',
+        },
     ]
 
     COMMANDS = [
@@ -166,8 +172,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         self.inventory_cache = orchestrator.OutdatablePersistentDict(
             self, self._STORE_HOST_PREFIX + '.devices')
 
-        self.daemon_cache = orchestrator.OutdatablePersistentDict(
-            self, self._STORE_HOST_PREFIX + '.daemons')
+        self.service_cache = orchestrator.OutdatablePersistentDict(
+            self, self._STORE_HOST_PREFIX + '.services')
 
     def config_notify(self):
         """
@@ -418,6 +424,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         @log_exceptions
         def run(host):
             self.inventory_cache[host] = orchestrator.OutdatableData()
+            self.service_cache[host] = orchestrator.OutdatableData()
             return "Added host '{}'".format(host)
 
         return SSHWriteCompletion(
@@ -432,6 +439,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         @log_exceptions
         def run(host):
             del self.inventory_cache[host]
+            del self.service_cache[host]
             return "Removed host '{}'".format(host)
 
         return SSHWriteCompletion(
@@ -450,20 +458,42 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
         return orchestrator.TrivialReadCompletion(nodes)
 
+    def _refresh_host_services(self, host):
+        out, code = self._run_ceph_daemon(
+            host, 'mon', 'ls', [], no_fsid=True)
+        data = json.loads(''.join(out))
+        self.log.debug('refreshed host %s services: %s' % (host, data))
+        self.service_cache[host] = orchestrator.OutdatableData(data)
+        return data
+
     def _get_services(self,
                       service_type=None,
                       service_name=None,
                       service_id=None,
-                      node_name=None):
-        daemons = {}
-        for host, _ in self._get_hosts():
-            self.log.info("refresh stale daemons for '{}'".format(host))
-            out, code = self._run_ceph_daemon(
-                host, 'mon', 'ls', [], no_fsid=True)
-            daemons[host] = json.loads(''.join(out))
+                      node_name=None,
+                      refresh=False):
+        hosts = []
+        wait_for = []
+        for host, host_info in self.service_cache.items_filtered():
+            hosts.append(host)
+            if host_info.outdated(self.service_cache_timeout) or refresh:
+                self.log.info("refresing stale services for '{}'".format(host))
+                wait_for.append(
+                    SSHReadCompletion(self._worker_pool.apply_async(
+                        self._refresh_host_services, (host,))))
+            else:
+                self.log.debug('have recent services for %s: %s' % (
+                    host, host_info.data))
+                wait_for.append(
+                    orchestrator.TrivialReadCompletion([host_info.data]))
+        self._orchestrator_wait(wait_for)
+
+        services = {}
+        for host, c in zip(hosts, wait_for):
+            services[host] = c.result[0]
 
         result = []
-        for host, ls in daemons.items():
+        for host, ls in services.items():
             for d in ls:
                 if not d['style'].startswith('ceph-daemon'):
                     self.log.debug('ignoring non-ceph-daemon on %s: %s' % (host, d))
@@ -504,7 +534,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
                 service_type + " unsupported")
         result = self._get_services(service_type,
                                     service_id=service_id,
-                                    node_name=node_name)
+                                    node_name=node_name,
+                                    refresh=refresh)
         return orchestrator.TrivialReadCompletion(result)
 
     def service_action(self, action, service_type,