]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: Add `_check_for_moved_osds`
authorSebastian Wagner <sewagner@redhat.com>
Wed, 8 Sep 2021 15:04:58 +0000 (17:04 +0200)
committerSebastian Wagner <sewagner@redhat.com>
Wed, 10 Nov 2021 12:47:46 +0000 (13:47 +0100)
Fixes: https://tracker.ceph.com/issues/49571
Signed-off-by: Sebastian Wagner <sewagner@redhat.com>
src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/orchestrator/_interface.py

index f50a7aa2bb516f8aab45e3c5fdedcd4380f7f39a..4090d040fa03a06de004587fa3b8dcfeb59e7a38 100644 (file)
@@ -809,13 +809,13 @@ class HostCache():
     def get_facts(self, host: str) -> Dict[str, Any]:
         return self.facts.get(host, {})
 
+    def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+        for dm in self.daemons.values():
+            yield from dm.values()
+
     def get_daemons(self):
         # type: () -> List[orchestrator.DaemonDescription]
-        r = []
-        for host, dm in self.daemons.items():
-            for name, dd in dm.items():
-                r.append(dd)
-        return r
+        return list(self._get_daemons())
 
     def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
         r = []
@@ -828,12 +828,13 @@ class HostCache():
     def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
         return list(self.daemons.get(host, {}).values())
 
-    def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+    def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
         assert not daemon_name.startswith('ha-rgw.')
-        for _, dm in self.daemons.items():
-            for _, dd in dm.items():
-                if dd.name() == daemon_name:
-                    return dd
+        dds = self.get_daemons_by_host(host) if host else self._get_daemons()
+        for dd in dds:
+            if dd.name() == daemon_name:
+                return dd
+
         raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
 
     def has_daemon(self, daemon_name: str) -> bool:
@@ -865,12 +866,7 @@ class HostCache():
         assert not service_name.startswith('keepalived.')
         assert not service_name.startswith('haproxy.')
 
-        result = []   # type: List[orchestrator.DaemonDescription]
-        for host, dm in self.daemons.items():
-            for name, d in dm.items():
-                if d.service_name() == service_name:
-                    result.append(d)
-        return result
+        return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
 
     def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
         assert service_type not in ['keepalived', 'haproxy']
index 77924e97e0df546baa6500fd0e4177a2a14a327e..274fa6bb5fbc00d2a4775196c0d6d46d04c23f34 100644 (file)
@@ -3,7 +3,8 @@ import json
 import logging
 import uuid
 from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
+    DefaultDict
 
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
@@ -91,6 +92,8 @@ class CephadmServe:
 
                     self._purge_deleted_services()
 
+                    self._check_for_moved_osds()
+
                     if self.mgr.agent_helpers._handle_use_agent_setting():
                         continue
 
@@ -513,6 +516,28 @@ class CephadmServe:
                 self.mgr.set_health_warning(
                     'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
 
+    def _check_for_moved_osds(self) -> None:
+        all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
+        for dd in self.mgr.cache.get_daemons_by_type('osd'):
+            assert dd.daemon_id
+            all_osds[int(dd.daemon_id)].append(dd)
+        for dds in all_osds.values():
+            if len(dds) <= 1:
+                continue
+            running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
+            error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
+            msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
+            logger.info(msg)
+            if len(running) != 1:
+                continue
+            for e in error:
+                assert e.hostname
+                try:
+                    self._remove_daemon(e.name(), e.hostname)
+                except OrchestratorError as ex:
+                    self.mgr.events.from_orch_error(ex)
+                    logger.exception(f'failed to remove duplicated daemon {e}')
+
     def _apply_all_services(self) -> bool:
         r = False
         specs = []  # type: List[ServiceSpec]
index d0033769b7eca9c5fcc60ba9a9da46c5cff1b57c..be8f3aad059d7a9650723572a0d63baffb7c832a 100644 (file)
@@ -20,7 +20,7 @@ from ceph.deployment.drive_selection.selector import DriveSelection
 from ceph.deployment.inventory import Devices, Device
 from ceph.utils import datetime_to_str, datetime_now
 from orchestrator import DaemonDescription, InventoryHost, \
-    HostSpec, OrchestratorError
+    HostSpec, OrchestratorError, DaemonDescriptionStatus
 from tests import mock
 from .fixtures import wait, _run_cephadm, match_glob, with_host, \
     with_cephadm_module, with_service, _deploy_cephadm_binary
@@ -68,6 +68,44 @@ def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: st
     assert False, 'Daemon not found'
 
 
+@contextmanager
+def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None):
+    cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+        'osds': [
+            {
+                'osd': 1,
+                'up_from': 0,
+                'uuid': 'uuid'
+            }
+        ]
+    })
+
+    ceph_volume_lvm_list = ceph_volume_lvm_list or {
+        str(osd_id): [{
+            'tags': {
+                'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+                'ceph.osd_fsid': 'uuid'
+            },
+            'type': 'data'
+        }]
+    }
+    _run_cephadm.return_value = (json.dumps(ceph_volume_lvm_list), '', 0)
+    _run_cephadm.reset_mock()
+    assert cephadm_module._osd_activate(
+        [host]).stdout == f"Created osd(s) 1 on host '{host}'"
+    assert _run_cephadm.mock_calls == [
+        mock.call(host, 'osd', 'ceph-volume',
+                  ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, image=''),
+        mock.call(host, f'osd.{osd_id}', 'deploy',
+                  ['--name', f'osd.{osd_id}', '--meta-json', mock.ANY,
+                   '--config-json', '-', '--osd-fsid', 'uuid'],
+                  stdin=mock.ANY, image=''),
+    ]
+    dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host)
+    assert dd.name() == f'osd.{osd_id}'
+    yield dd
+
+
 class TestCephadm(object):
 
     def test_get_unique_name(self, cephadm_module):
@@ -866,6 +904,22 @@ class TestCephadm(object):
             out = wait(cephadm_module, c)
             assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"]
 
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+    def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+        _run_cephadm.return_value = ('{}', '', 0)
+        with with_host(cephadm_module, 'host1'):
+            with with_host(cephadm_module, 'host2'):
+                with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1:  # type: DaemonDescription
+                    with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2:  # type: DaemonDescription
+                        dd1.status = DaemonDescriptionStatus.running
+                        dd2.status = DaemonDescriptionStatus.error
+                        cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1})
+                        cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2})
+
+                        CephadmServe(cephadm_module)._check_for_moved_osds()
+
+                        assert len(cephadm_module.cache.get_daemons()) == 1
+
     @pytest.mark.parametrize(
         "spec",
         [
index 9f86abe8db0a35ee4989322799cea24f363513bd..abc25a53fd834780dee606086e6fd8c9f4b4ed1e 100644 (file)
@@ -965,6 +965,9 @@ class DaemonDescription(object):
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
 
+    def __str__(self) -> str:
+        return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
     def to_json(self) -> dict:
         out: Dict[str, Any] = OrderedDict()
         out['daemon_type'] = self.daemon_type