]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: refactor _update_service and all apply methods
authorSage Weil <sage@redhat.com>
Fri, 21 Feb 2020 22:43:26 +0000 (16:43 -0600)
committerSage Weil <sage@redhat.com>
Mon, 24 Feb 2020 16:46:10 +0000 (10:46 -0600)
- Use a common _apply_service() helper
- Consolidate _add_new_daemon logic into _apply_service
- Do the NodeAssignment all in one place

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py

index fa8da4b333a792d9b3bd4286eca3f9a48bbe687a..d205c7b66036172543f0ddf1bdcdc64e704fa5ef 100644 (file)
@@ -222,6 +222,15 @@ class HostCache():
                     result.append(d)
         return result
 
+    def get_daemons_by_service(self, service_name):
+        # type: (str) -> List[orchestrator.DaemonDescription]
+        result = []   # type: List[orchestrator.DaemonDescription]
+        for host, dm in self.daemons.items():
+            for name, d in dm.items():
+                if name.startswith(service_name + '.'):
+                    result.append(d)
+        return result
+
     def get_daemon_names(self):
         # type: () -> List[str]
         r = []
@@ -1926,19 +1935,50 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         self.cache.invalidate_host_daemons(host)
         return "Removed {} from host '{}'".format(name, host)
 
-    def _update_service(self, daemon_type, add_func, spec):
-        daemons = self.cache.get_daemons_by_type(daemon_type)
-        if len(daemons) > spec.count:
+    def _apply_service(self, daemon_type, spec, create_func, config_func=None):
+        service_name = daemon_type
+        if spec.name:
+            service_name += '.' + spec.name
+        daemons = self.cache.get_daemons_by_service(service_name)
+        spec = HostAssignment(
+            spec=spec,
+            get_hosts_func=self._get_hosts,
+            service_type=daemon_type).load()
+        if len(daemons) > spec.placement.count:
             # remove some
-            to_remove = len(daemons) - spec.count
+            to_remove = len(daemons) - spec.placement.count
             args = []
             for d in daemons[0:to_remove]:
                 args.append(
                     ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname)
                 )
             return self._remove_daemon(args)
-        elif len(daemons) < spec.count:
-            return add_func(spec)
+        elif len(daemons) < spec.placement.count:
+            # add some
+            want = spec.placement.count - len(daemons)
+            hosts_with_daemons = {d.hostname for d in daemons}
+            hosts_without_daemons = {p for p in spec.placement.hosts if p.hostname not in hosts_with_daemons}
+            if want > len(hosts_without_daemons):
+                raise OrchestratorError('too few hosts: want %d, have %s' % (
+                    want, hosts_without_daemons))
+            args = []
+            for host, _, name in hosts_without_daemons:
+                daemon_id = self.get_unique_name(daemon_type, host, daemons,
+                                                 spec.name, name)
+                self.log.debug('Placing %s.%s on host %s' % (
+                    daemon_type, daemon_id, host))
+                args.append((daemon_id, host))
+
+                # add to daemon list so next name(s) will also be unique
+                sd = orchestrator.DaemonDescription(
+                    hostname=host,
+                    daemon_type=daemon_type,
+                    daemon_id=daemon_id,
+                )
+                daemons.append(sd)
+            if config_func:
+                config_func(spec)
+            return create_func(args)
         return trivial_result([])
 
     def _add_new_daemon(self,
@@ -2186,6 +2226,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         if not spec.placement.hosts or spec.placement.count is None or len(spec.placement.hosts) < spec.placement.count:
             raise RuntimeError("must specify at least %s hosts" % spec.placement.count)
+        self._config_mds(spec)
+        return self._add_new_daemon('mds', spec, self._create_mds)
+
+    def apply_mds(self, spec):
+        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+        return self._apply_service('mds', spec, self._create_mds,
+                                   self._config_mds)
+
+    def _config_mds(self, spec):
         # ensure mds_join_fs is set for these daemons
         assert spec.name
         ret, out, err = self.mon_command({
@@ -2195,14 +2244,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             'value': spec.name,
         })
 
-        return self._add_new_daemon('mds', spec, self._create_mds)
-
-    def apply_mds(self, spec):
-        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
-        spec =HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mds').load()
-
-        return self._update_service('mds', self.add_mds, spec)
-
     @async_map_completion
     def _create_mds(self, mds_id, host):
         # get mgr. key
@@ -2218,6 +2259,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
     def add_rgw(self, spec):
         if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
             raise RuntimeError("must specify at least %d hosts" % spec.count)
+        self._config_rgw(spec)
+        return self._add_new_daemon('rgw', spec, self._create_rgw)
+
+    def _config_rgw(self, spec):
         # ensure rgw_realm and rgw_zone is set for these daemons
         ret, out, err = self.mon_command({
             'prefix': 'config set',
@@ -2232,8 +2277,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             'value': spec.rgw_realm,
         })
 
-        return self._add_new_daemon('rgw', spec, self._create_rgw)
-
     @async_map_completion
     def _create_rgw(self, rgw_id, host):
         ret, keyring, err = self.mon_command({
@@ -2246,8 +2289,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
         return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
 
     def apply_rgw(self, spec):
-        spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rgw').load()
-        return self._update_service('rgw', self.add_rgw, spec)
+        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+        return self._apply_service('rgw', spec, self._create_rgw,
+                                   self._config_rgw)
 
     def add_rbd_mirror(self, spec):
         if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
@@ -2268,8 +2312,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                                    keyring=keyring)
 
     def apply_rbd_mirror(self, spec):
-        spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rbd-mirror').load()
-        return self._update_service('rbd-mirror', self.add_rbd_mirror, spec)
+        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+        return self._apply_service('rbd-mirror', spec, self._create_rbd_mirror)
 
     def _generate_prometheus_config(self):
         # scrape mgrs
@@ -2340,8 +2384,8 @@ scrape_configs:
         return self._create_daemon('prometheus', daemon_id, host)
 
     def apply_prometheus(self, spec):
-        spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load()
-        return self._update_service('prometheus', self.add_prometheus, spec)
+        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+        return self._apply_service('prometheus', spec, self._create_prometheus)
 
     def add_node_exporter(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
@@ -2351,8 +2395,9 @@ scrape_configs:
         return self._add_new_daemon('node-exporter', spec, self._create_node_exporter)
 
     def apply_node_exporter(self, spec):
-        spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='node-exporter').load()
-        return self._update_service('node-exporter', self.add_node_exporter, spec)
+        # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+        return self._apply_service('node-exporter', spec,
+                                   self._create_node_exporter)
 
     @async_map_completion
     def _create_node_exporter(self, daemon_id, host):