]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm/schedule: allow colocation of certain daemon types
authorSage Weil <sage@newdream.net>
Tue, 9 Mar 2021 23:49:51 +0000 (18:49 -0500)
committerSage Weil <sage@newdream.net>
Tue, 16 Mar 2021 12:56:18 +0000 (07:56 -0500)
For certain daemon types, we can deploy more than one per host (mds,
rbd-mirror, rgw).

Signed-off-by: Sage Weil <sage@newdream.net>
(cherry picked from commit 6d0098d1cd6b639880e12a5f471dd8d31e7c0017)

src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/tests/test_scheduling.py
src/pybind/mgr/orchestrator/_interface.py

index ca110de5f861d534a3e226caea406c0e5f4b1de9..0adaf5dd5b6afaa272261eea4ba0f3b6a5ca024e 100644 (file)
@@ -2087,6 +2087,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             spec=spec,
             hosts=self._hosts_with_daemon_inventory(),
             daemons=self.cache.get_daemons_by_service(spec.service_name()),
+            allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
         )
         ha.validate()
         hosts = ha.place()
@@ -2145,6 +2146,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             spec=spec,
             hosts=self.inventory.all_specs(),  # All hosts, even those without daemon refresh
             daemons=self.cache.get_daemons_by_service(spec.service_name()),
+            allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
         ).validate()
 
         self.log.info('Saving service %s spec with placement %s' % (
index 785a1bebaf818d21eee0e4f944650d3e53e840a2..31ce9b9a62659db2786e095ec2a49d6c9d5b727b 100644 (file)
@@ -125,6 +125,8 @@ class HostAssignment(object):
 
         # get candidate hosts based on [hosts, label, host_pattern]
         candidates = self.get_candidates()  # type: List[HostPlacementSpec]
+        if not candidates:
+            return []  # sigh
 
         # If we don't have <count> the list of candidates is definitive.
         if count is None:
@@ -135,55 +137,51 @@ class HostAssignment(object):
                 per_host = 1
             return candidates * per_host
 
-        # prefer hosts that already have services.
-        # this avoids re-assigning to _new_ hosts
-        # and constant re-distribution of hosts when new nodes are
-        # added to the cluster
-        hosts_with_daemons = self.hosts_with_daemons(candidates)
-
-        # The amount of hosts that need to be selected in order to fulfill count.
-        need = count - len(hosts_with_daemons)
-
-        # hostspecs that do not have daemons on them but are still candidates.
-        others = difference_hostspecs(candidates, hosts_with_daemons)
-
-        # we don't need any additional hosts
-        if need < 0:
-            final_candidates = self.prefer_hosts_with_active_daemons(hosts_with_daemons, count)
-        else:
-            # ask the scheduler to return a set of hosts with a up to the value of <count>
-            others = self.scheduler.place(others, need)
-            logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
-                hosts_with_daemons, others))
-            # if a host already has the anticipated daemon, merge it with the candidates
-            # to get a list of HostPlacementSpec that can be deployed on.
-            final_candidates = list(merge_hostspecs(hosts_with_daemons, others))
-
-        return final_candidates
-
-    def get_hosts_with_active_daemon(self, hosts: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
-        active_hosts: List['HostPlacementSpec'] = []
-        for daemon in self.daemons:
-            if daemon.is_active:
-                for h in hosts:
-                    if h.hostname == daemon.hostname:
-                        active_hosts.append(h)
-        # remove duplicates before returning
-        return list(dict.fromkeys(active_hosts))
-
-    def prefer_hosts_with_active_daemons(self, hosts: List[HostPlacementSpec], count: int) -> List[HostPlacementSpec]:
-        # try to prefer host with active daemon if possible
-        active_hosts = self.get_hosts_with_active_daemon(hosts)
-        if len(active_hosts) != 0 and count > 0:
-            for host in active_hosts:
-                hosts.remove(host)
-            if len(active_hosts) >= count:
-                return self.scheduler.place(active_hosts, count)
-            else:
-                return list(merge_hostspecs(self.scheduler.place(active_hosts, count),
-                                            self.scheduler.place(hosts, count - len(active_hosts))))
-        # ask the scheduler to return a set of hosts with a up to the value of <count>
-        return self.scheduler.place(hosts, count)
+        if self.allow_colo:
+            # consider enough slots to fit target count
+            per_host = 1 + ((count - 1) // len(candidates))
+            candidates = candidates * per_host
+
+        # consider active daemons first
+        daemons = [
+            d for d in self.daemons if d.is_active
+        ] + [
+            d for d in self.daemons if not d.is_active
+        ]
+
+        # sort candidates into existing/used slots that already have a
+        # daemon, and others (the rest)
+        existing_active: List[HostPlacementSpec] = []
+        existing_standby: List[HostPlacementSpec] = []
+        existing: List[HostPlacementSpec] = []
+        others = candidates
+        for d in daemons:
+            hs = d.get_host_placement()
+            for i in others:
+                if i == hs:
+                    others.remove(i)
+                    if d.is_active:
+                        existing_active.append(hs)
+                    else:
+                        existing_standby.append(hs)
+                    existing.append(hs)
+                    break
+        logger.debug('Hosts with existing active daemons: {}'.format(existing_active))
+        logger.debug('Hosts with existing standby daemons: {}'.format(existing_standby))
+
+        # The number of new slots that need to be selected in order to fulfill count
+        need = count - len(existing)
+
+        # we don't need any additional placements
+        if need <= 0:
+            del existing[count:]
+            return existing
+
+        # ask the scheduler to select additional slots
+        chosen = self.scheduler.place(others, need)
+        logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
+            existing, chosen))
+        return existing + chosen
 
     def add_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
         hosts_with_daemons = {d.hostname for d in self.daemons}
@@ -239,21 +237,8 @@ class HostAssignment(object):
         # gen seed off of self.spec to make shuffling deterministic
         seed = hash(self.spec.service_name())
         random.Random(seed).shuffle(hosts)
-        return hosts
 
-    def hosts_with_daemons(self, candidates: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
-        """
-        Prefer hosts with daemons. Otherwise we'll constantly schedule daemons
-        on different hosts all the time. This is about keeping daemons where
-        they are. This isn't about co-locating.
-        """
-        hosts_with_daemons = {d.hostname for d in self.daemons}
-
-        # calc existing daemons (that aren't already in chosen)
-        existing = [hs for hs in candidates if hs.hostname in hosts_with_daemons]
-
-        logger.debug('Hosts with existing daemons: {}'.format(existing))
-        return existing
+        return hosts
 
 
 def merge_hostspecs(
index 2b370588bd5fa38320f6de596de9cb0e59015255..990e84cc07668cb3c4b1a39cd3ced7a85362f43e 100644 (file)
@@ -363,14 +363,22 @@ class NodeAssignmentTest(NamedTuple):
             ['host1', 'host2', 'host3', 'host1', 'host2', 'host3']
         ),
         # count that is bigger than the amount of hosts. Truncate to len(hosts)
-        # RGWs should not be co-located to each other.
+        # mgr should not be co-located to each other.
         NodeAssignmentTest(
-            'rgw',
+            'mgr',
             PlacementSpec(count=4),
             'host1 host2 host3'.split(),
             [],
             ['host1', 'host2', 'host3']
         ),
+        # count that is bigger than the amount of hosts; wrap around.
+        NodeAssignmentTest(
+            'mds',
+            PlacementSpec(count=6),
+            'host1 host2 host3'.split(),
+            [],
+            ['host1', 'host2', 'host3', 'host1', 'host2', 'host3']
+        ),
         # count + partial host list
         NodeAssignmentTest(
             'mgr',
@@ -382,6 +390,17 @@ class NodeAssignmentTest(NamedTuple):
             ],
             ['host3']
         ),
+        # count + partial host list (with colo)
+        NodeAssignmentTest(
+            'mds',
+            PlacementSpec(count=3, hosts=['host3']),
+            'host1 host2 host3'.split(),
+            [
+                DaemonDescription('mgr', 'a', 'host1'),
+                DaemonDescription('mgr', 'b', 'host2'),
+            ],
+            ['host3', 'host3', 'host3']
+        ),
         # count 1 + partial host list
         NodeAssignmentTest(
             'mgr',
@@ -431,6 +450,22 @@ class NodeAssignmentTest(NamedTuple):
             [],
             ['host1', 'host2', 'host3']
         ),
+        # label + count (truncate to host list)
+        NodeAssignmentTest(
+            'mgr',
+            PlacementSpec(count=4, label='foo'),
+            'host1 host2 host3'.split(),
+            [],
+            ['host1', 'host2', 'host3']
+        ),
+        # label + count (with colo)
+        NodeAssignmentTest(
+            'mds',
+            PlacementSpec(count=6, label='foo'),
+            'host1 host2 host3'.split(),
+            [],
+            ['host1', 'host2', 'host3', 'host1', 'host2', 'host3']
+        ),
         # label only + count_per_hst
         NodeAssignmentTest(
             'mds',
index f44a5983e95aa520f64a613d5aa2547db3582441..29b8603818ea3c4a70a7d1119569866a911ce9c4 100644 (file)
@@ -31,7 +31,7 @@ import yaml
 
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
-    ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec
+    ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec, HostPlacementSpec
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.hostspec import HostSpec
 from ceph.utils import datetime_to_str, str_to_datetime
@@ -920,6 +920,14 @@ class DaemonDescription(object):
             return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
         return daemon_type_to_service(self.daemon_type)
 
+    def get_host_placement(self) -> HostPlacementSpec:
+        return HostPlacementSpec(
+            hostname=self.hostname or '',
+            # FIXME: include the ip:port here?
+            network='',
+            name='',
+        )
+
     def __repr__(self) -> str:
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)