From 0abfa90a617a6cde35b25963e43fef30961c3d0f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 21 Feb 2020 16:43:26 -0600 Subject: [PATCH] mgr/cephadm: refactor _update_service and all apply methods - Use a common _apply_service() helper - Consolidate _add_new_daemon logic into _apply_service - Do the NodeAssignment all in one place Signed-off-by: Sage Weil --- src/pybind/mgr/cephadm/module.py | 93 +++++++++++++++++++++++--------- 1 file changed, 69 insertions(+), 24 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index fa8da4b333a..d205c7b6603 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -222,6 +222,15 @@ class HostCache(): result.append(d) return result + def get_daemons_by_service(self, service_name): + # type: (str) -> List[orchestrator.DaemonDescription] + result = [] # type: List[orchestrator.DaemonDescription] + for host, dm in self.daemons.items(): + for name, d in dm.items(): + if name.startswith(service_name + '.'): + result.append(d) + return result + def get_daemon_names(self): # type: () -> List[str] r = [] @@ -1926,19 +1935,50 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.cache.invalidate_host_daemons(host) return "Removed {} from host '{}'".format(name, host) - def _update_service(self, daemon_type, add_func, spec): - daemons = self.cache.get_daemons_by_type(daemon_type) - if len(daemons) > spec.count: + def _apply_service(self, daemon_type, spec, create_func, config_func=None): + service_name = daemon_type + if spec.name: + service_name += '.' + spec.name + daemons = self.cache.get_daemons_by_service(service_name) + spec = HostAssignment( + spec=spec, + get_hosts_func=self._get_hosts, + service_type=daemon_type).load() + if len(daemons) > spec.placement.count: # remove some - to_remove = len(daemons) - spec.count + to_remove = len(daemons) - spec.placement.count args = [] for d in daemons[0:to_remove]: args.append( ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname) ) return self._remove_daemon(args) - elif len(daemons) < spec.count: - return add_func(spec) + elif len(daemons) < spec.placement.count: + # add some + want = spec.placement.count - len(daemons) + hosts_with_daemons = {d.hostname for d in daemons} + hosts_without_daemons = {p for p in spec.placement.hosts if p.hostname not in hosts_with_daemons} + if want > len(hosts_without_daemons): + raise OrchestratorError('too few hosts: want %d, have %s' % ( + want, hosts_without_daemons)) + args = [] + for host, _, name in hosts_without_daemons: + daemon_id = self.get_unique_name(daemon_type, host, daemons, + spec.name, name) + self.log.debug('Placing %s.%s on host %s' % ( + daemon_type, daemon_id, host)) + args.append((daemon_id, host)) + + # add to daemon list so next name(s) will also be unique + sd = orchestrator.DaemonDescription( + hostname=host, + daemon_type=daemon_type, + daemon_id=daemon_id, + ) + daemons.append(sd) + if config_func: + config_func(spec) + return create_func(args) return trivial_result([]) def _add_new_daemon(self, @@ -2186,6 +2226,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): # type: (orchestrator.ServiceSpec) -> AsyncCompletion if not spec.placement.hosts or spec.placement.count is None or len(spec.placement.hosts) < spec.placement.count: raise RuntimeError("must specify at least %s hosts" % spec.placement.count) + self._config_mds(spec) + return self._add_new_daemon('mds', spec, self._create_mds) + + def apply_mds(self, spec): + # type: (orchestrator.ServiceSpec) -> AsyncCompletion + return self._apply_service('mds', spec, self._create_mds, + self._config_mds) + + def _config_mds(self, spec): # ensure mds_join_fs is set for these daemons assert spec.name ret, out, err = self.mon_command({ @@ -2195,14 +2244,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'value': spec.name, }) - return self._add_new_daemon('mds', spec, self._create_mds) - - def apply_mds(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - spec =HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mds').load() - - return self._update_service('mds', self.add_mds, spec) - @async_map_completion def _create_mds(self, mds_id, host): # get mgr. key @@ -2218,6 +2259,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def add_rgw(self, spec): if not spec.placement.hosts or len(spec.placement.hosts) < spec.count: raise RuntimeError("must specify at least %d hosts" % spec.count) + self._config_rgw(spec) + return self._add_new_daemon('rgw', spec, self._create_rgw) + + def _config_rgw(self, spec): # ensure rgw_realm and rgw_zone is set for these daemons ret, out, err = self.mon_command({ 'prefix': 'config set', @@ -2232,8 +2277,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'value': spec.rgw_realm, }) - return self._add_new_daemon('rgw', spec, self._create_rgw) - @async_map_completion def _create_rgw(self, rgw_id, host): ret, keyring, err = self.mon_command({ @@ -2246,8 +2289,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return self._create_daemon('rgw', rgw_id, host, keyring=keyring) def apply_rgw(self, spec): - spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rgw').load() - return self._update_service('rgw', self.add_rgw, spec) + # type: (orchestrator.ServiceSpec) -> AsyncCompletion + return self._apply_service('rgw', spec, self._create_rgw, + self._config_rgw) def add_rbd_mirror(self, spec): if not spec.placement.hosts or len(spec.placement.hosts) < spec.count: @@ -2268,8 +2312,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): keyring=keyring) def apply_rbd_mirror(self, spec): - spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rbd-mirror').load() - return self._update_service('rbd-mirror', self.add_rbd_mirror, spec) + # type: (orchestrator.ServiceSpec) -> AsyncCompletion + return self._apply_service('rbd-mirror', spec, self._create_rbd_mirror) def _generate_prometheus_config(self): # scrape mgrs @@ -2340,8 +2384,8 @@ scrape_configs: return self._create_daemon('prometheus', daemon_id, host) def apply_prometheus(self, spec): - spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load() - return self._update_service('prometheus', self.add_prometheus, spec) + # type: (orchestrator.ServiceSpec) -> AsyncCompletion + return self._apply_service('prometheus', spec, self._create_prometheus) def add_node_exporter(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion @@ -2351,8 +2395,9 @@ scrape_configs: return self._add_new_daemon('node-exporter', spec, self._create_node_exporter) def apply_node_exporter(self, spec): - spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='node-exporter').load() - return self._update_service('node-exporter', self.add_node_exporter, spec) + # type: (orchestrator.ServiceSpec) -> AsyncCompletion + return self._apply_service('node-exporter', spec, + self._create_node_exporter) @async_map_completion def _create_node_exporter(self, daemon_id, host): -- 2.39.5