from ceph.deployment.utils import unwrap_ipv6, valid_addr, verify_non_negative_int
from ceph.deployment.utils import verify_positive_int, verify_non_negative_number
from ceph.deployment.utils import verify_boolean, verify_enum, verify_int
-from ceph.deployment.utils import parse_combined_pem_file
+from ceph.deployment.utils import parse_combined_pem_file, validate_port, validate_unique_ports
from ceph.cephadm.d3n_types import D3NCacheSpec, D3NCacheError
from ceph.utils import is_hex
from ceph.smb import constants as smbconst
is_nfs_backend = bool(
self.backend_service and self.backend_service.split('.')[0] == 'nfs'
)
- if self.haproxy_peer_communication_port or is_nfs_backend:
+ if self.haproxy_peer_communication_port is not None or is_nfs_backend:
ports.append(cast(int, self.haproxy_peer_communication_port) or 1024)
return ports
raise SpecValidationError(
f'Cannot add ingress: Invalid health_check_interval specified. '
f'Valid units are: {valid_units}')
- if self.haproxy_peer_communication_port and self.backend_service.split('.')[0] != 'nfs':
+ if (
+ self.haproxy_peer_communication_port is not None
+ and self.backend_service.split('.')[0] != 'nfs'
+ ):
raise SpecValidationError(
'The haproxy_peer_communication_port is valid only for NFS backend.'
)
+ for port_val, fname in (
+ (self.frontend_port, 'frontend_port'),
+ (self.monitor_port, 'monitor_port'),
+ (self.haproxy_peer_communication_port, 'haproxy_peer_communication_port'),
+ ):
+ if port_val is not None:
+ validate_port(port_val, fname)
+ validate_unique_ports(self.get_port_start())
+
# validate SSL parametes
if self.monitor_ssl:
if not self.ssl:
def validate(self) -> None:
super(MgmtGatewaySpec, self).validate()
- self._validate_port(self.port)
+ validate_port(self.port, 'port')
self._validate_certificate(self.ssl_cert, "ssl_cert")
self._validate_private_key(self.ssl_key, "ssl_key")
self._validate_boolean_switch(self.ssl_prefer_server_ciphers, "ssl_prefer_server_ciphers")
self._validate_boolean_switch(self.ssl_stapling_verify, "ssl_stapling_verify")
self._validate_ssl_protocols(self.ssl_protocols)
- def _validate_port(self, port: Optional[int]) -> None:
- if port is not None and not (1 <= port <= 65535):
- raise SpecValidationError(f"Invalid port: {port}. Must be between 1 and 65535.")
-
def _validate_certificate(self, cert: Optional[str], name: str) -> None:
if cert is not None and not isinstance(cert, str):
raise SpecValidationError(f"Invalid {name}. Must be a string.")
import ipaddress
import socket
-from typing import Tuple, Optional, Any
+from typing import Tuple, Optional, Any, List
from urllib.parse import urlparse
from ceph.deployment.hostspec import SpecValidationError
from numbers import Number
if field.lower() not in allowed_lower:
raise SpecValidationError(
f'Invalid {field_name}. Valid values are: {", ".join(allowed)}')
+
+
+def validate_port(port: Optional[int], field_name: str = 'port') -> None:
+ if port is not None and not (1 <= port <= 65535):
+ raise SpecValidationError(
+ f'Invalid {field_name}: {port}. Must be between 1 and 65535.'
+ )
+
+
+def validate_unique_ports(ports: List[int]) -> None:
+ """Raise SpecValidationError if any port is used more than once"""
+ if len(ports) != len(set(ports)):
+ raise SpecValidationError(
+ 'Invalid port: Duplicate ports are not allowed'
+ )