From: Shweta Bhosale Date: Mon, 15 Sep 2025 16:16:21 +0000 (+0530) Subject: mgr/cephadm: Add stick table and haproxy peers in haproxy.cfg for NFS to support... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6e67f09ab40b6730432d97dcbf9b7e1939e15c56;p=ceph.git mgr/cephadm: Add stick table and haproxy peers in haproxy.cfg for NFS to support nfs active-active cluster Fixes: https://tracker.ceph.com/issues/72906 Signed-off-by: Shweta Bhosale --- diff --git a/src/pybind/mgr/cephadm/schedule.py b/src/pybind/mgr/cephadm/schedule.py index 69aacf0a9cce..fe3cc093efd7 100644 --- a/src/pybind/mgr/cephadm/schedule.py +++ b/src/pybind/mgr/cephadm/schedule.py @@ -15,7 +15,7 @@ from typing import ( ) import orchestrator -from ceph.deployment.service_spec import ServiceSpec +from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec from orchestrator._interface import DaemonDescription from orchestrator import OrchestratorValidationError from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES @@ -24,6 +24,54 @@ logger = logging.getLogger(__name__) T = TypeVar('T') +def get_placement_hosts( + spec: ServiceSpec, + hosts: List[orchestrator.HostSpec], + draining_hosts: List[orchestrator.HostSpec] +) -> List[HostPlacementSpec]: + """ + Get the list of candidate host placement specs based on placement specifications. + Args: + spec: The service specification + hosts: List of available hosts + draining_hosts: List of hosts that are draining + Returns: + List[HostPlacementSpec]: List of host placement specs that match the placement criteria + """ + if spec.placement.hosts: + host_specs = [ + h for h in spec.placement.hosts + if h.hostname not in [dh.hostname for dh in draining_hosts] + ] + elif spec.placement.label: + labeled_hosts = [h for h in hosts if spec.placement.label in h.labels] + host_specs = [ + HostPlacementSpec(hostname=x.hostname, network='', name='') + for x in labeled_hosts + ] + if spec.placement.host_pattern: + matching_hostnames = spec.placement.filter_matching_hostspecs(hosts) + host_specs = [h for h in host_specs if h.hostname in matching_hostnames] + elif spec.placement.host_pattern: + matching_hostnames = spec.placement.filter_matching_hostspecs(hosts) + host_specs = [ + HostPlacementSpec(hostname=hostname, network='', name='') + for hostname in matching_hostnames + ] + elif ( + spec.placement.count is not None + or spec.placement.count_per_host is not None + ): + host_specs = [ + HostPlacementSpec(hostname=x.hostname, network='', name='') + for x in hosts + ] + else: + raise OrchestratorValidationError( + "placement spec is empty: no hosts, no label, no pattern, no count") + return host_specs + + class DaemonPlacement(NamedTuple): daemon_type: str hostname: str @@ -453,39 +501,16 @@ class HostAssignment(object): return None def get_candidates(self) -> List[DaemonPlacement]: - if self.spec.placement.hosts: - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=h.hostname, network=h.network, name=h.name, - ports=self.ports_start) - for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts] - ] - elif self.spec.placement.label: - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=x.hostname, ports=self.ports_start) - for x in self.hosts_by_label(self.spec.placement.label) - ] - if self.spec.placement.host_pattern: - ls = [h for h in ls if h.hostname in self.spec.placement.filter_matching_hostspecs(self.hosts)] - elif self.spec.placement.host_pattern: - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=x, ports=self.ports_start) - for x in self.spec.placement.filter_matching_hostspecs(self.hosts) - ] - elif ( - self.spec.placement.count is not None - or self.spec.placement.count_per_host is not None - ): - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=x.hostname, ports=self.ports_start) - for x in self.hosts - ] - else: - raise OrchestratorValidationError( - "placement spec is empty: no hosts, no label, no pattern, no count") + host_specs = get_placement_hosts(self.spec, self.hosts, self.draining_hosts) + + ls = [ + DaemonPlacement(daemon_type=self.primary_daemon_type, + hostname=h.hostname, + network=h.network, + name=h.name, + ports=self.ports_start) + for h in host_specs + ] # allocate an IP? if self.host_selector: diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 26d6762df5ed..a8478858751b 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -1161,6 +1161,14 @@ class CephadmServe: only_kmip_updated = all(s.startswith('kmip') for s in list(sym_diff)) if not only_kmip_updated: action = 'redeploy' + elif dd.daemon_type == 'haproxy': + if spec and hasattr(spec, 'backend_service'): + backend_spec = self.mgr.spec_store[spec.backend_service].spec + if backend_spec.service_type == 'nfs': + svc = service_registry.get_service('ingress') + if svc.has_placement_changed(deps, spec): + self.log.debug(f'Redeploy {spec.service_name()} as placement has changed') + action = 'redeploy' elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args: self.log.debug( f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}') diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index beda0f4fd902..03488933b0ca 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -834,6 +834,9 @@ class CephadmService(metaclass=ABCMeta): def get_blocking_daemon_hosts(self, service_name: str) -> List[HostSpec]: return [] + def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool: + return False + class CephService(CephadmService): diff --git a/src/pybind/mgr/cephadm/services/ingress.py b/src/pybind/mgr/cephadm/services/ingress.py index 01febfe7a708..e4bd677b304c 100644 --- a/src/pybind/mgr/cephadm/services/ingress.py +++ b/src/pybind/mgr/cephadm/services/ingress.py @@ -11,6 +11,7 @@ from orchestrator import OrchestratorError, DaemonDescription from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService from .service_registry import register_cephadm_service from cephadm.tlsobject_types import TLSCredentials +from cephadm.schedule import get_placement_hosts if TYPE_CHECKING: from ..module import CephadmOrchestrator @@ -121,7 +122,10 @@ class IngressService(CephService): if ssl_cert_key: assert isinstance(ssl_cert_key, str) deps.append(f'ssl-cert-key:{str(utils.md5_hash(ssl_cert_key))}') - + backend_spec = mgr.spec_store[ingress_spec.backend_service].spec + if backend_spec.service_type == 'nfs': + hosts = get_placement_hosts(spec, mgr.cache.get_schedulable_hosts(), mgr.cache.get_draining_hosts()) + deps.append(f'placement_hosts:{",".join(sorted(h.hostname for h in hosts))}') return sorted(deps) def haproxy_generate_config( @@ -150,6 +154,7 @@ class IngressService(CephService): if spec.monitor_password: password = spec.monitor_password + peer_hosts = {} if backend_spec.service_type == 'nfs': mode = 'tcp' # we need to get the nfs daemon with the highest rank_generation for @@ -202,6 +207,18 @@ class IngressService(CephService): 'ip': '0.0.0.0', 'port': 0, }) + # Get peer hosts for haproxy active-active configuration using placement hosts + hosts = get_placement_hosts( + spec, + self.mgr.cache.get_schedulable_hosts(), + self.mgr.cache.get_draining_hosts() + ) + if hosts: + for host in hosts: + peer_ip = self.mgr.inventory.get_addr(host.hostname) + peer_hosts[host.hostname] = peer_ip + logger.debug(f"HAProxy peer hosts for {spec.service_name()}: {peer_hosts}") + else: mode = 'tcp' if spec.use_tcp_mode_over_rgw else 'http' servers = [ @@ -257,6 +274,7 @@ class IngressService(CephService): 'health_check_interval': spec.health_check_interval or '2s', 'v4v6_flag': v4v6_flag, 'monitor_ssl_file': monitor_ssl_file, + 'peer_hosts': peer_hosts, } ) config_files = { @@ -509,3 +527,24 @@ class IngressService(CephService): if not monitor_addr: logger.debug(f"No IP address found in the network {spec.monitor_networks} on host {host}.") return monitor_addr, monitor_port + + def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool: + """Check if placement hosts have changed""" + def extract_hosts(deps: List[str]) -> List[str]: + for dep in deps: + if dep.startswith('placement_hosts:'): + host_string = dep.split(':', 1)[1] + return host_string.split(',') if host_string else [] + return [] + + hosts = extract_hosts(deps) + current_hosts = get_placement_hosts( + spec, + self.mgr.cache.get_schedulable_hosts(), + self.mgr.cache.get_draining_hosts() + ) + current_hosts = sorted(h.hostname for h in current_hosts) + if current_hosts != hosts: + logger.debug(f'Placement has changed for {spec.service_name()} from {hosts} -> {current_hosts}') + return True + return False diff --git a/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 b/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 index 6ced00038978..67658d5522bf 100644 --- a/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 +++ b/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 @@ -74,6 +74,13 @@ frontend frontend {% endif %} default_backend backend +{% if backend_spec.service_type == 'nfs' and peer_hosts %} +peers haproxy_peers + {% for hostname, ip in peer_hosts.items() %} + peer {{ hostname }} {{ ip }}:1024 + {% endfor %} + +{% endif %} backend backend {% if mode == 'http' %} option forwardfor @@ -87,6 +94,11 @@ backend backend {% if mode == 'tcp' %} mode tcp balance roundrobin + {% if backend_spec.service_type == 'nfs' %} + stick-table type ip size 200k expire 30m peers haproxy_peers + stick on src + {% endif %} + hash-type consistent {% if spec.use_tcp_mode_over_rgw %} {% if backend_spec.ssl %} option ssl-hello-chk diff --git a/src/pybind/mgr/cephadm/tests/test_services.py b/src/pybind/mgr/cephadm/tests/test_services.py index 436dee7909fe..65a9869012eb 100644 --- a/src/pybind/mgr/cephadm/tests/test_services.py +++ b/src/pybind/mgr/cephadm/tests/test_services.py @@ -2737,7 +2737,9 @@ class TestIngressService: monitor_password='12345', keepalived_password='12345', enable_haproxy_protocol=enable_haproxy_protocol, - enable_stats=True + enable_stats=True, + placement=PlacementSpec( + hosts=['host1']) ) cephadm_module.spec_store._specs = { @@ -2785,9 +2787,14 @@ class TestIngressService: ' bind 192.168.122.100:2049\n' ' option tcplog\n' ' default_backend backend\n\n' + 'peers haproxy_peers\n' + ' peer host1 host1:1024\n\n' 'backend backend\n' ' mode tcp\n' ' balance roundrobin\n' + ' stick-table type ip size 200k expire 30m peers haproxy_peers\n' + ' stick on src\n' + ' hash-type consistent\n' ) if enable_haproxy_protocol: haproxy_txt += ' default-server send-proxy-v2\n' @@ -3119,7 +3126,8 @@ class TestIngressService: monitor_password='12345', virtual_interface_networks=['1.2.3.0/24'], virtual_ip="1.2.3.4/32", - use_tcp_mode_over_rgw=True) + use_tcp_mode_over_rgw=True, + enable_stats=True) with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: # generate the haproxy conf based on the specified spec haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config( @@ -3637,7 +3645,10 @@ class TestIngressService: monitor_password='12345', keepalived_password='12345', enable_haproxy_protocol=True, - enable_stats=True + enable_stats=True, + placement=PlacementSpec( + count=1, + hosts=['host1', 'host2']), ) cephadm_module.spec_store._specs = { @@ -3681,9 +3692,15 @@ class TestIngressService: ' bind 192.168.122.100:2049\n' ' option tcplog\n' ' default_backend backend\n\n' + 'peers haproxy_peers\n' + ' peer host1 192.168.122.111:1024\n' + ' peer host2 192.168.122.222:1024\n\n' 'backend backend\n' ' mode tcp\n' ' balance roundrobin\n' + ' stick-table type ip size 200k expire 30m peers haproxy_peers\n' + ' stick on src\n' + ' hash-type consistent\n' ' default-server send-proxy-v2\n' ' server nfs.foo.0 192.168.122.111:12049 check\n' ) @@ -3866,6 +3883,10 @@ class TestIngressService: monitor_password='12345', keepalived_password='12345', enable_haproxy_protocol=True, + placement=PlacementSpec( + count=1, + hosts=['host1', 'host2']), + ) cephadm_module.spec_store._specs = { 'nfs.foo': nfs_service,