)
import orchestrator
-from ceph.deployment.service_spec import ServiceSpec
+from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec
from orchestrator._interface import DaemonDescription
from orchestrator import OrchestratorValidationError
from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
T = TypeVar('T')
+def get_placement_hosts(
+ spec: ServiceSpec,
+ hosts: List[orchestrator.HostSpec],
+ draining_hosts: List[orchestrator.HostSpec]
+) -> List[HostPlacementSpec]:
+ """
+ Get the list of candidate host placement specs based on placement specifications.
+ Args:
+ spec: The service specification
+ hosts: List of available hosts
+ draining_hosts: List of hosts that are draining
+ Returns:
+ List[HostPlacementSpec]: List of host placement specs that match the placement criteria
+ """
+ if spec.placement.hosts:
+ host_specs = [
+ h for h in spec.placement.hosts
+ if h.hostname not in [dh.hostname for dh in draining_hosts]
+ ]
+ elif spec.placement.label:
+ labeled_hosts = [h for h in hosts if spec.placement.label in h.labels]
+ host_specs = [
+ HostPlacementSpec(hostname=x.hostname, network='', name='')
+ for x in labeled_hosts
+ ]
+ if spec.placement.host_pattern:
+ matching_hostnames = spec.placement.filter_matching_hostspecs(hosts)
+ host_specs = [h for h in host_specs if h.hostname in matching_hostnames]
+ elif spec.placement.host_pattern:
+ matching_hostnames = spec.placement.filter_matching_hostspecs(hosts)
+ host_specs = [
+ HostPlacementSpec(hostname=hostname, network='', name='')
+ for hostname in matching_hostnames
+ ]
+ elif (
+ spec.placement.count is not None
+ or spec.placement.count_per_host is not None
+ ):
+ host_specs = [
+ HostPlacementSpec(hostname=x.hostname, network='', name='')
+ for x in hosts
+ ]
+ else:
+ raise OrchestratorValidationError(
+ "placement spec is empty: no hosts, no label, no pattern, no count")
+ return host_specs
+
+
class DaemonPlacement(NamedTuple):
daemon_type: str
hostname: str
return None
def get_candidates(self) -> List[DaemonPlacement]:
- if self.spec.placement.hosts:
- ls = [
- DaemonPlacement(daemon_type=self.primary_daemon_type,
- hostname=h.hostname, network=h.network, name=h.name,
- ports=self.ports_start)
- for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts]
- ]
- elif self.spec.placement.label:
- ls = [
- DaemonPlacement(daemon_type=self.primary_daemon_type,
- hostname=x.hostname, ports=self.ports_start)
- for x in self.hosts_by_label(self.spec.placement.label)
- ]
- if self.spec.placement.host_pattern:
- ls = [h for h in ls if h.hostname in self.spec.placement.filter_matching_hostspecs(self.hosts)]
- elif self.spec.placement.host_pattern:
- ls = [
- DaemonPlacement(daemon_type=self.primary_daemon_type,
- hostname=x, ports=self.ports_start)
- for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
- ]
- elif (
- self.spec.placement.count is not None
- or self.spec.placement.count_per_host is not None
- ):
- ls = [
- DaemonPlacement(daemon_type=self.primary_daemon_type,
- hostname=x.hostname, ports=self.ports_start)
- for x in self.hosts
- ]
- else:
- raise OrchestratorValidationError(
- "placement spec is empty: no hosts, no label, no pattern, no count")
+ host_specs = get_placement_hosts(self.spec, self.hosts, self.draining_hosts)
+
+ ls = [
+ DaemonPlacement(daemon_type=self.primary_daemon_type,
+ hostname=h.hostname,
+ network=h.network,
+ name=h.name,
+ ports=self.ports_start)
+ for h in host_specs
+ ]
# allocate an IP?
if self.host_selector:
only_kmip_updated = all(s.startswith('kmip') for s in list(sym_diff))
if not only_kmip_updated:
action = 'redeploy'
+ elif dd.daemon_type == 'haproxy':
+ if spec and hasattr(spec, 'backend_service'):
+ backend_spec = self.mgr.spec_store[spec.backend_service].spec
+ if backend_spec.service_type == 'nfs':
+ svc = service_registry.get_service('ingress')
+ if svc.has_placement_changed(deps, spec):
+ self.log.debug(f'Redeploy {spec.service_name()} as placement has changed')
+ action = 'redeploy'
elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
self.log.debug(
f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
def get_blocking_daemon_hosts(self, service_name: str) -> List[HostSpec]:
return []
+ def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool:
+ return False
+
class CephService(CephadmService):
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
from .service_registry import register_cephadm_service
from cephadm.tlsobject_types import TLSCredentials
+from cephadm.schedule import get_placement_hosts
if TYPE_CHECKING:
from ..module import CephadmOrchestrator
if ssl_cert_key:
assert isinstance(ssl_cert_key, str)
deps.append(f'ssl-cert-key:{str(utils.md5_hash(ssl_cert_key))}')
-
+ backend_spec = mgr.spec_store[ingress_spec.backend_service].spec
+ if backend_spec.service_type == 'nfs':
+ hosts = get_placement_hosts(spec, mgr.cache.get_schedulable_hosts(), mgr.cache.get_draining_hosts())
+ deps.append(f'placement_hosts:{",".join(sorted(h.hostname for h in hosts))}')
return sorted(deps)
def haproxy_generate_config(
if spec.monitor_password:
password = spec.monitor_password
+ peer_hosts = {}
if backend_spec.service_type == 'nfs':
mode = 'tcp'
# we need to get the nfs daemon with the highest rank_generation for
'ip': '0.0.0.0',
'port': 0,
})
+ # Get peer hosts for haproxy active-active configuration using placement hosts
+ hosts = get_placement_hosts(
+ spec,
+ self.mgr.cache.get_schedulable_hosts(),
+ self.mgr.cache.get_draining_hosts()
+ )
+ if hosts:
+ for host in hosts:
+ peer_ip = self.mgr.inventory.get_addr(host.hostname)
+ peer_hosts[host.hostname] = peer_ip
+ logger.debug(f"HAProxy peer hosts for {spec.service_name()}: {peer_hosts}")
+
else:
mode = 'tcp' if spec.use_tcp_mode_over_rgw else 'http'
servers = [
'health_check_interval': spec.health_check_interval or '2s',
'v4v6_flag': v4v6_flag,
'monitor_ssl_file': monitor_ssl_file,
+ 'peer_hosts': peer_hosts,
}
)
config_files = {
if not monitor_addr:
logger.debug(f"No IP address found in the network {spec.monitor_networks} on host {host}.")
return monitor_addr, monitor_port
+
+ def has_placement_changed(self, deps: List[str], spec: ServiceSpec) -> bool:
+ """Check if placement hosts have changed"""
+ def extract_hosts(deps: List[str]) -> List[str]:
+ for dep in deps:
+ if dep.startswith('placement_hosts:'):
+ host_string = dep.split(':', 1)[1]
+ return host_string.split(',') if host_string else []
+ return []
+
+ hosts = extract_hosts(deps)
+ current_hosts = get_placement_hosts(
+ spec,
+ self.mgr.cache.get_schedulable_hosts(),
+ self.mgr.cache.get_draining_hosts()
+ )
+ current_hosts = sorted(h.hostname for h in current_hosts)
+ if current_hosts != hosts:
+ logger.debug(f'Placement has changed for {spec.service_name()} from {hosts} -> {current_hosts}')
+ return True
+ return False
{% endif %}
default_backend backend
+{% if backend_spec.service_type == 'nfs' and peer_hosts %}
+peers haproxy_peers
+ {% for hostname, ip in peer_hosts.items() %}
+ peer {{ hostname }} {{ ip }}:1024
+ {% endfor %}
+
+{% endif %}
backend backend
{% if mode == 'http' %}
option forwardfor
{% if mode == 'tcp' %}
mode tcp
balance roundrobin
+ {% if backend_spec.service_type == 'nfs' %}
+ stick-table type ip size 200k expire 30m peers haproxy_peers
+ stick on src
+ {% endif %}
+ hash-type consistent
{% if spec.use_tcp_mode_over_rgw %}
{% if backend_spec.ssl %}
option ssl-hello-chk
monitor_password='12345',
keepalived_password='12345',
enable_haproxy_protocol=enable_haproxy_protocol,
- enable_stats=True
+ enable_stats=True,
+ placement=PlacementSpec(
+ hosts=['host1'])
)
cephadm_module.spec_store._specs = {
' bind 192.168.122.100:2049\n'
' option tcplog\n'
' default_backend backend\n\n'
+ 'peers haproxy_peers\n'
+ ' peer host1 host1:1024\n\n'
'backend backend\n'
' mode tcp\n'
' balance roundrobin\n'
+ ' stick-table type ip size 200k expire 30m peers haproxy_peers\n'
+ ' stick on src\n'
+ ' hash-type consistent\n'
)
if enable_haproxy_protocol:
haproxy_txt += ' default-server send-proxy-v2\n'
monitor_password='12345',
virtual_interface_networks=['1.2.3.0/24'],
virtual_ip="1.2.3.4/32",
- use_tcp_mode_over_rgw=True)
+ use_tcp_mode_over_rgw=True,
+ enable_stats=True)
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
# generate the haproxy conf based on the specified spec
haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
monitor_password='12345',
keepalived_password='12345',
enable_haproxy_protocol=True,
- enable_stats=True
+ enable_stats=True,
+ placement=PlacementSpec(
+ count=1,
+ hosts=['host1', 'host2']),
)
cephadm_module.spec_store._specs = {
' bind 192.168.122.100:2049\n'
' option tcplog\n'
' default_backend backend\n\n'
+ 'peers haproxy_peers\n'
+ ' peer host1 192.168.122.111:1024\n'
+ ' peer host2 192.168.122.222:1024\n\n'
'backend backend\n'
' mode tcp\n'
' balance roundrobin\n'
+ ' stick-table type ip size 200k expire 30m peers haproxy_peers\n'
+ ' stick on src\n'
+ ' hash-type consistent\n'
' default-server send-proxy-v2\n'
' server nfs.foo.0 192.168.122.111:12049 check\n'
)
monitor_password='12345',
keepalived_password='12345',
enable_haproxy_protocol=True,
+ placement=PlacementSpec(
+ count=1,
+ hosts=['host1', 'host2']),
+
)
cephadm_module.spec_store._specs = {
'nfs.foo': nfs_service,