@handle_orch_error
def create_osds(self, drive_group: DriveGroupSpec) -> str:
+ hosts: List[HostSpec] = self.inventory.all_specs()
+ filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts)
+ if not filtered_hosts:
+ return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
return self.osd_service.create_from_spec(drive_group)
def _preview_osdspecs(self,
c = cephadm_module.create_osds(dg)
out = wait(cephadm_module, c)
assert out == "Created no osd(s) on host test; already created?"
+ bad_dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='invalid_hsot'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(bad_dg)
+ out = wait(cephadm_module, c)
+ assert "Invalid 'host:device' spec: host not found in cluster" in out
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_create_noncollocated_osd(self, cephadm_module):
method=method,
)
except (TypeError, KeyError, ValueError) as e:
- msg = f"Invalid host:device spec: '{svc_arg}': {e}" + usage
+ msg = f"Invalid 'host:device' spec: '{svc_arg}': {e}" + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
completion = self.create_osds(drive_group)