# Add the hosts to the inventory in the right group
- hosts = spec.hosts
+ hosts = spec.placement.hosts
if not hosts:
raise orchestrator.OrchestratorError("No hosts provided. "
"At least one destination host is needed to install the RGW "
return self._update_mons(spec)
def _update_mons(self, spec):
+ # type: (orchestrator.StatefulServiceSpec) -> orchestrator.Completion
"""
Adjust the number of cluster monitors.
"""
if spec.count < num_mons:
raise NotImplementedError("Removing monitors is not supported.")
- self.log.debug("Trying to update monitors on: {}".format(spec.placement.nodes))
+ self.log.debug("Trying to update monitors on: {}".format(spec.placement.hosts))
# check that all the hosts are registered
- [self._require_hosts(host.hostname) for host in spec.placement.nodes]
+ [self._require_hosts(host.hostname) for host in spec.placement.hosts]
# current support requires a network to be specified
- for host, network, _ in spec.placement.nodes:
+ for host, network, _ in spec.placement.hosts:
if not network:
raise RuntimeError("Host '{}' is missing a network spec".format(host))
def update_mons_with_daemons(daemons):
- for _, _, name in spec.placement.nodes:
+ for _, _, name in spec.placement.hosts:
if name and len([d for d in daemons if d.service_instance == name]):
raise RuntimeError('name %s alrady exists', name)
# explicit placement: enough hosts provided?
num_new_mons = spec.count - num_mons
- if len(spec.placement.nodes) < num_new_mons:
+ if len(spec.placement.hosts) < num_new_mons:
raise RuntimeError("Error: {} hosts provided, expected {}".format(
- len(spec.placement.nodes), num_new_mons))
+ len(spec.placement.hosts), num_new_mons))
self.log.info("creating {} monitors on hosts: '{}'".format(
- num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.nodes))))
+ num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
# TODO: we may want to chain the creation of the monitors so they join
# the quorum one at a time.
- return self._create_mon(spec.placement.nodes)
+ return self._create_mon(spec.placement.hosts)
return self._get_services('mon').then(update_mons_with_daemons)
@async_map_completion
return self._create_daemon('mgr', name, host, keyring)
def update_mgrs(self, spec):
+ # type: (orchestrator.StatefulServiceSpec) -> orchestrator.Completion
"""
Adjust the number of cluster managers.
"""
return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons))
def _update_mgrs(self, spec, daemons):
+ # type: (orchestrator.StatefulServiceSpec, List[orchestrator.ServiceDescription]) -> orchestrator.Completion
num_mgrs = len(daemons)
if spec.count == num_mgrs:
return orchestrator.Completion(value="The requested number of managers exist.")
- self.log.debug("Trying to update managers on: {}".format(spec.placement.nodes))
+ self.log.debug("Trying to update managers on: {}".format(spec.placement.hosts))
# check that all the hosts are registered
- [self._require_hosts(host.hostname) for host in spec.placement.nodes]
+ [self._require_hosts(host.hostname) for host in spec.placement.hosts]
if spec.count < num_mgrs:
num_to_remove = num_mgrs - spec.count
else:
# we assume explicit placement by which there are the same number of
# hosts specified as the size of increase in number of daemons.
- num_new_mgrs = spec.placement.count - num_mgrs
- if len(spec.placement.nodes) < num_new_mgrs:
+ num_new_mgrs = spec.count - num_mgrs
+ if len(spec.placement.hosts) < num_new_mgrs:
raise RuntimeError(
"Error: {} hosts provided, expected {}".format(
- len(spec.placement.nodes), num_new_mgrs))
+ len(spec.placement.hosts), num_new_mgrs))
- for host_spec in spec.placement.nodes:
+ for host_spec in spec.placement.hosts:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
- for host_spec in spec.placement.nodes:
+ for host_spec in spec.placement.hosts:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
self.log.info("creating {} managers on hosts: '{}'".format(
- num_new_mgrs, ",".join([_spec.hostname for _spec in spec.placement.nodes])))
+ num_new_mgrs, ",".join([_spec.hostname for _spec in spec.placement.hosts])))
args = []
- for host_spec in spec.placement.nodes:
+ for host_spec in spec.placement.hosts:
name = host_spec.name or self.get_unique_name(daemons)
host = host_spec.hostname
args.append((host, name))
return self._create_mgr(args)
def add_mds(self, spec):
- if not spec.placement.nodes or len(spec.placement.nodes) < spec.placement.count:
+ if not spec.placement.hosts or len(spec.placement.hosts) < spec.placement.count:
raise RuntimeError("must specify at least %d hosts" % spec.placement.count)
return self._get_services('mds').then(lambda ds: self._add_mds(ds, spec))
def _add_mds(self, daemons, spec):
args = []
num_added = 0
- for host, _, name in spec.placement.nodes:
+ for host, _, name in spec.placement.hosts:
if num_added >= spec.count:
break
mds_id = self.get_unique_name(daemons, spec.name, name)
return self._get_services('mds').then(_remove_mds)
def add_rgw(self, spec):
- if not spec.placement.nodes or len(spec.placement.nodes) < spec.count:
+ if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
raise RuntimeError("must specify at least %d hosts" % spec.count)
# ensure rgw_realm and rgw_zone is set for these daemons
ret, out, err = self.mon_command({
def _add_rgw(daemons):
args = []
num_added = 0
- for host, _, name in spec.placement.nodes:
+ for host, _, name in spec.placement.hosts:
if num_added >= spec.count:
break
rgw_id = self.get_unique_name(daemons, spec.name, name)
return self._update_service('rgw', self.add_rgw, spec)
def add_rbd_mirror(self, spec):
- if not spec.placement.nodes or len(spec.placement.nodes) < spec.count:
+ if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
raise RuntimeError("must specify at least %d hosts" % spec.count)
- self.log.debug('nodes %s' % spec.placement.nodes)
+ self.log.debug('nodes %s' % spec.placement.hosts)
def _add_rbd_mirror(daemons):
args = []
num_added = 0
- for host, _, name in spec.placement.nodes:
+ for host, _, name in spec.placement.hosts:
if num_added >= spec.count:
break
daemon_id = self.get_unique_name(daemons, None, name)
service_type=None, # type: Optional[str]
):
assert spec and get_hosts_func and service_type
- self.spec = spec
+ self.spec = spec # type: orchestrator.StatefulServiceSpec
self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
self.get_hosts_func = get_hosts_func
self.service_type = service_type
logger.info("Found labels. Assinging nodes that match the label")
candidates = [HostSpec(x[0], '', '') for x in self.get_hosts_func()] # TODO: query for labels
logger.info('Assigning nodes to spec: {}'.format(candidates))
- self.spec.placement.set_nodes(candidates)
+ self.spec.placement.set_hosts(candidates)
def assign_nodes(self):
# type: () -> None
"""
# If no imperative or declarative host assignments, use the scheduler to pick from the
# host pool (assuming `count` is set)
- if not self.spec.placement.label and not self.spec.placement.nodes and self.spec.placement.count:
+ if not self.spec.placement.label and not self.spec.placement.hosts and self.spec.placement.count:
logger.info("Found num spec. Looking for labeled nodes.")
# TODO: actually query for labels (self.service_type)
candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()],
format(self.spec.placement.count))
else:
logger.info('Assigning nodes to spec: {}'.format(candidates))
- self.spec.placement.set_nodes(candidates)
+ self.spec.placement.set_hosts(candidates)
return None
candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count)
format(self.spec.placement.count, len(candidates)))
logger.info('Assigning nodes to spec: {}'.format(candidates))
- self.spec.placement.set_nodes(candidates)
+ self.spec.placement.set_hosts(candidates)
return None
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_mon_update(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- ps = PlacementSpec(nodes=['test:0.0.0.0=a'], count=1)
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mons(StatefulServiceSpec(placement=ps))
assert self._wait(cephadm_module, c) == ["(Re)deployed mon.a on host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_mgr_update(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- ps = PlacementSpec(nodes=['test:0.0.0.0=a'], count=1)
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mgrs(StatefulServiceSpec(placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed mgr." in out
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_mds(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- ps = PlacementSpec(nodes=['test'], count=1)
+ ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_mds(StatelessServiceSpec('name', placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed mds.name." in out
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_rgw(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- ps = PlacementSpec(nodes=['test'], count=1)
+ ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed rgw.realm.zone." in out
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_rbd_mirror(self, _send_command, _get_connection, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- ps = PlacementSpec(nodes=['test'], count=1)
+ ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rbd_mirror(StatelessServiceSpec(name='name', placement=ps))
[out] = self._wait(cephadm_module, c)
assert "(Re)deployed rbd-mirror." in out
"""
For APIs that need to specify a node subset
"""
- def __init__(self, label=None, nodes=None, count=None):
+ def __init__(self, label=None, hosts=None, count=None):
# type: (Optional[str], Optional[List], Optional[int]) -> None
self.label = label
- if nodes:
- if all([isinstance(node, HostSpec) for node in nodes]):
- self.nodes = nodes
+ if hosts:
+ if all([isinstance(host, HostSpec) for host in hosts]):
+ self.hosts = hosts # type: List[HostSpec]
else:
- self.nodes = [parse_host_specs(x, require_network=False) for x in nodes if x]
+ self.hosts = [parse_host_specs(x, require_network=False) for x in hosts if x]
else:
- self.nodes = []
+ self.hosts = []
+
self.count = count # type: Optional[int]
- def set_nodes(self, nodes):
- # To backpopulate the .nodes attribute when using labels or count
+ def set_hosts(self, hosts):
+ # To backpopulate the .hosts attribute when using labels or count
# in the orchestrator backend.
- self.nodes = nodes
+ self.hosts = hosts
@classmethod
def from_dict(cls, data):
return _cls
def validate(self):
- if self.nodes and self.label:
+ if self.hosts and self.label:
# TODO: a less generic Exception
raise Exception('Node and label are mutually exclusive')
if self.count is not None and self.count <= 0:
"""
# TODO: create base class for Stateless/Stateful service specs and propertly inherit
def __init__(self, name=None, placement=None):
- self.placement = PlacementSpec() if placement is None else placement
+ # type: (Optional[str], Optional[PlacementSpec]) -> None
+ self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec
self.name = name
- self.count = self.placement.count if self.placement is not None else 1 # for backwards-compatibility
+
+ # for backwards-compatibility
+ if self.placement is not None and self.placement.count is not None:
+ self.count = self.placement.count
+ else:
+ self.count = 1
class StatelessServiceSpec(object):
# start the services.
def __init__(self, name, placement=None):
- self.placement = PlacementSpec() if placement is None else placement
+ self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec
#: Give this set of statelss services a name: typically it would
#: be the name of a CephFS filesystem, RGW zone, etc. Must be unique
#: within one ceph cluster.
- self.name = name
+ self.name = name # type: str
#: Count of service instances
self.count = self.placement.count if self.placement is not None else 1 # for backwards-compatibility
#: List of hosts where RGWs should run. Not for Rook.
if hosts:
- self.placement.hosts = hosts
+ self.placement = PlacementSpec(hosts=hosts)
#: is multisite
self.rgw_multisite = rgw_multisite
def _rbd_mirror_add(self, num=None, hosts=None):
spec = orchestrator.StatelessServiceSpec(
None,
- placement=orchestrator.PlacementSpec(nodes=hosts, count=num))
+ placement=orchestrator.PlacementSpec(hosts=hosts, count=num))
completion = self.add_rbd_mirror(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
def _rbd_mirror_update(self, num, label=None, hosts=[]):
spec = orchestrator.StatelessServiceSpec(
None,
- placement=orchestrator.PlacementSpec(nodes=hosts, count=num, label=label))
+ placement=orchestrator.PlacementSpec(hosts=hosts, count=num, label=label))
completion = self.update_rbd_mirror(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
def _mds_add(self, fs_name, num=None, hosts=None):
spec = orchestrator.StatelessServiceSpec(
fs_name,
- placement=orchestrator.PlacementSpec(nodes=hosts, count=num))
+ placement=orchestrator.PlacementSpec(hosts=hosts, count=num))
completion = self.add_mds(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
"name=hosts,type=CephString,n=N,req=false "
'Update the number of MDS instances for the given fs_name')
def _mds_update(self, fs_name, num=None, label=None, hosts=[]):
- placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
+ placement = orchestrator.PlacementSpec(label=label, count=num, hosts=hosts)
placement.validate()
spec = orchestrator.StatelessServiceSpec(
rgw_spec = orchestrator.RGWSpec(
rgw_realm=realm_name,
rgw_zone=zone_name,
- placement=orchestrator.PlacementSpec(nodes=hosts, count=num))
+ placement=orchestrator.PlacementSpec(hosts=hosts, count=num))
completion = self.add_rgw(rgw_spec)
self._orchestrator_wait([completion])
spec = orchestrator.RGWSpec(
rgw_realm=realm_name,
rgw_zone=zone_name,
- placement=orchestrator.PlacementSpec(nodes=hosts, label=label, count=num))
+ placement=orchestrator.PlacementSpec(hosts=hosts, label=label, count=num))
completion = self.update_rgw(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
'Update the number of manager instances')
def _update_mgrs(self, num=None, hosts=[], label=None):
- placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
+ placement = orchestrator.PlacementSpec(label=label, count=num, hosts=hosts)
placement.validate()
spec = orchestrator.StatefulServiceSpec(placement=placement)
'Update the number of monitor instances')
def _update_mons(self, num=None, hosts=[], label=None):
- placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
+ placement = orchestrator.PlacementSpec(label=label, count=num, hosts=hosts)
if not hosts and not label:
# Improve Error message. Point to parse_host_spec examples
raise orchestrator.OrchestratorValidationError("Mons need a host spec. (host, network, name(opt))")