daemons: List[orchestrator.DaemonDescription],
related_service_daemons: Optional[List[DaemonDescription]] = None,
networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
- filter_new_host: Optional[Callable[[str], bool]] = None,
+ filter_new_host: Optional[Callable[[str, ServiceSpec], bool]] = None,
allow_colo: bool = False,
primary_daemon_type: Optional[str] = None,
per_host_daemon_type: Optional[str] = None,
old = ls.copy()
ls = []
for h in old:
- if self.filter_new_host(h.hostname):
+ if self.filter_new_host(h.hostname, self.spec):
ls.append(h)
if len(old) > len(ls):
logger.debug('Filtered %s down to %s' % (old, ls))
import os
from collections import defaultdict
from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
- DefaultDict
+ DefaultDict, Callable
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
PlacementSpec,
RGWSpec,
ServiceSpec,
+ IngressSpec,
)
from ceph.utils import datetime_now
public_networks = [x.strip() for x in out.split(',')]
self.log.debug('mon public_network(s) is %s' % public_networks)
- def matches_network(host):
- # type: (str) -> bool
+ def matches_public_network(host: str, sspec: ServiceSpec) -> bool:
# make sure the host has at least one network that belongs to some configured public network(s)
for pn in public_networks:
public_network = ipaddress.ip_network(pn)
)
return False
+ def has_interface_for_vip(host: str, sspec: ServiceSpec) -> bool:
+ # make sure the host has an interface that can
+ # actually accomodate the VIP
+ if not sspec or sspec.service_type != 'ingress':
+ return True
+ ingress_spec = cast(IngressSpec, sspec)
+ virtual_ips = []
+ if ingress_spec.virtual_ip:
+ virtual_ips.append(ingress_spec.virtual_ip)
+ elif ingress_spec.virtual_ips_list:
+ virtual_ips = ingress_spec.virtual_ips_list
+ for vip in virtual_ips:
+ found = False
+ bare_ip = str(vip).split('/')[0]
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
+ # found matching interface for this IP, move on
+ self.log.debug(
+ f'{bare_ip} is in {subnet} on {host} interface {list(ifaces.keys())[0]}'
+ )
+ found = True
+ break
+ if not found:
+ self.log.info(
+ f"Filtered out host {host}: Host has no interface available for VIP: {vip}"
+ )
+ return False
+ return True
+
+ host_filters: Dict[str, Callable[[str, ServiceSpec], bool]] = {
+ 'mon': matches_public_network,
+ 'ingress': has_interface_for_vip
+ }
+
rank_map = None
if svc.ranked():
rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
daemons=daemons,
related_service_daemons=related_service_daemons,
networks=self.mgr.cache.networks,
- filter_new_host=(
- matches_network if service_type == 'mon'
- else None
- ),
+ filter_new_host=host_filters.get(service_type, None),
allow_colo=svc.allow_colo(),
primary_daemon_type=svc.primary_daemon_type(spec),
per_host_daemon_type=svc.per_host_daemon_type(spec),
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1), rgw_frontend_type='beast')
with with_host(cephadm_module, 'test'):
+ # host "test" needs to have networks for keepalive to be placed
+ cephadm_module.cache.update_host_networks('test', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.1']
+ },
+ })
with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
with_service(cephadm_module, CephExporterSpec('ceph-exporter')) as _, \
with_service(cephadm_module, s) as _, \
cephadm_module.http_server.service_discovery.password = 'sd_password'
cephadm_module.http_server.service_discovery.ssl_certs.generate_cert = MagicMock(
side_effect=gen_cert)
+ # host "test" needs to have networks for keepalive to be placed
+ cephadm_module.cache.update_host_networks('test', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.1']
+ },
+ })
with with_service(cephadm_module, MonitoringSpec('node-exporter')) as _, \
with_service(cephadm_module, s) as _, \
with_service(cephadm_module, AlertManagerSpec('alertmanager')) as _, \
# check keepalived config
assert keepalived_generated_conf[0] == keepalived_expected_conf
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_keepalive_interface_host_filtering(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ # we need to make sure keepalive daemons will have an interface
+ # on the hosts we deploy them on in order to set up their VIP.
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+
+ with with_host(cephadm_module, 'test', addr='1.2.3.1'):
+ with with_host(cephadm_module, 'test2', addr='1.2.3.2'):
+ with with_host(cephadm_module, 'test3', addr='1.2.3.3'):
+ with with_host(cephadm_module, 'test4', addr='1.2.3.3'):
+ # setup "test" and "test4" to have all the necessary interfaces,
+ # "test2" to have one of them (should still be filtered)
+ # and "test3" to have none of them
+ cephadm_module.cache.update_host_networks('test', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.1']
+ },
+ '100.100.100.0/24': {
+ 'if1': ['100.100.100.1']
+ }
+ })
+ cephadm_module.cache.update_host_networks('test2', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.2']
+ },
+ })
+ cephadm_module.cache.update_host_networks('test4', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.4']
+ },
+ '100.100.100.0/24': {
+ 'if1': ['100.100.100.4']
+ }
+ })
+
+ s = RGWSpec(service_id="foo", placement=PlacementSpec(count=1),
+ rgw_frontend_type='beast')
+
+ ispec = IngressSpec(service_type='ingress',
+ service_id='test',
+ placement=PlacementSpec(hosts=['test', 'test2', 'test3', 'test4']),
+ backend_service='rgw.foo',
+ frontend_port=8089,
+ monitor_port=8999,
+ monitor_user='admin',
+ monitor_password='12345',
+ keepalived_password='12345',
+ virtual_ips_list=["1.2.3.100/24", "100.100.100.100/24"])
+ with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
+ # since we're never actually going to refresh the host here,
+ # check the tmp daemons to see what was placed during the apply
+ daemons = cephadm_module.cache._get_tmp_daemons()
+ keepalive_daemons = [d for d in daemons if d.daemon_type == 'keepalived']
+ hosts_deployed_on = [d.hostname for d in keepalive_daemons]
+ assert 'test' in hosts_deployed_on
+ assert 'test2' not in hosts_deployed_on
+ assert 'test3' not in hosts_deployed_on
+ assert 'test4' in hosts_deployed_on
+
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
@patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())