@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
- return self.osd_service.create_from_spec(drive_group)
+ return self.osd_service.create(drive_group)
@trivial_completion
def preview_osdspecs(self,
self.cache.invalidate_host_daemons(host)
return "Removed {} from host '{}'".format(name, host)
- def _apply_service(self, spec) -> bool:
+ def _create_fn(self, service_type: str) -> Callable[..., str]:
+ try:
+ d: Dict[str, function] = {
+ 'mon': self.mon_service.create,
+ 'mgr': self.mgr_service.create,
+ 'osd': self.osd_service.create,
+ 'mds': self.mds_service.create,
+ 'rgw': self.rgw_service.create,
+ 'rbd-mirror': self.rbd_mirror_service.create,
+ 'nfs': self.nfs_service.create,
+ 'grafana': self.grafana_service.create,
+ 'alertmanager': self.alertmanager_service.create,
+ 'prometheus': self.prometheus_service.create,
+ 'node-exporter': self.node_exporter_service.create,
+ 'crash': self.crash_service.create,
+ 'iscsi': self.iscsi_service.create,
+ }
+ return d[service_type] # type: ignore
+ except KeyError:
+ self.log.exception(f'unknown service type {service_type}')
+ raise OrchestratorError(f'unknown service type {service_type}') from e
+
+ def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
+ return {
+ 'mds': self.mds_service.config,
+ 'rgw': self.rgw_service.config,
+ 'nfs': self.nfs_service.config,
+ 'iscsi': self.iscsi_service.config,
+ }.get(service_type)
+
+ def _apply_service(self, spec: ServiceSpec) -> bool:
"""
Schedule a service. Deploy new daemons or remove old ones, depending
on the target label and count specified in the placement.
self.log.debug('Skipping unmanaged service %s spec' % service_name)
return False
self.log.debug('Applying service %s spec' % service_name)
- create_fns = {
- 'mon': self.mon_service.create,
- 'mgr': self.mgr_service.create,
- 'osd': self.create_osds, # osds work a bit different.
- 'mds': self.mds_service.create,
- 'rgw': self.rgw_service.create,
- 'rbd-mirror': self.rbd_mirror_service.create,
- 'nfs': self.nfs_service.create,
- 'grafana': self.grafana_service.create,
- 'alertmanager': self.alertmanager_service.create,
- 'prometheus': self.prometheus_service.create,
- 'node-exporter': self.node_exporter_service.create,
- 'crash': self.crash_service.create,
- 'iscsi': self.iscsi_service.create,
- }
- config_fns = {
- 'mds': self.mds_service.config,
- 'rgw': self.rgw_service.config,
- 'nfs': self.nfs_service.config,
- 'iscsi': self.iscsi_service.config,
- }
- create_func = create_fns.get(daemon_type, None)
- if not create_func:
- self.log.debug('unrecognized service type %s' % daemon_type)
+
+ create_func = self._create_fn(daemon_type)
+ config_func = self._config_fn(daemon_type)
+
+ if daemon_type == 'osd':
+ create_func(spec)
+ # TODO: return True would result in a busy loop
return False
- config_func = config_fns.get(daemon_type, None)
daemons = self.cache.get_daemons_by_service(service_name)
r = False
- if daemon_type == 'osd':
- return False if create_func(spec) else True # type: ignore
-
# sanity check
if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
class OSDService(CephadmService):
- def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
+ def create(self, drive_group: DriveGroupSpec) -> str:
logger.debug(f"Processing DriveGroup {drive_group}")
ret = []
drive_group.osd_id_claims = self.find_destroyed_osds()
# env_vars = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
# disable this until https://github.com/ceph/ceph/pull/34835 is merged
env_vars: List[str] = []
- ret_msg = self.create(host, cmd, replace_osd_ids=drive_group.osd_id_claims.get(host, []), env_vars=env_vars)
+ ret_msg = self.create_single_host(
+ host, cmd, replace_osd_ids=drive_group.osd_id_claims.get(host, []), env_vars=env_vars
+ )
ret.append(ret_msg)
return ", ".join(ret)
- def create(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str:
+ def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str:
out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
if code == 1 and ', it is already prepared' in '\n'.join(err):
with pytest.raises(OrchestratorError):
out = cephadm_module.osd_service.find_destroyed_osds()
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.module.SpecStore.save")
- def test_apply_osd_save(self, _save_spec, cephadm_module):
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
with self._with_host(cephadm_module, 'test'):
- json_spec = {'service_type': 'osd', 'host_pattern': 'test', 'service_id': 'foo', 'data_devices': {'all': True}}
- spec = ServiceSpec.from_json(json_spec)
- assert isinstance(spec, DriveGroupSpec)
+
+ spec = DriveGroupSpec(
+ service_id='foo',
+ placement=PlacementSpec(
+ host_pattern='*',
+ ),
+ data_devices=DeviceSelection(
+ all=True
+ )
+ )
+
c = cephadm_module.apply_drivegroups([spec])
assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
- _save_spec.assert_called_with(spec)
+
+ inventory = Devices([
+ Device(
+ '/dev/sdb',
+ available=True
+ ),
+ ])
+
+ cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+
+ _run_cephadm.return_value = (['{}'], '', 0)
+
+ assert cephadm_module._apply_all_services() == False
+
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume',
+ ['--config-json', '-', '--', 'lvm', 'prepare', '--bluestore', '--data', '/dev/sdb', '--no-systemd'],
+ env_vars=[], error_ok=True, stdin='{"config": "", "keyring": ""}')
+ _run_cephadm.assert_called_with('test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")