'service_type': spec.service_type,
'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
+ svc = self.cephadm_services[spec.service_type]
ha = HostAssignment(
spec=spec,
hosts=self._schedulable_hosts(),
networks=self.cache.networks,
daemons=self.cache.get_daemons_by_service(spec.service_name()),
- allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
+ allow_colo=svc.allow_colo(),
+ rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None
)
ha.validate()
hosts, to_add, to_remove = ha.place()
)
return False
+ rank_map = None
+ if svc.ranked():
+ rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
ha = HostAssignment(
spec=spec,
hosts=self.mgr._schedulable_hosts(),
allow_colo=svc.allow_colo(),
primary_daemon_type=svc.primary_daemon_type(),
per_host_daemon_type=svc.per_host_daemon_type(),
+ rank_map=rank_map,
)
try:
self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
+ # assign names
+ for i in range(len(slots_to_add)):
+ slot = slots_to_add[i]
+ slot = slot.assign_name(self.mgr.get_unique_name(
+ slot.daemon_type,
+ slot.hostname,
+ daemons,
+ prefix=spec.service_id,
+ forcename=slot.name,
+ rank=slot.rank,
+ rank_generation=slot.rank_generation,
+ ))
+ slots_to_add[i] = slot
+ if rank_map is not None:
+ assert slot.rank is not None
+ assert slot.rank_generation is not None
+ assert rank_map[slot.rank][slot.rank_generation] is None
+ rank_map[slot.rank][slot.rank_generation] = slot.name
+
+ if rank_map:
+ # record the rank_map before we make changes so that if we fail the
+ # next mgr will clean up.
+ self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
+
+ # remove daemons now, since we are going to fence them anyway
+ for d in daemons_to_remove:
+ assert d.hostname is not None
+ self._remove_daemon(d.name(), d.hostname)
+ daemons_to_remove = []
+
+ # fence them
+ svc.fence_old_ranks(spec, rank_map, len(all_slots))
+
+ # create daemons
for slot in slots_to_add:
# first remove daemon on conflicting port?
if slot.ports:
break
# deploy new daemon
- daemon_id = self.mgr.get_unique_name(
- slot.daemon_type,
- slot.hostname,
- daemons,
- prefix=spec.service_id,
- forcename=slot.name)
-
+ daemon_id = slot.name
if not did_config:
svc.config(spec, daemon_id)
did_config = True
"""
return None
+ def ranked(self) -> bool:
+ """
+ If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
+ generation (0, 1, ...) to each daemon we create/deploy.
+ """
+ return False
+
+ def fence_old_ranks(self,
+ spec: ServiceSpec,
+ rank_map: Dict[int, Dict[int, Optional[str]]],
+ num_ranks: int) -> None:
+ assert False
+
def make_daemon_spec(
self,
host: str,