"Please try again after applying an OSD service that matches "
"the service name to which you want to attach OSDs.")
+ # Verify that service_type is of osd type
+ spec = self.spec_store[service_name].spec
+ if spec.service_type != 'osd':
+ raise OrchestratorError(f"Service '{service_name}' is not an OSD service (type: {spec.service_type}). "
+ "OSDs can only be assigned to OSD service specs.")
+
daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
update_osd = defaultdict(list)
for daemon in daemons:
# on nvmeof.foo.foo's daemons
nvmeof_bar_blocking_hosts = NvmeofService(cephadm_module).get_blocking_daemon_hosts('nvmeof.bar.bar')
assert set([h.hostname for h in nvmeof_bar_blocking_hosts]) == set(['host1', 'host2'])
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.inventory.HostCache.get_daemons_by_type")
+ def test_set_osd_spec_non_osd_service(self, _get_daemons_by_type, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ osd_daemon = DaemonDescription(daemon_type='osd', daemon_id='1', hostname='test')
+ _get_daemons_by_type.return_value = [osd_daemon]
+
+ mgr_service = ServiceSpec('mgr', placement=PlacementSpec(hosts=['test']))
+
+ with with_service(cephadm_module, mgr_service):
+ with pytest.raises(OrchestratorError) as e:
+ cephadm_module.set_osd_spec('mgr', ['1'])
+
+ assert "Service 'mgr' is not an OSD service (type: mgr). OSDs can only be assigned to OSD service specs." in str(e.value)
+
+ # Move osd.1 to osd.foo service
+ spec = DriveGroupSpec(
+ service_id='foo',
+ placement=PlacementSpec(
+ host_pattern='*',
+ ),
+ data_devices=DeviceSelection(
+ all=True
+ )
+ )
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
+
+ cephadm_module.set_osd_spec('osd.foo', ['1'])