]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: prefer same hosts as related service daemons when picking arbitrary...
authorAdam King <adking@redhat.com>
Wed, 5 Apr 2023 00:45:23 +0000 (20:45 -0400)
committerAdam King <adking@redhat.com>
Sat, 20 May 2023 16:53:30 +0000 (12:53 -0400)
For now, just for linking ingress services and
their backend services. The idea is if one, or both,
of the ingress service and backend service is using a
count, to try and get them to deploy their daemons
on the same host(s). If the placements have explicit
placements (not using count) we still stick to
those placements regardless.

This should enable something like specifying a host
for the backend service and leaving the ingress
placement as just "count: 1" and having the ingress
service get on the same host as the backend service
daemon. This is particularly useful for the keepalive-only
(VIP but no haproxy) over NFS setup where the keepalive
must share a host with the NFS to function, but will
also be useful for other VIP only setups we may do
in the future.

Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit 088d2c4205c599a7d4f2ce4de8e2af8e129adac8)

src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py

index fcbad42b7d929d90400dece4d9d5e8d5e39be3ee..6ac0bfea93a1014c09383f6fb95f99bdcc29ecfe 100644 (file)
@@ -12,7 +12,7 @@ from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Se
 
 import orchestrator
 from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec
 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
@@ -546,6 +546,7 @@ class HostCache():
         # type: (CephadmOrchestrator) -> None
         self.mgr: CephadmOrchestrator = mgr
         self.daemons = {}   # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
+        self._tmp_daemons = {}  # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
         self.last_daemon_update = {}   # type: Dict[str, datetime.datetime]
         self.devices = {}              # type: Dict[str, List[inventory.Device]]
         self.facts = {}                # type: Dict[str, Dict[str, Any]]
@@ -659,8 +660,18 @@ class HostCache():
     def update_host_daemons(self, host, dm):
         # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
         self.daemons[host] = dm
+        self._tmp_daemons.pop(host, {})
         self.last_daemon_update[host] = datetime_now()
 
+    def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None:
+        # for storing empty daemon descriptions representing daemons we have
+        # just deployed but not yet had the chance to pick up in a daemon refresh
+        # _tmp_daemons is cleared for a host upon receiving a real update of the
+        # host's dameons
+        if host not in self._tmp_daemons:
+            self._tmp_daemons[host] = {}
+        self._tmp_daemons[host][dd.name()] = dd
+
     def update_host_facts(self, host, facts):
         # type: (str, Dict[str, Dict[str, Any]]) -> None
         self.facts[host] = facts
@@ -1007,6 +1018,10 @@ class HostCache():
         for dm in self.daemons.copy().values():
             yield from dm.values()
 
+    def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+        for dm in self._tmp_daemons.copy().values():
+            yield from dm.values()
+
     def get_daemons(self):
         # type: () -> List[orchestrator.DaemonDescription]
         return list(self._get_daemons())
@@ -1060,6 +1075,21 @@ class HostCache():
 
         return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
 
+    def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]:
+        if service_spec.service_type == 'ingress':
+            dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+            dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+            logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+            return dds
+        else:
+            for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']:
+                if ingress_spec.backend_service == service_spec.service_name():
+                    dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name())
+                    dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name())
+                    logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+                    return dds
+        return None
+
     def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
         assert service_type not in ['keepalived', 'haproxy']
 
index 09c8c4e43cdcd6e4241eea0a710f51e1bed11ef6..004c474f4c2b287916666eba8a02789def93ca24 100644 (file)
@@ -146,6 +146,7 @@ class HostAssignment(object):
                  unreachable_hosts: List[orchestrator.HostSpec],
                  draining_hosts: List[orchestrator.HostSpec],
                  daemons: List[orchestrator.DaemonDescription],
+                 related_service_daemons: Optional[List[DaemonDescription]] = None,
                  networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
                  filter_new_host: Optional[Callable[[str], bool]] = None,
                  allow_colo: bool = False,
@@ -162,6 +163,7 @@ class HostAssignment(object):
         self.filter_new_host = filter_new_host
         self.service_name = spec.service_name()
         self.daemons = daemons
+        self.related_service_daemons = related_service_daemons
         self.networks = networks
         self.allow_colo = allow_colo
         self.per_host_daemon_type = per_host_daemon_type
@@ -256,6 +258,11 @@ class HostAssignment(object):
 
         self.validate()
 
+        if self.related_service_daemons:
+            logger.info(f'Service {self.service_name} has related daemons already placed: {self.related_service_daemons}')
+        else:
+            logger.info(f'Service {self.service_name} has no related daemon already placed.')
+
         count = self.spec.placement.count
 
         # get candidate hosts based on [hosts, label, host_pattern]
@@ -344,6 +351,27 @@ class HostAssignment(object):
                 del existing_slots[count:]
                 return self.place_per_host_daemons(existing_slots, [], to_remove)
 
+            if self.related_service_daemons:
+                # prefer to put daemons on the same host(s) as daemons of the related service
+                # Note that we are only doing this over picking arbitrary hosts to satisfy
+                # the count. We are not breaking any deterministic placements in order to
+                # match the placement with a related service.
+                related_service_hosts = list(set(dd.hostname for dd in self.related_service_daemons))
+                matching_dps = [dp for dp in others if dp.hostname in related_service_hosts]
+                for dp in matching_dps:
+                    if need <= 0:
+                        break
+                    if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
+                        logger.info(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
+                        to_add.append(dp)
+                        need -= 1  # this is last use of need so it can work as a counter
+                # at this point, we've either met our placement quota entirely using hosts with related
+                # service daemons, or we still need to place more. If we do need to place more,
+                # we should make sure not to re-use hosts with related service daemons by filtering
+                # them out from the "others" list
+                if need > 0:
+                    others = [dp for dp in others if dp.hostname not in related_service_hosts]
+
             for dp in others:
                 if need <= 0:
                     break
index f8b471771463ba0962b79ef2accc7fff8beb8878..9b1c06cc901d05fcebda4cafc873ff0706867b84 100644 (file)
@@ -499,8 +499,13 @@ class CephadmServe:
                 self.mgr.agent_helpers._apply_agent()
                 return r
         else:
+            _specs: List[ServiceSpec] = []
             for sn, spec in self.mgr.spec_store.active_specs.items():
-                specs.append(spec)
+                _specs.append(spec)
+            # apply specs that don't use count first sice their placement is deterministic
+            # and not dependant on other daemon's placements in any way
+            specs = [s for s in _specs if not s.placement.count] + [s for s in _specs if s.placement.count]
+
         for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
             self.mgr.remove_health_warning(name)
         self.mgr.apply_spec_fails = []
@@ -629,6 +634,7 @@ class CephadmServe:
 
         svc = self.mgr.cephadm_services[service_type]
         daemons = self.mgr.cache.get_daemons_by_service(service_name)
+        related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
 
         public_networks: List[str] = []
         if service_type == 'mon':
@@ -665,6 +671,7 @@ class CephadmServe:
             unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
             draining_hosts=self.mgr.cache.get_draining_hosts(),
             daemons=daemons,
+            related_service_daemons=related_service_daemons,
             networks=self.mgr.cache.networks,
             filter_new_host=(
                 matches_network if service_type == 'mon'
@@ -835,8 +842,10 @@ class CephadmServe:
                     hostname=slot.hostname,
                     daemon_type=slot.daemon_type,
                     daemon_id=daemon_id,
+                    service_name=spec.service_name()
                 )
                 daemons.append(sd)
+                self.mgr.cache.append_tmp_daemon(slot.hostname, sd)
 
             if daemon_place_fails:
                 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(