spec = HostAssignment(
spec=spec,
get_hosts_func=self._get_hosts,
- service_type=daemon_type).load()
+ get_daemons_func=self.cache.get_daemons_by_service).load()
if len(daemons) > spec.placement.count:
# remove some
to_remove = len(daemons) - spec.placement.count
# type: (List, Optional[int]) -> List[HostPlacementSpec]
if not host_pool:
raise Exception('List of host candidates is empty')
- host_pool = [HostPlacementSpec(x, '', '') for x in host_pool]
+ host_pool = [x for x in host_pool]
# shuffle for pseudo random selection
random.shuffle(host_pool)
return host_pool[:count]
"""
def __init__(self,
- spec=None, # type: Optional[orchestrator.ServiceSpec]
+ spec, # type: orchestrator.ServiceSpec
+ get_hosts_func, # type: Callable[[Optional[str]],List[str]]
+ get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]]
+
scheduler=None, # type: Optional[BaseScheduler]
- get_hosts_func=None, # type: Optional[Callable[[Optional[str]],List[str]]]
- service_type=None, # type: Optional[str]
):
- assert spec and get_hosts_func and service_type
+ assert spec and get_hosts_func and get_daemons_func
self.spec = spec # type: orchestrator.ServiceSpec
self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
self.get_hosts_func = get_hosts_func
- self.daemon_type = service_type
+ self.get_daemons_func = get_daemons_func
+ self.service_name = spec.service_name()
def load(self):
# type: () -> orchestrator.ServiceSpec
"""
Load hosts into the spec.placement.hosts container.
"""
- self.load_labeled_hosts()
- self.assign_hosts()
+ # respect any explicit host list
+ if self.spec.placement.hosts and not self.spec.placement.count:
+ logger.info('Provided hosts: %s' % self.spec.placement.hosts)
+ if not self.spec.placement.count:
+ self.spec.placement.count = len(self.spec.placement.hosts)
+ return self.spec
+
+ # respect all_hosts=true
+ if self.spec.placement.all_hosts:
+ candidates = [
+ HostPlacementSpec(x, '', '')
+ for x in self.get_hosts_func(None)
+ ]
+ logger.info('All hosts: {}'.format(candidates))
+ self.spec.placement.set_hosts(candidates)
+ self.spec.placement.count = len(self.spec.placement.hosts)
+ return self.spec
+
+ if self.spec.placement.hosts and \
+ self.spec.placement.count and \
+ len(self.spec.placement.hosts) >= self.spec.placement.count:
+ # use the provided hosts and our candidates
+ hosts = self.spec.placement.hosts
+ else:
+ # select candidate hosts based on labels, types, etc.
+ hosts = self.pick_candidates()
+
+ if not self.spec.placement.count:
+ logger.info('Labeled hosts: {}'.format(hosts))
+ self.spec.placement.set_hosts(hosts)
+ if not self.spec.placement.count:
+ self.spec.placement.count = len(self.spec.placement.hosts)
+ return self.spec
+
+ # we need to select a subset of the candidates
+
+ # if a partial host list is provided, always start with that
+ if len(self.spec.placement.hosts) < self.spec.placement.count:
+ chosen = self.spec.placement.hosts
+ else:
+ chosen = []
+
+ # prefer hosts that already have services
+ daemons = self.get_daemons_func(self.service_name)
+ hosts_with_daemons = {d.hostname for d in daemons}
+ existing = [hs for hs in hosts
+ if hs.hostname in hosts_with_daemons]
+ if len(chosen + existing) >= self.spec.placement.count:
+ chosen = chosen + self.scheduler.place(
+ existing,
+ self.spec.placement.count - len(chosen))
+ logger.info('Hosts with existing daemons: {}'.format(chosen))
+ self.spec.placement.set_hosts(chosen)
+ if not self.spec.placement.count:
+ self.spec.placement.count = len(self.spec.placement.hosts)
+ return self.spec
+
+ need = self.spec.placement.count - len(existing + chosen)
+ others = [hs for hs in hosts
+ if hs.hostname not in hosts_with_daemons]
+ chosen = chosen + self.scheduler.place(others, need)
+ self.spec.placement.set_hosts(existing + chosen)
+ logger.info('Combine hosts with existing daemons %s + new hosts %s' % (
+ existing, chosen))
+ if not self.spec.placement.count:
+ self.spec.placement.count = len(self.spec.placement.hosts)
return self.spec
- def load_labeled_hosts(self):
- # type: () -> None
+ def pick_candidates(self):
+ # type: () -> List[orchestrator.HostPlacementSpec]
"""
- Assign hosts based on their label
+ Use the assigned scheduler to load hosts into the spec.placement.hosts
+ container
"""
+ # is there an explicit label?
if self.spec.placement.label:
- logger.info("Matching label '%s'" % self.spec.placement.label)
candidates = [
HostPlacementSpec(x, '', '')
for x in self.get_hosts_func(self.spec.placement.label)
]
- logger.info('Assigning hostss to spec: {}'.format(candidates))
- self.spec.placement.set_hosts(candidates)
-
- def assign_hosts(self):
- # type: () -> None
- """
- Use the assigned scheduler to load hosts into the spec.placement.hosts container
- """
- # If no imperative or declarative host assignments, use the scheduler to pick from the
- # host pool (assuming `count` is set)
- if not self.spec.placement.label and not self.spec.placement.hosts and self.spec.placement.count:
- logger.info("Found num spec. Looking for labeled hosts.")
- # TODO: actually query for labels (self.daemon_type)
- candidates = self.scheduler.place([x for x in self.get_hosts_func(None)],
- count=self.spec.placement.count)
- # Not enough hosts to deploy on
- if len(candidates) != self.spec.placement.count:
- logger.warning("Did not find enough labeled hosts to \
- scale to <{}> services. Falling back to unlabeled hosts.".
- format(self.spec.placement.count))
+ logger.info('Candidate hosts with label %s: %s' % (
+ self.spec.placement.label, candidates))
+ return candidates
+
+ # otherwise, start with hosts labeled for this service or type
+ candidates = [
+ HostPlacementSpec(x, '', '')
+ for x in self.get_hosts_func(self.service_name)
+ ]
+ if candidates:
+ logger.info('Candidate hosts with service label %s: %s' % (
+ self.service_name, candidates))
+ return candidates
+
+ # type label (e.g., mds)
+ candidates = [
+ HostPlacementSpec(x, '', '')
+ for x in self.get_hosts_func(self.spec.service_type)
+ ]
+ if candidates:
+ if self.spec.placement.count and \
+ len(candidates) < self.spec.placement.count:
+ # NOTE: Hmm, is this really the behavior we want?
+ logger.warning("Did not find enough labeled hosts to "
+ "scale to <{}> services. Falling back to "
+ "unlabeled hosts.".format(
+ self.spec.placement.count))
else:
- logger.info('Assigning hosts to spec: {}'.format(candidates))
- self.spec.placement.set_hosts(candidates)
- return None
-
- candidates = self.scheduler.place([x for x in self.get_hosts_func(None)], count=self.spec.placement.count)
- # Not enough hosts to deploy on
- if len(candidates) != self.spec.placement.count:
- raise OrchestratorValidationError("Cannot place {} daemons on {} hosts.".
- format(self.spec.placement.count, len(candidates)))
-
- logger.info('Assigning hosts to spec: {}'.format(candidates))
- self.spec.placement.set_hosts(candidates)
- return None
+ logger.info('Candidate hosts with service type label: %s' % (
+ candidates))
+ return candidates
+
+ # fall back to *all* hosts?
+ if not self.spec.placement.count:
+ raise OrchestratorValidationError(
+ "Cannot place service %s without any placement information" % (
+ self.service_name))
+
+ candidates = [
+ HostPlacementSpec(x, '', '')
+ for x in self.get_hosts_func(None)
+ ]
+ logger.info('Candidate hosts (all): {}'.format(candidates))
+ return candidates
--- /dev/null
+from typing import NamedTuple, List
+import pytest
+
+from cephadm.module import HostAssignment
+from orchestrator import ServiceSpec, PlacementSpec, DaemonDescription, OrchestratorValidationError
+
+
+class NodeAssignmentTest(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ daemons: List[DaemonDescription]
+ expected: List[str]
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected",
+ [
+ # just hosts
+ NodeAssignmentTest(
+ 'mon',
+ PlacementSpec(hosts=['smithi060:[v2:172.21.15.60:3301,v1:172.21.15.60:6790]=c']),
+ ['smithi060'],
+ [],
+ ['smithi060']
+ ),
+ # all_hosts
+ NodeAssignmentTest(
+ 'mon',
+ PlacementSpec(all_hosts=True),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mon', 'a', 'host1'),
+ DaemonDescription('mon', 'b', 'host2'),
+ ],
+ ['host1', 'host2', 'host3']
+ ),
+ # count + partial host list
+ NodeAssignmentTest(
+ 'mon',
+ PlacementSpec(count=3, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mon', 'a', 'host1'),
+ DaemonDescription('mon', 'b', 'host2'),
+ ],
+ ['host1', 'host2', 'host3']
+ ),
+ # count + partial host list + existing
+ NodeAssignmentTest(
+ 'mon',
+ PlacementSpec(count=2, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mon', 'a', 'host1'),
+ ],
+ ['host1', 'host3']
+ ),
+ # label only
+ NodeAssignmentTest(
+ 'mon',
+ PlacementSpec(label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ ['host1', 'host2', 'host3']
+ ),
+ ])
+def test_node_assignment(service_type, placement, hosts, daemons, expected):
+ s = HostAssignment(spec=ServiceSpec(service_type, placement=placement),
+ get_hosts_func=lambda _: hosts,
+ get_daemons_func=lambda _: daemons).load()
+ assert sorted([h.hostname for h in s.placement.hosts]) == sorted(expected)
+
+class NodeAssignmentTest2(NamedTuple):
+ service_type: str
+ placement: PlacementSpec
+ hosts: List[str]
+ daemons: List[DaemonDescription]
+ expected_len: int
+ in_set: List[str]
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected_len,in_set",
+ [
+ # just count
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=1),
+ 'host1 host2 host3'.split(),
+ [],
+ 1,
+ ['host1', 'host2', 'host3'],
+ ),
+
+ # hosts + (smaller) count
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=1, hosts='host1 host2'.split()),
+ 'host1 host2'.split(),
+ [],
+ 1,
+ ['host1', 'host2'],
+ ),
+ # hosts + (smaller) count, existing
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=1, hosts='host1 host2 host3'.split()),
+ 'host1 host2 host3'.split(),
+ [DaemonDescription('mon', 'mon.a', 'host1'),],
+ 1,
+ ['host1', 'host2', 'host3'],
+ ),
+ # hosts + (smaller) count, (more) existing
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=1, hosts='host1 host2 host3'.split()),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mon', 'a', 'host1'),
+ DaemonDescription('mon', 'b', 'host2'),
+ ],
+ 1,
+ ['host1', 'host2']
+ ),
+ # count + partial host list
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=2, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [],
+ 2,
+ ['host1', 'host2', 'host3']
+ ),
+ # label + count
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=1, label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ 1,
+ ['host1', 'host2', 'host3']
+ ),
+ ])
+def test_node_assignment2(service_type, placement, hosts,
+ daemons, expected_len, in_set):
+ s = HostAssignment(spec=ServiceSpec(service_type, placement=placement),
+ get_hosts_func=lambda _: hosts,
+ get_daemons_func=lambda _: daemons).load()
+ assert len(s.placement.hosts) == expected_len
+ for h in [h.hostname for h in s.placement.hosts]:
+ assert h in in_set
+
+@pytest.mark.parametrize("service_type,placement,hosts,daemons,expected_len,must_have",
+ [
+ # hosts + (smaller) count, (more) existing
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=3, hosts='host3'.split()),
+ 'host1 host2 host3'.split(),
+ [],
+ 3,
+ ['host3']
+ ),
+ # count + partial host list
+ NodeAssignmentTest2(
+ 'mon',
+ PlacementSpec(count=2, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [],
+ 2,
+ ['host3']
+ ),
+ ])
+def test_node_assignment3(service_type, placement, hosts,
+ daemons, expected_len, must_have):
+ s = HostAssignment(spec=ServiceSpec(service_type, placement=placement),
+ get_hosts_func=lambda _: hosts,
+ get_daemons_func=lambda _: daemons).load()
+ assert len(s.placement.hosts) == expected_len
+ for h in must_have:
+ assert h in [h.hostname for h in s.placement.hosts]
+
+
+@pytest.mark.parametrize("placement",
+ [
+ ('1 all:true'),
+ ('all:true label:foo'),
+ ('all:true host1 host2'),
+ ])
+def test_bad_placements(placement):
+ try:
+ s = PlacementSpec.from_strings(placement.split(' '))
+ assert False
+ except OrchestratorValidationError as e:
+ pass