]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: filter hosts that can't support VIP for ingress 53124/head
authorAdam King <adking@redhat.com>
Tue, 1 Aug 2023 21:43:36 +0000 (17:43 -0400)
committerAdam King <adking@redhat.com>
Thu, 31 Aug 2023 17:36:16 +0000 (13:36 -0400)
Keepalive daemons need the host to have an interface
on which they can set up their VIP. If a host
does not have any interface that can work, we should
filter it out

Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit 17bc76f5bb6b3ef8c962ce31a80c5a3a43b5bdd2)

src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/tests/test_services.py

index 888a2a0335dff85ecbd20df8e8d050203c5f3127..6666d761ebcf919a07177a7079f851e84217c4ee 100644 (file)
@@ -148,7 +148,7 @@ class HostAssignment(object):
                  daemons: List[orchestrator.DaemonDescription],
                  related_service_daemons: Optional[List[DaemonDescription]] = None,
                  networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
-                 filter_new_host: Optional[Callable[[str], bool]] = None,
+                 filter_new_host: Optional[Callable[[str, ServiceSpec], bool]] = None,
                  allow_colo: bool = False,
                  primary_daemon_type: Optional[str] = None,
                  per_host_daemon_type: Optional[str] = None,
@@ -451,7 +451,7 @@ class HostAssignment(object):
             old = ls.copy()
             ls = []
             for h in old:
-                if self.filter_new_host(h.hostname):
+                if self.filter_new_host(h.hostname, self.spec):
                     ls.append(h)
             if len(old) > len(ls):
                 logger.debug('Filtered %s down to %s' % (old, ls))
index ee6542feed8fc6d5fa681ba4b0298b6194fe48ab..22b60f34c3562dacc55c7c2ff36bc8976710a68c 100644 (file)
@@ -6,7 +6,7 @@ import uuid
 import os
 from collections import defaultdict
 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
-    DefaultDict
+    DefaultDict, Callable
 
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
@@ -17,6 +17,7 @@ from ceph.deployment.service_spec import (
     PlacementSpec,
     RGWSpec,
     ServiceSpec,
+    IngressSpec,
 )
 from ceph.utils import datetime_now
 
@@ -695,8 +696,7 @@ class CephadmServe:
                 public_networks = [x.strip() for x in out.split(',')]
                 self.log.debug('mon public_network(s) is %s' % public_networks)
 
-        def matches_network(host):
-            # type: (str) -> bool
+        def matches_public_network(host: str, sspec: ServiceSpec) -> bool:
             # make sure the host has at least one network that belongs to some configured public network(s)
             for pn in public_networks:
                 public_network = ipaddress.ip_network(pn)
@@ -713,6 +713,40 @@ class CephadmServe:
             )
             return False
 
+        def has_interface_for_vip(host: str, sspec: ServiceSpec) -> bool:
+            # make sure the host has an interface that can
+            # actually accomodate the VIP
+            if not sspec or sspec.service_type != 'ingress':
+                return True
+            ingress_spec = cast(IngressSpec, sspec)
+            virtual_ips = []
+            if ingress_spec.virtual_ip:
+                virtual_ips.append(ingress_spec.virtual_ip)
+            elif ingress_spec.virtual_ips_list:
+                virtual_ips = ingress_spec.virtual_ips_list
+            for vip in virtual_ips:
+                found = False
+                bare_ip = str(vip).split('/')[0]
+                for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+                    if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
+                        # found matching interface for this IP, move on
+                        self.log.debug(
+                            f'{bare_ip} is in {subnet} on {host} interface {list(ifaces.keys())[0]}'
+                        )
+                        found = True
+                        break
+                if not found:
+                    self.log.info(
+                        f"Filtered out host {host}: Host has no interface available for VIP: {vip}"
+                    )
+                    return False
+            return True
+
+        host_filters: Dict[str, Callable[[str, ServiceSpec], bool]] = {
+            'mon': matches_public_network,
+            'ingress': has_interface_for_vip
+        }
+
         rank_map = None
         if svc.ranked():
             rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
@@ -725,10 +759,7 @@ class CephadmServe:
             daemons=daemons,
             related_service_daemons=related_service_daemons,
             networks=self.mgr.cache.networks,
-            filter_new_host=(
-                matches_network if service_type == 'mon'
-                else None
-            ),
+            filter_new_host=host_filters.get(service_type, None),
             allow_colo=svc.allow_colo(),
             primary_daemon_type=svc.primary_daemon_type(spec),
             per_host_daemon_type=svc.per_host_daemon_type(spec),
index fbca5ef30f88fd7a84a88175c91451e0910c133b..b646360af41a6ba625c7823430da577c691b0282 100644 (file)
@@ -644,6 +644,12 @@ class TestMonitoring:
         _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
         s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1), rgw_frontend_type='beast')
         with with_host(cephadm_module, 'test'):
+            # host "test" needs to have networks for keepalive to be placed
+            cephadm_module.cache.update_host_networks('test', {
+                '1.2.3.0/24': {
+                    'if0': ['1.2.3.1']
+                },
+            })
             with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
                     with_service(cephadm_module, CephExporterSpec('ceph-exporter')) as _, \
                     with_service(cephadm_module, s) as _, \
@@ -746,6 +752,12 @@ class TestMonitoring:
             cephadm_module.http_server.service_discovery.password = 'sd_password'
             cephadm_module.http_server.service_discovery.ssl_certs.generate_cert = MagicMock(
                 side_effect=gen_cert)
+            # host "test" needs to have networks for keepalive to be placed
+            cephadm_module.cache.update_host_networks('test', {
+                '1.2.3.0/24': {
+                    'if0': ['1.2.3.1']
+                },
+            })
             with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
                     with_service(cephadm_module, s) as _, \
                     with_service(cephadm_module, AlertManagerSpec('alertmanager')) as _, \
@@ -2159,6 +2171,65 @@ class TestIngressService:
                     # check keepalived config
                     assert keepalived_generated_conf[0] == keepalived_expected_conf
 
+    @patch("cephadm.serve.CephadmServe._run_cephadm")
+    def test_keepalive_interface_host_filtering(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+        # we need to make sure keepalive daemons will have an interface
+        # on the hosts we deploy them on in order to set up their VIP.
+        _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+
+        with with_host(cephadm_module, 'test', addr='1.2.3.1'):
+            with with_host(cephadm_module, 'test2', addr='1.2.3.2'):
+                with with_host(cephadm_module, 'test3', addr='1.2.3.3'):
+                    with with_host(cephadm_module, 'test4', addr='1.2.3.3'):
+                        # setup "test" and "test4" to have all the necessary interfaces,
+                        # "test2" to have one of them (should still be filtered)
+                        # and "test3" to have none of them
+                        cephadm_module.cache.update_host_networks('test', {
+                            '1.2.3.0/24': {
+                                'if0': ['1.2.3.1']
+                            },
+                            '100.100.100.0/24': {
+                                'if1': ['100.100.100.1']
+                            }
+                        })
+                        cephadm_module.cache.update_host_networks('test2', {
+                            '1.2.3.0/24': {
+                                'if0': ['1.2.3.2']
+                            },
+                        })
+                        cephadm_module.cache.update_host_networks('test4', {
+                            '1.2.3.0/24': {
+                                'if0': ['1.2.3.4']
+                            },
+                            '100.100.100.0/24': {
+                                'if1': ['100.100.100.4']
+                            }
+                        })
+
+                        s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1),
+                                    rgw_frontend_type='beast')
+
+                        ispec = IngressSpec(service_type='ingress',
+                                            service_id='test',
+                                            placement=PlacementSpec(hosts=['test', 'test2', 'test3', 'test4']),
+                                            backend_service='rgw.foo',
+                                            frontend_port=8089,
+                                            monitor_port=8999,
+                                            monitor_user='admin',
+                                            monitor_password='12345',
+                                            keepalived_password='12345',
+                                            virtual_ips_list=["1.2.3.100/24", "100.100.100.100/24"])
+                        with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
+                            # since we're never actually going to refresh the host here,
+                            # check the tmp daemons to see what was placed during the apply
+                            daemons = cephadm_module.cache._get_tmp_daemons()
+                            keepalive_daemons = [d for d in daemons if d.daemon_type == 'keepalived']
+                            hosts_deployed_on = [d.hostname for d in keepalive_daemons]
+                            assert 'test' in hosts_deployed_on
+                            assert 'test2' not in hosts_deployed_on
+                            assert 'test3' not in hosts_deployed_on
+                            assert 'test4' in hosts_deployed_on
+
     @patch("cephadm.serve.CephadmServe._run_cephadm")
     @patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
     @patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())