From: Shweta Bhosale Date: Mon, 15 Sep 2025 16:16:21 +0000 (+0530) Subject: mgr/cephadm: Add stick table and haproxy peers in haproxy.cfg for NFS to support... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2c520ff5ba6e349a05cb160bbfda2363e84bc195;p=ceph-ci.git mgr/cephadm: Add stick table and haproxy peers in haproxy.cfg for NFS to support nfs active-active cluster Fixes: https://tracker.ceph.com/issues/72906 Signed-off-by: Shweta Bhosale Resolves: rhbz#2388477 Conflicts: src/pybind/mgr/cephadm/schedule.py src/pybind/mgr/cephadm/serve.py src/pybind/mgr/cephadm/services/cephadmservice.py --- diff --git a/src/pybind/mgr/cephadm/schedule.py b/src/pybind/mgr/cephadm/schedule.py index 0d0b59da56c..7d43bffa25f 100644 --- a/src/pybind/mgr/cephadm/schedule.py +++ b/src/pybind/mgr/cephadm/schedule.py @@ -16,7 +16,7 @@ from typing import ( ) import orchestrator -from ceph.deployment.service_spec import ServiceSpec +from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec from orchestrator._interface import DaemonDescription from orchestrator import OrchestratorValidationError from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES @@ -25,6 +25,60 @@ logger = logging.getLogger(__name__) T = TypeVar('T') +def get_placement_hosts( + spec: ServiceSpec, + hosts: List[orchestrator.HostSpec], + draining_hosts: List[orchestrator.HostSpec] +) -> List[HostPlacementSpec]: + """ + Get the list of candidate host placement specs based on placement specifications. + Args: + spec: The service specification + hosts: List of available hosts + draining_hosts: List of hosts that are draining + Returns: + List[HostPlacementSpec]: List of host placement specs that match the placement criteria + """ + if spec.placement.hosts: + host_specs = [ + h for h in spec.placement.hosts + if h.hostname not in [dh.hostname for dh in draining_hosts] + ] + elif spec.placement.label: + labeled_hosts = [h for h in hosts if spec.placement.label in h.labels] + host_specs = [ + HostPlacementSpec(hostname=x.hostname, network='', name='') + for x in labeled_hosts + ] + if spec.placement.host_pattern: + matching_hostnames = spec.placement.filter_matching_hostspecs(hosts) + host_specs = [h for h in host_specs if h.hostname in matching_hostnames] + elif spec.placement.labels: + host_specs = [] + for label in spec.placement.labels: + for host in [h for h in hosts if label in h.labels]: + if host.hostname not in [h.hostname for h in host_specs]: + host_specs.append(HostPlacementSpec(hostname=host.hostname, network='', name='')) + elif spec.placement.host_pattern: + matching_hostnames = spec.placement.filter_matching_hostspecs(hosts) + host_specs = [ + HostPlacementSpec(hostname=hostname, network='', name='') + for hostname in matching_hostnames + ] + elif ( + spec.placement.count is not None + or spec.placement.count_per_host is not None + ): + host_specs = [ + HostPlacementSpec(hostname=x.hostname, network='', name='') + for x in hosts + ] + else: + raise OrchestratorValidationError( + "placement spec is empty: no hosts, no label, no pattern, no count") + return host_specs + + class DaemonPlacement(NamedTuple): daemon_type: str hostname: str @@ -491,46 +545,16 @@ class HostAssignment(object): return None def get_candidates(self) -> List[DaemonPlacement]: - if self.spec.placement.hosts: - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=h.hostname, network=h.network, name=h.name, - ports=self.ports_start) - for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts] - ] - elif self.spec.placement.label: - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=x.hostname, ports=self.ports_start) - for x in self.hosts_by_label(self.spec.placement.label) - ] - if self.spec.placement.host_pattern: - ls = [h for h in ls if h.hostname in self.spec.placement.filter_matching_hostspecs(self.hosts)] - elif self.spec.placement.labels: - ls = [] - for label in self.spec.placement.labels: - for host in self.hosts_by_label(label): - if host.hostname not in [h.hostname for h in ls]: - ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=host.hostname, ports=self.ports_start)) - elif self.spec.placement.host_pattern: - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=x, ports=self.ports_start) - for x in self.spec.placement.filter_matching_hostspecs(self.hosts) - ] - elif ( - self.spec.placement.count is not None - or self.spec.placement.count_per_host is not None - ): - ls = [ - DaemonPlacement(daemon_type=self.primary_daemon_type, - hostname=x.hostname, ports=self.ports_start) - for x in self.hosts - ] - else: - raise OrchestratorValidationError( - "placement spec is empty: no hosts, no label, no pattern, no count") + host_specs = get_placement_hosts(self.spec, self.hosts, self.draining_hosts) + + ls = [ + DaemonPlacement(daemon_type=self.primary_daemon_type, + hostname=h.hostname, + network=h.network, + name=h.name, + ports=self.ports_start) + for h in host_specs + ] # allocate an IP? if self.host_selector: diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 8e0dcc76995..59ebd49fd49 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -1384,6 +1384,14 @@ class CephadmServe: else: skip_restart_for_reconfig = True send_signal_to_daemon = 'SIGHUP' + elif dd.daemon_type == 'haproxy': + if spec and hasattr(spec, 'backend_service'): + backend_spec = self.mgr.spec_store[spec.backend_service].spec + if backend_spec.service_type == 'nfs': + svc = service_registry.get_service('ingress') + if svc.has_placement_changed(deps, spec): + self.log.debug(f'Redeploy {spec.service_name()} as placement has changed') + action = 'redeploy' elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args: self.log.debug( f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}') diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 6f58bfad48f..c681a55595d 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -849,6 +849,9 @@ class CephadmService(metaclass=ABCMeta): def pre_daemon_service_config(self, spec: ServiceSpec) -> None: return + def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool: + return False + class CephService(CephadmService): diff --git a/src/pybind/mgr/cephadm/services/ingress.py b/src/pybind/mgr/cephadm/services/ingress.py index ecfafd8a939..33825e6ea59 100644 --- a/src/pybind/mgr/cephadm/services/ingress.py +++ b/src/pybind/mgr/cephadm/services/ingress.py @@ -11,6 +11,7 @@ from orchestrator import OrchestratorError, DaemonDescription from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService from .service_registry import register_cephadm_service from cephadm.tlsobject_types import TLSCredentials +from cephadm.schedule import get_placement_hosts if TYPE_CHECKING: from ..module import CephadmOrchestrator @@ -136,7 +137,10 @@ class IngressService(CephService): if ssl_cert_key: assert isinstance(ssl_cert_key, str) deps.append(f'ssl-cert-key:{str(utils.md5_hash(ssl_cert_key))}') - + backend_spec = mgr.spec_store[ingress_spec.backend_service].spec + if backend_spec.service_type == 'nfs': + hosts = get_placement_hosts(spec, mgr.cache.get_schedulable_hosts(), mgr.cache.get_draining_hosts()) + deps.append(f'placement_hosts:{",".join(sorted(h.hostname for h in hosts))}') return sorted(deps) def haproxy_generate_config( @@ -165,6 +169,7 @@ class IngressService(CephService): if spec.monitor_password: password = spec.monitor_password + peer_hosts = {} if backend_spec.service_type == 'nfs': mode = 'tcp' # we need to get the nfs daemon with the highest rank_generation for @@ -217,6 +222,18 @@ class IngressService(CephService): 'ip': '0.0.0.0', 'port': 0, }) + # Get peer hosts for haproxy active-active configuration using placement hosts + hosts = get_placement_hosts( + spec, + self.mgr.cache.get_schedulable_hosts(), + self.mgr.cache.get_draining_hosts() + ) + if hosts: + for host in hosts: + peer_ip = self.mgr.inventory.get_addr(host.hostname) + peer_hosts[host.hostname] = peer_ip + logger.debug(f"HAProxy peer hosts for {spec.service_name()}: {peer_hosts}") + else: mode = 'tcp' if spec.use_tcp_mode_over_rgw else 'http' servers = [ @@ -273,6 +290,7 @@ class IngressService(CephService): 'v4v6_flag': v4v6_flag, 'qat_support': spec.haproxy_qat_support, 'monitor_ssl_file': monitor_ssl_file, + 'peer_hosts': peer_hosts, } ) final_config: Dict[str, Any] = { @@ -541,3 +559,24 @@ class IngressService(CephService): if not monitor_addr: logger.debug(f"No IP address found in the network {spec.monitor_networks} on host {host}.") return monitor_addr, monitor_port + + def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool: + """Check if placement hosts have changed""" + def extract_hosts(deps: List[str]) -> List[str]: + for dep in deps: + if dep.startswith('placement_hosts:'): + host_string = dep.split(':', 1)[1] + return host_string.split(',') if host_string else [] + return [] + + hosts = extract_hosts(deps) + current_hosts = get_placement_hosts( + spec, + self.mgr.cache.get_schedulable_hosts(), + self.mgr.cache.get_draining_hosts() + ) + current_hosts = sorted(h.hostname for h in current_hosts) + if current_hosts != hosts: + logger.debug(f'Placement has changed for {spec.service_name()} from {hosts} -> {current_hosts}') + return True + return False diff --git a/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 b/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 index 6e6c076dac6..55cb1b4b69b 100644 --- a/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 +++ b/src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2 @@ -78,6 +78,13 @@ frontend frontend {% endif %} default_backend backend +{% if backend_spec.service_type == 'nfs' and peer_hosts %} +peers haproxy_peers + {% for hostname, ip in peer_hosts.items() %} + peer {{ hostname }} {{ ip }}:1024 + {% endfor %} + +{% endif %} backend backend {% if mode == 'http' %} option forwardfor @@ -95,6 +102,11 @@ backend backend {% if mode == 'tcp' %} mode tcp balance roundrobin + {% if backend_spec.service_type == 'nfs' %} + stick-table type ip size 200k expire 30m peers haproxy_peers + stick on src + {% endif %} + hash-type consistent {% if spec.use_tcp_mode_over_rgw %} {% if backend_spec.ssl %} option ssl-hello-chk diff --git a/src/pybind/mgr/cephadm/tests/test_services.py b/src/pybind/mgr/cephadm/tests/test_services.py index f9e9a8bff99..b08b8e96acd 100644 --- a/src/pybind/mgr/cephadm/tests/test_services.py +++ b/src/pybind/mgr/cephadm/tests/test_services.py @@ -2728,7 +2728,9 @@ class TestIngressService: monitor_password='12345', keepalived_password='12345', enable_haproxy_protocol=enable_haproxy_protocol, - enable_stats=True + enable_stats=True, + placement=PlacementSpec( + hosts=['host1']) ) cephadm_module.spec_store._specs = { @@ -2776,9 +2778,14 @@ class TestIngressService: ' bind 192.168.122.100:2049\n' ' option tcplog\n' ' default_backend backend\n\n' + 'peers haproxy_peers\n' + ' peer host1 host1:1024\n\n' 'backend backend\n' ' mode tcp\n' ' balance roundrobin\n' + ' stick-table type ip size 200k expire 30m peers haproxy_peers\n' + ' stick on src\n' + ' hash-type consistent\n' ) if enable_haproxy_protocol: haproxy_txt += ' default-server send-proxy-v2\n' @@ -3110,7 +3117,8 @@ class TestIngressService: monitor_password='12345', virtual_interface_networks=['1.2.3.0/24'], virtual_ip="1.2.3.4/32", - use_tcp_mode_over_rgw=True) + use_tcp_mode_over_rgw=True, + enable_stats=True) with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: # generate the haproxy conf based on the specified spec haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( @@ -3627,7 +3635,10 @@ class TestIngressService: monitor_password='12345', keepalived_password='12345', enable_haproxy_protocol=True, - enable_stats=True + enable_stats=True, + placement=PlacementSpec( + count=1, + hosts=['host1', 'host2']), ) cephadm_module.spec_store._specs = { @@ -3671,9 +3682,15 @@ class TestIngressService: ' bind 192.168.122.100:2049\n' ' option tcplog\n' ' default_backend backend\n\n' + 'peers haproxy_peers\n' + ' peer host1 192.168.122.111:1024\n' + ' peer host2 192.168.122.222:1024\n\n' 'backend backend\n' ' mode tcp\n' ' balance roundrobin\n' + ' stick-table type ip size 200k expire 30m peers haproxy_peers\n' + ' stick on src\n' + ' hash-type consistent\n' ' default-server send-proxy-v2\n' ' server nfs.foo.0 192.168.122.111:12049 check\n' ) @@ -3856,6 +3873,10 @@ class TestIngressService: monitor_password='12345', keepalived_password='12345', enable_haproxy_protocol=True, + placement=PlacementSpec( + count=1, + hosts=['host1', 'host2']), + ) cephadm_module.spec_store._specs = { 'nfs.foo': nfs_service,