@handle_orch_error
def create_osds(self, drive_group: DriveGroupSpec) -> str:
+ hosts: List[HostSpec] = self.inventory.all_specs()
+ filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts)
+ if not filtered_hosts:
+ return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
return self.osd_service.create_from_spec(drive_group)
def _preview_osdspecs(self,
c = cephadm_module.create_osds(dg)
out = wait(cephadm_module, c)
assert out == "Created no osd(s) on host test; already created?"
+ bad_dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='invalid_hsot'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(bad_dg)
+ out = wait(cephadm_module, c)
+ assert "Invalid 'host:device' spec: host not found in cluster" in out
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_create_noncollocated_osd(self, cephadm_module):
host_name, block_device = svc_arg.split(":")
block_devices = block_device.split(',')
devs = DeviceSelection(paths=block_devices)
- drive_group = DriveGroupSpec(placement=PlacementSpec(
- host_pattern=host_name), data_devices=devs)
- except (TypeError, KeyError, ValueError):
- msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage
+ drive_group = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern=host_name),
+ data_devices=devs,
+ )
+ except (TypeError, KeyError, ValueError) as e:
+ msg = f"Invalid 'host:device' spec: '{svc_arg}': {e}" + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
completion = self.create_osds(drive_group)