import orchestrator
from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
# type: (CephadmOrchestrator) -> None
self.mgr: CephadmOrchestrator = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
+ self._tmp_daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
self.devices = {} # type: Dict[str, List[inventory.Device]]
self.facts = {} # type: Dict[str, Dict[str, Any]]
def update_host_daemons(self, host, dm):
# type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
self.daemons[host] = dm
+ self._tmp_daemons.pop(host, {})
self.last_daemon_update[host] = datetime_now()
+ def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None:
+ # for storing empty daemon descriptions representing daemons we have
+ # just deployed but not yet had the chance to pick up in a daemon refresh
+ # _tmp_daemons is cleared for a host upon receiving a real update of the
+ # host's dameons
+ if host not in self._tmp_daemons:
+ self._tmp_daemons[host] = {}
+ self._tmp_daemons[host][dd.name()] = dd
+
def update_host_facts(self, host, facts):
# type: (str, Dict[str, Dict[str, Any]]) -> None
self.facts[host] = facts
for dm in self.daemons.copy().values():
yield from dm.values()
+ def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+ for dm in self._tmp_daemons.copy().values():
+ yield from dm.values()
+
def get_daemons(self):
# type: () -> List[orchestrator.DaemonDescription]
return list(self._get_daemons())
return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
+ def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]:
+ if service_spec.service_type == 'ingress':
+ dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+ dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
+ logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+ return dds
+ else:
+ for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']:
+ if ingress_spec.backend_service == service_spec.service_name():
+ dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name())
+ dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name())
+ logger.info(f'Found related daemons {dds} for service {service_spec.service_name()}')
+ return dds
+ return None
+
def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
assert service_type not in ['keepalived', 'haproxy']
unreachable_hosts: List[orchestrator.HostSpec],
draining_hosts: List[orchestrator.HostSpec],
daemons: List[orchestrator.DaemonDescription],
+ related_service_daemons: Optional[List[DaemonDescription]] = None,
networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
filter_new_host: Optional[Callable[[str], bool]] = None,
allow_colo: bool = False,
self.filter_new_host = filter_new_host
self.service_name = spec.service_name()
self.daemons = daemons
+ self.related_service_daemons = related_service_daemons
self.networks = networks
self.allow_colo = allow_colo
self.per_host_daemon_type = per_host_daemon_type
self.validate()
+ if self.related_service_daemons:
+ logger.info(f'Service {self.service_name} has related daemons already placed: {self.related_service_daemons}')
+ else:
+ logger.info(f'Service {self.service_name} has no related daemon already placed.')
+
count = self.spec.placement.count
# get candidate hosts based on [hosts, label, host_pattern]
del existing_slots[count:]
return self.place_per_host_daemons(existing_slots, [], to_remove)
+ if self.related_service_daemons:
+ # prefer to put daemons on the same host(s) as daemons of the related service
+ # Note that we are only doing this over picking arbitrary hosts to satisfy
+ # the count. We are not breaking any deterministic placements in order to
+ # match the placement with a related service.
+ related_service_hosts = list(set(dd.hostname for dd in self.related_service_daemons))
+ matching_dps = [dp for dp in others if dp.hostname in related_service_hosts]
+ for dp in matching_dps:
+ if need <= 0:
+ break
+ if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
+ logger.info(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
+ to_add.append(dp)
+ need -= 1 # this is last use of need so it can work as a counter
+ # at this point, we've either met our placement quota entirely using hosts with related
+ # service daemons, or we still need to place more. If we do need to place more,
+ # we should make sure not to re-use hosts with related service daemons by filtering
+ # them out from the "others" list
+ if need > 0:
+ others = [dp for dp in others if dp.hostname not in related_service_hosts]
+
for dp in others:
if need <= 0:
break
self.mgr.agent_helpers._apply_agent()
return r
else:
+ _specs: List[ServiceSpec] = []
for sn, spec in self.mgr.spec_store.active_specs.items():
- specs.append(spec)
+ _specs.append(spec)
+ # apply specs that don't use count first sice their placement is deterministic
+ # and not dependant on other daemon's placements in any way
+ specs = [s for s in _specs if not s.placement.count] + [s for s in _specs if s.placement.count]
+
for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
self.mgr.remove_health_warning(name)
self.mgr.apply_spec_fails = []
svc = self.mgr.cephadm_services[service_type]
daemons = self.mgr.cache.get_daemons_by_service(service_name)
+ related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
public_networks: List[str] = []
if service_type == 'mon':
unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
draining_hosts=self.mgr.cache.get_draining_hosts(),
daemons=daemons,
+ related_service_daemons=related_service_daemons,
networks=self.mgr.cache.networks,
filter_new_host=(
matches_network if service_type == 'mon'
hostname=slot.hostname,
daemon_type=slot.daemon_type,
daemon_id=daemon_id,
+ service_name=spec.service_name()
)
daemons.append(sd)
+ self.mgr.cache.append_tmp_daemon(slot.hostname, sd)
if daemon_place_fails:
self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(