]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: replace remaining _get_daemons() with daemon cache
authorSage Weil <sage@redhat.com>
Fri, 14 Feb 2020 17:08:18 +0000 (11:08 -0600)
committerSage Weil <sage@redhat.com>
Tue, 18 Feb 2020 21:49:56 +0000 (15:49 -0600)
New function _get_daemons_by_type() returns immediately with a result
from the cache.

Remove the with_daemons decorator.

Push daemon list fetch into _add_new_daemon.

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py

index df600a2c6e8a6746f79eb855ef6807652ae04a11..c11a9c4ffc0df729e09f8870c6b80483abdb06b1 100644 (file)
@@ -262,29 +262,6 @@ def trivial_result(val):
     return AsyncCompletion(value=val, name='trivial_result')
 
 
-def with_daemons(daemon_type=None,
-                 daemon_id=None,
-                 service_name=None,
-                 host=None,
-                 refresh=False):
-    def decorator(func):
-        @wraps(func)
-        def wrapper(self, *args, **kwargs):
-            def on_complete(daemons):
-                if kwargs:
-                    kwargs['daemons'] = daemons
-                    return func(self, *args, **kwargs)
-                else:
-                    args_ = args + (daemons,)
-                    return func(self, *args_, **kwargs)
-            return self._get_daemons(daemon_type=daemon_type,
-                                     daemon_id=daemon_id,
-                                     service_name=service_name,
-                                     host=host,
-                                     refresh=refresh).then(on_complete)
-        return wrapper
-    return decorator
-
 @six.add_metaclass(CLICommandMeta)
 class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
 
@@ -481,8 +458,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self.health_checks[alert_id] = alert
         self.set_health_checks(self.health_checks)
 
-    def _do_upgrade(self, daemons):
-        # type: (List[orchestrator.DaemonDescription]) -> Optional[AsyncCompletion]
+    def _do_upgrade(self):
+        # type: () -> Optional[AsyncCompletion]
         if not self.upgrade_state:
             self.log.debug('_do_upgrade no state, exiting')
             return None
@@ -520,6 +497,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             if opt['name'] == 'container_image':
                 image_settings[opt['section']] = opt['value']
 
+        daemons = []
+        for host, di in self.daemon_cache.items():
+            for name, dd in di['daemons'].items():
+                daemons.append(dd)
+
         for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']:
             self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
             need_upgrade_self = False
@@ -780,7 +762,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             self._check_for_strays()
 
             if self.upgrade_state and not self.upgrade_state.get('paused'):
-                completion = self._do_upgrade(daemons)
+                completion = self._do_upgrade()
                 if completion:
                     while not completion.has_result:
                         self.process([completion])
@@ -1267,6 +1249,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self._save_inventory()
         return 'Removed label %s from host %s' % (label, host)
 
+    def _get_daemons_by_type(self, daemon_type):
+        # type: (str) -> List[orchestrator.DaemonDescription]
+        result = []   # type: List[orchestrator.DaemonDescription]
+        for host, di in self.daemon_cache.items():
+            for name, d in di['daemons'].items():
+                if name.startswith(daemon_type + '.'):
+                    result.append(d)
+        return result
+
     def _refresh_host_daemons(self, host):
         out, err, code = self._run_cephadm(
             host, 'mon', 'ls', [], no_fsid=True)
@@ -1728,26 +1719,25 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         return "Removed {} from host '{}'".format(name, host)
 
     def _update_service(self, daemon_type, add_func, spec):
-        def ___update_service(daemons):
-            if len(daemons) > spec.count:
-                # remove some
-                to_remove = len(daemons) - spec.count
-                args = []
-                for d in daemons[0:to_remove]:
-                    args.append(
-                        ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)
-                    )
-                return self._remove_daemon(args)
-            elif len(daemons) < spec.count:
-                return add_func(spec)
-            return []
-        return self._get_daemons(daemon_type, service_name=spec.name).then(___update_service)
+        daemons = self._get_daemons_by_type(daemon_type)
+        if len(daemons) > spec.count:
+            # remove some
+            to_remove = len(daemons) - spec.count
+            args = []
+            for d in daemons[0:to_remove]:
+                args.append(
+                    ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)
+                )
+            return self._remove_daemon(args)
+        elif len(daemons) < spec.count:
+            return add_func(spec)
+        return trivial_result([])
 
     def _add_new_daemon(self,
                         daemon_type: str,
-                        daemons: List[orchestrator.DaemonDescription],
                         spec: orchestrator.ServiceSpec,
                         create_func: Callable):
+        daemons = self._get_daemons_by_type(daemon_type)
         args = []
         num_added = 0
         assert spec.count is not None
@@ -1818,22 +1808,20 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             if not network:
                 raise RuntimeError("Host '{}' is missing a network spec".format(host))
 
-        def add_mons(daemons):
-            for _, _, name in spec.placement.hosts:
-                if name and len([d for d in daemons if d.daemon_id == name]):
-                    raise RuntimeError('name %s already exists', name)
-
-            # explicit placement: enough hosts provided?
-            if len(spec.placement.hosts) < spec.count:
-                raise RuntimeError("Error: {} hosts provided, expected {}".format(
-                    len(spec.placement.hosts), spec.count))
-            self.log.info("creating {} monitors on hosts: '{}'".format(
-                spec.count, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
-            # TODO: we may want to chain the creation of the monitors so they join
-            # the quorum one at a time.
-            return self._create_mon(spec.placement.hosts)
-
-        return self._get_daemons('mon').then(add_mons)
+        daemons = self._get_daemons_by_type('mon')
+        for _, _, name in spec.placement.hosts:
+            if name and len([d for d in daemons if d.daemon_id == name]):
+                raise RuntimeError('name %s already exists', name)
+
+        # explicit placement: enough hosts provided?
+        if len(spec.placement.hosts) < spec.count:
+            raise RuntimeError("Error: {} hosts provided, expected {}".format(
+                len(spec.placement.hosts), spec.count))
+        self.log.info("creating {} monitors on hosts: '{}'".format(
+            spec.count, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
+        # TODO: we may want to chain the creation of the monitors so they join
+        # the quorum one at a time.
+        return self._create_mon(spec.placement.hosts)
 
     def apply_mon(self, spec):
         # type: (orchestrator.ServiceSpec) -> orchestrator.Completion
@@ -1869,22 +1857,21 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             if not network:
                 raise RuntimeError("Host '{}' is missing a network spec".format(host))
 
-        def update_mons_with_daemons(daemons):
-            for _, _, name in spec.placement.hosts:
-                if name and len([d for d in daemons if d.daemon_id == name]):
-                    raise RuntimeError('name %s alrady exists', name)
-
-            # explicit placement: enough hosts provided?
-            num_new_mons = spec.count - num_mons
-            if len(spec.placement.hosts) < num_new_mons:
-                raise RuntimeError("Error: {} hosts provided, expected {}".format(
-                    len(spec.placement.hosts), num_new_mons))
-            self.log.info("creating {} monitors on hosts: '{}'".format(
-                num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
-            # TODO: we may want to chain the creation of the monitors so they join
-            # the quorum one at a time.
-            return self._create_mon(spec.placement.hosts)
-        return self._get_daemons('mon').then(update_mons_with_daemons)
+        daemons = self._get_daemons_by_type('mon')
+        for _, _, name in spec.placement.hosts:
+            if name and len([d for d in daemons if d.daemon_id == name]):
+                raise RuntimeError('name %s alrady exists', name)
+
+        # explicit placement: enough hosts provided?
+        num_new_mons = spec.count - num_mons
+        if len(spec.placement.hosts) < num_new_mons:
+            raise RuntimeError("Error: {} hosts provided, expected {}".format(
+                len(spec.placement.hosts), num_new_mons))
+        self.log.info("creating {} monitors on hosts: '{}'".format(
+            num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
+        # TODO: we may want to chain the creation of the monitors so they join
+        # the quorum one at a time.
+        return self._create_mon(spec.placement.hosts)
 
     @async_map_completion
     def _create_mgr(self, mgr_id, host):
@@ -1904,20 +1891,19 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
 
         return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
 
-    @with_daemons('mgr')
-    def add_mgr(self, spec, daemons):
-        # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion
+    def add_mgr(self, spec):
+        # type: (orchestrator.ServiceSpec) -> orchestrator.Completion
         spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
-        return self._add_new_daemon('mgr', daemons, spec, self._create_mgr)
+        return self._add_new_daemon('mgr', spec, self._create_mgr)
 
-    @with_daemons('mgr')
-    def apply_mgr(self, spec, daemons):
-        # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion
+    def apply_mgr(self, spec):
+        # type: (orchestrator.ServiceSpec) -> orchestrator.Completion
         """
         Adjust the number of cluster managers.
         """
         spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
 
+        daemons = self._get_daemons_by_type('mgr')
         num_mgrs = len(daemons)
         if spec.count == num_mgrs:
             return orchestrator.Completion(value="The requested number of managers exist.")
@@ -2001,11 +1987,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             'value': spec.name,
         })
 
-        def _add_mds(daemons):
-            # type: (List[orchestrator.DaemonDescription]) -> AsyncCompletion
-            return self._add_new_daemon('mds', daemons, spec, self._create_mds)
-
-        return self._get_daemons('mds').then(_add_mds)
+        return self._add_new_daemon('mds', spec, self._create_mds)
 
     def apply_mds(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
@@ -2042,10 +2024,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             'value': spec.rgw_realm,
         })
 
-        def _add_rgw(daemons):
-            return self._add_new_daemon('rgw', daemons, spec, self._create_rgw)
-
-        return self._get_daemons('rgw').then(_add_rgw)
+        return self._add_new_daemon('rgw', spec, self._create_rgw)
 
     @async_map_completion
     def _create_rgw(self, rgw_id, host):
@@ -2067,10 +2046,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             raise RuntimeError("must specify at least %d hosts" % spec.count)
         self.log.debug('nodes %s' % spec.placement.hosts)
 
-        def _add_rbd_mirror(daemons):
-            return self._add_new_daemon('rbd-mirror', daemons, spec, self._create_rbd_mirror)
-
-        return self._get_daemons('rbd-mirror').then(_add_rbd_mirror)
+        return self._add_new_daemon('rbd-mirror', spec, self._create_rbd_mirror)
 
     @async_map_completion
     def _create_rbd_mirror(self, daemon_id, host):
@@ -2114,29 +2090,7 @@ scrape_configs:
 
     def add_prometheus(self, spec):
         spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load()
-        self.log.debug('nodes %s' % spec.placement.hosts)
-
-        def _add(daemons):
-            args = []
-            num_added = 0
-            for host, _, name in spec.placement.hosts:
-                if num_added >= spec.count:
-                    break
-                daemon_id = self.get_unique_name(host, daemons, None, name)
-                self.log.debug('placing prometheus.%s on host %s' % (daemon_id,
-                                                                     host))
-                args.append((daemon_id, host))
-
-                # add to daemon list so next name(s) will also be unique
-                sd = orchestrator.ServiceDescription()
-                sd.service_instance = daemon_id
-                sd.service_type = 'prometheus'
-                sd.nodename = host
-                daemons.append(sd)
-                num_added += 1
-            return self._create_prometheus(args)
-
-        return self._get_daemons('prometheus').then(_add)
+        return self._add_new_daemon('prometheus', spec, self._create_prometheus)
 
     @async_map_completion
     def _create_prometheus(self, daemon_id, host):
@@ -2177,11 +2131,7 @@ scrape_configs:
             target_name = image
         else:
             raise OrchestratorError('must specify either image or version')
-        return self._get_daemons().then(
-            lambda daemons: self._upgrade_check(target_name, daemons))
 
-    def _upgrade_check(self, target_name, daemons):
-        # get service state
         target_id, target_version = self._get_container_image_id(target_name)
         self.log.debug('Target image %s id %s version %s' % (
             target_name, target_id, target_version))
@@ -2192,18 +2142,18 @@ scrape_configs:
             'needs_update': dict(),
             'up_to_date': list(),
         }
-        for s in daemons:
-            if target_id == s.container_image_id:
-                r['up_to_date'].append(s.name())
-            else:
-                r['needs_update'][s.name()] = {
-                    'current_name': s.container_image_name,
-                    'current_id': s.container_image_id,
-                    'current_version': s.version,
-                }
+        for host, di in self.daemon_cache.items():
+            for name, dd in di['daemons'].items():
+                if target_id == dd.container_image_id:
+                    r['up_to_date'].append(dd.name())
+                else:
+                    r['needs_update'][dd.name()] = {
+                        'current_name': dd.container_image_name,
+                        'current_id': dd.container_image_id,
+                        'current_version': dd.version,
+                    }
         return json.dumps(r, indent=4, sort_keys=True)
 
-
     def upgrade_status(self):
         r = orchestrator.UpgradeStatusSpec()
         if self.upgrade_state: