From: Sage Weil Date: Sun, 1 Mar 2020 03:09:57 +0000 (-0600) Subject: mgr/cephadm: simplify spec apply X-Git-Tag: v15.1.1~191^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=05035638f8829259653568aadddeb4efd6efa900;p=ceph.git mgr/cephadm: simplify spec apply - Teach _apply_service how to pick the create (and config) functions, so that we don't need any weird wrappers in the callers. - Replace trigger_deploy() and _apply_services() with a simpler _apply_all_services() - Drop all of the per-type _apply_foo() methods. Signed-off-by: Sage Weil --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 6f6c36ef84f5..1f7f3cdc0503 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -1026,7 +1026,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self._remove_osds_bg() - service_completions = self._apply_services() + service_completions = self._apply_all_services() for service_completion in service_completions: if service_completion: while not service_completion.has_result: @@ -2059,12 +2059,33 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.cache.invalidate_host_daemons(host) return "Removed {} from host '{}'".format(name, host) - def _apply_service(self, spec, create_func, config_func=None): + def _apply_service(self, spec): """ Schedule a service. Deploy new daemons or remove old ones, depending on the target label and count specified in the placement. """ daemon_type = spec.service_type + create_fns = { + 'mon': self._create_mon, + 'mgr': self._create_mgr, + 'mds': self._create_mds, + 'rgw': self._create_rgw, + 'rbd-mirror': self._create_rbd_mirror, + 'grafana': self._create_grafana, + 'alertmanager': self._create_alertmanager, + 'prometheus': self._create_prometheus, + 'node-exporter': self._create_node_exporter, + } + config_fns = { + 'mds': self._config_mds, + 'rgw': self._config_rgw, + } + create_func = create_fns.get(daemon_type, None) + if not create_func: + self.log.debug('unrecognized service type %s' % daemon_type) + return trivial_result([]) + config_func = config_fns.get(daemon_type, None) + service_name = spec.service_name() self.log.debug('Applying service %s spec' % service_name) daemons = self.cache.get_daemons_by_service(service_name) @@ -2092,6 +2113,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): create_func, config_func) return trivial_result([]) + def _apply_all_services(self): + r : List[orchestrator.Completion] = [] + for sn, spec in self.spec_store.specs.items(): + try: + r.extend(self._apply_service(spec)) + except Exception as e: + self.log.warning('Failed to apply %s spec %s: %s' % ( + spec.service_name(), spec, e)) + return r + def _add_daemon(self, daemon_type, spec, create_func, config_func=None): """ @@ -2199,10 +2230,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_mgr(self, spec): return self._apply(spec) - def _apply_mgr(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - return self._apply_service(spec, self._create_mgr) - def add_mds(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._add_daemon('mds', spec, self._create_mds, self._config_mds) @@ -2210,11 +2237,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion: return self._apply(spec) - def _apply_mds(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - return self._apply_service(spec, self._create_mds, - self._config_mds) - def _config_mds(self, spec): # ensure mds_join_fs is set for these daemons assert spec.name @@ -2269,11 +2291,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_rgw(self, spec): return self._apply(spec) - def _apply_rgw(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - return self._apply_service(spec, self._create_rgw, - self._config_rgw) - def add_rbd_mirror(self, spec): return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror) @@ -2291,10 +2308,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): def apply_rbd_mirror(self, spec): return self._apply(spec) - def _apply_rbd_mirror(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - return self._apply_service(spec, self._create_rbd_mirror) - def _generate_prometheus_config(self): # scrape mgrs mgr_scrape_list = [] @@ -2491,10 +2504,6 @@ receivers: def _create_prometheus(self, daemon_id, host): return self._create_daemon('prometheus', daemon_id, host) - def _apply_prometheus(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - return self._apply_service(spec, self._create_prometheus) - def apply_prometheus(self, spec): return self._apply(spec) @@ -2506,11 +2515,6 @@ receivers: def apply_node_exporter(self, spec): return self._apply(spec) - def _apply_node_exporter(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - return self._apply_service(spec, - self._create_node_exporter) - @async_map_completion def _create_node_exporter(self, daemon_id, host): return self._create_daemon('node-exporter', daemon_id, host) @@ -2523,9 +2527,6 @@ receivers: # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply(spec) - def _apply_grafana(self, spec): - return self._apply_service(spec, self._create_grafana) - @async_map_completion def _create_grafana(self, daemon_id, host): return self._create_daemon('grafana', daemon_id, host) @@ -2538,10 +2539,6 @@ receivers: # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply(spec) - def _apply_alertmanager(self, spec): - # type: (orchestrator.ServiceSpec) -> AsyncCompletion - return self._apply_service(spec, self._create_alertmanager) - @async_map_completion def _create_alertmanager(self, daemon_id, host): return self._create_daemon('alertmanager', daemon_id, host) @@ -2804,64 +2801,6 @@ receivers: self._kick_serve_loop() return trivial_result("ServiceSpecs saved") - def trigger_deployment(self, - service_name: str, - func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]: - """ - Triggers a corresponding deployment method `func` to `service_name` - Services can have multiple entries. (i.e. different RGW configurations) - """ - self.log.debug(f"starting async {service_name} deployment") - specs = self.spec_store.find(service_name) - completions = list() - for spec in specs: - try: - completions.append(func(spec)) - except Exception as e: - self.log.warning('Failed to apply %s spec %s: %s' % ( - service_name, spec, e)) - if completions: - return completions - return [trivial_result("Nothing to do..")] - - def _apply_services(self) -> List[orchestrator.Completion]: - """ - This is a method that is supposed to run continuously in the - server() thread. - It will initiate deployments based on the presence of a ServiceSpec - in the persistent mon_store. - There is a defined order in which the services should be deployed - Defined order: - # mon -> mgr -> osd -> monitoring -> mds -> rgw -> nfs -> iscsi -> rbd-mirror - - Special cases: - * Mons scaling is currently not implemented. - * OSDs are daemons that are handled differently and may not fit in this paradigm - - The serve() thread processes the completions serially, which ensures the adherence to - the defined order. - """ - - super_completions: List[orchestrator.Completion] = list() - super_completions.extend(self.trigger_deployment('mgr', self._apply_mgr)) - super_completions.extend(self.trigger_deployment('prometheus', self._apply_prometheus)) - super_completions.extend(self.trigger_deployment('node-exporter', self._apply_node_exporter)) - super_completions.extend(self.trigger_deployment('mds', self._apply_mds)) - super_completions.extend(self.trigger_deployment('rgw', self._apply_rgw)) - super_completions.extend(self.trigger_deployment('rbd-mirror', self._apply_rbd_mirror)) - super_completions.extend(self.trigger_deployment('grafana', self._apply_grafana)) - super_completions.extend(self.trigger_deployment('alertmanager', self._apply_alertmanager)) - - # Not implemented - - # super_completions.extend(trigger_deployment('mon', self._apply_mon)) - # super_completions.extend(trigger_deployment('nfs', self._apply_nfs)) - # super_completions.extend(trigger_deployment('grafana', self._apply_grafana)) - # super_completions.extend(trigger_deployment('iscsi', self._apply_iscsi)) - - # Not implemented - return super_completions - class BaseScheduler(object): """ diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 4893d0851d6e..ec92ee72de8f 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -154,7 +154,7 @@ class TestCephadm(object): def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) - c = cephadm_module._apply_mgr(ServiceSpec(placement=ps, service_type='mgr')) + c = cephadm_module._apply_service(ServiceSpec(placement=ps, service_type='mgr')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed mgr.* on host 'test'") @@ -237,7 +237,7 @@ class TestCephadm(object): match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'") ps = PlacementSpec(hosts=['host1', 'host2'], count=2) - c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) + c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'") @@ -267,7 +267,7 @@ class TestCephadm(object): with pytest.raises(OrchestratorError): ps = PlacementSpec(hosts=['host1', 'host2'], count=3) - c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) + c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) @@ -508,48 +508,3 @@ class TestCephadm(object): _sspec.from_json.assert_called_once() assert wait(cephadm_module, c) == 'ServiceSpecs saved' - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) - @mock.patch("cephadm.module.CephadmOrchestrator.send_command") - @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) - @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.HostCache.save_host") - @mock.patch("cephadm.module.HostCache.rm_host") - @mock.patch("cephadm.module.SpecStore.find") - def test_trigger_deployment_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): - with self._with_host(cephadm_module, 'test'): - _find.return_value = ['something'] - c = cephadm_module.trigger_deployment('foo', lambda x: x) - _find.assert_called_with('foo') - assert c == ['something'] - - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) - @mock.patch("cephadm.module.CephadmOrchestrator.send_command") - @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) - @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.HostCache.save_host") - @mock.patch("cephadm.module.HostCache.rm_host") - @mock.patch("cephadm.module.SpecStore.find") - def test_trigger_deployment_no_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): - with self._with_host(cephadm_module, 'test'): - _find.return_value = [] - c = cephadm_module.trigger_deployment('foo', lambda x: x) - _find.assert_called_with('foo') - assert wait(cephadm_module, c[0]) == 'Nothing to do..' - - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) - @mock.patch("cephadm.module.CephadmOrchestrator.send_command") - @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) - @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.HostCache.save_host") - @mock.patch("cephadm.module.HostCache.rm_host") - @mock.patch("cephadm.module.CephadmOrchestrator.trigger_deployment") - def test_apply_services(self, _trigger_deployment, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): - with self._with_host(cephadm_module, 'test'): - c = cephadm_module._apply_services() - _trigger_deployment.assert_any_call('mgr', cephadm_module._apply_mgr) - _trigger_deployment.assert_any_call('prometheus', cephadm_module._apply_prometheus) - _trigger_deployment.assert_any_call('node-exporter', cephadm_module._apply_node_exporter) - _trigger_deployment.assert_any_call('mds', cephadm_module._apply_mds) - _trigger_deployment.assert_any_call('rgw', cephadm_module._apply_rgw) - _trigger_deployment.assert_any_call('rbd-mirror', cephadm_module._apply_rbd_mirror) - assert isinstance(c, list)