From: Sage Weil Date: Wed, 10 Mar 2021 17:24:32 +0000 (-0500) Subject: mgr/cephadm/schedule: return DaemonPlacement instead of HostPlacementSpec X-Git-Tag: v17.1.0~2601^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0e929d5c67e4d7f2f2b9629a8568af07fcf845ed;p=ceph.git mgr/cephadm/schedule: return DaemonPlacement instead of HostPlacementSpec Create a new type for the result of scheduling/place(). Include new fields like ip:port. Introduce a matches_daemon() to see whether an existing daemon matches a scheduling slot. Signed-off-by: Sage Weil --- diff --git a/src/pybind/mgr/cephadm/schedule.py b/src/pybind/mgr/cephadm/schedule.py index d335f7bf5691..905561585d2b 100644 --- a/src/pybind/mgr/cephadm/schedule.py +++ b/src/pybind/mgr/cephadm/schedule.py @@ -1,9 +1,9 @@ import logging import random -from typing import List, Optional, Callable, TypeVar, Set, Tuple +from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple import orchestrator -from ceph.deployment.service_spec import HostPlacementSpec, ServiceSpec +from ceph.deployment.service_spec import ServiceSpec from orchestrator._interface import DaemonDescription from orchestrator import OrchestratorValidationError @@ -11,6 +11,41 @@ logger = logging.getLogger(__name__) T = TypeVar('T') +class DaemonPlacement(NamedTuple): + hostname: str + network: str = '' # for mons only + name: str = '' + ip: Optional[str] = None + ports: Optional[List[int]] = None + + def __str__(self) -> str: + res = self.hostname + other = [] + if self.network: + other.append(f'network={self.network}') + if self.name: + other.append(f'name={self.name}') + if self.ip: + other.append(f'ip={self.ip}') + if self.ports: + other.append(f'ports={",".join(map(str, self.ports))}') + if other: + res += '(' + ' '.join(other) + ')' + return res + + def matches_daemon(self, dd: DaemonDescription) -> bool: + if self.hostname != dd.hostname: + return False + # fixme: how to match against network? + if self.name and self.name != dd.daemon_id: + return False + if self.ip and self.ip != dd.ip: + return False + if self.ports and self.ports != dd.ports: + return False + return True + + class HostAssignment(object): def __init__(self, @@ -71,7 +106,7 @@ class HostAssignment(object): f'hosts for label {self.spec.placement.label}') def place(self): - # type: () -> Tuple[List[HostPlacementSpec], List[HostPlacementSpec], List[orchestrator.DaemonDescription]] + # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]] """ Generate a list of HostPlacementSpec taking into account: @@ -86,7 +121,7 @@ class HostAssignment(object): count = self.spec.placement.count # get candidate hosts based on [hosts, label, host_pattern] - candidates = self.get_candidates() # type: List[HostPlacementSpec] + candidates = self.get_candidates() # type: List[DaemonPlacement] # consider enough slots to fulfill target count-per-host or count if count is None: @@ -110,24 +145,23 @@ class HostAssignment(object): # daemon, and others (the rest) existing_active: List[orchestrator.DaemonDescription] = [] existing_standby: List[orchestrator.DaemonDescription] = [] - existing_slots: List[HostPlacementSpec] = [] + existing_slots: List[DaemonPlacement] = [] to_remove: List[orchestrator.DaemonDescription] = [] others = candidates.copy() - for d in daemons: - hs = d.get_host_placement() + for dd in daemons: found = False - for i in others: - if i == hs: - others.remove(i) - if d.is_active: - existing_active.append(d) + for p in others: + if p.matches_daemon(dd): + others.remove(p) + if dd.is_active: + existing_active.append(dd) else: - existing_standby.append(d) - existing_slots.append(hs) + existing_standby.append(dd) + existing_slots.append(p) found = True break if not found: - to_remove.append(d) + to_remove.append(dd) existing = existing_active + existing_standby @@ -151,25 +185,28 @@ class HostAssignment(object): existing, to_add)) return existing_slots + to_add, to_add, to_remove - def get_candidates(self) -> List[HostPlacementSpec]: + def get_candidates(self) -> List[DaemonPlacement]: if self.spec.placement.hosts: - hosts = self.spec.placement.hosts + ls = [ + DaemonPlacement(hostname=h.hostname, network=h.network, name=h.name) + for h in self.spec.placement.hosts + ] elif self.spec.placement.label: - hosts = [ - HostPlacementSpec(x.hostname, '', '') + ls = [ + DaemonPlacement(hostname=x.hostname) for x in self.hosts_by_label(self.spec.placement.label) ] elif self.spec.placement.host_pattern: - hosts = [ - HostPlacementSpec(x, '', '') + ls = [ + DaemonPlacement(hostname=x) for x in self.spec.placement.filter_matching_hostspecs(self.hosts) ] elif ( self.spec.placement.count is not None or self.spec.placement.count_per_host is not None ): - hosts = [ - HostPlacementSpec(x.hostname, '', '') + ls = [ + DaemonPlacement(hostname=x.hostname) for x in self.hosts ] else: @@ -177,16 +214,16 @@ class HostAssignment(object): "placement spec is empty: no hosts, no label, no pattern, no count") if self.filter_new_host: - old = hosts.copy() - hosts = [h for h in hosts if self.filter_new_host(h.hostname)] - for h in list(set(old) - set(hosts)): + old = ls.copy() + ls = [h for h in ls if self.filter_new_host(h.hostname)] + for h in list(set(old) - set(ls)): logger.info( f"Filtered out host {h.hostname}: could not verify host allowed virtual ips") - logger.debug('Filtered %s down to %s' % (old, hosts)) + logger.debug('Filtered %s down to %s' % (old, ls)) # shuffle for pseudo random selection # gen seed off of self.spec to make shuffling deterministic seed = hash(self.spec.service_name()) - random.Random(seed).shuffle(hosts) + random.Random(seed).shuffle(ls) - return hosts + return ls diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 1f82dbfb2187..5e4019819242 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -14,15 +14,14 @@ except ImportError: from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, \ - HA_RGWSpec, CustomContainerSpec +from ceph.deployment.service_spec import ServiceSpec, HA_RGWSpec, CustomContainerSpec from ceph.utils import str_to_datetime, datetime_now import orchestrator from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \ DaemonDescriptionStatus, daemon_type_to_service, service_to_daemon_types from cephadm.services.cephadmservice import CephadmDaemonDeploySpec -from cephadm.schedule import HostAssignment +from cephadm.schedule import HostAssignment, DaemonPlacement from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ CephadmNoImage, CEPH_TYPES, ContainerInspectInfo from mgr_module import MonCommandFailed @@ -586,20 +585,25 @@ class CephadmServe: if service_type == 'ha-rgw': spec = self.update_ha_rgw_definitive_hosts(spec, all_slots, slots_to_add) - for host, network, name in slots_to_add: + for slot in slots_to_add: for daemon_type in service_to_daemon_types(service_type): - daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons, - prefix=spec.service_id, - forcename=name) + daemon_id = self.mgr.get_unique_name( + daemon_type, + slot.hostname, + daemons, + prefix=spec.service_id, + forcename=slot.name) if not did_config: svc.config(spec, daemon_id) did_config = True daemon_spec = svc.make_daemon_spec( - host, daemon_id, network, spec, daemon_type=daemon_type) + slot.hostname, daemon_id, slot.network, spec, daemon_type=daemon_type, + ports=slot.ports + ) self.log.debug('Placing %s.%s on host %s' % ( - daemon_type, daemon_id, host)) + daemon_type, daemon_id, slot.hostname)) try: daemon_spec = svc.prepare_create(daemon_spec) @@ -608,7 +612,7 @@ class CephadmServe: except (RuntimeError, OrchestratorError) as e: self.mgr.events.for_service(spec, 'ERROR', f"Failed while placing {daemon_type}.{daemon_id}" - f"on {host}: {e}") + f"on {slot.hostname}: {e}") # only return "no change" if no one else has already succeeded. # later successes will also change to True if r is None: @@ -617,7 +621,7 @@ class CephadmServe: # add to daemon list so next name(s) will also be unique sd = orchestrator.DaemonDescription( - hostname=host, + hostname=slot.hostname, daemon_type=daemon_type, daemon_id=daemon_id, ) @@ -772,15 +776,17 @@ class CephadmServe: def update_ha_rgw_definitive_hosts( self, spec: ServiceSpec, - hosts: List[HostPlacementSpec], - add_hosts: List[HostPlacementSpec] + hosts: List[DaemonPlacement], + add_hosts: List[DaemonPlacement] ) -> HA_RGWSpec: spec = cast(HA_RGWSpec, spec) - if not (set(hosts) == set(spec.definitive_host_list)): - spec.definitive_host_list = hosts + hostnames = [p.hostname for p in hosts] + add_hostnames = [p.hostname for p in add_hosts] + if not (set(hostnames) == set(spec.definitive_host_list)): + spec.definitive_host_list = hostnames ha_rgw_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name()) for daemon in ha_rgw_daemons: - if daemon.hostname in [h.hostname for h in hosts] and daemon.hostname not in add_hosts: + if daemon.hostname in hostnames and daemon.hostname not in add_hostnames: assert daemon.hostname is not None self.mgr.cache.schedule_daemon_action( daemon.hostname, daemon.name(), 'reconfig') diff --git a/src/pybind/mgr/cephadm/services/ha_rgw.py b/src/pybind/mgr/cephadm/services/ha_rgw.py index 9764c9b81846..488088f1d54f 100644 --- a/src/pybind/mgr/cephadm/services/ha_rgw.py +++ b/src/pybind/mgr/cephadm/services/ha_rgw.py @@ -100,9 +100,7 @@ class HA_RGWService(CephService): spec = cast(HA_RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec) - all_hosts = [] - for h, network, name in spec.definitive_host_list: - all_hosts.append(h) + all_hosts = spec.definitive_host_list # set state. first host in placement is master all others backups state = 'BACKUP' diff --git a/src/pybind/mgr/cephadm/tests/test_scheduling.py b/src/pybind/mgr/cephadm/tests/test_scheduling.py index 49c420c6ba3b..f98a593bdf7e 100644 --- a/src/pybind/mgr/cephadm/tests/test_scheduling.py +++ b/src/pybind/mgr/cephadm/tests/test_scheduling.py @@ -9,6 +9,7 @@ from ceph.deployment.hostspec import HostSpec from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, ServiceSpecValidationError from cephadm.module import HostAssignment +from cephadm.schedule import DaemonPlacement from orchestrator import DaemonDescription, OrchestratorValidationError, OrchestratorError @@ -186,6 +187,29 @@ test_explicit_scheduler_results = [ ] +@pytest.mark.parametrize( + 'dp,dd,result', + [ + ( + DaemonPlacement(hostname='host1'), + DaemonDescription('mgr', 'a', 'host1'), + True + ), + ( + DaemonPlacement(hostname='host1', name='a'), + DaemonDescription('mgr', 'a', 'host1'), + True + ), + ( + DaemonPlacement(hostname='host1', name='a'), + DaemonDescription('mgr', 'b', 'host1'), + False + ), + ]) +def test_daemon_placement_match(dp, dd, result): + assert dp.matches_daemon(dd) == result + + @pytest.mark.parametrize("spec_section_key,spec_section", [ # noqa: E128 ('h', 'hosts'), diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 16fcf2fa6408..40092df2e605 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -31,7 +31,7 @@ import yaml from ceph.deployment import inventory from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ - ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec, HostPlacementSpec + ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.hostspec import HostSpec from ceph.utils import datetime_to_str, str_to_datetime @@ -932,14 +932,6 @@ class DaemonDescription(object): return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}' return daemon_type_to_service(self.daemon_type) - def get_host_placement(self) -> HostPlacementSpec: - return HostPlacementSpec( - hostname=self.hostname or '', - # FIXME: include the ip:port here? - network='', - name='', - ) - def __repr__(self) -> str: return "({type}.{id})".format(type=self.daemon_type, id=self.daemon_id) diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index 8774c91116e4..213c0af9f4d9 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -861,7 +861,7 @@ class HA_RGWSpec(ServiceSpec): ha_proxy_ssl_options: Optional[List[str]] = None, haproxy_container_image: Optional[str] = None, keepalived_container_image: Optional[str] = None, - definitive_host_list: Optional[List[HostPlacementSpec]] = None + definitive_host_list: Optional[List[str]] = None ): assert service_type == 'ha-rgw' super(HA_RGWSpec, self).__init__('ha-rgw', service_id=service_id, @@ -887,7 +887,7 @@ class HA_RGWSpec(ServiceSpec): # placeholder variable. Need definitive list of hosts this service will # be placed on in order to generate keepalived config. Will be populated # when applying spec - self.definitive_host_list = [] # type: List[HostPlacementSpec] + self.definitive_host_list = [] # type: List[str] def validate(self) -> None: super(HA_RGWSpec, self).validate()