]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Add stick table and haproxy peers in haproxy.cfg for NFS to support... 65531/head
authorShweta Bhosale <Shweta.Bhosale1@ibm.com>
Mon, 15 Sep 2025 16:16:21 +0000 (21:46 +0530)
committerShweta Bhosale <Shweta.Bhosale1@ibm.com>
Wed, 8 Oct 2025 14:41:16 +0000 (20:11 +0530)
Fixes: https://tracker.ceph.com/issues/72906
Signed-off-by: Shweta Bhosale <Shweta.Bhosale1@ibm.com>
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/ingress.py
src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2
src/pybind/mgr/cephadm/tests/test_services.py

index 69aacf0a9cce0d86166d46e3ffbc8067ea27c5f8..fe3cc093efd7a12c0d3ab9ece5cbcc0b40e05f46 100644 (file)
@@ -15,7 +15,7 @@ from typing import (
 )
 
 import orchestrator
-from ceph.deployment.service_spec import ServiceSpec
+from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec
 from orchestrator._interface import DaemonDescription
 from orchestrator import OrchestratorValidationError
 from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
@@ -24,6 +24,54 @@ logger = logging.getLogger(__name__)
 T = TypeVar('T')
 
 
+def get_placement_hosts(
+    spec: ServiceSpec,
+    hosts: List[orchestrator.HostSpec],
+    draining_hosts: List[orchestrator.HostSpec]
+) -> List[HostPlacementSpec]:
+    """
+    Get the list of candidate host placement specs based on placement specifications.
+    Args:
+        spec: The service specification
+        hosts: List of available hosts
+        draining_hosts: List of hosts that are draining
+    Returns:
+        List[HostPlacementSpec]: List of host placement specs that match the placement criteria
+    """
+    if spec.placement.hosts:
+        host_specs = [
+            h for h in spec.placement.hosts
+            if h.hostname not in [dh.hostname for dh in draining_hosts]
+        ]
+    elif spec.placement.label:
+        labeled_hosts = [h for h in hosts if spec.placement.label in h.labels]
+        host_specs = [
+            HostPlacementSpec(hostname=x.hostname, network='', name='')
+            for x in labeled_hosts
+        ]
+        if spec.placement.host_pattern:
+            matching_hostnames = spec.placement.filter_matching_hostspecs(hosts)
+            host_specs = [h for h in host_specs if h.hostname in matching_hostnames]
+    elif spec.placement.host_pattern:
+        matching_hostnames = spec.placement.filter_matching_hostspecs(hosts)
+        host_specs = [
+            HostPlacementSpec(hostname=hostname, network='', name='')
+            for hostname in matching_hostnames
+        ]
+    elif (
+            spec.placement.count is not None
+            or spec.placement.count_per_host is not None
+    ):
+        host_specs = [
+            HostPlacementSpec(hostname=x.hostname, network='', name='')
+            for x in hosts
+        ]
+    else:
+        raise OrchestratorValidationError(
+            "placement spec is empty: no hosts, no label, no pattern, no count")
+    return host_specs
+
+
 class DaemonPlacement(NamedTuple):
     daemon_type: str
     hostname: str
@@ -453,39 +501,16 @@ class HostAssignment(object):
         return None
 
     def get_candidates(self) -> List[DaemonPlacement]:
-        if self.spec.placement.hosts:
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=h.hostname, network=h.network, name=h.name,
-                                ports=self.ports_start)
-                for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts]
-            ]
-        elif self.spec.placement.label:
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=x.hostname, ports=self.ports_start)
-                for x in self.hosts_by_label(self.spec.placement.label)
-            ]
-            if self.spec.placement.host_pattern:
-                ls = [h for h in ls if h.hostname in self.spec.placement.filter_matching_hostspecs(self.hosts)]
-        elif self.spec.placement.host_pattern:
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=x, ports=self.ports_start)
-                for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
-            ]
-        elif (
-                self.spec.placement.count is not None
-                or self.spec.placement.count_per_host is not None
-        ):
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=x.hostname, ports=self.ports_start)
-                for x in self.hosts
-            ]
-        else:
-            raise OrchestratorValidationError(
-                "placement spec is empty: no hosts, no label, no pattern, no count")
+        host_specs = get_placement_hosts(self.spec, self.hosts, self.draining_hosts)
+
+        ls = [
+            DaemonPlacement(daemon_type=self.primary_daemon_type,
+                            hostname=h.hostname,
+                            network=h.network,
+                            name=h.name,
+                            ports=self.ports_start)
+            for h in host_specs
+        ]
 
         # allocate an IP?
         if self.host_selector:
index 26d6762df5ed971ec29e458065767ee2b12b9693..a8478858751b659133a26ed75d2c67f89cd2425c 100644 (file)
@@ -1161,6 +1161,14 @@ class CephadmServe:
                     only_kmip_updated = all(s.startswith('kmip') for s in list(sym_diff))
                     if not only_kmip_updated:
                         action = 'redeploy'
+            elif dd.daemon_type == 'haproxy':
+                if spec and hasattr(spec, 'backend_service'):
+                    backend_spec = self.mgr.spec_store[spec.backend_service].spec
+                    if backend_spec.service_type == 'nfs':
+                        svc = service_registry.get_service('ingress')
+                        if svc.has_placement_changed(deps, spec):
+                            self.log.debug(f'Redeploy {spec.service_name()} as placement has changed')
+                            action = 'redeploy'
             elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
                 self.log.debug(
                     f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
index beda0f4fd90281c67e009892e5c02045eb60b5da..03488933b0cad051146a15ff9edc52c08258228c 100644 (file)
@@ -834,6 +834,9 @@ class CephadmService(metaclass=ABCMeta):
     def get_blocking_daemon_hosts(self, service_name: str) -> List[HostSpec]:
         return []
 
+    def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool:
+        return False
+
 
 class CephService(CephadmService):
 
index 01febfe7a708d280d2594a0b71a79950b4c1ddd6..e4bd677b304c0eea20428d2c2b668f9f1658a868 100644 (file)
@@ -11,6 +11,7 @@ from orchestrator import OrchestratorError, DaemonDescription
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
 from .service_registry import register_cephadm_service
 from cephadm.tlsobject_types import TLSCredentials
+from cephadm.schedule import get_placement_hosts
 
 if TYPE_CHECKING:
     from ..module import CephadmOrchestrator
@@ -121,7 +122,10 @@ class IngressService(CephService):
             if ssl_cert_key:
                 assert isinstance(ssl_cert_key, str)
                 deps.append(f'ssl-cert-key:{str(utils.md5_hash(ssl_cert_key))}')
-
+        backend_spec = mgr.spec_store[ingress_spec.backend_service].spec
+        if backend_spec.service_type == 'nfs':
+            hosts = get_placement_hosts(spec, mgr.cache.get_schedulable_hosts(), mgr.cache.get_draining_hosts())
+            deps.append(f'placement_hosts:{",".join(sorted(h.hostname for h in hosts))}')
         return sorted(deps)
 
     def haproxy_generate_config(
@@ -150,6 +154,7 @@ class IngressService(CephService):
         if spec.monitor_password:
             password = spec.monitor_password
 
+        peer_hosts = {}
         if backend_spec.service_type == 'nfs':
             mode = 'tcp'
             # we need to get the nfs daemon with the highest rank_generation for
@@ -202,6 +207,18 @@ class IngressService(CephService):
                         'ip': '0.0.0.0',
                         'port': 0,
                     })
+            # Get peer hosts for haproxy active-active configuration using placement hosts
+            hosts = get_placement_hosts(
+                spec,
+                self.mgr.cache.get_schedulable_hosts(),
+                self.mgr.cache.get_draining_hosts()
+            )
+            if hosts:
+                for host in hosts:
+                    peer_ip = self.mgr.inventory.get_addr(host.hostname)
+                    peer_hosts[host.hostname] = peer_ip
+                logger.debug(f"HAProxy peer hosts for {spec.service_name()}: {peer_hosts}")
+
         else:
             mode = 'tcp' if spec.use_tcp_mode_over_rgw else 'http'
             servers = [
@@ -257,6 +274,7 @@ class IngressService(CephService):
                 'health_check_interval': spec.health_check_interval or '2s',
                 'v4v6_flag': v4v6_flag,
                 'monitor_ssl_file': monitor_ssl_file,
+                'peer_hosts': peer_hosts,
             }
         )
         config_files = {
@@ -509,3 +527,24 @@ class IngressService(CephService):
             if not monitor_addr:
                 logger.debug(f"No IP address found in the network {spec.monitor_networks} on host {host}.")
         return monitor_addr, monitor_port
+
+    def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool:
+        """Check if placement hosts have changed"""
+        def extract_hosts(deps: List[str]) -> List[str]:
+            for dep in deps:
+                if dep.startswith('placement_hosts:'):
+                    host_string = dep.split(':', 1)[1]
+                    return host_string.split(',') if host_string else []
+            return []
+
+        hosts = extract_hosts(deps)
+        current_hosts = get_placement_hosts(
+            spec,
+            self.mgr.cache.get_schedulable_hosts(),
+            self.mgr.cache.get_draining_hosts()
+        )
+        current_hosts = sorted(h.hostname for h in current_hosts)
+        if current_hosts != hosts:
+            logger.debug(f'Placement has changed for {spec.service_name()} from {hosts} -> {current_hosts}')
+            return True
+        return False
index 6ced000389789933a3055d30222f0ada12a26f5d..67658d5522bfc63ce4907d072f6e4a8e38212d5a 100644 (file)
@@ -74,6 +74,13 @@ frontend frontend
 {% endif %}
     default_backend backend
 
+{% if backend_spec.service_type == 'nfs' and peer_hosts %}
+peers haproxy_peers
+    {% for hostname, ip in peer_hosts.items() %}
+     peer {{ hostname }} {{ ip }}:1024
+    {% endfor %}
+
+{% endif %}
 backend backend
 {% if mode == 'http' %}
     option forwardfor
@@ -87,6 +94,11 @@ backend backend
 {% if mode == 'tcp' %}
     mode        tcp
     balance     roundrobin
+    {% if backend_spec.service_type == 'nfs' %}
+    stick-table type ip size 200k expire 30m peers haproxy_peers
+    stick on src
+    {% endif %}
+    hash-type   consistent
 {% if spec.use_tcp_mode_over_rgw %}
 {% if backend_spec.ssl %}
     option ssl-hello-chk
index 436dee7909fedd1ce2d11a599cbbc7375f11d551..65a9869012eb9dc46b9da24f473739727fce4577 100644 (file)
@@ -2737,7 +2737,9 @@ class TestIngressService:
             monitor_password='12345',
             keepalived_password='12345',
             enable_haproxy_protocol=enable_haproxy_protocol,
-            enable_stats=True
+            enable_stats=True,
+            placement=PlacementSpec(
+                hosts=['host1'])
         )
 
         cephadm_module.spec_store._specs = {
@@ -2785,9 +2787,14 @@ class TestIngressService:
             '    bind 192.168.122.100:2049\n'
             '    option tcplog\n'
             '    default_backend backend\n\n'
+            'peers haproxy_peers\n'
+            '     peer host1 host1:1024\n\n'
             'backend backend\n'
             '    mode        tcp\n'
             '    balance     roundrobin\n'
+            '    stick-table type ip size 200k expire 30m peers haproxy_peers\n'
+            '    stick on src\n'
+            '    hash-type   consistent\n'
         )
         if enable_haproxy_protocol:
             haproxy_txt += '    default-server send-proxy-v2\n'
@@ -3119,7 +3126,8 @@ class TestIngressService:
                                 monitor_password='12345',
                                 virtual_interface_networks=['1.2.3.0/24'],
                                 virtual_ip="1.2.3.4/32",
-                                use_tcp_mode_over_rgw=True)
+                                use_tcp_mode_over_rgw=True,
+                                enable_stats=True)
             with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
                 # generate the haproxy conf based on the specified spec
                 haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
@@ -3637,7 +3645,10 @@ class TestIngressService:
             monitor_password='12345',
             keepalived_password='12345',
             enable_haproxy_protocol=True,
-            enable_stats=True
+            enable_stats=True,
+            placement=PlacementSpec(
+                count=1,
+                hosts=['host1', 'host2']),
         )
 
         cephadm_module.spec_store._specs = {
@@ -3681,9 +3692,15 @@ class TestIngressService:
             '    bind 192.168.122.100:2049\n'
             '    option tcplog\n'
             '    default_backend backend\n\n'
+            'peers haproxy_peers\n'
+            '     peer host1 192.168.122.111:1024\n'
+            '     peer host2 192.168.122.222:1024\n\n'
             'backend backend\n'
             '    mode        tcp\n'
             '    balance     roundrobin\n'
+            '    stick-table type ip size 200k expire 30m peers haproxy_peers\n'
+            '    stick on src\n'
+            '    hash-type   consistent\n'
             '    default-server send-proxy-v2\n'
             '    server nfs.foo.0 192.168.122.111:12049 check\n'
         )
@@ -3866,6 +3883,10 @@ class TestIngressService:
             monitor_password='12345',
             keepalived_password='12345',
             enable_haproxy_protocol=True,
+            placement=PlacementSpec(
+                count=1,
+                hosts=['host1', 'host2']),
+
         )
         cephadm_module.spec_store._specs = {
             'nfs.foo': nfs_service,