From f5fca6be7242738d9c905eca651273329494729d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 21 Feb 2020 16:56:50 -0600 Subject: [PATCH] mgr/cephadm: refactor most daemon add methods New _add_daemons() that shares what we can with _apply_service(). Unlike _apply_service(), the _add_daemon() path requires the user to explicitly place daemons on hosts--no automatic placement is performed. Signed-off-by: Sage Weil --- src/pybind/mgr/cephadm/module.py | 116 ++++++++----------- src/pybind/mgr/cephadm/tests/test_cephadm.py | 4 +- 2 files changed, 50 insertions(+), 70 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index d205c7b6603..689232be09a 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -1936,6 +1936,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return "Removed {} from host '{}'".format(name, host) def _apply_service(self, daemon_type, spec, create_func, config_func=None): + """ + Schedule a service. Deploy new daemons or remove old ones, depending + on the target label and count specified in the placement. + """ service_name = daemon_type if spec.name: service_name += '.' + spec.name @@ -1955,52 +1959,50 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return self._remove_daemon(args) elif len(daemons) < spec.placement.count: # add some - want = spec.placement.count - len(daemons) + spec.placement.count -= len(daemons) hosts_with_daemons = {d.hostname for d in daemons} - hosts_without_daemons = {p for p in spec.placement.hosts if p.hostname not in hosts_with_daemons} - if want > len(hosts_without_daemons): - raise OrchestratorError('too few hosts: want %d, have %s' % ( - want, hosts_without_daemons)) - args = [] - for host, _, name in hosts_without_daemons: - daemon_id = self.get_unique_name(daemon_type, host, daemons, - spec.name, name) - self.log.debug('Placing %s.%s on host %s' % ( - daemon_type, daemon_id, host)) - args.append((daemon_id, host)) - - # add to daemon list so next name(s) will also be unique - sd = orchestrator.DaemonDescription( - hostname=host, - daemon_type=daemon_type, - daemon_id=daemon_id, - ) - daemons.append(sd) - if config_func: - config_func(spec) - return create_func(args) + hosts_without_daemons = {p for p in spec.placement.hosts + if p.hostname not in hosts_with_daemons} + spec.placement.hosts = hosts_without_daemons + return self._create_daemons(daemon_type, spec, daemons, + create_func, config_func) return trivial_result([]) - def _add_new_daemon(self, - daemon_type: str, - spec: orchestrator.ServiceSpec, - create_func: Callable): - daemons = self.cache.get_daemons_by_type(daemon_type) + def _add_daemon(self, daemon_type, spec, + create_func, config_func=None): + """ + Add (and place) a daemon. Require explicit host placement. Do not + schedule, and do not apply the related scheduling limitations. + """ + self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) + if not spec.placement.hosts: + raise OrchestratorError('must specify host(s) to deploy on') + if not spec.placement.count: + spec.placement.count = len(spec.placement.hosts) + service_name = daemon_type + if spec.name: + service_name += '.' + spec.name + daemons = self.cache.get_daemons_by_service(service_name) + return self._create_daemons(daemon_type, spec, daemons, + create_func, config_func) + + def _create_daemons(self, daemon_type, spec, daemons, + create_func, config_func=None): + if spec.placement.count > len(spec.placement.hosts): + raise OrchestratorError('too few hosts: want %d, have %s' % ( + spec.placement.count, spec.placement.hosts)) + + if config_func: + config_func(spec) + args = [] - num_added = 0 - assert spec.count is not None - prefix = f'{daemon_type}.{spec.name}' - our_daemons = [d for d in daemons if d.name().startswith(prefix)] - hosts_with_daemons = {d.hostname for d in daemons} - hosts_without_daemons = {p for p in spec.placement.hosts if p.hostname not in hosts_with_daemons} - - for host, _, name in hosts_without_daemons: - if (len(our_daemons) + num_added) >= spec.count: - break + for host, _, name in spec.placement.hosts: daemon_id = self.get_unique_name(daemon_type, host, daemons, spec.name, name) - self.log.debug('placing %s.%s on host %s' % (daemon_type, daemon_id, host)) + self.log.debug('Placing %s.%s on host %s' % ( + daemon_type, daemon_id, host)) args.append((daemon_id, host)) + # add to daemon list so next name(s) will also be unique sd = orchestrator.DaemonDescription( hostname=host, @@ -2008,16 +2010,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): daemon_id=daemon_id, ) daemons.append(sd) - num_added += 1 - - if (len(our_daemons) + num_added) < spec.count: - missing = spec.count - len(our_daemons) - num_added - available = [p.hostname for p in hosts_without_daemons] - m = f'Cannot find placement for {missing} daemons. available hosts = {available}' - raise orchestrator.OrchestratorError(m) return create_func(args) - @async_map_completion def _create_mon(self, host, network, name): """ @@ -2140,8 +2134,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def add_mgr(self, spec): # type: (orchestrator.ServiceSpec) -> orchestrator.Completion - spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load() - return self._add_new_daemon('mgr', spec, self._create_mgr) + return self._add_daemon('mgr', spec, self._create_mgr) def apply_mgr(self, spec): # type: (orchestrator.ServiceSpec) -> orchestrator.Completion @@ -2224,10 +2217,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def add_mds(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion - if not spec.placement.hosts or spec.placement.count is None or len(spec.placement.hosts) < spec.placement.count: - raise RuntimeError("must specify at least %s hosts" % spec.placement.count) - self._config_mds(spec) - return self._add_new_daemon('mds', spec, self._create_mds) + return self._add_daemon('mds', spec, self._create_mds, self._config_mds) def apply_mds(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion @@ -2257,10 +2247,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return self._create_daemon('mds', mds_id, host, keyring=keyring) def add_rgw(self, spec): - if not spec.placement.hosts or len(spec.placement.hosts) < spec.count: - raise RuntimeError("must specify at least %d hosts" % spec.count) - self._config_rgw(spec) - return self._add_new_daemon('rgw', spec, self._create_rgw) + return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw) def _config_rgw(self, spec): # ensure rgw_realm and rgw_zone is set for these daemons @@ -2294,11 +2281,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self._config_rgw) def add_rbd_mirror(self, spec): - if not spec.placement.hosts or len(spec.placement.hosts) < spec.count: - raise RuntimeError("must specify at least %d hosts" % spec.count) - self.log.debug('hosts %s' % spec.placement.hosts) - - return self._add_new_daemon('rbd-mirror', spec, self._create_rbd_mirror) + return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror) @async_map_completion def _create_rbd_mirror(self, daemon_id, host): @@ -2376,8 +2359,7 @@ scrape_configs: return j def add_prometheus(self, spec): - spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load() - return self._add_new_daemon('prometheus', spec, self._create_prometheus) + return self._add_daemon('prometheus', spec, self._create_prometheus) @async_map_completion def _create_prometheus(self, daemon_id, host): @@ -2389,10 +2371,8 @@ scrape_configs: def add_node_exporter(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion - # FIXME if no hosts are set (likewise no spec.count?) add node-exporter to all hosts! - if not spec.placement.hosts or len(spec.placement.hosts) < spec.count: - raise RuntimeError("must specify at least %d hosts" % spec.count) - return self._add_new_daemon('node-exporter', spec, self._create_node_exporter) + return self._add_daemon('node-exporter', spec, + self._create_node_exporter) def apply_node_exporter(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 42e66f51890..967a9606f6c 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -266,8 +266,8 @@ class TestCephadm(object): assert len(r) == 2 with pytest.raises(OrchestratorError): - ps = PlacementSpec(hosts=['host1', 'host2'], count=2) - c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps)) + ps = PlacementSpec(hosts=['host1', 'host2'], count=3) + c = cephadm_module.apply_rgw(RGWSpec('realm', 'zone1', placement=ps)) [out] = wait(cephadm_module, c) -- 2.39.5