]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Fix race between host_add and _apply_all_specs
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 14 Sep 2020 10:17:43 +0000 (12:17 +0200)
committerNathan Cutler <ncutler@suse.com>
Tue, 6 Oct 2020 09:40:53 +0000 (11:40 +0200)
This mitigates a potential race, where new host was added *after*
``_refresh_host_daemons()`` was called, but *before*
``_apply_all_specs()`` was called. thus we end up with a hosts
where daemons might be running, but we have not yet detected them.

Imagine:

1. the initial MGR was deployed manually
2. `serve()` is called
3. `_refresh_hosts_and_daemons()` is called
4. `add_host()` is called with the initial host
5. `apply_all_specs()` now deploys an additional MGR on the initial host

Fixes: https://tracker.ceph.com/issues/45093
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit 72e3990382d65205de7871cc86b5063ed16732e5)

src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/tests/test_migration.py

index 1131df23302c3e2bab2bac69896249fe8d44bd31..5040f17d9caa4d00613fa310f08df20733da8869 100644 (file)
@@ -436,6 +436,16 @@ class HostCache():
             return True
         return False
 
+    def host_had_daemon_refresh(self, host: str) -> bool:
+        """
+        ... at least once.
+        """
+        if host in self.last_daemon_update:
+            return True
+        if host not in self.daemons:
+            return False
+        return bool(self.daemons[host])
+
     def host_needs_device_refresh(self, host):
         # type: (str) -> bool
         if host in self.mgr.offline_hosts:
@@ -514,7 +524,7 @@ class HostCache():
         We're not checking for `host_needs_daemon_refresh`, as this might never be
         False for all hosts.
         """
-        return all((h in self.last_daemon_update or h in self.mgr.offline_hosts)
+        return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
                    for h in self.get_hosts())
 
     def schedule_daemon_action(self, host: str, daemon_name: str, action: str):
index 655d75674c858c34ad27cd5d7f30c7590cf2d5ce..3a47ce3ad4e26573ead9f4aa6020cd878efb4794 100644 (file)
@@ -1239,6 +1239,20 @@ To check that the host is reachable:
                         code, '\n'.join(err)))
             return out, err, code
 
+    def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
+        """
+        Returns all hosts that went through _refresh_host_daemons().
+
+        This mitigates a potential race, where new host was added *after*
+        ``_refresh_host_daemons()`` was called, but *before*
+        ``_apply_all_specs()`` was called. thus we end up with a hosts
+        where daemons might be running, but we have not yet detected them.
+        """
+        return [
+            h for h in self.inventory.all_specs()
+            if self.cache.host_had_daemon_refresh(h.hostname)
+        ]
+
     def _add_host(self, spec):
         # type: (HostSpec) -> str
         """
@@ -2100,6 +2114,8 @@ To check that the host is reachable:
         Schedule a service.  Deploy new daemons or remove old ones, depending
         on the target label and count specified in the placement.
         """
+        self.migration.verify_no_migration()
+
         daemon_type = spec.service_type
         service_name = spec.service_name()
         if spec.unmanaged:
@@ -2140,7 +2156,7 @@ To check that the host is reachable:
 
         ha = HostAssignment(
             spec=spec,
-            hosts=self.inventory.all_specs(),
+            hosts=self._hosts_with_daemon_inventory(),
             get_daemons_func=self.cache.get_daemons_by_service,
             filter_new_host=matches_network if daemon_type == 'mon' else None,
         )
@@ -2384,8 +2400,6 @@ To check that the host is reachable:
         return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
 
     def _apply(self, spec: GenericSpec) -> str:
-        self.migration.verify_no_migration()
-
         if spec.service_type == 'host':
             return self._add_host(cast(HostSpec, spec))
 
@@ -2404,7 +2418,7 @@ To check that the host is reachable:
 
         ha = HostAssignment(
             spec=spec,
-            hosts=self.inventory.all_specs(),
+            hosts=self._hosts_with_daemon_inventory(),
             get_daemons_func=self.cache.get_daemons_by_service,
         )
         ha.validate()
@@ -2459,7 +2473,7 @@ To check that the host is reachable:
 
         HostAssignment(
             spec=spec,
-            hosts=self.inventory.all_specs(),
+            hosts=self.inventory.all_specs(),  # All hosts, even those without daemon refresh
             get_daemons_func=self.cache.get_daemons_by_service,
         ).validate()
 
index 8e6adc9678d2baffe655e836e5f50e787ffa82ab..460e4a24b75b472fceb464197f8a37137bc35b60 100644 (file)
@@ -114,9 +114,11 @@ def wait(m, c):
 
 
 @contextmanager
-def with_host(m: CephadmOrchestrator, name):
+def with_host(m: CephadmOrchestrator, name, refresh_hosts=True):
     # type: (CephadmOrchestrator, str) -> None
     wait(m, m.add_host(HostSpec(hostname=name)))
+    if refresh_hosts:
+        m._refresh_hosts_and_daemons()
     yield
     wait(m, m.remove_host(name))
 
index defc5912d75ba227448bee65e9eeda47084aa826..4464fa0ba5efe0112aa4fb51138450c695f349a4 100644 (file)
@@ -769,7 +769,7 @@ class TestCephadm(object):
             return '{}', None, 0
         with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
             with mock.patch("remoto.process.check", _check):
-                with with_host(cephadm_module, 'test'):
+                with with_host(cephadm_module, 'test', refresh_hosts=False):
                     code, out, err = cephadm_module.check_host('test')
                     # First should succeed.
                     assert err is None
@@ -878,7 +878,7 @@ class TestCephadm(object):
                                  True
                              ])
     def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
-        with with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test', refresh_hosts=False):
             cephadm_module.set_container_image('global', 'image')
             if use_repo_digest:
                 cephadm_module.use_repo_digest = True
index b8951c9e29b6a510f87892bceace97d5a5332d2c..08e0abd5a61e78157d61861d16e9238bfd677b91 100644 (file)
@@ -1,18 +1,21 @@
 import json
 from datetime import datetime
 
+import pytest
+
 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
 from cephadm import CephadmOrchestrator
 from cephadm.inventory import SPEC_STORE_PREFIX, DATEFMT
 from cephadm.tests.fixtures import _run_cephadm, cephadm_module, wait, with_host
+from orchestrator import OrchestratorError
 from tests import mock
 
 
 @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
 @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
 def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
-    with with_host(cephadm_module, 'host1'):
-        with with_host(cephadm_module, 'host2'):
+    with with_host(cephadm_module, 'host1', refresh_hosts=False):
+        with with_host(cephadm_module, 'host2', refresh_hosts=False):
 
             # emulate the old scheduler:
             c = cephadm_module.apply_rgw(
@@ -20,7 +23,19 @@ def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
             )
             assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
 
+            # with pytest.raises(OrchestratorError, match="cephadm migration still ongoing. Please wait, until the migration is complete."):
             cephadm_module._apply_all_services()
+
+            cephadm_module.migration_current = 0
+            cephadm_module.migration.migrate()
+            # assert we need all daemons.
+            assert cephadm_module.migration_current == 0
+
+            cephadm_module._refresh_hosts_and_daemons()
+            cephadm_module.migration.migrate()
+
+            cephadm_module._apply_all_services()
+
             out = {o.hostname for o in wait(cephadm_module, cephadm_module.list_daemons())}
             assert out == {'host1', 'host2'}
 
@@ -29,17 +44,12 @@ def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
             )
             assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
 
-            cephadm_module.migration_current = 0
-            cephadm_module.migration.migrate()
-
-            # assert we need all daemons.
-            assert cephadm_module.migration_current == 0
-
             # Sorry, for this hack, but I need to make sure, Migration thinks,
             # we have updated all daemons already.
             cephadm_module.cache.last_daemon_update['host1'] = datetime.now()
             cephadm_module.cache.last_daemon_update['host2'] = datetime.now()
 
+            cephadm_module.migration_current = 0
             cephadm_module.migration.migrate()
             assert cephadm_module.migration_current == 2