From 91c1c6181671719e958ab787f8f3c96850167d82 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 15 May 2020 12:04:14 +0200 Subject: [PATCH] mgr/cephadm: Fix OSDs not being created Fixes: https://tracker.ceph.com/issues/45560 This was caused by completions no longer getting executed synchonously. Also * We don't want to have a busy loop when applying OSDs * Added call to `_apply_all_services` to OSD test This is a follow-up on f7bf4d0c597080f45bb8466bd6bab3b9cecb03c4 Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 69 +++++++++++--------- src/pybind/mgr/cephadm/services/osd.py | 8 ++- src/pybind/mgr/cephadm/tests/test_cephadm.py | 41 ++++++++++-- 3 files changed, 78 insertions(+), 40 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 64f046a99ebcf..9db98b977b93d 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -1473,7 +1473,7 @@ you may want to run: @trivial_completion def create_osds(self, drive_group: DriveGroupSpec): - return self.osd_service.create_from_spec(drive_group) + return self.osd_service.create(drive_group) @trivial_completion def preview_osdspecs(self, @@ -1639,7 +1639,37 @@ you may want to run: self.cache.invalidate_host_daemons(host) return "Removed {} from host '{}'".format(name, host) - def _apply_service(self, spec) -> bool: + def _create_fn(self, service_type: str) -> Callable[..., str]: + try: + d: Dict[str, function] = { + 'mon': self.mon_service.create, + 'mgr': self.mgr_service.create, + 'osd': self.osd_service.create, + 'mds': self.mds_service.create, + 'rgw': self.rgw_service.create, + 'rbd-mirror': self.rbd_mirror_service.create, + 'nfs': self.nfs_service.create, + 'grafana': self.grafana_service.create, + 'alertmanager': self.alertmanager_service.create, + 'prometheus': self.prometheus_service.create, + 'node-exporter': self.node_exporter_service.create, + 'crash': self.crash_service.create, + 'iscsi': self.iscsi_service.create, + } + return d[service_type] # type: ignore + except KeyError: + self.log.exception(f'unknown service type {service_type}') + raise OrchestratorError(f'unknown service type {service_type}') from e + + def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]: + return { + 'mds': self.mds_service.config, + 'rgw': self.rgw_service.config, + 'nfs': self.nfs_service.config, + 'iscsi': self.iscsi_service.config, + }.get(service_type) + + def _apply_service(self, spec: ServiceSpec) -> bool: """ Schedule a service. Deploy new daemons or remove old ones, depending on the target label and count specified in the placement. @@ -1650,32 +1680,14 @@ you may want to run: self.log.debug('Skipping unmanaged service %s spec' % service_name) return False self.log.debug('Applying service %s spec' % service_name) - create_fns = { - 'mon': self.mon_service.create, - 'mgr': self.mgr_service.create, - 'osd': self.create_osds, # osds work a bit different. - 'mds': self.mds_service.create, - 'rgw': self.rgw_service.create, - 'rbd-mirror': self.rbd_mirror_service.create, - 'nfs': self.nfs_service.create, - 'grafana': self.grafana_service.create, - 'alertmanager': self.alertmanager_service.create, - 'prometheus': self.prometheus_service.create, - 'node-exporter': self.node_exporter_service.create, - 'crash': self.crash_service.create, - 'iscsi': self.iscsi_service.create, - } - config_fns = { - 'mds': self.mds_service.config, - 'rgw': self.rgw_service.config, - 'nfs': self.nfs_service.config, - 'iscsi': self.iscsi_service.config, - } - create_func = create_fns.get(daemon_type, None) - if not create_func: - self.log.debug('unrecognized service type %s' % daemon_type) + + create_func = self._create_fn(daemon_type) + config_func = self._config_fn(daemon_type) + + if daemon_type == 'osd': + create_func(spec) + # TODO: return True would result in a busy loop return False - config_func = config_fns.get(daemon_type, None) daemons = self.cache.get_daemons_by_service(service_name) @@ -1707,9 +1719,6 @@ you may want to run: r = False - if daemon_type == 'osd': - return False if create_func(spec) else True # type: ignore - # sanity check if daemon_type in ['mon', 'mgr'] and len(hosts) < 1: self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts) diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 7f90e6edb16b6..a48f8bcd031c7 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) class OSDService(CephadmService): - def create_from_spec(self, drive_group: DriveGroupSpec) -> str: + def create(self, drive_group: DriveGroupSpec) -> str: logger.debug(f"Processing DriveGroup {drive_group}") ret = [] drive_group.osd_id_claims = self.find_destroyed_osds() @@ -33,11 +33,13 @@ class OSDService(CephadmService): # env_vars = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] # disable this until https://github.com/ceph/ceph/pull/34835 is merged env_vars: List[str] = [] - ret_msg = self.create(host, cmd, replace_osd_ids=drive_group.osd_id_claims.get(host, []), env_vars=env_vars) + ret_msg = self.create_single_host( + host, cmd, replace_osd_ids=drive_group.osd_id_claims.get(host, []), env_vars=env_vars + ) ret.append(ret_msg) return ", ".join(ret) - def create(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str: + def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str: out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars) if code == 1 and ', it is already prepared' in '\n'.join(err): diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 8287fbe6b793e..069b81f4a35a3 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -251,16 +251,43 @@ class TestCephadm(object): with pytest.raises(OrchestratorError): out = cephadm_module.osd_service.find_destroyed_osds() - @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) - @mock.patch("cephadm.module.SpecStore.save") - def test_apply_osd_save(self, _save_spec, cephadm_module): + @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm") + def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator): + _run_cephadm.return_value = ('{}', '', 0) with self._with_host(cephadm_module, 'test'): - json_spec = {'service_type': 'osd', 'host_pattern': 'test', 'service_id': 'foo', 'data_devices': {'all': True}} - spec = ServiceSpec.from_json(json_spec) - assert isinstance(spec, DriveGroupSpec) + + spec = DriveGroupSpec( + service_id='foo', + placement=PlacementSpec( + host_pattern='*', + ), + data_devices=DeviceSelection( + all=True + ) + ) + c = cephadm_module.apply_drivegroups([spec]) assert wait(cephadm_module, c) == ['Scheduled osd.foo update...'] - _save_spec.assert_called_with(spec) + + inventory = Devices([ + Device( + '/dev/sdb', + available=True + ), + ]) + + cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {}) + + _run_cephadm.return_value = (['{}'], '', 0) + + assert cephadm_module._apply_all_services() == False + + _run_cephadm.assert_any_call( + 'test', 'osd', 'ceph-volume', + ['--config-json', '-', '--', 'lvm', 'prepare', '--bluestore', '--data', '/dev/sdb', '--no-systemd'], + env_vars=[], error_ok=True, stdin='{"config": "", "keyring": ""}') + _run_cephadm.assert_called_with('test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json']) + @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.module.SpecStore.save") -- 2.39.5