from typing import Optional, List, Any, Dict
+def normalize_hostname(hostname: str) -> str:
+ """Normalize hostname to lowercase for case-insensitive matching."""
+ return hostname.lower()
+
+
def assert_valid_host(name: str) -> None:
p = re.compile('^[a-zA-Z0-9-]+$')
if len(name) > 250:
self.service_type = 'host'
#: the bare hostname on the host. Not the FQDN.
- self.hostname = hostname # type: str
+ self.hostname = normalize_hostname(hostname)
#: DNS name or IP address to reach it
- self.addr = addr or hostname # type: str
+ self.addr = addr or normalize_hostname(hostname)
#: label(s), if any
- self.labels = labels or [] # type: List[str]
+ self.labels = labels or []
#: human readable status
- self.status = status or '' # type: str
+ self.status = status or ''
self.location = location
@staticmethod
def normalize_json(host_spec: dict) -> dict:
+ if 'hostname' in host_spec:
+ host_spec['hostname'] = normalize_hostname(host_spec['hostname'])
labels = host_spec.get('labels')
if labels is not None:
if isinstance(labels, str):
import yaml
-from ceph.deployment.hostspec import HostSpec, SpecValidationError, assert_valid_host
+from ceph.deployment.hostspec import HostSpec, SpecValidationError, normalize_hostname, assert_valid_host
from ceph.deployment.utils import unwrap_ipv6, valid_addr, verify_non_negative_int
from ceph.deployment.utils import verify_positive_int, verify_non_negative_number
from ceph.deployment.utils import verify_boolean, verify_enum, verify_int
network: str
name: str
+ @classmethod
+ def normalized(cls, hostname: str, network: str = '', name: str = '') -> 'HostPlacementSpec':
+ """Create a HostPlacementSpec with normalized hostname."""
+ return cls(normalize_hostname(hostname), network, name)
+
def __str__(self) -> str:
res = ''
res += self.hostname
def from_json(cls, data: Union[dict, str]) -> 'HostPlacementSpec':
if isinstance(data, str):
return cls.parse(data)
+ # Use normalized() for consistent lowercasing
+ if isinstance(data, dict):
+ return cls.normalized(
+ data.get('hostname', ''),
+ data.get('network', ''),
+ data.get('name', '')
+ )
return cls(**data)
def to_json(self) -> str:
match_host = re.search(host_re, host)
if match_host:
- host_spec = host_spec._replace(hostname=match_host.group(1))
+ # Lowercase for case-insensitive matching
+ host_spec = host_spec._replace(hostname=normalize_hostname(match_host.group(1)))
name_match = re.search(name_re, host)
if name_match:
self.pattern_type: PatternType = pattern_type
self.compiled_regex = None
if self.pattern_type == PatternType.regex and self.pattern:
- self.compiled_regex = re.compile(self.pattern)
+ self.compiled_regex = re.compile(self.pattern, re.IGNORECASE)
def filter_hosts(self, hosts: List[str]) -> List[str]:
if not self.pattern:
return []
if not self.pattern_type or self.pattern_type == PatternType.fnmatch:
- return fnmatch.filter(hosts, self.pattern)
+ # Case-insensitive fnmatch comparison
+ pattern_lower = self.pattern.lower()
+ return [h for h in hosts if fnmatch.fnmatch(h.lower(), pattern_lower)]
elif self.pattern_type == PatternType.regex:
if not self.compiled_regex:
- self.compiled_regex = re.compile(self.pattern)
+ self.compiled_regex = re.compile(self.pattern, re.IGNORECASE)
return [h for h in hosts if re.match(self.compiled_regex, h)]
raise SpecValidationError(f'Got unexpected pattern_type: {self.pattern_type}')
def set_hosts(self, hosts: Union[List[str], List[HostPlacementSpec]]) -> None:
# To backpopulate the .hosts attribute when using labels or count
# in the orchestrator backend.
- if all([isinstance(host, HostPlacementSpec) for host in hosts]):
- self.hosts = hosts # type: ignore
+ if all(isinstance(host, HostPlacementSpec) for host in hosts):
+ # Type narrowing: all items are HostPlacementSpec
+ placement_hosts = cast(List[HostPlacementSpec], hosts)
+ self.hosts = [
+ HostPlacementSpec.normalized(h.hostname, h.network, h.name)
+ for h in placement_hosts
+ ]
else:
self.hosts = [HostPlacementSpec.parse(x, require_network=False) # type: ignore
for x in hosts if x]