]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: simplify spec apply
authorSage Weil <sage@redhat.com>
Sun, 1 Mar 2020 03:09:57 +0000 (21:09 -0600)
committerSage Weil <sage@redhat.com>
Sun, 1 Mar 2020 14:10:54 +0000 (08:10 -0600)
- Teach _apply_service how to pick the create (and config) functions, so
  that we don't need any weird wrappers in the callers.
- Replace trigger_deploy() and _apply_services() with a simpler
  _apply_all_services()
- Drop all of the per-type _apply_foo() methods.

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index 6f6c36ef84f5ab0ba655c9c32451963786f39243..1f7f3cdc05039a8fa92cb4bf411587dba9869a05 100644 (file)
@@ -1026,7 +1026,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
             self._remove_osds_bg()
 
-            service_completions = self._apply_services()
+            service_completions = self._apply_all_services()
             for service_completion in service_completions:
                 if service_completion:
                     while not service_completion.has_result:
@@ -2059,12 +2059,33 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.cache.invalidate_host_daemons(host)
         return "Removed {} from host '{}'".format(name, host)
 
-    def _apply_service(self, spec, create_func, config_func=None):
+    def _apply_service(self, spec):
         """
         Schedule a service.  Deploy new daemons or remove old ones, depending
         on the target label and count specified in the placement.
         """
         daemon_type = spec.service_type
+        create_fns = {
+            'mon': self._create_mon,
+            'mgr': self._create_mgr,
+            'mds': self._create_mds,
+            'rgw': self._create_rgw,
+            'rbd-mirror': self._create_rbd_mirror,
+            'grafana': self._create_grafana,
+            'alertmanager': self._create_alertmanager,
+            'prometheus': self._create_prometheus,
+            'node-exporter': self._create_node_exporter,
+        }
+        config_fns = {
+            'mds': self._config_mds,
+            'rgw': self._config_rgw,
+        }
+        create_func = create_fns.get(daemon_type, None)
+        if not create_func:
+            self.log.debug('unrecognized service type %s' % daemon_type)
+            return trivial_result([])
+        config_func = config_fns.get(daemon_type, None)
+
         service_name = spec.service_name()
         self.log.debug('Applying service %s spec' % service_name)
         daemons = self.cache.get_daemons_by_service(service_name)
@@ -2092,6 +2113,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                                         create_func, config_func)
         return trivial_result([])
 
+    def _apply_all_services(self):
+        r : List[orchestrator.Completion] = []
+        for sn, spec in self.spec_store.specs.items():
+            try:
+                r.extend(self._apply_service(spec))
+            except Exception as e:
+                self.log.warning('Failed to apply %s spec %s: %s' % (
+                    spec.service_name(), spec, e))
+        return r
+
     def _add_daemon(self, daemon_type, spec,
                     create_func, config_func=None):
         """
@@ -2199,10 +2230,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_mgr(self, spec):
         return self._apply(spec)
 
-    def _apply_mgr(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        return self._apply_service(spec, self._create_mgr)
-
     def add_mds(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
@@ -2210,11 +2237,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
         return self._apply(spec)
 
-    def _apply_mds(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        return self._apply_service(spec, self._create_mds,
-                                   self._config_mds)
-
     def _config_mds(self, spec):
         # ensure mds_join_fs is set for these daemons
         assert spec.name
@@ -2269,11 +2291,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_rgw(self, spec):
         return self._apply(spec)
 
-    def _apply_rgw(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        return self._apply_service(spec, self._create_rgw,
-                                   self._config_rgw)
-
     def add_rbd_mirror(self, spec):
         return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
 
@@ -2291,10 +2308,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_rbd_mirror(self, spec):
         return self._apply(spec)
 
-    def _apply_rbd_mirror(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        return self._apply_service(spec, self._create_rbd_mirror)
-
     def _generate_prometheus_config(self):
         # scrape mgrs
         mgr_scrape_list = []
@@ -2491,10 +2504,6 @@ receivers:
     def _create_prometheus(self, daemon_id, host):
         return self._create_daemon('prometheus', daemon_id, host)
 
-    def _apply_prometheus(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        return self._apply_service(spec, self._create_prometheus)
-
     def apply_prometheus(self, spec):
         return self._apply(spec)
 
@@ -2506,11 +2515,6 @@ receivers:
     def apply_node_exporter(self, spec):
         return self._apply(spec)
 
-    def _apply_node_exporter(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        return self._apply_service(spec,
-                                   self._create_node_exporter)
-
     @async_map_completion
     def _create_node_exporter(self, daemon_id, host):
         return self._create_daemon('node-exporter', daemon_id, host)
@@ -2523,9 +2527,6 @@ receivers:
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply(spec)
 
-    def _apply_grafana(self, spec):
-        return self._apply_service(spec, self._create_grafana)
-
     @async_map_completion
     def _create_grafana(self, daemon_id, host):
         return self._create_daemon('grafana', daemon_id, host)
@@ -2538,10 +2539,6 @@ receivers:
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply(spec)
 
-    def _apply_alertmanager(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        return self._apply_service(spec, self._create_alertmanager)
-
     @async_map_completion
     def _create_alertmanager(self, daemon_id, host):
         return self._create_daemon('alertmanager', daemon_id, host)
@@ -2804,64 +2801,6 @@ receivers:
         self._kick_serve_loop()
         return trivial_result("ServiceSpecs saved")
 
-    def trigger_deployment(self,
-                           service_name: str,
-                           func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]:
-        """
-        Triggers a corresponding deployment method `func` to `service_name`
-        Services can have multiple entries. (i.e. different RGW configurations)
-        """
-        self.log.debug(f"starting async {service_name} deployment")
-        specs = self.spec_store.find(service_name)
-        completions = list()
-        for spec in specs:
-            try:
-                completions.append(func(spec))
-            except Exception as e:
-                self.log.warning('Failed to apply %s spec %s: %s' % (
-                    service_name, spec, e))
-        if completions:
-            return completions
-        return [trivial_result("Nothing to do..")]
-
-    def _apply_services(self) -> List[orchestrator.Completion]:
-        """
-        This is a method that is supposed to run continuously in the
-        server() thread.
-        It will initiate deployments based on the presence of a ServiceSpec
-        in the persistent mon_store.
-        There is a defined order in which the services should be deployed
-        Defined order:
-        # mon -> mgr -> osd -> monitoring -> mds -> rgw -> nfs -> iscsi -> rbd-mirror
-
-        Special cases:
-        * Mons scaling is currently not implemented.
-        * OSDs are daemons that are handled differently and may not fit in this paradigm
-
-        The serve() thread processes the completions serially, which ensures the adherence to
-        the defined order.
-        """
-
-        super_completions: List[orchestrator.Completion] = list()
-        super_completions.extend(self.trigger_deployment('mgr', self._apply_mgr))
-        super_completions.extend(self.trigger_deployment('prometheus', self._apply_prometheus))
-        super_completions.extend(self.trigger_deployment('node-exporter', self._apply_node_exporter))
-        super_completions.extend(self.trigger_deployment('mds', self._apply_mds))
-        super_completions.extend(self.trigger_deployment('rgw', self._apply_rgw))
-        super_completions.extend(self.trigger_deployment('rbd-mirror', self._apply_rbd_mirror))
-        super_completions.extend(self.trigger_deployment('grafana', self._apply_grafana))
-        super_completions.extend(self.trigger_deployment('alertmanager', self._apply_alertmanager))
-
-        # Not implemented
-
-        # super_completions.extend(trigger_deployment('mon', self._apply_mon))
-        # super_completions.extend(trigger_deployment('nfs', self._apply_nfs))
-        # super_completions.extend(trigger_deployment('grafana', self._apply_grafana))
-        # super_completions.extend(trigger_deployment('iscsi', self._apply_iscsi))
-
-        # Not implemented
-        return super_completions
-
 
 class BaseScheduler(object):
     """
index 4893d0851d6eca5a4b1ecb89123e3476c74df169..ec92ee72de8f6d0bbb5315fbb3c54017f6de6552 100644 (file)
@@ -154,7 +154,7 @@ class TestCephadm(object):
     def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
-            c = cephadm_module._apply_mgr(ServiceSpec(placement=ps, service_type='mgr'))
+            c = cephadm_module._apply_service(ServiceSpec(placement=ps, service_type='mgr'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed mgr.* on host 'test'")
 
@@ -237,7 +237,7 @@ class TestCephadm(object):
                 match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
 
                 ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
-                c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
+                c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
                 [out] = wait(cephadm_module, c)
                 match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'")
 
@@ -267,7 +267,7 @@ class TestCephadm(object):
 
                 with pytest.raises(OrchestratorError):
                     ps = PlacementSpec(hosts=['host1', 'host2'], count=3)
-                    c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
+                    c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
                     [out] = wait(cephadm_module, c)
 
 
@@ -508,48 +508,3 @@ class TestCephadm(object):
             _sspec.from_json.assert_called_once()
             assert wait(cephadm_module, c) == 'ServiceSpecs saved'
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
-    @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
-    @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.HostCache.save_host")
-    @mock.patch("cephadm.module.HostCache.rm_host")
-    @mock.patch("cephadm.module.SpecStore.find")
-    def test_trigger_deployment_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            _find.return_value = ['something']
-            c = cephadm_module.trigger_deployment('foo', lambda x: x)
-            _find.assert_called_with('foo')
-            assert c == ['something']
-
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
-    @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
-    @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.HostCache.save_host")
-    @mock.patch("cephadm.module.HostCache.rm_host")
-    @mock.patch("cephadm.module.SpecStore.find")
-    def test_trigger_deployment_no_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            _find.return_value = []
-            c = cephadm_module.trigger_deployment('foo', lambda x: x)
-            _find.assert_called_with('foo')
-            assert wait(cephadm_module, c[0]) == 'Nothing to do..'
-
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
-    @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
-    @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.HostCache.save_host")
-    @mock.patch("cephadm.module.HostCache.rm_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator.trigger_deployment")
-    def test_apply_services(self, _trigger_deployment, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            c = cephadm_module._apply_services()
-            _trigger_deployment.assert_any_call('mgr', cephadm_module._apply_mgr)
-            _trigger_deployment.assert_any_call('prometheus', cephadm_module._apply_prometheus)
-            _trigger_deployment.assert_any_call('node-exporter', cephadm_module._apply_node_exporter)
-            _trigger_deployment.assert_any_call('mds', cephadm_module._apply_mds)
-            _trigger_deployment.assert_any_call('rgw', cephadm_module._apply_rgw)
-            _trigger_deployment.assert_any_call('rbd-mirror', cephadm_module._apply_rbd_mirror)
-            assert isinstance(c, list)