]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: Add stick table and haproxy peers in haproxy.cfg for NFS to support...
authorShweta Bhosale <Shweta.Bhosale1@ibm.com>
Mon, 15 Sep 2025 16:16:21 +0000 (21:46 +0530)
committerShweta Bhosale <shbhosal@redhat.com>
Mon, 13 Oct 2025 15:49:05 +0000 (15:49 +0000)
Fixes: https://tracker.ceph.com/issues/72906
Signed-off-by: Shweta Bhosale <Shweta.Bhosale1@ibm.com>
Resolves: rhbz#2388477

 Conflicts:
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py

src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/ingress.py
src/pybind/mgr/cephadm/templates/services/ingress/haproxy.cfg.j2
src/pybind/mgr/cephadm/tests/test_services.py

index 0d0b59da56c8151411980914fb51f6a77ddd9a90..7d43bffa25fab27b878ac32f024e66d99414784f 100644 (file)
@@ -16,7 +16,7 @@ from typing import (
 )
 
 import orchestrator
-from ceph.deployment.service_spec import ServiceSpec
+from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec
 from orchestrator._interface import DaemonDescription
 from orchestrator import OrchestratorValidationError
 from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
@@ -25,6 +25,60 @@ logger = logging.getLogger(__name__)
 T = TypeVar('T')
 
 
+def get_placement_hosts(
+    spec: ServiceSpec,
+    hosts: List[orchestrator.HostSpec],
+    draining_hosts: List[orchestrator.HostSpec]
+) -> List[HostPlacementSpec]:
+    """
+    Get the list of candidate host placement specs based on placement specifications.
+    Args:
+        spec: The service specification
+        hosts: List of available hosts
+        draining_hosts: List of hosts that are draining
+    Returns:
+        List[HostPlacementSpec]: List of host placement specs that match the placement criteria
+    """
+    if spec.placement.hosts:
+        host_specs = [
+            h for h in spec.placement.hosts
+            if h.hostname not in [dh.hostname for dh in draining_hosts]
+        ]
+    elif spec.placement.label:
+        labeled_hosts = [h for h in hosts if spec.placement.label in h.labels]
+        host_specs = [
+            HostPlacementSpec(hostname=x.hostname, network='', name='')
+            for x in labeled_hosts
+        ]
+        if spec.placement.host_pattern:
+            matching_hostnames = spec.placement.filter_matching_hostspecs(hosts)
+            host_specs = [h for h in host_specs if h.hostname in matching_hostnames]
+    elif spec.placement.labels:
+        host_specs = []
+        for label in spec.placement.labels:
+            for host in [h for h in hosts if label in h.labels]:
+                if host.hostname not in [h.hostname for h in host_specs]:
+                    host_specs.append(HostPlacementSpec(hostname=host.hostname, network='', name=''))
+    elif spec.placement.host_pattern:
+        matching_hostnames = spec.placement.filter_matching_hostspecs(hosts)
+        host_specs = [
+            HostPlacementSpec(hostname=hostname, network='', name='')
+            for hostname in matching_hostnames
+        ]
+    elif (
+            spec.placement.count is not None
+            or spec.placement.count_per_host is not None
+    ):
+        host_specs = [
+            HostPlacementSpec(hostname=x.hostname, network='', name='')
+            for x in hosts
+        ]
+    else:
+        raise OrchestratorValidationError(
+            "placement spec is empty: no hosts, no label, no pattern, no count")
+    return host_specs
+
+
 class DaemonPlacement(NamedTuple):
     daemon_type: str
     hostname: str
@@ -491,46 +545,16 @@ class HostAssignment(object):
         return None
 
     def get_candidates(self) -> List[DaemonPlacement]:
-        if self.spec.placement.hosts:
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=h.hostname, network=h.network, name=h.name,
-                                ports=self.ports_start)
-                for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts]
-            ]
-        elif self.spec.placement.label:
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=x.hostname, ports=self.ports_start)
-                for x in self.hosts_by_label(self.spec.placement.label)
-            ]
-            if self.spec.placement.host_pattern:
-                ls = [h for h in ls if h.hostname in self.spec.placement.filter_matching_hostspecs(self.hosts)]
-        elif self.spec.placement.labels:
-            ls = []
-            for label in self.spec.placement.labels:
-                for host in self.hosts_by_label(label):
-                    if host.hostname not in [h.hostname for h in ls]:
-                        ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                                  hostname=host.hostname, ports=self.ports_start))
-        elif self.spec.placement.host_pattern:
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=x, ports=self.ports_start)
-                for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
-            ]
-        elif (
-                self.spec.placement.count is not None
-                or self.spec.placement.count_per_host is not None
-        ):
-            ls = [
-                DaemonPlacement(daemon_type=self.primary_daemon_type,
-                                hostname=x.hostname, ports=self.ports_start)
-                for x in self.hosts
-            ]
-        else:
-            raise OrchestratorValidationError(
-                "placement spec is empty: no hosts, no label, no pattern, no count")
+        host_specs = get_placement_hosts(self.spec, self.hosts, self.draining_hosts)
+
+        ls = [
+            DaemonPlacement(daemon_type=self.primary_daemon_type,
+                            hostname=h.hostname,
+                            network=h.network,
+                            name=h.name,
+                            ports=self.ports_start)
+            for h in host_specs
+        ]
 
         # allocate an IP?
         if self.host_selector:
index 8e0dcc76995a8b4cfc41ef04fe65eaed03678f41..59ebd49fd497eae8e386e1ff5e19d0f100aca618 100644 (file)
@@ -1384,6 +1384,14 @@ class CephadmServe:
                     else:
                         skip_restart_for_reconfig = True
                         send_signal_to_daemon = 'SIGHUP'
+            elif dd.daemon_type == 'haproxy':
+                if spec and hasattr(spec, 'backend_service'):
+                    backend_spec = self.mgr.spec_store[spec.backend_service].spec
+                    if backend_spec.service_type == 'nfs':
+                        svc = service_registry.get_service('ingress')
+                        if svc.has_placement_changed(deps, spec):
+                            self.log.debug(f'Redeploy {spec.service_name()} as placement has changed')
+                            action = 'redeploy'
             elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
                 self.log.debug(
                     f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
index 6f58bfad48fbcd659d05d229d1544839f930bec5..c681a55595d63a32b7e4d0e72c627814a4ae314a 100644 (file)
@@ -849,6 +849,9 @@ class CephadmService(metaclass=ABCMeta):
     def pre_daemon_service_config(self, spec: ServiceSpec) -> None:
         return
 
+    def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool:
+        return False
+
 
 class CephService(CephadmService):
 
index ecfafd8a9392f209ff683a8c5fc89c174542505f..33825e6ea593ca42f74c30a48d04bede5883c33c 100644 (file)
@@ -11,6 +11,7 @@ from orchestrator import OrchestratorError, DaemonDescription
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
 from .service_registry import register_cephadm_service
 from cephadm.tlsobject_types import TLSCredentials
+from cephadm.schedule import get_placement_hosts
 
 if TYPE_CHECKING:
     from ..module import CephadmOrchestrator
@@ -136,7 +137,10 @@ class IngressService(CephService):
             if ssl_cert_key:
                 assert isinstance(ssl_cert_key, str)
                 deps.append(f'ssl-cert-key:{str(utils.md5_hash(ssl_cert_key))}')
-
+        backend_spec = mgr.spec_store[ingress_spec.backend_service].spec
+        if backend_spec.service_type == 'nfs':
+            hosts = get_placement_hosts(spec, mgr.cache.get_schedulable_hosts(), mgr.cache.get_draining_hosts())
+            deps.append(f'placement_hosts:{",".join(sorted(h.hostname for h in hosts))}')
         return sorted(deps)
 
     def haproxy_generate_config(
@@ -165,6 +169,7 @@ class IngressService(CephService):
         if spec.monitor_password:
             password = spec.monitor_password
 
+        peer_hosts = {}
         if backend_spec.service_type == 'nfs':
             mode = 'tcp'
             # we need to get the nfs daemon with the highest rank_generation for
@@ -217,6 +222,18 @@ class IngressService(CephService):
                         'ip': '0.0.0.0',
                         'port': 0,
                     })
+            # Get peer hosts for haproxy active-active configuration using placement hosts
+            hosts = get_placement_hosts(
+                spec,
+                self.mgr.cache.get_schedulable_hosts(),
+                self.mgr.cache.get_draining_hosts()
+            )
+            if hosts:
+                for host in hosts:
+                    peer_ip = self.mgr.inventory.get_addr(host.hostname)
+                    peer_hosts[host.hostname] = peer_ip
+                logger.debug(f"HAProxy peer hosts for {spec.service_name()}: {peer_hosts}")
+
         else:
             mode = 'tcp' if spec.use_tcp_mode_over_rgw else 'http'
             servers = [
@@ -273,6 +290,7 @@ class IngressService(CephService):
                 'v4v6_flag': v4v6_flag,
                 'qat_support': spec.haproxy_qat_support,
                 'monitor_ssl_file': monitor_ssl_file,
+                'peer_hosts': peer_hosts,
             }
         )
         final_config: Dict[str, Any] = {
@@ -541,3 +559,24 @@ class IngressService(CephService):
             if not monitor_addr:
                 logger.debug(f"No IP address found in the network {spec.monitor_networks} on host {host}.")
         return monitor_addr, monitor_port
+
+    def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool:
+        """Check if placement hosts have changed"""
+        def extract_hosts(deps: List[str]) -> List[str]:
+            for dep in deps:
+                if dep.startswith('placement_hosts:'):
+                    host_string = dep.split(':', 1)[1]
+                    return host_string.split(',') if host_string else []
+            return []
+
+        hosts = extract_hosts(deps)
+        current_hosts = get_placement_hosts(
+            spec,
+            self.mgr.cache.get_schedulable_hosts(),
+            self.mgr.cache.get_draining_hosts()
+        )
+        current_hosts = sorted(h.hostname for h in current_hosts)
+        if current_hosts != hosts:
+            logger.debug(f'Placement has changed for {spec.service_name()} from {hosts} -> {current_hosts}')
+            return True
+        return False
index 6e6c076dac6c7b3cf812504668cc8b7c3e006e2b..55cb1b4b69b65ae919012a7ddae3906cc4941d16 100644 (file)
@@ -78,6 +78,13 @@ frontend frontend
 {% endif %}
     default_backend backend
 
+{% if backend_spec.service_type == 'nfs' and peer_hosts %}
+peers haproxy_peers
+    {% for hostname, ip in peer_hosts.items() %}
+     peer {{ hostname }} {{ ip }}:1024
+    {% endfor %}
+
+{% endif %}
 backend backend
 {% if mode == 'http' %}
     option forwardfor
@@ -95,6 +102,11 @@ backend backend
 {% if mode == 'tcp' %}
     mode        tcp
     balance     roundrobin
+    {% if backend_spec.service_type == 'nfs' %}
+    stick-table type ip size 200k expire 30m peers haproxy_peers
+    stick on src
+    {% endif %}
+    hash-type   consistent
 {% if spec.use_tcp_mode_over_rgw %}
 {% if backend_spec.ssl %}
     option ssl-hello-chk
index f9e9a8bff99c00753fc13c6904c4f20dff39e236..b08b8e96acd2f9b9f13e0728a27832ed9af37b86 100644 (file)
@@ -2728,7 +2728,9 @@ class TestIngressService:
             monitor_password='12345',
             keepalived_password='12345',
             enable_haproxy_protocol=enable_haproxy_protocol,
-            enable_stats=True
+            enable_stats=True,
+            placement=PlacementSpec(
+                hosts=['host1'])
         )
 
         cephadm_module.spec_store._specs = {
@@ -2776,9 +2778,14 @@ class TestIngressService:
             '    bind 192.168.122.100:2049\n'
             '    option tcplog\n'
             '    default_backend backend\n\n'
+            'peers haproxy_peers\n'
+            '     peer host1 host1:1024\n\n'
             'backend backend\n'
             '    mode        tcp\n'
             '    balance     roundrobin\n'
+            '    stick-table type ip size 200k expire 30m peers haproxy_peers\n'
+            '    stick on src\n'
+            '    hash-type   consistent\n'
         )
         if enable_haproxy_protocol:
             haproxy_txt += '    default-server send-proxy-v2\n'
@@ -3110,7 +3117,8 @@ class TestIngressService:
                                 monitor_password='12345',
                                 virtual_interface_networks=['1.2.3.0/24'],
                                 virtual_ip="1.2.3.4/32",
-                                use_tcp_mode_over_rgw=True)
+                                use_tcp_mode_over_rgw=True,
+                                enable_stats=True)
             with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
                 # generate the haproxy conf based on the specified spec
                 haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
@@ -3627,7 +3635,10 @@ class TestIngressService:
             monitor_password='12345',
             keepalived_password='12345',
             enable_haproxy_protocol=True,
-            enable_stats=True
+            enable_stats=True,
+            placement=PlacementSpec(
+                count=1,
+                hosts=['host1', 'host2']),
         )
 
         cephadm_module.spec_store._specs = {
@@ -3671,9 +3682,15 @@ class TestIngressService:
             '    bind 192.168.122.100:2049\n'
             '    option tcplog\n'
             '    default_backend backend\n\n'
+            'peers haproxy_peers\n'
+            '     peer host1 192.168.122.111:1024\n'
+            '     peer host2 192.168.122.222:1024\n\n'
             'backend backend\n'
             '    mode        tcp\n'
             '    balance     roundrobin\n'
+            '    stick-table type ip size 200k expire 30m peers haproxy_peers\n'
+            '    stick on src\n'
+            '    hash-type   consistent\n'
             '    default-server send-proxy-v2\n'
             '    server nfs.foo.0 192.168.122.111:12049 check\n'
         )
@@ -3856,6 +3873,10 @@ class TestIngressService:
             monitor_password='12345',
             keepalived_password='12345',
             enable_haproxy_protocol=True,
+            placement=PlacementSpec(
+                count=1,
+                hosts=['host1', 'host2']),
+
         )
         cephadm_module.spec_store._specs = {
             'nfs.foo': nfs_service,