self._remove_osds_bg()
- service_completions = self._apply_all_services()
- for service_completion in service_completions:
- if service_completion:
- while not service_completion.has_result:
- self.process([service_completion])
- self.log.debug(f'Still processing {service_completion}')
- if service_completion.needs_result:
- time.sleep(1)
- else:
- break
- if service_completion.exception is not None:
- self.log.error(str(service_completion.exception))
+ if self._apply_all_services():
+ continue # did something, refresh
self._refresh_configs()
spec=spec,
get_hosts_func=self._get_hosts,
get_daemons_func=self.cache.get_daemons_by_service).place()
- count = len(hosts)
- if len(daemons) > count:
- # remove some
- to_remove = len(daemons) - count
- args = []
- for d in daemons[0:to_remove]:
- args.append(
- ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname)
+
+ r = False
+
+ # add any?
+ did_config = False
+ hosts_with_daemons = {d.hostname for d in daemons}
+ self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
+ for host, network, name in hosts:
+ if host not in hosts_with_daemons:
+ if not did_config and config_func:
+ config_func(spec)
+ did_config = True
+ daemon_id = self.get_unique_name(daemon_type, host, daemons,
+ spec.service_id, name)
+ self.log.debug('Placing %s.%s on host %s' % (
+ daemon_type, daemon_id, host))
+ if daemon_type == 'mon':
+ create_func(daemon_id, host, network) # type: ignore
+ else:
+ create_func(daemon_id, host) # type: ignore
+
+ # add to daemon list so next name(s) will also be unique
+ sd = orchestrator.DaemonDescription(
+ hostname=host,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
)
- return self._remove_daemon(args)
- elif len(daemons) < count:
- # add some
- count -= len(daemons)
- hosts_with_daemons = {d.hostname for d in daemons}
- hosts_without_daemons = {p for p in spec.placement.hosts
- if p.hostname not in hosts_with_daemons}
- return self._create_daemons(daemon_type, spec, daemons,
- hosts_without_daemons, count,
- create_func, config_func)
- return trivial_result([])
+ daemons.append(sd)
+ r = True
+
+ # remove any?
+ target_hosts = [h.hostname for h in hosts]
+ for d in daemons:
+ if d.hostname not in target_hosts:
+ self._remove_daemon(d.name(), d.hostname)
+ r = True
+
+ return r
def _apply_all_services(self):
- r : List[orchestrator.Completion] = []
+ r = False
for sn, spec in self.spec_store.specs.items():
try:
- r.extend(self._apply_service(spec))
+ if self._apply_service(spec):
+ r = True
except Exception as e:
self.log.warning('Failed to apply %s spec %s: %s' % (
spec.service_name(), spec, e))
def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
- c = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
- [out] = wait(cephadm_module, c)
- match_glob(out, "Deployed mgr.* on host 'test'")
+ r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
+ assert r
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
- c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
- [out] = wait(cephadm_module, c)
- match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'")
+ r = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
+ assert r
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
with pytest.raises(OrchestratorError):
ps = PlacementSpec(hosts=['host1', 'host2'], count=3)
- c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
- [out] = wait(cephadm_module, c)
+ r = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
+ assert r
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(