]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: prefer same hosts as related service daemons when picking arbitrary...
authorAdam King <adking@redhat.com>
Wed, 5 Apr 2023 00:45:23 +0000 (20:45 -0400)
committerAdam King <adking@redhat.com>
Mon, 1 May 2023 21:51:45 +0000 (17:51 -0400)
For now, just for linking ingress services and
their backend services. The idea is if one, or both,
of the ingress service and backend service is using a
count, to try and get them to deploy their daemons
on the same host(s). If the placements have explicit
placements (not using count) we still stick to
those placements regardless.

This should enable something like specifying a host
for the backend service and leaving the ingress
placement as just "count: 1" and having the ingress
service get on the same host as the backend service
daemon. This is particularly useful for the keepalive-only
(VIP but no haproxy) over NFS setup where the keepalive
must share a host with the NFS to function, but will
also be useful for other VIP only setups we may do
in the future.

Signed-off-by: Adam King <adking@redhat.com>
src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py

index cb225b2a5538ac2de681d233d092c806248eea6f..929bc2c343f18534290d5b071f9ee4e5095e8372 100644 (file)
@@ -12,7 +12,7 @@ from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Se
 
 import orchestrator
 from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec
 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
@@ -580,6 +580,7 @@ class HostCache():
         # type: (CephadmOrchestrator) -> None
         self.mgr: CephadmOrchestrator = mgr
         self.daemons = {}   # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
+        self._tmp_daemons = {}  # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
         self.last_daemon_update = {}   # type: Dict[str, datetime.datetime]
         self.devices = {}              # type: Dict[str, List[inventory.Device]]
         self.facts = {}                # type: Dict[str, Dict[str, Any]]
@@ -693,8 +694,18 @@ class HostCache():
     def update_host_daemons(self, host, dm):
         # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
         self.daemons[host] = dm
+        self._tmp_daemons.pop(host, {})
         self.last_daemon_update[host] = datetime_now()
 
+    def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None:
+        # for storing empty daemon descriptions representing daemons we have
+        # just deployed but not yet had the chance to pick up in a daemon refresh
+        # _tmp_daemons is cleared for a host upon receiving a real update of the
+        # host's dameons
+        if host not in self._tmp_daemons:
+            self._tmp_daemons[host] = {}
+        self._tmp_daemons[host][dd.name()] = dd
+
     def update_host_facts(self, host, facts):
         # type: (str, Dict[str, Dict[str, Any]]) -> None
         self.facts[host] = facts
@@ -1041,6 +1052,10 @@ class HostCache():
         for dm in self.daemons.copy().values():
             yield from dm.values()
 
+    def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+        for dm in self._tmp_daemons.copy().values():
+            yield from dm.values()
+
     def get_daemons(self):
         # type: () -> List[orchestrator.DaemonDescription]
         return list(self._get_daemons())
@@ -1094,6 +1109,21 @@ class HostCache():
 
         return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
 
+    def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]:
+        if service_spec.service_type == 'ingress':
+            dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+            dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+            logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+            return dds
+        else:
+            for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']:
+                if ingress_spec.backend_service == service_spec.service_name():
+                    dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name())
+                    dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name())
+                    logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+                    return dds
+        return None
+
     def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
         assert service_type not in ['keepalived', 'haproxy']
 
index 09c8c4e43cdcd6e4241eea0a710f51e1bed11ef6..004c474f4c2b287916666eba8a02789def93ca24 100644 (file)
@@ -146,6 +146,7 @@ class HostAssignment(object):
                  unreachable_hosts: List[orchestrator.HostSpec],
                  draining_hosts: List[orchestrator.HostSpec],
                  daemons: List[orchestrator.DaemonDescription],
+                 related_service_daemons: Optional[List[DaemonDescription]] = None,
                  networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
                  filter_new_host: Optional[Callable[[str], bool]] = None,
                  allow_colo: bool = False,
@@ -162,6 +163,7 @@ class HostAssignment(object):
         self.filter_new_host = filter_new_host
         self.service_name = spec.service_name()
         self.daemons = daemons
+        self.related_service_daemons = related_service_daemons
         self.networks = networks
         self.allow_colo = allow_colo
         self.per_host_daemon_type = per_host_daemon_type
@@ -256,6 +258,11 @@ class HostAssignment(object):
 
         self.validate()
 
+        if self.related_service_daemons:
+            logger.info(f'Service {self.service_name} has related daemons already placed: {self.related_service_daemons}')
+        else:
+            logger.info(f'Service {self.service_name} has no related daemon already placed.')
+
         count = self.spec.placement.count
 
         # get candidate hosts based on [hosts, label, host_pattern]
@@ -344,6 +351,27 @@ class HostAssignment(object):
                 del existing_slots[count:]
                 return self.place_per_host_daemons(existing_slots, [], to_remove)
 
+            if self.related_service_daemons:
+                # prefer to put daemons on the same host(s) as daemons of the related service
+                # Note that we are only doing this over picking arbitrary hosts to satisfy
+                # the count. We are not breaking any deterministic placements in order to
+                # match the placement with a related service.
+                related_service_hosts = list(set(dd.hostname for dd in self.related_service_daemons))
+                matching_dps = [dp for dp in others if dp.hostname in related_service_hosts]
+                for dp in matching_dps:
+                    if need <= 0:
+                        break
+                    if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
+                        logger.info(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
+                        to_add.append(dp)
+                        need -= 1  # this is last use of need so it can work as a counter
+                # at this point, we've either met our placement quota entirely using hosts with related
+                # service daemons, or we still need to place more. If we do need to place more,
+                # we should make sure not to re-use hosts with related service daemons by filtering
+                # them out from the "others" list
+                if need > 0:
+                    others = [dp for dp in others if dp.hostname not in related_service_hosts]
+
             for dp in others:
                 if need <= 0:
                     break
index 54633e41d9419d231adce432c2d7916f93d826b8..86224c8777482d2a00e315c8cfebcb7e7bb9d6eb 100644 (file)
@@ -543,8 +543,13 @@ class CephadmServe:
                 self.mgr.agent_helpers._apply_agent()
                 return r
         else:
+            _specs: List[ServiceSpec] = []
             for sn, spec in self.mgr.spec_store.active_specs.items():
-                specs.append(spec)
+                _specs.append(spec)
+            # apply specs that don't use count first sice their placement is deterministic
+            # and not dependant on other daemon's placements in any way
+            specs = [s for s in _specs if not s.placement.count] + [s for s in _specs if s.placement.count]
+
         for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
             self.mgr.remove_health_warning(name)
         self.mgr.apply_spec_fails = []
@@ -673,6 +678,7 @@ class CephadmServe:
 
         svc = self.mgr.cephadm_services[service_type]
         daemons = self.mgr.cache.get_daemons_by_service(service_name)
+        related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
 
         public_networks: List[str] = []
         if service_type == 'mon':
@@ -709,6 +715,7 @@ class CephadmServe:
             unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
             draining_hosts=self.mgr.cache.get_draining_hosts(),
             daemons=daemons,
+            related_service_daemons=related_service_daemons,
             networks=self.mgr.cache.networks,
             filter_new_host=(
                 matches_network if service_type == 'mon'
@@ -875,8 +882,10 @@ class CephadmServe:
                     hostname=slot.hostname,
                     daemon_type=slot.daemon_type,
                     daemon_id=daemon_id,
+                    service_name=spec.service_name()
                 )
                 daemons.append(sd)
+                self.mgr.cache.append_tmp_daemon(slot.hostname, sd)
 
             if daemon_place_fails:
                 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(