result.append(d)
return result
+ def get_daemons_by_service(self, service_name):
+ # type: (str) -> List[orchestrator.DaemonDescription]
+ result = [] # type: List[orchestrator.DaemonDescription]
+ for host, dm in self.daemons.items():
+ for name, d in dm.items():
+ if name.startswith(service_name + '.'):
+ result.append(d)
+ return result
+
def get_daemon_names(self):
# type: () -> List[str]
r = []
self.cache.invalidate_host_daemons(host)
return "Removed {} from host '{}'".format(name, host)
- def _update_service(self, daemon_type, add_func, spec):
- daemons = self.cache.get_daemons_by_type(daemon_type)
- if len(daemons) > spec.count:
+ def _apply_service(self, daemon_type, spec, create_func, config_func=None):
+ service_name = daemon_type
+ if spec.name:
+ service_name += '.' + spec.name
+ daemons = self.cache.get_daemons_by_service(service_name)
+ spec = HostAssignment(
+ spec=spec,
+ get_hosts_func=self._get_hosts,
+ service_type=daemon_type).load()
+ if len(daemons) > spec.placement.count:
# remove some
- to_remove = len(daemons) - spec.count
+ to_remove = len(daemons) - spec.placement.count
args = []
for d in daemons[0:to_remove]:
args.append(
('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname)
)
return self._remove_daemon(args)
- elif len(daemons) < spec.count:
- return add_func(spec)
+ elif len(daemons) < spec.placement.count:
+ # add some
+ want = spec.placement.count - len(daemons)
+ hosts_with_daemons = {d.hostname for d in daemons}
+ hosts_without_daemons = {p for p in spec.placement.hosts if p.hostname not in hosts_with_daemons}
+ if want > len(hosts_without_daemons):
+ raise OrchestratorError('too few hosts: want %d, have %s' % (
+ want, hosts_without_daemons))
+ args = []
+ for host, _, name in hosts_without_daemons:
+ daemon_id = self.get_unique_name(daemon_type, host, daemons,
+ spec.name, name)
+ self.log.debug('Placing %s.%s on host %s' % (
+ daemon_type, daemon_id, host))
+ args.append((daemon_id, host))
+
+ # add to daemon list so next name(s) will also be unique
+ sd = orchestrator.DaemonDescription(
+ hostname=host,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ )
+ daemons.append(sd)
+ if config_func:
+ config_func(spec)
+ return create_func(args)
return trivial_result([])
def _add_new_daemon(self,
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
if not spec.placement.hosts or spec.placement.count is None or len(spec.placement.hosts) < spec.placement.count:
raise RuntimeError("must specify at least %s hosts" % spec.placement.count)
+ self._config_mds(spec)
+ return self._add_new_daemon('mds', spec, self._create_mds)
+
+ def apply_mds(self, spec):
+ # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+ return self._apply_service('mds', spec, self._create_mds,
+ self._config_mds)
+
+ def _config_mds(self, spec):
# ensure mds_join_fs is set for these daemons
assert spec.name
ret, out, err = self.mon_command({
'value': spec.name,
})
- return self._add_new_daemon('mds', spec, self._create_mds)
-
- def apply_mds(self, spec):
- # type: (orchestrator.ServiceSpec) -> AsyncCompletion
- spec =HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mds').load()
-
- return self._update_service('mds', self.add_mds, spec)
-
@async_map_completion
def _create_mds(self, mds_id, host):
# get mgr. key
def add_rgw(self, spec):
if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
raise RuntimeError("must specify at least %d hosts" % spec.count)
+ self._config_rgw(spec)
+ return self._add_new_daemon('rgw', spec, self._create_rgw)
+
+ def _config_rgw(self, spec):
# ensure rgw_realm and rgw_zone is set for these daemons
ret, out, err = self.mon_command({
'prefix': 'config set',
'value': spec.rgw_realm,
})
- return self._add_new_daemon('rgw', spec, self._create_rgw)
-
@async_map_completion
def _create_rgw(self, rgw_id, host):
ret, keyring, err = self.mon_command({
return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
def apply_rgw(self, spec):
- spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rgw').load()
- return self._update_service('rgw', self.add_rgw, spec)
+ # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+ return self._apply_service('rgw', spec, self._create_rgw,
+ self._config_rgw)
def add_rbd_mirror(self, spec):
if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
keyring=keyring)
def apply_rbd_mirror(self, spec):
- spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rbd-mirror').load()
- return self._update_service('rbd-mirror', self.add_rbd_mirror, spec)
+ # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+ return self._apply_service('rbd-mirror', spec, self._create_rbd_mirror)
def _generate_prometheus_config(self):
# scrape mgrs
return self._create_daemon('prometheus', daemon_id, host)
def apply_prometheus(self, spec):
- spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load()
- return self._update_service('prometheus', self.add_prometheus, spec)
+ # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+ return self._apply_service('prometheus', spec, self._create_prometheus)
def add_node_exporter(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._add_new_daemon('node-exporter', spec, self._create_node_exporter)
def apply_node_exporter(self, spec):
- spec = HostAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='node-exporter').load()
- return self._update_service('node-exporter', self.add_node_exporter, spec)
+ # type: (orchestrator.ServiceSpec) -> AsyncCompletion
+ return self._apply_service('node-exporter', spec,
+ self._create_node_exporter)
@async_map_completion
def _create_node_exporter(self, daemon_id, host):