import hashlib
import logging
import random
-from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
+from typing import (
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ NamedTuple,
+ Optional,
+ Protocol,
+ Tuple,
+ TypeVar,
+)
import orchestrator
from ceph.deployment.service_spec import ServiceSpec
return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id
+class HostSelector(Protocol):
+ def filter_host_candidates(
+ self,
+ spec: ServiceSpec,
+ candidates: Iterable[DaemonPlacement],
+ ) -> List[DaemonPlacement]:
+ "Filter candidates for host/ip matching."
+
+
class HostAssignment(object):
def __init__(self,
per_host_daemon_type: Optional[str] = None,
rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
blocking_daemon_hosts: Optional[List[orchestrator.HostSpec]] = None,
- upgrade_in_progress: bool = False
+ upgrade_in_progress: bool = False,
+ host_selector: Optional[HostSelector] = None,
):
assert spec
self.spec = spec # type: ServiceSpec
self.ports_start = spec.get_port_start()
self.rank_map = rank_map
self.upgrade_in_progress = upgrade_in_progress
+ self.host_selector = host_selector
def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
return [h for h in self.hosts if label in h.labels]
"placement spec is empty: no hosts, no label, no pattern, no count")
# allocate an IP?
- if self.spec.networks or self.spec.ip_addrs:
+ if self.host_selector:
+ ls = self.host_selector.filter_host_candidates(self.spec, ls)
+ elif self.spec.networks or self.spec.ip_addrs:
orig = ls.copy()
ls = []
for p in orig: