def get_facts(self, host: str) -> Dict[str, Any]:
return self.facts.get(host, {})
+ def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+ for dm in self.daemons.values():
+ yield from dm.values()
+
def get_daemons(self):
# type: () -> List[orchestrator.DaemonDescription]
- r = []
- for host, dm in self.daemons.items():
- for name, dd in dm.items():
- r.append(dd)
- return r
+ return list(self._get_daemons())
def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
return list(self.daemons.get(host, {}).values())
- def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+ def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
assert not daemon_name.startswith('ha-rgw.')
- for _, dm in self.daemons.items():
- for _, dd in dm.items():
- if dd.name() == daemon_name:
- return dd
+ dds = self.get_daemons_by_host(host) if host else self._get_daemons()
+ for dd in dds:
+ if dd.name() == daemon_name:
+ return dd
+
raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
def has_daemon(self, daemon_name: str) -> bool:
assert not service_name.startswith('keepalived.')
assert not service_name.startswith('haproxy.')
- result = [] # type: List[orchestrator.DaemonDescription]
- for host, dm in self.daemons.items():
- for name, d in dm.items():
- if d.service_name() == service_name:
- result.append(d)
- return result
+ return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
def get_daemons_by_type(self, service_type):
# type: (str) -> List[orchestrator.DaemonDescription]
import uuid
from collections import defaultdict
from contextlib import contextmanager
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator, \
+ DefaultDict
from cephadm import remotes
self._purge_deleted_services()
+ self._check_for_moved_osds()
+
if self.mgr.upgrade.continue_upgrade():
continue
'stray host %s has %d stray daemons: %s' % (
host, len(missing_names), missing_names))
if self.mgr.warn_on_stray_hosts and host_detail:
- self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
+ self.mgr.set_health_warning(
+ 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
if self.mgr.warn_on_stray_daemons and daemon_detail:
- self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+ self.mgr.set_health_warning(
+ 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+
+ def _check_for_moved_osds(self) -> None:
+ all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
+ for dd in self.mgr.cache.get_daemons_by_type('osd'):
+ assert dd.daemon_id
+ all_osds[int(dd.daemon_id)].append(dd)
+ for dds in all_osds.values():
+ if len(dds) <= 1:
+ continue
+ running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
+ error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
+ msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
+ logger.info(msg)
+ if len(running) != 1:
+ continue
+ for e in error:
+ assert e.hostname
+ try:
+ self._remove_daemon(e.name(), e.hostname)
+ except OrchestratorError as ex:
+ self.mgr.events.from_orch_error(ex)
+ logger.exception(f'failed to remove duplicated daemon {e}')
def _apply_all_services(self) -> bool:
r = False
from ceph.deployment.inventory import Devices, Device
from ceph.utils import datetime_to_str, datetime_now
from orchestrator import DaemonDescription, InventoryHost, \
- HostSpec, OrchestratorError
+ HostSpec, OrchestratorError, DaemonDescriptionStatus
from tests import mock
from .fixtures import wait, _run_cephadm, match_glob, with_host, \
with_cephadm_module, with_service, _deploy_cephadm_binary
assert False, 'Daemon not found'
+@contextmanager
+def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None):
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {
+ 'osd': 1,
+ 'up_from': 0,
+ 'uuid': 'uuid'
+ }
+ ]
+ })
+
+ ceph_volume_lvm_list = ceph_volume_lvm_list or {
+ str(osd_id): [{
+ 'tags': {
+ 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+ 'ceph.osd_fsid': 'uuid'
+ },
+ 'type': 'data'
+ }]
+ }
+ _run_cephadm.return_value = (json.dumps(ceph_volume_lvm_list), '', 0)
+ _run_cephadm.reset_mock()
+ assert cephadm_module._osd_activate(
+ [host]).stdout == f"Created osd(s) 1 on host '{host}'"
+ assert _run_cephadm.mock_calls == [
+ mock.call(host, 'osd', 'ceph-volume',
+ ['--', 'lvm', 'list', '--format', 'json'], no_fsid=False, image=''),
+ mock.call(host, f'osd.{osd_id}', 'deploy',
+ ['--name', f'osd.{osd_id}', '--meta-json', mock.ANY,
+ '--config-json', '-', '--osd-fsid', 'uuid'],
+ stdin=mock.ANY, image=''),
+ ]
+ dd = cephadm_module.cache.get_daemon(f'osd.{osd_id}', host=host)
+ assert dd.name() == f'osd.{osd_id}'
+ yield dd
+
+
class TestCephadm(object):
def test_get_unique_name(self, cephadm_module):
out = wait(cephadm_module, c)
assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"]
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_remove_duplicate_osds(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'host1', 1) as dd1: # type: DaemonDescription
+ with with_osd_daemon(cephadm_module, _run_cephadm, 'host2', 1) as dd2: # type: DaemonDescription
+ dd1.status = DaemonDescriptionStatus.running
+ dd2.status = DaemonDescriptionStatus.error
+ cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1})
+ cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2})
+
+ CephadmServe(cephadm_module)._check_for_moved_osds()
+
+ assert len(cephadm_module.cache.get_daemons()) == 1
+
@pytest.mark.parametrize(
"spec",
[
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
id=self.daemon_id)
+ def __str__(self) -> str:
+ return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
def to_json(self) -> dict:
out: Dict[str, Any] = OrderedDict()
out['daemon_type'] = self.daemon_type