import logging
import random
-from typing import List, Optional, Callable, TypeVar, Set, Tuple
+from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple
import orchestrator
-from ceph.deployment.service_spec import HostPlacementSpec, ServiceSpec
+from ceph.deployment.service_spec import ServiceSpec
from orchestrator._interface import DaemonDescription
from orchestrator import OrchestratorValidationError
T = TypeVar('T')
+class DaemonPlacement(NamedTuple):
+ hostname: str
+ network: str = '' # for mons only
+ name: str = ''
+ ip: Optional[str] = None
+ ports: Optional[List[int]] = None
+
+ def __str__(self) -> str:
+ res = self.hostname
+ other = []
+ if self.network:
+ other.append(f'network={self.network}')
+ if self.name:
+ other.append(f'name={self.name}')
+ if self.ip:
+ other.append(f'ip={self.ip}')
+ if self.ports:
+ other.append(f'ports={",".join(map(str, self.ports))}')
+ if other:
+ res += '(' + ' '.join(other) + ')'
+ return res
+
+ def matches_daemon(self, dd: DaemonDescription) -> bool:
+ if self.hostname != dd.hostname:
+ return False
+ # fixme: how to match against network?
+ if self.name and self.name != dd.daemon_id:
+ return False
+ if self.ip and self.ip != dd.ip:
+ return False
+ if self.ports and self.ports != dd.ports:
+ return False
+ return True
+
+
class HostAssignment(object):
def __init__(self,
f'hosts for label {self.spec.placement.label}')
def place(self):
- # type: () -> Tuple[List[HostPlacementSpec], List[HostPlacementSpec], List[orchestrator.DaemonDescription]]
+ # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
"""
Generate a list of HostPlacementSpec taking into account:
count = self.spec.placement.count
# get candidate hosts based on [hosts, label, host_pattern]
- candidates = self.get_candidates() # type: List[HostPlacementSpec]
+ candidates = self.get_candidates() # type: List[DaemonPlacement]
# consider enough slots to fulfill target count-per-host or count
if count is None:
# daemon, and others (the rest)
existing_active: List[orchestrator.DaemonDescription] = []
existing_standby: List[orchestrator.DaemonDescription] = []
- existing_slots: List[HostPlacementSpec] = []
+ existing_slots: List[DaemonPlacement] = []
to_remove: List[orchestrator.DaemonDescription] = []
others = candidates.copy()
- for d in daemons:
- hs = d.get_host_placement()
+ for dd in daemons:
found = False
- for i in others:
- if i == hs:
- others.remove(i)
- if d.is_active:
- existing_active.append(d)
+ for p in others:
+ if p.matches_daemon(dd):
+ others.remove(p)
+ if dd.is_active:
+ existing_active.append(dd)
else:
- existing_standby.append(d)
- existing_slots.append(hs)
+ existing_standby.append(dd)
+ existing_slots.append(p)
found = True
break
if not found:
- to_remove.append(d)
+ to_remove.append(dd)
existing = existing_active + existing_standby
existing, to_add))
return existing_slots + to_add, to_add, to_remove
- def get_candidates(self) -> List[HostPlacementSpec]:
+ def get_candidates(self) -> List[DaemonPlacement]:
if self.spec.placement.hosts:
- hosts = self.spec.placement.hosts
+ ls = [
+ DaemonPlacement(hostname=h.hostname, network=h.network, name=h.name)
+ for h in self.spec.placement.hosts
+ ]
elif self.spec.placement.label:
- hosts = [
- HostPlacementSpec(x.hostname, '', '')
+ ls = [
+ DaemonPlacement(hostname=x.hostname)
for x in self.hosts_by_label(self.spec.placement.label)
]
elif self.spec.placement.host_pattern:
- hosts = [
- HostPlacementSpec(x, '', '')
+ ls = [
+ DaemonPlacement(hostname=x)
for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
]
elif (
self.spec.placement.count is not None
or self.spec.placement.count_per_host is not None
):
- hosts = [
- HostPlacementSpec(x.hostname, '', '')
+ ls = [
+ DaemonPlacement(hostname=x.hostname)
for x in self.hosts
]
else:
"placement spec is empty: no hosts, no label, no pattern, no count")
if self.filter_new_host:
- old = hosts.copy()
- hosts = [h for h in hosts if self.filter_new_host(h.hostname)]
- for h in list(set(old) - set(hosts)):
+ old = ls.copy()
+ ls = [h for h in ls if self.filter_new_host(h.hostname)]
+ for h in list(set(old) - set(ls)):
logger.info(
f"Filtered out host {h.hostname}: could not verify host allowed virtual ips")
- logger.debug('Filtered %s down to %s' % (old, hosts))
+ logger.debug('Filtered %s down to %s' % (old, ls))
# shuffle for pseudo random selection
# gen seed off of self.spec to make shuffling deterministic
seed = hash(self.spec.service_name())
- random.Random(seed).shuffle(hosts)
+ random.Random(seed).shuffle(ls)
- return hosts
+ return ls
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, \
- HA_RGWSpec, CustomContainerSpec
+from ceph.deployment.service_spec import ServiceSpec, HA_RGWSpec, CustomContainerSpec
from ceph.utils import str_to_datetime, datetime_now
import orchestrator
from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
DaemonDescriptionStatus, daemon_type_to_service, service_to_daemon_types
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.schedule import HostAssignment
+from cephadm.schedule import HostAssignment, DaemonPlacement
from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
from mgr_module import MonCommandFailed
if service_type == 'ha-rgw':
spec = self.update_ha_rgw_definitive_hosts(spec, all_slots, slots_to_add)
- for host, network, name in slots_to_add:
+ for slot in slots_to_add:
for daemon_type in service_to_daemon_types(service_type):
- daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons,
- prefix=spec.service_id,
- forcename=name)
+ daemon_id = self.mgr.get_unique_name(
+ daemon_type,
+ slot.hostname,
+ daemons,
+ prefix=spec.service_id,
+ forcename=slot.name)
if not did_config:
svc.config(spec, daemon_id)
did_config = True
daemon_spec = svc.make_daemon_spec(
- host, daemon_id, network, spec, daemon_type=daemon_type)
+ slot.hostname, daemon_id, slot.network, spec, daemon_type=daemon_type,
+ ports=slot.ports
+ )
self.log.debug('Placing %s.%s on host %s' % (
- daemon_type, daemon_id, host))
+ daemon_type, daemon_id, slot.hostname))
try:
daemon_spec = svc.prepare_create(daemon_spec)
except (RuntimeError, OrchestratorError) as e:
self.mgr.events.for_service(spec, 'ERROR',
f"Failed while placing {daemon_type}.{daemon_id}"
- f"on {host}: {e}")
+ f"on {slot.hostname}: {e}")
# only return "no change" if no one else has already succeeded.
# later successes will also change to True
if r is None:
# add to daemon list so next name(s) will also be unique
sd = orchestrator.DaemonDescription(
- hostname=host,
+ hostname=slot.hostname,
daemon_type=daemon_type,
daemon_id=daemon_id,
)
def update_ha_rgw_definitive_hosts(
self,
spec: ServiceSpec,
- hosts: List[HostPlacementSpec],
- add_hosts: List[HostPlacementSpec]
+ hosts: List[DaemonPlacement],
+ add_hosts: List[DaemonPlacement]
) -> HA_RGWSpec:
spec = cast(HA_RGWSpec, spec)
- if not (set(hosts) == set(spec.definitive_host_list)):
- spec.definitive_host_list = hosts
+ hostnames = [p.hostname for p in hosts]
+ add_hostnames = [p.hostname for p in add_hosts]
+ if not (set(hostnames) == set(spec.definitive_host_list)):
+ spec.definitive_host_list = hostnames
ha_rgw_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
for daemon in ha_rgw_daemons:
- if daemon.hostname in [h.hostname for h in hosts] and daemon.hostname not in add_hosts:
+ if daemon.hostname in hostnames and daemon.hostname not in add_hostnames:
assert daemon.hostname is not None
self.mgr.cache.schedule_daemon_action(
daemon.hostname, daemon.name(), 'reconfig')
spec = cast(HA_RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
- all_hosts = []
- for h, network, name in spec.definitive_host_list:
- all_hosts.append(h)
+ all_hosts = spec.definitive_host_list
# set state. first host in placement is master all others backups
state = 'BACKUP'
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, ServiceSpecValidationError
from cephadm.module import HostAssignment
+from cephadm.schedule import DaemonPlacement
from orchestrator import DaemonDescription, OrchestratorValidationError, OrchestratorError
]
+@pytest.mark.parametrize(
+ 'dp,dd,result',
+ [
+ (
+ DaemonPlacement(hostname='host1'),
+ DaemonDescription('mgr', 'a', 'host1'),
+ True
+ ),
+ (
+ DaemonPlacement(hostname='host1', name='a'),
+ DaemonDescription('mgr', 'a', 'host1'),
+ True
+ ),
+ (
+ DaemonPlacement(hostname='host1', name='a'),
+ DaemonDescription('mgr', 'b', 'host1'),
+ False
+ ),
+ ])
+def test_daemon_placement_match(dp, dd, result):
+ assert dp.matches_daemon(dd) == result
+
+
@pytest.mark.parametrize("spec_section_key,spec_section",
[ # noqa: E128
('h', 'hosts'),
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec, HostPlacementSpec
+ ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec
from ceph.utils import datetime_to_str, str_to_datetime
return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
return daemon_type_to_service(self.daemon_type)
- def get_host_placement(self) -> HostPlacementSpec:
- return HostPlacementSpec(
- hostname=self.hostname or '',
- # FIXME: include the ip:port here?
- network='',
- name='',
- )
-
def __repr__(self) -> str:
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
id=self.daemon_id)
ha_proxy_ssl_options: Optional[List[str]] = None,
haproxy_container_image: Optional[str] = None,
keepalived_container_image: Optional[str] = None,
- definitive_host_list: Optional[List[HostPlacementSpec]] = None
+ definitive_host_list: Optional[List[str]] = None
):
assert service_type == 'ha-rgw'
super(HA_RGWSpec, self).__init__('ha-rgw', service_id=service_id,
# placeholder variable. Need definitive list of hosts this service will
# be placed on in order to generate keepalived config. Will be populated
# when applying spec
- self.definitive_host_list = [] # type: List[HostPlacementSpec]
+ self.definitive_host_list = [] # type: List[str]
def validate(self) -> None:
super(HA_RGWSpec, self).validate()