from .services.osd import RemoveUtil, OSDRemoval, OSDService
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
-from .schedule import HostAssignment
+from .schedule import HostAssignment, HostPlacementSpec
from .inventory import Inventory, SpecStore, HostCache
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
# host
return len(self.cache.networks[host].get(public_network, [])) > 0
- hosts = HostAssignment(
+ ha = HostAssignment(
spec=spec,
get_hosts_func=self._get_hosts,
get_daemons_func=self.cache.get_daemons_by_service,
filter_new_host=matches_network if daemon_type == 'mon' else None,
- ).place()
+ )
+
+ hosts: List[HostPlacementSpec] = ha.place()
+ self.log.debug('Usable hosts: %s' % hosts)
r = False
# add any?
did_config = False
- hosts_with_daemons = {d.hostname for d in daemons}
- self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
- for host, network, name in hosts:
- if host not in hosts_with_daemons:
- if not did_config and config_func:
- config_func(spec)
- did_config = True
- daemon_id = self.get_unique_name(daemon_type, host, daemons,
- prefix=spec.service_id,
- forcename=name)
- self.log.debug('Placing %s.%s on host %s' % (
- daemon_type, daemon_id, host))
- if daemon_type == 'mon':
- create_func(daemon_id, host, network) # type: ignore
- elif daemon_type in ['nfs', 'iscsi']:
- create_func(daemon_id, host, spec) # type: ignore
- else:
- create_func(daemon_id, host) # type: ignore
- # add to daemon list so next name(s) will also be unique
- sd = orchestrator.DaemonDescription(
- hostname=host,
- daemon_type=daemon_type,
- daemon_id=daemon_id,
- )
- daemons.append(sd)
- r = True
+ add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
+ self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
+
+ remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
+ self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
+
+ for host, network, name in add_daemon_hosts:
+ if not did_config and config_func:
+ config_func(spec)
+ did_config = True
+ daemon_id = self.get_unique_name(daemon_type, host, daemons,
+ prefix=spec.service_id,
+ forcename=name)
+ self.log.debug('Placing %s.%s on host %s' % (
+ daemon_type, daemon_id, host))
+ if daemon_type == 'mon':
+ create_func(daemon_id, host, network) # type: ignore
+ elif daemon_type in ['nfs', 'iscsi']:
+ create_func(daemon_id, host, spec) # type: ignore
+ else:
+ create_func(daemon_id, host) # type: ignore
+
+ # add to daemon list so next name(s) will also be unique
+ sd = orchestrator.DaemonDescription(
+ hostname=host,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ )
+ daemons.append(sd)
+ r = True
# remove any?
- target_hosts = [h.hostname for h in hosts]
- for d in daemons:
- if d.hostname not in target_hosts:
- # NOTE: we are passing the 'force' flag here, which means
- # we can delete a mon instances data.
- self._remove_daemon(d.name(), d.hostname)
- r = True
+ for d in remove_daemon_hosts:
+ # NOTE: we are passing the 'force' flag here, which means
+ # we can delete a mon instances data.
+ self._remove_daemon(d.name(), d.hostname)
+ r = True
return r
import logging
import random
-from typing import List, Optional, Callable, Iterable, Tuple, TypeVar
+from typing import List, Optional, Callable, Iterable, Tuple, TypeVar, Set
import orchestrator
from ceph.deployment.service_spec import PlacementSpec, HostPlacementSpec, ServiceSpec
+from orchestrator._interface import DaemonDescription
from orchestrator import OrchestratorValidationError
logger = logging.getLogger(__name__)
"""
Base Scheduler Interface
- * requires a placement_spec
+ * requires a ServiceSpec
`place(host_pool)` needs to return a List[HostPlacementSpec, ..]
"""
- def __init__(self, placement_spec):
- # type: (PlacementSpec) -> None
- self.placement_spec = placement_spec
+ def __init__(self, spec):
+ # type: (ServiceSpec) -> None
+ self.spec = spec
def place(self, host_pool, count=None):
# type: (List[T], Optional[int]) -> List[T]
1) Shuffle the provided host_pool
2) Select from list up to :count
"""
- def __init__(self, placement_spec):
- super(SimpleScheduler, self).__init__(placement_spec)
+ def __init__(self, spec):
+ super(SimpleScheduler, self).__init__(spec)
def place(self, host_pool, count=None):
# type: (List[T], Optional[int]) -> List[T]
):
assert spec and get_hosts_func and get_daemons_func
self.spec = spec # type: ServiceSpec
- self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
+ self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec)
self.get_hosts_func = get_hosts_func
self.filter_new_host = filter_new_host
self.service_name = spec.service_name()
def validate(self):
self.spec.validate()
+ if self.spec.placement.count == 0:
+ raise OrchestratorValidationError(
+ f'<count> can not be 0 for {self.spec.one_line_str()}')
+
if self.spec.placement.hosts:
explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func()))
self.validate()
count = self.spec.placement.count
- assert count != 0
- chosen = self.get_candidates()
+ # get candidates based on [hosts, label, host_pattern]
+ candidates = self.get_candidates()
+
+ # If we don't have <count> the list of candidates is definitive.
if count is None:
- logger.debug('Provided hosts: %s' % self.spec.placement.hosts)
- return chosen
+ logger.debug('Provided hosts: %s' % candidates)
+ return candidates
+
+ # prefer hosts that already have services.
+ # this avoids re-assigning to _new_ hosts
+ # and constant re-distribution of hosts when new nodes are
+ # added to the cluster
+ hosts_with_daemons = self.hosts_with_daemons(candidates)
- # prefer hosts that already have services
- existing = self.hosts_with_daemons(chosen)
+ # The amount of hosts that need to be selected in order to fulfill count.
+ need = count - len(hosts_with_daemons)
- need = count - len(existing)
- others = difference_hostspecs(chosen, existing)
+ # hostspecs that are do not have daemons on them but are still candidates.
+ others = difference_hostspecs(candidates, hosts_with_daemons)
+ # we don't need any additional hosts
if need < 0:
- return self.scheduler.place(existing, count)
+ # ask the scheduler to return a set of hosts with a up to the value of <count>
+ return self.scheduler.place(hosts_with_daemons, count)
else:
+ # exclusive to 'mon' daemons. Filter out hosts that don't have a public network assigned
if self.filter_new_host:
old = others
others = [h for h in others if self.filter_new_host(h.hostname)]
- logger.debug('filtered %s down to %s' % (old, chosen))
+ logger.debug('filtered %s down to %s' % (old, candidates))
+ # ask the scheduler to return a set of hosts with a up to the value of <count>
others = self.scheduler.place(others, need)
logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
- existing, others))
- return list(merge_hostspecs(existing, others))
+ hosts_with_daemons, others))
+ # if a host already has the anticipated daemon, merge it with the candidates
+ # to get a list of HostPlacementSpec that can be deployed on.
+ return list(merge_hostspecs(hosts_with_daemons, others))
+
+ def add_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[HostPlacementSpec]:
+ hosts_with_daemons = {d.hostname for d in self.daemons}
+ _add_daemon_hosts = set()
+ for host in host_pool:
+ if host.hostname not in hosts_with_daemons:
+ _add_daemon_hosts.add(host)
+ return _add_daemon_hosts
+
+ def remove_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[DaemonDescription]:
+ target_hosts = [h.hostname for h in host_pool]
+ _remove_daemon_hosts = set()
+ for d in self.daemons:
+ if d.hostname not in target_hosts:
+ _remove_daemon_hosts.add(d)
+ return _remove_daemon_hosts
def get_candidates(self) -> List[HostPlacementSpec]:
if self.spec.placement.hosts:
HostPlacementSpec(x, '', '')
for x in self.spec.placement.filter_matching_hosts(self.get_hosts_func)
]
+ # If none of the above and also no <count>
if self.spec.placement.count is None:
raise OrchestratorValidationError("placement spec is empty: no hosts, no label, no pattern, no count")
# backward compatibility: consider an empty placements to be the same pattern = *
for x in self.get_hosts_func()
]
- def hosts_with_daemons(self, chosen: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
+ def hosts_with_daemons(self, candidates: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
"""
Prefer hosts with daemons. Otherwise we'll constantly schedule daemons
on different hosts all the time. This is about keeping daemons where
they are. This isn't about co-locating.
"""
- hosts = {d.hostname for d in self.daemons}
+ hosts_with_daemons = {d.hostname for d in self.daemons}
# calc existing daemons (that aren't already in chosen)
- existing = [hs for hs in chosen if hs.hostname in hosts]
+ existing = [hs for hs in candidates if hs.hostname in hosts_with_daemons]
logger.debug('Hosts with existing daemons: {}'.format(existing))
return existing