From 58148c85c7a6d27aca0f487560e1b0368e6b2d87 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 3 Mar 2020 12:14:07 -0600 Subject: [PATCH] mgr/cephadm: make _apply_service move services This transitions us to a synchronous mode of operation. Fixes: https://tracker.ceph.com/issues/44167 Signed-off-by: Sage Weil --- src/pybind/mgr/cephadm/module.py | 75 +++++++++++--------- src/pybind/mgr/cephadm/tests/test_cephadm.py | 15 ++-- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index db7f4f176aa71..ed9b37968316e 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -1052,18 +1052,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self._remove_osds_bg() - service_completions = self._apply_all_services() - for service_completion in service_completions: - if service_completion: - while not service_completion.has_result: - self.process([service_completion]) - self.log.debug(f'Still processing {service_completion}') - if service_completion.needs_result: - time.sleep(1) - else: - break - if service_completion.exception is not None: - self.log.error(str(service_completion.exception)) + if self._apply_all_services(): + continue # did something, refresh self._refresh_configs() @@ -2159,32 +2149,51 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): spec=spec, get_hosts_func=self._get_hosts, get_daemons_func=self.cache.get_daemons_by_service).place() - count = len(hosts) - if len(daemons) > count: - # remove some - to_remove = len(daemons) - count - args = [] - for d in daemons[0:to_remove]: - args.append( - ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname) + + r = False + + # add any? + did_config = False + hosts_with_daemons = {d.hostname for d in daemons} + self.log.debug('hosts with daemons: %s' % hosts_with_daemons) + for host, network, name in hosts: + if host not in hosts_with_daemons: + if not did_config and config_func: + config_func(spec) + did_config = True + daemon_id = self.get_unique_name(daemon_type, host, daemons, + spec.service_id, name) + self.log.debug('Placing %s.%s on host %s' % ( + daemon_type, daemon_id, host)) + if daemon_type == 'mon': + create_func(daemon_id, host, network) # type: ignore + else: + create_func(daemon_id, host) # type: ignore + + # add to daemon list so next name(s) will also be unique + sd = orchestrator.DaemonDescription( + hostname=host, + daemon_type=daemon_type, + daemon_id=daemon_id, ) - return self._remove_daemon(args) - elif len(daemons) < count: - # add some - count -= len(daemons) - hosts_with_daemons = {d.hostname for d in daemons} - hosts_without_daemons = {p for p in spec.placement.hosts - if p.hostname not in hosts_with_daemons} - return self._create_daemons(daemon_type, spec, daemons, - hosts_without_daemons, count, - create_func, config_func) - return trivial_result([]) + daemons.append(sd) + r = True + + # remove any? + target_hosts = [h.hostname for h in hosts] + for d in daemons: + if d.hostname not in target_hosts: + self._remove_daemon(d.name(), d.hostname) + r = True + + return r def _apply_all_services(self): - r : List[orchestrator.Completion] = [] + r = False for sn, spec in self.spec_store.specs.items(): try: - r.extend(self._apply_service(spec)) + if self._apply_service(spec): + r = True except Exception as e: self.log.warning('Failed to apply %s spec %s: %s' % ( spec.service_name(), spec, e)) diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index c1a43e5d030b4..3d7378820c820 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -154,9 +154,8 @@ class TestCephadm(object): def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) - c = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps)) - [out] = wait(cephadm_module, c) - match_glob(out, "Deployed mgr.* on host 'test'") + r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps)) + assert r @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @@ -237,9 +236,9 @@ class TestCephadm(object): match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'") ps = PlacementSpec(hosts=['host1', 'host2'], count=2) - c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps)) - [out] = wait(cephadm_module, c) - match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'") + r = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps)) + assert r + @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @@ -267,8 +266,8 @@ class TestCephadm(object): with pytest.raises(OrchestratorError): ps = PlacementSpec(hosts=['host1', 'host2'], count=3) - c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps)) - [out] = wait(cephadm_module, c) + r = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps)) + assert r @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm( -- 2.39.5