]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: make _apply_service move services
authorSage Weil <sage@redhat.com>
Tue, 3 Mar 2020 18:14:07 +0000 (12:14 -0600)
committerSage Weil <sage@redhat.com>
Wed, 4 Mar 2020 18:57:50 +0000 (12:57 -0600)
This transitions us to a synchronous mode of operation.

Fixes: https://tracker.ceph.com/issues/44167
Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index db7f4f176aa71a804462a24de77fe44d1811a0b4..ed9b37968316ea77ec99e8f2d224d1a3841f79ad 100644 (file)
@@ -1052,18 +1052,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
             self._remove_osds_bg()
 
-            service_completions = self._apply_all_services()
-            for service_completion in service_completions:
-                if service_completion:
-                    while not service_completion.has_result:
-                        self.process([service_completion])
-                        self.log.debug(f'Still processing {service_completion}')
-                        if service_completion.needs_result:
-                            time.sleep(1)
-                        else:
-                            break
-                    if service_completion.exception is not None:
-                        self.log.error(str(service_completion.exception))
+            if self._apply_all_services():
+                continue  # did something, refresh
 
             self._refresh_configs()
 
@@ -2159,32 +2149,51 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             spec=spec,
             get_hosts_func=self._get_hosts,
             get_daemons_func=self.cache.get_daemons_by_service).place()
-        count = len(hosts)
-        if len(daemons) > count:
-            # remove some
-            to_remove = len(daemons) - count
-            args = []
-            for d in daemons[0:to_remove]:
-                args.append(
-                    ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname)
+
+        r = False
+
+        # add any?
+        did_config = False
+        hosts_with_daemons = {d.hostname for d in daemons}
+        self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
+        for host, network, name in hosts:
+            if host not in hosts_with_daemons:
+                if not did_config and config_func:
+                    config_func(spec)
+                    did_config = True
+                daemon_id = self.get_unique_name(daemon_type, host, daemons,
+                                                 spec.service_id, name)
+                self.log.debug('Placing %s.%s on host %s' % (
+                    daemon_type, daemon_id, host))
+                if daemon_type == 'mon':
+                    create_func(daemon_id, host, network)  # type: ignore
+                else:
+                    create_func(daemon_id, host)           # type: ignore
+
+                # add to daemon list so next name(s) will also be unique
+                sd = orchestrator.DaemonDescription(
+                    hostname=host,
+                    daemon_type=daemon_type,
+                    daemon_id=daemon_id,
                 )
-            return self._remove_daemon(args)
-        elif len(daemons) < count:
-            # add some
-            count -= len(daemons)
-            hosts_with_daemons = {d.hostname for d in daemons}
-            hosts_without_daemons = {p for p in spec.placement.hosts
-                                     if p.hostname not in hosts_with_daemons}
-            return self._create_daemons(daemon_type, spec, daemons,
-                                        hosts_without_daemons, count,
-                                        create_func, config_func)
-        return trivial_result([])
+                daemons.append(sd)
+                r = True
+
+        # remove any?
+        target_hosts = [h.hostname for h in hosts]
+        for d in daemons:
+            if d.hostname not in target_hosts:
+                self._remove_daemon(d.name(), d.hostname)
+                r = True
+
+        return r
 
     def _apply_all_services(self):
-        r : List[orchestrator.Completion] = []
+        r = False
         for sn, spec in self.spec_store.specs.items():
             try:
-                r.extend(self._apply_service(spec))
+                if self._apply_service(spec):
+                    r = True
             except Exception as e:
                 self.log.warning('Failed to apply %s spec %s: %s' % (
                     spec.service_name(), spec, e))
index c1a43e5d030b42502ac8e48f3d5a26724ed560ba..3d7378820c820663102a94a9ed55e779dda54cdf 100644 (file)
@@ -154,9 +154,8 @@ class TestCephadm(object):
     def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
-            c = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
-            [out] = wait(cephadm_module, c)
-            match_glob(out, "Deployed mgr.* on host 'test'")
+            r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
+            assert r
 
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@@ -237,9 +236,9 @@ class TestCephadm(object):
                 match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
 
                 ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
-                c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
-                [out] = wait(cephadm_module, c)
-                match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'")
+                r = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
+                assert r
+
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@@ -267,8 +266,8 @@ class TestCephadm(object):
 
                 with pytest.raises(OrchestratorError):
                     ps = PlacementSpec(hosts=['host1', 'host2'], count=3)
-                    c = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
-                    [out] = wait(cephadm_module, c)
+                    r = cephadm_module._apply_service(RGWSpec('realm', 'zone1', placement=ps))
+                    assert r
 
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(