From a9917454183670715e32220394e69b9f1a02d516 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 14 Feb 2020 11:08:18 -0600 Subject: [PATCH] mgr/cephadm: replace remaining _get_daemons() with daemon cache New function _get_daemons_by_type() returns immediately with a result from the cache. Remove the with_daemons decorator. Push daemon list fetch into _add_new_daemon. Signed-off-by: Sage Weil --- src/pybind/mgr/cephadm/module.py | 210 ++++++++++++------------------- 1 file changed, 80 insertions(+), 130 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index df600a2c6e8a6..c11a9c4ffc0df 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -262,29 +262,6 @@ def trivial_result(val): return AsyncCompletion(value=val, name='trivial_result') -def with_daemons(daemon_type=None, - daemon_id=None, - service_name=None, - host=None, - refresh=False): - def decorator(func): - @wraps(func) - def wrapper(self, *args, **kwargs): - def on_complete(daemons): - if kwargs: - kwargs['daemons'] = daemons - return func(self, *args, **kwargs) - else: - args_ = args + (daemons,) - return func(self, *args_, **kwargs) - return self._get_daemons(daemon_type=daemon_type, - daemon_id=daemon_id, - service_name=service_name, - host=host, - refresh=refresh).then(on_complete) - return wrapper - return decorator - @six.add_metaclass(CLICommandMeta) class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): @@ -481,8 +458,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.health_checks[alert_id] = alert self.set_health_checks(self.health_checks) - def _do_upgrade(self, daemons): - # type: (List[orchestrator.DaemonDescription]) -> Optional[AsyncCompletion] + def _do_upgrade(self): + # type: () -> Optional[AsyncCompletion] if not self.upgrade_state: self.log.debug('_do_upgrade no state, exiting') return None @@ -520,6 +497,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if opt['name'] == 'container_image': image_settings[opt['section']] = opt['value'] + daemons = [] + for host, di in self.daemon_cache.items(): + for name, dd in di['daemons'].items(): + daemons.append(dd) + for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']: self.log.info('Upgrade: Checking %s daemons...' % daemon_type) need_upgrade_self = False @@ -780,7 +762,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self._check_for_strays() if self.upgrade_state and not self.upgrade_state.get('paused'): - completion = self._do_upgrade(daemons) + completion = self._do_upgrade() if completion: while not completion.has_result: self.process([completion]) @@ -1267,6 +1249,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self._save_inventory() return 'Removed label %s from host %s' % (label, host) + def _get_daemons_by_type(self, daemon_type): + # type: (str) -> List[orchestrator.DaemonDescription] + result = [] # type: List[orchestrator.DaemonDescription] + for host, di in self.daemon_cache.items(): + for name, d in di['daemons'].items(): + if name.startswith(daemon_type + '.'): + result.append(d) + return result + def _refresh_host_daemons(self, host): out, err, code = self._run_cephadm( host, 'mon', 'ls', [], no_fsid=True) @@ -1728,26 +1719,25 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return "Removed {} from host '{}'".format(name, host) def _update_service(self, daemon_type, add_func, spec): - def ___update_service(daemons): - if len(daemons) > spec.count: - # remove some - to_remove = len(daemons) - spec.count - args = [] - for d in daemons[0:to_remove]: - args.append( - ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename) - ) - return self._remove_daemon(args) - elif len(daemons) < spec.count: - return add_func(spec) - return [] - return self._get_daemons(daemon_type, service_name=spec.name).then(___update_service) + daemons = self._get_daemons_by_type(daemon_type) + if len(daemons) > spec.count: + # remove some + to_remove = len(daemons) - spec.count + args = [] + for d in daemons[0:to_remove]: + args.append( + ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename) + ) + return self._remove_daemon(args) + elif len(daemons) < spec.count: + return add_func(spec) + return trivial_result([]) def _add_new_daemon(self, daemon_type: str, - daemons: List[orchestrator.DaemonDescription], spec: orchestrator.ServiceSpec, create_func: Callable): + daemons = self._get_daemons_by_type(daemon_type) args = [] num_added = 0 assert spec.count is not None @@ -1818,22 +1808,20 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if not network: raise RuntimeError("Host '{}' is missing a network spec".format(host)) - def add_mons(daemons): - for _, _, name in spec.placement.hosts: - if name and len([d for d in daemons if d.daemon_id == name]): - raise RuntimeError('name %s already exists', name) - - # explicit placement: enough hosts provided? - if len(spec.placement.hosts) < spec.count: - raise RuntimeError("Error: {} hosts provided, expected {}".format( - len(spec.placement.hosts), spec.count)) - self.log.info("creating {} monitors on hosts: '{}'".format( - spec.count, ",".join(map(lambda h: ":".join(h), spec.placement.hosts)))) - # TODO: we may want to chain the creation of the monitors so they join - # the quorum one at a time. - return self._create_mon(spec.placement.hosts) - - return self._get_daemons('mon').then(add_mons) + daemons = self._get_daemons_by_type('mon') + for _, _, name in spec.placement.hosts: + if name and len([d for d in daemons if d.daemon_id == name]): + raise RuntimeError('name %s already exists', name) + + # explicit placement: enough hosts provided? + if len(spec.placement.hosts) < spec.count: + raise RuntimeError("Error: {} hosts provided, expected {}".format( + len(spec.placement.hosts), spec.count)) + self.log.info("creating {} monitors on hosts: '{}'".format( + spec.count, ",".join(map(lambda h: ":".join(h), spec.placement.hosts)))) + # TODO: we may want to chain the creation of the monitors so they join + # the quorum one at a time. + return self._create_mon(spec.placement.hosts) def apply_mon(self, spec): # type: (orchestrator.ServiceSpec) -> orchestrator.Completion @@ -1869,22 +1857,21 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if not network: raise RuntimeError("Host '{}' is missing a network spec".format(host)) - def update_mons_with_daemons(daemons): - for _, _, name in spec.placement.hosts: - if name and len([d for d in daemons if d.daemon_id == name]): - raise RuntimeError('name %s alrady exists', name) - - # explicit placement: enough hosts provided? - num_new_mons = spec.count - num_mons - if len(spec.placement.hosts) < num_new_mons: - raise RuntimeError("Error: {} hosts provided, expected {}".format( - len(spec.placement.hosts), num_new_mons)) - self.log.info("creating {} monitors on hosts: '{}'".format( - num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts)))) - # TODO: we may want to chain the creation of the monitors so they join - # the quorum one at a time. - return self._create_mon(spec.placement.hosts) - return self._get_daemons('mon').then(update_mons_with_daemons) + daemons = self._get_daemons_by_type('mon') + for _, _, name in spec.placement.hosts: + if name and len([d for d in daemons if d.daemon_id == name]): + raise RuntimeError('name %s alrady exists', name) + + # explicit placement: enough hosts provided? + num_new_mons = spec.count - num_mons + if len(spec.placement.hosts) < num_new_mons: + raise RuntimeError("Error: {} hosts provided, expected {}".format( + len(spec.placement.hosts), num_new_mons)) + self.log.info("creating {} monitors on hosts: '{}'".format( + num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts)))) + # TODO: we may want to chain the creation of the monitors so they join + # the quorum one at a time. + return self._create_mon(spec.placement.hosts) @async_map_completion def _create_mgr(self, mgr_id, host): @@ -1904,20 +1891,19 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return self._create_daemon('mgr', mgr_id, host, keyring=keyring) - @with_daemons('mgr') - def add_mgr(self, spec, daemons): - # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion + def add_mgr(self, spec): + # type: (orchestrator.ServiceSpec) -> orchestrator.Completion spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load() - return self._add_new_daemon('mgr', daemons, spec, self._create_mgr) + return self._add_new_daemon('mgr', spec, self._create_mgr) - @with_daemons('mgr') - def apply_mgr(self, spec, daemons): - # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion + def apply_mgr(self, spec): + # type: (orchestrator.ServiceSpec) -> orchestrator.Completion """ Adjust the number of cluster managers. """ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load() + daemons = self._get_daemons_by_type('mgr') num_mgrs = len(daemons) if spec.count == num_mgrs: return orchestrator.Completion(value="The requested number of managers exist.") @@ -2001,11 +1987,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'value': spec.name, }) - def _add_mds(daemons): - # type: (List[orchestrator.DaemonDescription]) -> AsyncCompletion - return self._add_new_daemon('mds', daemons, spec, self._create_mds) - - return self._get_daemons('mds').then(_add_mds) + return self._add_new_daemon('mds', spec, self._create_mds) def apply_mds(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion @@ -2042,10 +2024,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'value': spec.rgw_realm, }) - def _add_rgw(daemons): - return self._add_new_daemon('rgw', daemons, spec, self._create_rgw) - - return self._get_daemons('rgw').then(_add_rgw) + return self._add_new_daemon('rgw', spec, self._create_rgw) @async_map_completion def _create_rgw(self, rgw_id, host): @@ -2067,10 +2046,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): raise RuntimeError("must specify at least %d hosts" % spec.count) self.log.debug('nodes %s' % spec.placement.hosts) - def _add_rbd_mirror(daemons): - return self._add_new_daemon('rbd-mirror', daemons, spec, self._create_rbd_mirror) - - return self._get_daemons('rbd-mirror').then(_add_rbd_mirror) + return self._add_new_daemon('rbd-mirror', spec, self._create_rbd_mirror) @async_map_completion def _create_rbd_mirror(self, daemon_id, host): @@ -2114,29 +2090,7 @@ scrape_configs: def add_prometheus(self, spec): spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load() - self.log.debug('nodes %s' % spec.placement.hosts) - - def _add(daemons): - args = [] - num_added = 0 - for host, _, name in spec.placement.hosts: - if num_added >= spec.count: - break - daemon_id = self.get_unique_name(host, daemons, None, name) - self.log.debug('placing prometheus.%s on host %s' % (daemon_id, - host)) - args.append((daemon_id, host)) - - # add to daemon list so next name(s) will also be unique - sd = orchestrator.ServiceDescription() - sd.service_instance = daemon_id - sd.service_type = 'prometheus' - sd.nodename = host - daemons.append(sd) - num_added += 1 - return self._create_prometheus(args) - - return self._get_daemons('prometheus').then(_add) + return self._add_new_daemon('prometheus', spec, self._create_prometheus) @async_map_completion def _create_prometheus(self, daemon_id, host): @@ -2177,11 +2131,7 @@ scrape_configs: target_name = image else: raise OrchestratorError('must specify either image or version') - return self._get_daemons().then( - lambda daemons: self._upgrade_check(target_name, daemons)) - def _upgrade_check(self, target_name, daemons): - # get service state target_id, target_version = self._get_container_image_id(target_name) self.log.debug('Target image %s id %s version %s' % ( target_name, target_id, target_version)) @@ -2192,18 +2142,18 @@ scrape_configs: 'needs_update': dict(), 'up_to_date': list(), } - for s in daemons: - if target_id == s.container_image_id: - r['up_to_date'].append(s.name()) - else: - r['needs_update'][s.name()] = { - 'current_name': s.container_image_name, - 'current_id': s.container_image_id, - 'current_version': s.version, - } + for host, di in self.daemon_cache.items(): + for name, dd in di['daemons'].items(): + if target_id == dd.container_image_id: + r['up_to_date'].append(dd.name()) + else: + r['needs_update'][dd.name()] = { + 'current_name': dd.container_image_name, + 'current_id': dd.container_image_id, + 'current_version': dd.version, + } return json.dumps(r, indent=4, sort_keys=True) - def upgrade_status(self): r = orchestrator.UpgradeStatusSpec() if self.upgrade_state: -- 2.39.5