# get candidate hosts based on [hosts, label, host_pattern]
candidates = self.get_candidates() # type: List[HostPlacementSpec]
+ if not candidates:
+ return [] # sigh
# If we don't have <count> the list of candidates is definitive.
if count is None:
per_host = 1
return candidates * per_host
- # prefer hosts that already have services.
- # this avoids re-assigning to _new_ hosts
- # and constant re-distribution of hosts when new nodes are
- # added to the cluster
- hosts_with_daemons = self.hosts_with_daemons(candidates)
-
- # The amount of hosts that need to be selected in order to fulfill count.
- need = count - len(hosts_with_daemons)
-
- # hostspecs that do not have daemons on them but are still candidates.
- others = difference_hostspecs(candidates, hosts_with_daemons)
-
- # we don't need any additional hosts
- if need < 0:
- final_candidates = self.prefer_hosts_with_active_daemons(hosts_with_daemons, count)
- else:
- # ask the scheduler to return a set of hosts with a up to the value of <count>
- others = self.scheduler.place(others, need)
- logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
- hosts_with_daemons, others))
- # if a host already has the anticipated daemon, merge it with the candidates
- # to get a list of HostPlacementSpec that can be deployed on.
- final_candidates = list(merge_hostspecs(hosts_with_daemons, others))
-
- return final_candidates
-
- def get_hosts_with_active_daemon(self, hosts: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
- active_hosts: List['HostPlacementSpec'] = []
- for daemon in self.daemons:
- if daemon.is_active:
- for h in hosts:
- if h.hostname == daemon.hostname:
- active_hosts.append(h)
- # remove duplicates before returning
- return list(dict.fromkeys(active_hosts))
-
- def prefer_hosts_with_active_daemons(self, hosts: List[HostPlacementSpec], count: int) -> List[HostPlacementSpec]:
- # try to prefer host with active daemon if possible
- active_hosts = self.get_hosts_with_active_daemon(hosts)
- if len(active_hosts) != 0 and count > 0:
- for host in active_hosts:
- hosts.remove(host)
- if len(active_hosts) >= count:
- return self.scheduler.place(active_hosts, count)
- else:
- return list(merge_hostspecs(self.scheduler.place(active_hosts, count),
- self.scheduler.place(hosts, count - len(active_hosts))))
- # ask the scheduler to return a set of hosts with a up to the value of <count>
- return self.scheduler.place(hosts, count)
+ if self.allow_colo:
+ # consider enough slots to fit target count
+ per_host = 1 + ((count - 1) // len(candidates))
+ candidates = candidates * per_host
+
+ # consider active daemons first
+ daemons = [
+ d for d in self.daemons if d.is_active
+ ] + [
+ d for d in self.daemons if not d.is_active
+ ]
+
+ # sort candidates into existing/used slots that already have a
+ # daemon, and others (the rest)
+ existing_active: List[HostPlacementSpec] = []
+ existing_standby: List[HostPlacementSpec] = []
+ existing: List[HostPlacementSpec] = []
+ others = candidates
+ for d in daemons:
+ hs = d.get_host_placement()
+ for i in others:
+ if i == hs:
+ others.remove(i)
+ if d.is_active:
+ existing_active.append(hs)
+ else:
+ existing_standby.append(hs)
+ existing.append(hs)
+ break
+ logger.debug('Hosts with existing active daemons: {}'.format(existing_active))
+ logger.debug('Hosts with existing standby daemons: {}'.format(existing_standby))
+
+ # The number of new slots that need to be selected in order to fulfill count
+ need = count - len(existing)
+
+ # we don't need any additional placements
+ if need <= 0:
+ del existing[count:]
+ return existing
+
+ # ask the scheduler to select additional slots
+ chosen = self.scheduler.place(others, need)
+ logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
+ existing, chosen))
+ return existing + chosen
def add_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
hosts_with_daemons = {d.hostname for d in self.daemons}
# gen seed off of self.spec to make shuffling deterministic
seed = hash(self.spec.service_name())
random.Random(seed).shuffle(hosts)
- return hosts
- def hosts_with_daemons(self, candidates: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
- """
- Prefer hosts with daemons. Otherwise we'll constantly schedule daemons
- on different hosts all the time. This is about keeping daemons where
- they are. This isn't about co-locating.
- """
- hosts_with_daemons = {d.hostname for d in self.daemons}
-
- # calc existing daemons (that aren't already in chosen)
- existing = [hs for hs in candidates if hs.hostname in hosts_with_daemons]
-
- logger.debug('Hosts with existing daemons: {}'.format(existing))
- return existing
+ return hosts
def merge_hostspecs(
['host1', 'host2', 'host3', 'host1', 'host2', 'host3']
),
# count that is bigger than the amount of hosts. Truncate to len(hosts)
- # RGWs should not be co-located to each other.
+ # mgr should not be co-located to each other.
NodeAssignmentTest(
- 'rgw',
+ 'mgr',
PlacementSpec(count=4),
'host1 host2 host3'.split(),
[],
['host1', 'host2', 'host3']
),
+ # count that is bigger than the amount of hosts; wrap around.
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(count=6),
+ 'host1 host2 host3'.split(),
+ [],
+ ['host1', 'host2', 'host3', 'host1', 'host2', 'host3']
+ ),
# count + partial host list
NodeAssignmentTest(
'mgr',
],
['host3']
),
+ # count + partial host list (with colo)
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(count=3, hosts=['host3']),
+ 'host1 host2 host3'.split(),
+ [
+ DaemonDescription('mgr', 'a', 'host1'),
+ DaemonDescription('mgr', 'b', 'host2'),
+ ],
+ ['host3', 'host3', 'host3']
+ ),
# count 1 + partial host list
NodeAssignmentTest(
'mgr',
[],
['host1', 'host2', 'host3']
),
+ # label + count (truncate to host list)
+ NodeAssignmentTest(
+ 'mgr',
+ PlacementSpec(count=4, label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ ['host1', 'host2', 'host3']
+ ),
+ # label + count (with colo)
+ NodeAssignmentTest(
+ 'mds',
+ PlacementSpec(count=6, label='foo'),
+ 'host1 host2 host3'.split(),
+ [],
+ ['host1', 'host2', 'host3', 'host1', 'host2', 'host3']
+ ),
# label only + count_per_hst
NodeAssignmentTest(
'mds',
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec
+ ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec, HostPlacementSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec
from ceph.utils import datetime_to_str, str_to_datetime
return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
return daemon_type_to_service(self.daemon_type)
+ def get_host_placement(self) -> HostPlacementSpec:
+ return HostPlacementSpec(
+ hostname=self.hostname or '',
+ # FIXME: include the ip:port here?
+ network='',
+ name='',
+ )
+
def __repr__(self) -> str:
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
id=self.daemon_id)