]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm/schedule: return DaemonPlacement instead of HostPlacementSpec
authorSage Weil <sage@newdream.net>
Wed, 10 Mar 2021 17:24:32 +0000 (12:24 -0500)
committerSage Weil <sage@newdream.net>
Mon, 15 Mar 2021 22:55:16 +0000 (18:55 -0400)
Create a new type for the result of scheduling/place().  Include new fields
like ip:port.  Introduce a matches_daemon() to see whether an existing
daemon matches a scheduling slot.

Signed-off-by: Sage Weil <sage@newdream.net>
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/ha_rgw.py
src/pybind/mgr/cephadm/tests/test_scheduling.py
src/pybind/mgr/orchestrator/_interface.py
src/python-common/ceph/deployment/service_spec.py

index d335f7bf569157e1c0ed3a14e33699159413c443..905561585d2bbc165b0143232484b8e9da316d36 100644 (file)
@@ -1,9 +1,9 @@
 import logging
 import random
-from typing import List, Optional, Callable, TypeVar, Set, Tuple
+from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple
 
 import orchestrator
-from ceph.deployment.service_spec import HostPlacementSpec, ServiceSpec
+from ceph.deployment.service_spec import ServiceSpec
 from orchestrator._interface import DaemonDescription
 from orchestrator import OrchestratorValidationError
 
@@ -11,6 +11,41 @@ logger = logging.getLogger(__name__)
 T = TypeVar('T')
 
 
+class DaemonPlacement(NamedTuple):
+    hostname: str
+    network: str = ''   # for mons only
+    name: str = ''
+    ip: Optional[str] = None
+    ports: Optional[List[int]] = None
+
+    def __str__(self) -> str:
+        res = self.hostname
+        other = []
+        if self.network:
+            other.append(f'network={self.network}')
+        if self.name:
+            other.append(f'name={self.name}')
+        if self.ip:
+            other.append(f'ip={self.ip}')
+        if self.ports:
+            other.append(f'ports={",".join(map(str, self.ports))}')
+        if other:
+            res += '(' + ' '.join(other) + ')'
+        return res
+
+    def matches_daemon(self, dd: DaemonDescription) -> bool:
+        if self.hostname != dd.hostname:
+            return False
+        # fixme: how to match against network?
+        if self.name and self.name != dd.daemon_id:
+            return False
+        if self.ip and self.ip != dd.ip:
+            return False
+        if self.ports and self.ports != dd.ports:
+            return False
+        return True
+
+
 class HostAssignment(object):
 
     def __init__(self,
@@ -71,7 +106,7 @@ class HostAssignment(object):
                     f'hosts for label {self.spec.placement.label}')
 
     def place(self):
-        # type: () -> Tuple[List[HostPlacementSpec], List[HostPlacementSpec], List[orchestrator.DaemonDescription]]
+        # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
         """
         Generate a list of HostPlacementSpec taking into account:
 
@@ -86,7 +121,7 @@ class HostAssignment(object):
         count = self.spec.placement.count
 
         # get candidate hosts based on [hosts, label, host_pattern]
-        candidates = self.get_candidates()  # type: List[HostPlacementSpec]
+        candidates = self.get_candidates()  # type: List[DaemonPlacement]
 
         # consider enough slots to fulfill target count-per-host or count
         if count is None:
@@ -110,24 +145,23 @@ class HostAssignment(object):
         # daemon, and others (the rest)
         existing_active: List[orchestrator.DaemonDescription] = []
         existing_standby: List[orchestrator.DaemonDescription] = []
-        existing_slots: List[HostPlacementSpec] = []
+        existing_slots: List[DaemonPlacement] = []
         to_remove: List[orchestrator.DaemonDescription] = []
         others = candidates.copy()
-        for d in daemons:
-            hs = d.get_host_placement()
+        for dd in daemons:
             found = False
-            for i in others:
-                if i == hs:
-                    others.remove(i)
-                    if d.is_active:
-                        existing_active.append(d)
+            for p in others:
+                if p.matches_daemon(dd):
+                    others.remove(p)
+                    if dd.is_active:
+                        existing_active.append(dd)
                     else:
-                        existing_standby.append(d)
-                    existing_slots.append(hs)
+                        existing_standby.append(dd)
+                    existing_slots.append(p)
                     found = True
                     break
             if not found:
-                to_remove.append(d)
+                to_remove.append(dd)
 
         existing = existing_active + existing_standby
 
@@ -151,25 +185,28 @@ class HostAssignment(object):
             existing, to_add))
         return existing_slots + to_add, to_add, to_remove
 
-    def get_candidates(self) -> List[HostPlacementSpec]:
+    def get_candidates(self) -> List[DaemonPlacement]:
         if self.spec.placement.hosts:
-            hosts = self.spec.placement.hosts
+            ls = [
+                DaemonPlacement(hostname=h.hostname, network=h.network, name=h.name)
+                for h in self.spec.placement.hosts
+            ]
         elif self.spec.placement.label:
-            hosts = [
-                HostPlacementSpec(x.hostname, '', '')
+            ls = [
+                DaemonPlacement(hostname=x.hostname)
                 for x in self.hosts_by_label(self.spec.placement.label)
             ]
         elif self.spec.placement.host_pattern:
-            hosts = [
-                HostPlacementSpec(x, '', '')
+            ls = [
+                DaemonPlacement(hostname=x)
                 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
             ]
         elif (
                 self.spec.placement.count is not None
                 or self.spec.placement.count_per_host is not None
         ):
-            hosts = [
-                HostPlacementSpec(x.hostname, '', '')
+            ls = [
+                DaemonPlacement(hostname=x.hostname)
                 for x in self.hosts
             ]
         else:
@@ -177,16 +214,16 @@ class HostAssignment(object):
                 "placement spec is empty: no hosts, no label, no pattern, no count")
 
         if self.filter_new_host:
-            old = hosts.copy()
-            hosts = [h for h in hosts if self.filter_new_host(h.hostname)]
-            for h in list(set(old) - set(hosts)):
+            old = ls.copy()
+            ls = [h for h in ls if self.filter_new_host(h.hostname)]
+            for h in list(set(old) - set(ls)):
                 logger.info(
                     f"Filtered out host {h.hostname}: could not verify host allowed virtual ips")
-                logger.debug('Filtered %s down to %s' % (old, hosts))
+                logger.debug('Filtered %s down to %s' % (old, ls))
 
         # shuffle for pseudo random selection
         # gen seed off of self.spec to make shuffling deterministic
         seed = hash(self.spec.service_name())
-        random.Random(seed).shuffle(hosts)
+        random.Random(seed).shuffle(ls)
 
-        return hosts
+        return ls
index 1f82dbfb218761639068c97ada6808020c10745a..5e401981924289fe045beecb76b2d4ae87f6033e 100644 (file)
@@ -14,15 +14,14 @@ except ImportError:
 
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, \
-    HA_RGWSpec, CustomContainerSpec
+from ceph.deployment.service_spec import ServiceSpec, HA_RGWSpec, CustomContainerSpec
 from ceph.utils import str_to_datetime, datetime_now
 
 import orchestrator
 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
     DaemonDescriptionStatus, daemon_type_to_service, service_to_daemon_types
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.schedule import HostAssignment
+from cephadm.schedule import HostAssignment, DaemonPlacement
 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
     CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
 from mgr_module import MonCommandFailed
@@ -586,20 +585,25 @@ class CephadmServe:
         if service_type == 'ha-rgw':
             spec = self.update_ha_rgw_definitive_hosts(spec, all_slots, slots_to_add)
 
-        for host, network, name in slots_to_add:
+        for slot in slots_to_add:
             for daemon_type in service_to_daemon_types(service_type):
-                daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons,
-                                                     prefix=spec.service_id,
-                                                     forcename=name)
+                daemon_id = self.mgr.get_unique_name(
+                    daemon_type,
+                    slot.hostname,
+                    daemons,
+                    prefix=spec.service_id,
+                    forcename=slot.name)
 
                 if not did_config:
                     svc.config(spec, daemon_id)
                     did_config = True
 
                 daemon_spec = svc.make_daemon_spec(
-                    host, daemon_id, network, spec, daemon_type=daemon_type)
+                    slot.hostname, daemon_id, slot.network, spec, daemon_type=daemon_type,
+                    ports=slot.ports
+                )
                 self.log.debug('Placing %s.%s on host %s' % (
-                    daemon_type, daemon_id, host))
+                    daemon_type, daemon_id, slot.hostname))
 
                 try:
                     daemon_spec = svc.prepare_create(daemon_spec)
@@ -608,7 +612,7 @@ class CephadmServe:
                 except (RuntimeError, OrchestratorError) as e:
                     self.mgr.events.for_service(spec, 'ERROR',
                                                 f"Failed while placing {daemon_type}.{daemon_id}"
-                                                f"on {host}: {e}")
+                                                f"on {slot.hostname}: {e}")
                     # only return "no change" if no one else has already succeeded.
                     # later successes will also change to True
                     if r is None:
@@ -617,7 +621,7 @@ class CephadmServe:
 
                 # add to daemon list so next name(s) will also be unique
                 sd = orchestrator.DaemonDescription(
-                    hostname=host,
+                    hostname=slot.hostname,
                     daemon_type=daemon_type,
                     daemon_id=daemon_id,
                 )
@@ -772,15 +776,17 @@ class CephadmServe:
     def update_ha_rgw_definitive_hosts(
             self,
             spec: ServiceSpec,
-            hosts: List[HostPlacementSpec],
-            add_hosts: List[HostPlacementSpec]
+            hosts: List[DaemonPlacement],
+            add_hosts: List[DaemonPlacement]
     ) -> HA_RGWSpec:
         spec = cast(HA_RGWSpec, spec)
-        if not (set(hosts) == set(spec.definitive_host_list)):
-            spec.definitive_host_list = hosts
+        hostnames = [p.hostname for p in hosts]
+        add_hostnames = [p.hostname for p in add_hosts]
+        if not (set(hostnames) == set(spec.definitive_host_list)):
+            spec.definitive_host_list = hostnames
             ha_rgw_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
             for daemon in ha_rgw_daemons:
-                if daemon.hostname in [h.hostname for h in hosts] and daemon.hostname not in add_hosts:
+                if daemon.hostname in hostnames and daemon.hostname not in add_hostnames:
                     assert daemon.hostname is not None
                     self.mgr.cache.schedule_daemon_action(
                         daemon.hostname, daemon.name(), 'reconfig')
index 9764c9b818461162d11ca36c68cd6787f7138f1c..488088f1d54f971f553ab694bfefcaf712f5744c 100644 (file)
@@ -100,9 +100,7 @@ class HA_RGWService(CephService):
 
         spec = cast(HA_RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
 
-        all_hosts = []
-        for h, network, name in spec.definitive_host_list:
-            all_hosts.append(h)
+        all_hosts = spec.definitive_host_list
 
         # set state. first host in placement is master all others backups
         state = 'BACKUP'
index 49c420c6ba3b14fb65c9b449d7c313952e2fe405..f98a593bdf7e91ae4765130a9f807b4f45e41638 100644 (file)
@@ -9,6 +9,7 @@ from ceph.deployment.hostspec import HostSpec
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, ServiceSpecValidationError
 
 from cephadm.module import HostAssignment
+from cephadm.schedule import DaemonPlacement
 from orchestrator import DaemonDescription, OrchestratorValidationError, OrchestratorError
 
 
@@ -186,6 +187,29 @@ test_explicit_scheduler_results = [
 ]
 
 
+@pytest.mark.parametrize(
+    'dp,dd,result',
+    [
+        (
+            DaemonPlacement(hostname='host1'),
+            DaemonDescription('mgr', 'a', 'host1'),
+            True
+        ),
+        (
+            DaemonPlacement(hostname='host1', name='a'),
+            DaemonDescription('mgr', 'a', 'host1'),
+            True
+        ),
+        (
+            DaemonPlacement(hostname='host1', name='a'),
+            DaemonDescription('mgr', 'b', 'host1'),
+            False
+        ),
+    ])
+def test_daemon_placement_match(dp, dd, result):
+    assert dp.matches_daemon(dd) == result
+
+
 @pytest.mark.parametrize("spec_section_key,spec_section",
     [   # noqa: E128
         ('h', 'hosts'),
index 16fcf2fa6408c6443f7f21c0e4d0fbec18efa3a1..40092df2e605d1bb927294c424a55b3f23109c4a 100644 (file)
@@ -31,7 +31,7 @@ import yaml
 
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
-    ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec, HostPlacementSpec
+    ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.hostspec import HostSpec
 from ceph.utils import datetime_to_str, str_to_datetime
@@ -932,14 +932,6 @@ class DaemonDescription(object):
             return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
         return daemon_type_to_service(self.daemon_type)
 
-    def get_host_placement(self) -> HostPlacementSpec:
-        return HostPlacementSpec(
-            hostname=self.hostname or '',
-            # FIXME: include the ip:port here?
-            network='',
-            name='',
-        )
-
     def __repr__(self) -> str:
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
index 8774c91116e4e6e98b23ce2e5ba1176f161024b8..213c0af9f4d954822107ef26deab6380b458e2f9 100644 (file)
@@ -861,7 +861,7 @@ class HA_RGWSpec(ServiceSpec):
                  ha_proxy_ssl_options: Optional[List[str]] = None,
                  haproxy_container_image: Optional[str] = None,
                  keepalived_container_image: Optional[str] = None,
-                 definitive_host_list: Optional[List[HostPlacementSpec]] = None
+                 definitive_host_list: Optional[List[str]] = None
                  ):
         assert service_type == 'ha-rgw'
         super(HA_RGWSpec, self).__init__('ha-rgw', service_id=service_id,
@@ -887,7 +887,7 @@ class HA_RGWSpec(ServiceSpec):
         # placeholder variable. Need definitive list of hosts this service will
         # be placed on in order to generate keepalived config. Will be populated
         # when applying spec
-        self.definitive_host_list = []  # type: List[HostPlacementSpec]
+        self.definitive_host_list = []  # type: List[str]
 
     def validate(self) -> None:
         super(HA_RGWSpec, self).validate()