]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Add `_check_for_moved_osds`
authorSebastian Wagner <sewagner@redhat.com>
Wed, 8 Sep 2021 15:04:58 +0000 (17:04 +0200)
committerSebastian Wagner <sewagner@redhat.com>
Mon, 3 Jan 2022 13:59:49 +0000 (14:59 +0100)
Fixes: https://tracker.ceph.com/issues/49571
Signed-off-by: Sebastian Wagner <sewagner@redhat.com>
(cherry picked from commit 0ef6788fe6e4c3cfbebb401748eb9da29a56975c)

Conflicts:
src/pybind/mgr/cephadm/serve.py

src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/orchestrator/_interface.py

index 5c69e1fc09a477d39e1d4ccf42da2a400c857a23..1f77898e2511e59b07b5f781e89ac28a770a915c 100644 (file)
@@ -726,23 +726,24 @@ class HostCache():
     def get_facts(self, host: str) -> Dict[str, Any]:
         return self.facts.get(host, {})
 
+    def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+        for dm in self.daemons.values():
+            yield from dm.values()
+
     def get_daemons(self):
         # type: () -> List[orchestrator.DaemonDescription]
-        r = []
-        for host, dm in self.daemons.items():
-            for name, dd in dm.items():
-                r.append(dd)
-        return r
+        return list(self._get_daemons())
 
     def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
         return list(self.daemons.get(host, {}).values())
 
-    def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+    def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
         assert not daemon_name.startswith('ha-rgw.')
-        for _, dm in self.daemons.items():
-            for _, dd in dm.items():
-                if dd.name() == daemon_name:
-                    return dd
+        dds = self.get_daemons_by_host(host) if host else self._get_daemons()
+        for dd in dds:
+            if dd.name() == daemon_name:
+                return dd
+
         raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
 
     def has_daemon(self, daemon_name: str) -> bool:
@@ -774,12 +775,7 @@ class HostCache():
         assert not service_name.startswith('keepalived.')
         assert not service_name.startswith('haproxy.')
 
-        result = []   # type: List[orchestrator.DaemonDescription]
-        for host, dm in self.daemons.items():
-            for name, d in dm.items():
-                if d.service_name() == service_name:
-                    result.append(d)
-        return result
+        return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
 
     def get_daemons_by_type(self, service_type):
         # type: (str) -> List[orchestrator.DaemonDescription]
index 35092793a3339f6999d174cd7e3c883925a85edd..cc8bd1307846fafa7fc345f0770b9b51fe365e65 100644 (file)
@@ -4,7 +4,8 @@ import logging
 import uuid
 from collections import defaultdict
 from contextlib import contextmanager
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator, \
+    DefaultDict
 
 from cephadm import remotes
 
@@ -101,6 +102,8 @@ class CephadmServe:
 
                     self._purge_deleted_services()
 
+                    self._check_for_moved_osds()
+
                     if self.mgr.upgrade.continue_upgrade():
                         continue
 
@@ -539,9 +542,33 @@ class CephadmServe:
                         'stray host %s has %d stray daemons: %s' % (
                             host, len(missing_names), missing_names))
             if self.mgr.warn_on_stray_hosts and host_detail:
-                self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
+                self.mgr.set_health_warning(
+                    'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
             if self.mgr.warn_on_stray_daemons and daemon_detail:
-                self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+                self.mgr.set_health_warning(
+                    'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+
+    def _check_for_moved_osds(self) -> None:
+        all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
+        for dd in self.mgr.cache.get_daemons_by_type('osd'):
+            assert dd.daemon_id
+            all_osds[int(dd.daemon_id)].append(dd)
+        for dds in all_osds.values():
+            if len(dds) <= 1:
+                continue
+            running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
+            error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
+            msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
+            logger.info(msg)
+            if len(running) != 1:
+                continue
+            for e in error:
+                assert e.hostname
+                try:
+                    self._remove_daemon(e.name(), e.hostname)
+                except OrchestratorError as ex:
+                    self.mgr.events.from_orch_error(ex)
+                    logger.exception(f'failed to remove duplicated daemon {e}')
 
     def _apply_all_services(self) -> bool:
         r = False
index 1230a1f48a7bd99dc51c1833c7827b1c8e9d047d..6acd3938956dcbc7d4075ec6a9c82cbe92586651 100644 (file)
@@ -20,7 +20,7 @@ from ceph.deployment.drive_selection.selector import DriveSelection
 from ceph.deployment.inventory import Devices, Device
 from ceph.utils import datetime_to_str, datetime_now
 from orchestrator import DaemonDescription, InventoryHost, \
-    HostSpec, OrchestratorError
+    HostSpec, OrchestratorError, DaemonDescriptionStatus
 from tests import mock
 from .fixtures import wait, _run_cephadm, match_glob, with_host, \
     with_cephadm_module, with_service, _deploy_cephadm_binary
@@ -68,6 +68,44 @@ def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: st
     assert False, 'Daemon not found'
 
 
+@contextmanager
+def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None):
+    cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+        'osds': [
+            {
+                'osd': 1,
+                'up_from': 0,
+                'uuid': 'uuid'
+            }
+        ]
+    })
+
+    ceph_volume_lvm_list = ceph_volume_lvm_list or {
+        str(osd_id): [{
+            'tags': {
+                'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+                'ceph.osd_fsid': 'uuid'
+            },
+            'type': 'data'
+        }]
+    }
+    _run_cephadm.return_value = (json.dumps(ceph_volume_lvm_list), '', 0)
+    _run_cephadm.reset_mock()
+    assert cephadm_module._osd_activate(
+        [host]).stdout == f"Created osd(s) 1 on host '{host}'"
+    assert _run_cephadm.mock_calls == [
+        mock.call(host, 'osd', 'ceph-volume',
+                  ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, image=''),
+        mock.call(host, f'osd.{osd_id}', 'deploy',
+                  ['--name', f'osd.{osd_id}', '--meta-json', mock.ANY,
+                   '--config-json', '-', '--osd-fsid', 'uuid'],
+                  stdin=mock.ANY, image=''),
+    ]
+    dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host)
+    assert dd.name() == f'osd.{osd_id}'
+    yield dd
+
+
 class TestCephadm(object):
 
     def test_get_unique_name(self, cephadm_module):
@@ -863,6 +901,22 @@ class TestCephadm(object):
             out = wait(cephadm_module, c)
             assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"]
 
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+    def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+        _run_cephadm.return_value = ('{}', '', 0)
+        with with_host(cephadm_module, 'host1'):
+            with with_host(cephadm_module, 'host2'):
+                with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1:  # type: DaemonDescription
+                    with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2:  # type: DaemonDescription
+                        dd1.status = DaemonDescriptionStatus.running
+                        dd2.status = DaemonDescriptionStatus.error
+                        cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1})
+                        cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2})
+
+                        CephadmServe(cephadm_module)._check_for_moved_osds()
+
+                        assert len(cephadm_module.cache.get_daemons()) == 1
+
     @pytest.mark.parametrize(
         "spec",
         [
index e43f843ba13c1913d8714fb98c3b1d5df487c56f..f6031edd1e96123d35b2f39148e90b6ce90ce5b8 100644 (file)
@@ -970,6 +970,9 @@ class DaemonDescription(object):
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
 
+    def __str__(self) -> str:
+        return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
     def to_json(self) -> dict:
         out: Dict[str, Any] = OrderedDict()
         out['daemon_type'] = self.daemon_type