From: Sebastian Wagner Date: Wed, 8 Sep 2021 15:04:58 +0000 (+0200) Subject: mgr/cephadm: Add `_check_for_moved_osds` X-Git-Tag: v16.2.8~273^2~28 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=20767bed8183a0d620f0a2a30f4b14990a989931;p=ceph.git mgr/cephadm: Add `_check_for_moved_osds` Fixes: https://tracker.ceph.com/issues/49571 Signed-off-by: Sebastian Wagner (cherry picked from commit 0ef6788fe6e4c3cfbebb401748eb9da29a56975c) Conflicts: src/pybind/mgr/cephadm/serve.py --- diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index 5c69e1fc09a47..1f77898e2511e 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -726,23 +726,24 @@ class HostCache(): def get_facts(self, host: str) -> Dict[str, Any]: return self.facts.get(host, {}) + def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]: + for dm in self.daemons.values(): + yield from dm.values() + def get_daemons(self): # type: () -> List[orchestrator.DaemonDescription] - r = [] - for host, dm in self.daemons.items(): - for name, dd in dm.items(): - r.append(dd) - return r + return list(self._get_daemons()) def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]: return list(self.daemons.get(host, {}).values()) - def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription: + def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription: assert not daemon_name.startswith('ha-rgw.') - for _, dm in self.daemons.items(): - for _, dd in dm.items(): - if dd.name() == daemon_name: - return dd + dds = self.get_daemons_by_host(host) if host else self._get_daemons() + for dd in dds: + if dd.name() == daemon_name: + return dd + raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') def has_daemon(self, daemon_name: str) -> bool: @@ -774,12 +775,7 @@ class HostCache(): assert not service_name.startswith('keepalived.') assert not service_name.startswith('haproxy.') - result = [] # type: List[orchestrator.DaemonDescription] - for host, dm in self.daemons.items(): - for name, d in dm.items(): - if d.service_name() == service_name: - result.append(d) - return result + return list(dd for dd in self._get_daemons() if dd.service_name() == service_name) def get_daemons_by_type(self, service_type): # type: (str) -> List[orchestrator.DaemonDescription] diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 35092793a3339..cc8bd1307846f 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -4,7 +4,8 @@ import logging import uuid from collections import defaultdict from contextlib import contextmanager -from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator +from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator, \ + DefaultDict from cephadm import remotes @@ -101,6 +102,8 @@ class CephadmServe: self._purge_deleted_services() + self._check_for_moved_osds() + if self.mgr.upgrade.continue_upgrade(): continue @@ -539,9 +542,33 @@ class CephadmServe: 'stray host %s has %d stray daemons: %s' % ( host, len(missing_names), missing_names)) if self.mgr.warn_on_stray_hosts and host_detail: - self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail) + self.mgr.set_health_warning( + 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail) if self.mgr.warn_on_stray_daemons and daemon_detail: - self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail) + self.mgr.set_health_warning( + 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail) + + def _check_for_moved_osds(self) -> None: + all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list) + for dd in self.mgr.cache.get_daemons_by_type('osd'): + assert dd.daemon_id + all_osds[int(dd.daemon_id)].append(dd) + for dds in all_osds.values(): + if len(dds) <= 1: + continue + running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running] + error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error] + msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}' + logger.info(msg) + if len(running) != 1: + continue + for e in error: + assert e.hostname + try: + self._remove_daemon(e.name(), e.hostname) + except OrchestratorError as ex: + self.mgr.events.from_orch_error(ex) + logger.exception(f'failed to remove duplicated daemon {e}') def _apply_all_services(self) -> bool: r = False diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 1230a1f48a7bd..6acd3938956dc 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -20,7 +20,7 @@ from ceph.deployment.drive_selection.selector import DriveSelection from ceph.deployment.inventory import Devices, Device from ceph.utils import datetime_to_str, datetime_now from orchestrator import DaemonDescription, InventoryHost, \ - HostSpec, OrchestratorError + HostSpec, OrchestratorError, DaemonDescriptionStatus from tests import mock from .fixtures import wait, _run_cephadm, match_glob, with_host, \ with_cephadm_module, with_service, _deploy_cephadm_binary @@ -68,6 +68,44 @@ def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: st assert False, 'Daemon not found' +@contextmanager +def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None): + cephadm_module.mock_store_set('_ceph_get', 'osd_map', { + 'osds': [ + { + 'osd': 1, + 'up_from': 0, + 'uuid': 'uuid' + } + ] + }) + + ceph_volume_lvm_list = ceph_volume_lvm_list or { + str(osd_id): [{ + 'tags': { + 'ceph.cluster_fsid': cephadm_module._cluster_fsid, + 'ceph.osd_fsid': 'uuid' + }, + 'type': 'data' + }] + } + _run_cephadm.return_value = (json.dumps(ceph_volume_lvm_list), '', 0) + _run_cephadm.reset_mock() + assert cephadm_module._osd_activate( + [host]).stdout == f"Created osd(s) 1 on host '{host}'" + assert _run_cephadm.mock_calls == [ + mock.call(host, 'osd', 'ceph-volume', + ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, image=''), + mock.call(host, f'osd.{osd_id}', 'deploy', + ['--name', f'osd.{osd_id}', '--meta-json', mock.ANY, + '--config-json', '-', '--osd-fsid', 'uuid'], + stdin=mock.ANY, image=''), + ] + dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host) + assert dd.name() == f'osd.{osd_id}' + yield dd + + class TestCephadm(object): def test_get_unique_name(self, cephadm_module): @@ -863,6 +901,22 @@ class TestCephadm(object): out = wait(cephadm_module, c) assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"] + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1: # type: DaemonDescription + with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2: # type: DaemonDescription + dd1.status = DaemonDescriptionStatus.running + dd2.status = DaemonDescriptionStatus.error + cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1}) + cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2}) + + CephadmServe(cephadm_module)._check_for_moved_osds() + + assert len(cephadm_module.cache.get_daemons()) == 1 + @pytest.mark.parametrize( "spec", [ diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index e43f843ba13c1..f6031edd1e961 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -970,6 +970,9 @@ class DaemonDescription(object): return "({type}.{id})".format(type=self.daemon_type, id=self.daemon_id) + def __str__(self) -> str: + return f"{self.name()} in status {self.status_desc} on {self.hostname}" + def to_json(self) -> dict: out: Dict[str, Any] = OrderedDict() out['daemon_type'] = self.daemon_type