From: Adam King Date: Wed, 5 Apr 2023 00:45:23 +0000 (-0400) Subject: mgr/cephadm: prefer same hosts as related service daemons when picking arbitrary... X-Git-Tag: v17.2.7~362^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=4c1d47ee06ded19b00611a06bcacac5d622018a7;p=ceph.git mgr/cephadm: prefer same hosts as related service daemons when picking arbitrary hosts For now, just for linking ingress services and their backend services. The idea is if one, or both, of the ingress service and backend service is using a count, to try and get them to deploy their daemons on the same host(s). If the placements have explicit placements (not using count) we still stick to those placements regardless. This should enable something like specifying a host for the backend service and leaving the ingress placement as just "count: 1" and having the ingress service get on the same host as the backend service daemon. This is particularly useful for the keepalive-only (VIP but no haproxy) over NFS setup where the keepalive must share a host with the NFS to function, but will also be useful for other VIP only setups we may do in the future. Signed-off-by: Adam King (cherry picked from commit 088d2c4205c599a7d4f2ce4de8e2af8e129adac8) --- diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index fcbad42b7d929..6ac0bfea93a10 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -12,7 +12,7 @@ from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Se import orchestrator from ceph.deployment import inventory -from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types from cephadm.services.cephadmservice import CephadmDaemonDeploySpec @@ -546,6 +546,7 @@ class HostCache(): # type: (CephadmOrchestrator) -> None self.mgr: CephadmOrchestrator = mgr self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] + self._tmp_daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] self.last_daemon_update = {} # type: Dict[str, datetime.datetime] self.devices = {} # type: Dict[str, List[inventory.Device]] self.facts = {} # type: Dict[str, Dict[str, Any]] @@ -659,8 +660,18 @@ class HostCache(): def update_host_daemons(self, host, dm): # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None self.daemons[host] = dm + self._tmp_daemons.pop(host, {}) self.last_daemon_update[host] = datetime_now() + def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None: + # for storing empty daemon descriptions representing daemons we have + # just deployed but not yet had the chance to pick up in a daemon refresh + # _tmp_daemons is cleared for a host upon receiving a real update of the + # host's dameons + if host not in self._tmp_daemons: + self._tmp_daemons[host] = {} + self._tmp_daemons[host][dd.name()] = dd + def update_host_facts(self, host, facts): # type: (str, Dict[str, Dict[str, Any]]) -> None self.facts[host] = facts @@ -1007,6 +1018,10 @@ class HostCache(): for dm in self.daemons.copy().values(): yield from dm.values() + def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]: + for dm in self._tmp_daemons.copy().values(): + yield from dm.values() + def get_daemons(self): # type: () -> List[orchestrator.DaemonDescription] return list(self._get_daemons()) @@ -1060,6 +1075,21 @@ class HostCache(): return list(dd for dd in self._get_daemons() if dd.service_name() == service_name) + def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]: + if service_spec.service_type == 'ingress': + dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service) + dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service) + logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}') + return dds + else: + for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']: + if ingress_spec.backend_service == service_spec.service_name(): + dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name()) + dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name()) + logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}') + return dds + return None + def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]: assert service_type not in ['keepalived', 'haproxy'] diff --git a/src/pybind/mgr/cephadm/schedule.py b/src/pybind/mgr/cephadm/schedule.py index 09c8c4e43cdcd..004c474f4c2b2 100644 --- a/src/pybind/mgr/cephadm/schedule.py +++ b/src/pybind/mgr/cephadm/schedule.py @@ -146,6 +146,7 @@ class HostAssignment(object): unreachable_hosts: List[orchestrator.HostSpec], draining_hosts: List[orchestrator.HostSpec], daemons: List[orchestrator.DaemonDescription], + related_service_daemons: Optional[List[DaemonDescription]] = None, networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {}, filter_new_host: Optional[Callable[[str], bool]] = None, allow_colo: bool = False, @@ -162,6 +163,7 @@ class HostAssignment(object): self.filter_new_host = filter_new_host self.service_name = spec.service_name() self.daemons = daemons + self.related_service_daemons = related_service_daemons self.networks = networks self.allow_colo = allow_colo self.per_host_daemon_type = per_host_daemon_type @@ -256,6 +258,11 @@ class HostAssignment(object): self.validate() + if self.related_service_daemons: + logger.info(f'Service {self.service_name} has related daemons already placed: {self.related_service_daemons}') + else: + logger.info(f'Service {self.service_name} has no related daemon already placed.') + count = self.spec.placement.count # get candidate hosts based on [hosts, label, host_pattern] @@ -344,6 +351,27 @@ class HostAssignment(object): del existing_slots[count:] return self.place_per_host_daemons(existing_slots, [], to_remove) + if self.related_service_daemons: + # prefer to put daemons on the same host(s) as daemons of the related service + # Note that we are only doing this over picking arbitrary hosts to satisfy + # the count. We are not breaking any deterministic placements in order to + # match the placement with a related service. + related_service_hosts = list(set(dd.hostname for dd in self.related_service_daemons)) + matching_dps = [dp for dp in others if dp.hostname in related_service_hosts] + for dp in matching_dps: + if need <= 0: + break + if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]: + logger.info(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there') + to_add.append(dp) + need -= 1 # this is last use of need so it can work as a counter + # at this point, we've either met our placement quota entirely using hosts with related + # service daemons, or we still need to place more. If we do need to place more, + # we should make sure not to re-use hosts with related service daemons by filtering + # them out from the "others" list + if need > 0: + others = [dp for dp in others if dp.hostname not in related_service_hosts] + for dp in others: if need <= 0: break diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index f8b471771463b..9b1c06cc901d0 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -499,8 +499,13 @@ class CephadmServe: self.mgr.agent_helpers._apply_agent() return r else: + _specs: List[ServiceSpec] = [] for sn, spec in self.mgr.spec_store.active_specs.items(): - specs.append(spec) + _specs.append(spec) + # apply specs that don't use count first sice their placement is deterministic + # and not dependant on other daemon's placements in any way + specs = [s for s in _specs if not s.placement.count] + [s for s in _specs if s.placement.count] + for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']: self.mgr.remove_health_warning(name) self.mgr.apply_spec_fails = [] @@ -629,6 +634,7 @@ class CephadmServe: svc = self.mgr.cephadm_services[service_type] daemons = self.mgr.cache.get_daemons_by_service(service_name) + related_service_daemons = self.mgr.cache.get_related_service_daemons(spec) public_networks: List[str] = [] if service_type == 'mon': @@ -665,6 +671,7 @@ class CephadmServe: unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), draining_hosts=self.mgr.cache.get_draining_hosts(), daemons=daemons, + related_service_daemons=related_service_daemons, networks=self.mgr.cache.networks, filter_new_host=( matches_network if service_type == 'mon' @@ -835,8 +842,10 @@ class CephadmServe: hostname=slot.hostname, daemon_type=slot.daemon_type, daemon_id=daemon_id, + service_name=spec.service_name() ) daemons.append(sd) + self.mgr.cache.append_tmp_daemon(slot.hostname, sd) if daemon_place_fails: self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(