For now, just for linking ingress services and
their backend services. The idea is if one, or both,
of the ingress service and backend service is using a
count, to try and get them to deploy their daemons
on the same host(s). If the placements have explicit
placements (not using count) we still stick to
those placements regardless.
This should enable something like specifying a host
for the backend service and leaving the ingress
placement as just "count: 1" and having the ingress
service get on the same host as the backend service
daemon. This is particularly useful for the keepalive-only
(VIP but no haproxy) over NFS setup where the keepalive
must share a host with the NFS to function, but will
also be useful for other VIP only setups we may do
in the future.
Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit
088d2c4205c599a7d4f2ce4de8e2af8e129adac8)
import orchestrator
from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
# type: (CephadmOrchestrator) -> None
self.mgr: CephadmOrchestrator = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
+ self._tmp_daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
self.devices = {} # type: Dict[str, List[inventory.Device]]
self.facts = {} # type: Dict[str, Dict[str, Any]]
def update_host_daemons(self, host, dm):
# type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
self.daemons[host] = dm
+ self._tmp_daemons.pop(host, {})
self.last_daemon_update[host] = datetime_now()
+ def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None:
+ # for storing empty daemon descriptions representing daemons we have
+ # just deployed but not yet had the chance to pick up in a daemon refresh
+ # _tmp_daemons is cleared for a host upon receiving a real update of the
+ # host's dameons
+ if host not in self._tmp_daemons:
+ self._tmp_daemons[host] = {}
+ self._tmp_daemons[host][dd.name()] = dd
+
def update_host_facts(self, host, facts):
# type: (str, Dict[str, Dict[str, Any]]) -> None
self.facts[host] = facts
for dm in self.daemons.copy().values():
yield from dm.values()
+ def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+ for dm in self._tmp_daemons.copy().values():
+ yield from dm.values()
+
def get_daemons(self):
# type: () -> List[orchestrator.DaemonDescription]
return list(self._get_daemons())
return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
+ def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]:
+ if service_spec.service_type == 'ingress':
+ dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+ dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+ logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+ return dds
+ else:
+ for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']:
+ if ingress_spec.backend_service == service_spec.service_name():
+ dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name())
+ dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name())
+ logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+ return dds
+ return None
+
def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
assert service_type not in ['keepalived', 'haproxy']
unreachable_hosts: List[orchestrator.HostSpec],
draining_hosts: List[orchestrator.HostSpec],
daemons: List[orchestrator.DaemonDescription],
+ related_service_daemons: Optional[List[DaemonDescription]] = None,
networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
filter_new_host: Optional[Callable[[str], bool]] = None,
allow_colo: bool = False,
self.filter_new_host = filter_new_host
self.service_name = spec.service_name()
self.daemons = daemons
+ self.related_service_daemons = related_service_daemons
self.networks = networks
self.allow_colo = allow_colo
self.per_host_daemon_type = per_host_daemon_type
self.validate()
+ if self.related_service_daemons:
+ logger.info(f'Service {self.service_name} has related daemons already placed: {self.related_service_daemons}')
+ else:
+ logger.info(f'Service {self.service_name} has no related daemon already placed.')
+
count = self.spec.placement.count
# get candidate hosts based on [hosts, label, host_pattern]
del existing_slots[count:]
return self.place_per_host_daemons(existing_slots, [], to_remove)
+ if self.related_service_daemons:
+ # prefer to put daemons on the same host(s) as daemons of the related service
+ # Note that we are only doing this over picking arbitrary hosts to satisfy
+ # the count. We are not breaking any deterministic placements in order to
+ # match the placement with a related service.
+ related_service_hosts = list(set(dd.hostname for dd in self.related_service_daemons))
+ matching_dps = [dp for dp in others if dp.hostname in related_service_hosts]
+ for dp in matching_dps:
+ if need <= 0:
+ break
+ if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
+ logger.info(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
+ to_add.append(dp)
+ need -= 1 # this is last use of need so it can work as a counter
+ # at this point, we've either met our placement quota entirely using hosts with related
+ # service daemons, or we still need to place more. If we do need to place more,
+ # we should make sure not to re-use hosts with related service daemons by filtering
+ # them out from the "others" list
+ if need > 0:
+ others = [dp for dp in others if dp.hostname not in related_service_hosts]
+
for dp in others:
if need <= 0:
break
self.mgr.agent_helpers._apply_agent()
return r
else:
+ _specs: List[ServiceSpec] = []
for sn, spec in self.mgr.spec_store.active_specs.items():
- specs.append(spec)
+ _specs.append(spec)
+ # apply specs that don't use count first sice their placement is deterministic
+ # and not dependant on other daemon's placements in any way
+ specs = [s for s in _specs if not s.placement.count] + [s for s in _specs if s.placement.count]
+
for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
self.mgr.remove_health_warning(name)
self.mgr.apply_spec_fails = []
svc = self.mgr.cephadm_services[service_type]
daemons = self.mgr.cache.get_daemons_by_service(service_name)
+ related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
public_networks: List[str] = []
if service_type == 'mon':
unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
draining_hosts=self.mgr.cache.get_draining_hosts(),
daemons=daemons,
+ related_service_daemons=related_service_daemons,
networks=self.mgr.cache.networks,
filter_new_host=(
matches_network if service_type == 'mon'
hostname=slot.hostname,
daemon_type=slot.daemon_type,
daemon_id=daemon_id,
+ service_name=spec.service_name()
)
daemons.append(sd)
+ self.mgr.cache.append_tmp_daemon(slot.hostname, sd)
if daemon_place_fails:
self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(