"""
return [self._apply(spec) for spec in specs]
+ @staticmethod
+ def device_selection_from_explicit_paths(sel: Optional[DeviceSelection]) -> Optional[DeviceSelection]:
+ """
+ Build a new DeviceSelection from explicit paths, preserving 'per path'
+ crush_device_class. Used when persisting osd.default so all device roles
+ are stored consistently (no shared references with the incoming spec).
+ """
+ if sel is None or not sel.paths:
+ return None
+ path_specs: List[Dict[str, Any]] = []
+ for d in sel.paths:
+ spec: Dict[str, Any] = {"path": d.path}
+ if d.crush_device_class:
+ spec["crush_device_class"] = d.crush_device_class
+ path_specs.append(spec)
+ return DeviceSelection(paths=path_specs)
+
def create_osd_default_spec(self, drive_group: DriveGroupSpec) -> None:
# Create the default osd and attach a valid spec to it.
host_pattern_obj = drive_group.placement.host_pattern
host = str(host_pattern_obj.pattern)
+ data_devices = self.device_selection_from_explicit_paths(drive_group.data_devices)
+ assert data_devices is not None
device_list = [d.path for d in drive_group.data_devices.paths] if drive_group.data_devices else []
- devices = [{"path": d} for d in device_list]
osd_default_spec = DriveGroupSpec(
service_id="default",
placement=PlacementSpec(host_pattern=host),
- data_devices=DeviceSelection(paths=devices),
+ data_devices=data_devices,
+ db_devices=self.device_selection_from_explicit_paths(drive_group.db_devices),
+ wal_devices=self.device_selection_from_explicit_paths(drive_group.wal_devices),
+ journal_devices=self.device_selection_from_explicit_paths(drive_group.journal_devices),
unmanaged=False,
method=drive_group.method,
objectstore=drive_group.objectstore,
self.spec_store.save(osd_default_spec)
self.apply([osd_default_spec])
+ @staticmethod
+ def is_path_lists_only(drive_group: DriveGroupSpec) -> bool:
+ """
+ Return True if every non null data/db/wal/journal device selection is only
+ non empty paths (no 'all').
+ """
+
+ def path_list_only(sel: Optional[DeviceSelection]) -> bool:
+ if sel is None:
+ return True
+ if sel.all:
+ return False
+ return bool(sel.paths)
+
+ dd = drive_group.data_devices
+ if dd is None or dd.all or not dd.paths:
+ return False
+ for name in ('db_devices', 'wal_devices', 'journal_devices'):
+ if not path_list_only(getattr(drive_group, name, None)):
+ return False
+ return True
+
+ @staticmethod
+ def validate_no_empty_device_paths(drive_group: DriveGroupSpec) -> str:
+ """Reject empty device paths before cache checks (avoids silent ceph-volume failures)."""
+ for selection in (
+ drive_group.data_devices,
+ drive_group.db_devices,
+ drive_group.wal_devices,
+ drive_group.journal_devices,
+ ):
+ if not selection or not selection.paths:
+ continue
+ for device in selection.paths:
+ if device.path is None or not str(device.path).strip():
+ return "Error: Device path is empty."
+ return ""
+
def validate_device(self, host_name: str, drive_group: DriveGroupSpec) -> str:
"""
Validates whether the specified device exists and is available for OSD creation.
if self.cache.is_host_unreachable(host_name):
return f"Host {host_name} is not reachable (it may be offline or in maintenance mode)."
+ explicit_paths_only = self.is_path_lists_only(drive_group)
+
+ path_fmt_err = self.validate_no_empty_device_paths(drive_group)
+ if path_fmt_err:
+ return path_fmt_err
+
host_cache = self.cache.devices.get(host_name, [])
if not host_cache:
+ if explicit_paths_only:
+ return ""
return (f"Error: No devices found for host {host_name}. "
"You can check known devices with 'ceph orch device ls'. "
"If no devices appear, wait for an automatic refresh.")
- available_devices = {
- dev.path: dev for dev in host_cache if dev.available
- }
- self.log.debug(f"Host {host_name} has {len(available_devices)} available devices.")
-
- for device in drive_group.data_devices.paths:
- matching_device = next((dev for dev in host_cache if dev.path == device.path), None)
- if not matching_device:
- return f"Error: Device {device.path} is not found on host {host_name}"
- if not matching_device.available:
- return (f"Error: Device {device.path} is present but unavailable for OSD creation. "
+ devices_by_path = {dev.path: dev for dev in host_cache}
+ available_count = sum(1 for dev in host_cache if dev.available)
+ self.log.debug(f"Host {host_name} has {available_count} available devices.")
+
+ for selection in (
+ drive_group.data_devices,
+ drive_group.db_devices,
+ drive_group.wal_devices,
+ drive_group.journal_devices,
+ ):
+ if not selection or not selection.paths:
+ continue
+
+ for device in selection.paths:
+ matching_device = devices_by_path.get(device.path)
+ if matching_device is None:
+ if not explicit_paths_only:
+ return f"Error: Device {device.path} is not found on host {host_name}"
+ continue
+
+ if not matching_device.available:
+ return (
+ f"Error: Device {device.path} is present but unavailable for OSD creation. "
f"Reason: {', '.join(matching_device.rejected_reasons) if matching_device.rejected_reasons else 'Unknown'}")
return ""
if not drive_group.service_id:
drive_group.service_id = "default"
- if drive_group.service_id not in self.spec_store.all_specs:
- self.log.info("osd.default does not exist. Creating it now.")
- self.create_osd_default_spec(drive_group)
- else:
- self.log.info("osd.default already exists.")
host_name = filtered_hosts[0]
if not skip_validation:
- self.log.warning("Skipping the validation of device paths for osd daemon add command. Please make sure that the osd path is valid")
err_msg = self.validate_device(host_name, drive_group)
if err_msg:
return err_msg
- return self.osd_service.create_from_spec(drive_group)
+ # Only save/apply osd.default after validation passes. Otherwise the serve loop
+ # would still apply the spec and create an OSD while the CLI returns an error.
+ # Spec store keys are full service names (example: "osd.default"), not service_id alone.
+ if drive_group.service_name() not in self.spec_store.all_specs:
+ self.log.info("osd.default does not exist. Creating it now.")
+ self.create_osd_default_spec(drive_group)
+ else:
+ self.log.info(
+ "Service osd.default is already registered; continuing with this "
+ "daemon add (one-shot ceph-volume apply, not a full spec re-apply).")
+
+ # 'ceph orch daemon add osd' must always run ceph-volume for this request.
+ # osdspec_needs_apply() only compares timestamps and would skip when inventory did not change.
+ return self.osd_service.create_from_spec(drive_group, force_apply=True)
def _preview_osdspecs(self,
osdspecs: Optional[List[DriveGroupSpec]] = None
class OSDService(CephService):
TYPE = 'osd'
- def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
+ def create_from_spec(self, drive_group: DriveGroupSpec, force_apply: bool = False) -> str:
+ """
+ :param force_apply: If True, do not check osdspec_needs_apply(). Used by
+ 'ceph orch daemon add osd' where the requested devices are not reflected
+ in inventory timestamps (and the check only compares timestamps, not spec content).
+ """
logger.debug(f"Processing DriveGroup {drive_group}")
osd_id_claims = OsdIdClaims(self.mgr)
if osd_id_claims.get():
async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
# skip this host if there has been no change in inventory
- if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
+ if not force_apply and not self.mgr.cache.osdspec_needs_apply(host, drive_group):
self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
host, drive_group))
return None
assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
_save_spec.assert_called_with(spec)
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_validate_device_path_only_spec_allows_paths_missing_from_cache(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.cache.update_host_devices('test', [
+ Device('/dev/vdj', available=True),
+ ])
+ dg = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['/dev/test/lv1']),
+ db_devices=DeviceSelection(paths=['/dev/vdj']),
+ )
+ assert cephadm_module.validate_device('test', dg) == ""
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_validate_device_path_only_unknown_path_allowed(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.cache.update_host_devices('test', [
+ Device('/dev/sda', available=True),
+ ])
+ dg = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['/dev/not-in-cache']),
+ )
+ assert cephadm_module.validate_device('test', dg) == ""
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_validate_device_rejects_path_in_cache_but_unavailable(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.cache.update_host_devices('test', [
+ Device('/dev/sdb', available=False, rejected_reasons=['Has a File System']),
+ ])
+ dg = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['/dev/sdb']),
+ )
+ out = cephadm_module.validate_device('test', dg)
+ assert 'unavailable' in out
+
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_validate_device_rejects_empty_path(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.cache.update_host_devices('test', [
+ Device('/dev/sda', available=True),
+ ])
+ dg = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']),
+ )
+ out = cephadm_module.validate_device('test', dg)
+ assert 'empty' in out.lower()
+
+ def test_create_osd_default_spec_includes_db_wal_journal(self, cephadm_module):
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')):
+ with mock.patch("cephadm.module.CephadmOrchestrator.apply") as mock_apply:
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['/dev/lv1']),
+ db_devices=DeviceSelection(paths=['/dev/vdk']),
+ wal_devices=DeviceSelection(paths=['/dev/vdw']),
+ journal_devices=DeviceSelection(paths=['/dev/sdz']),
+ service_id='default',
+ )
+ cephadm_module.create_osd_default_spec(dg)
+ saved = cephadm_module.spec_store.all_specs['osd.default']
+ assert [d.path for d in saved.data_devices.paths] == ['/dev/lv1']
+ assert [d.path for d in saved.db_devices.paths] == ['/dev/vdk']
+ assert [d.path for d in saved.wal_devices.paths] == ['/dev/vdw']
+ assert [d.path for d in saved.journal_devices.paths] == ['/dev/sdz']
+ mock_apply.assert_called_once()
+
+ def test_create_osd_default_spec_preserves_data_crush_device_class(self, cephadm_module):
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')):
+ with mock.patch("cephadm.module.CephadmOrchestrator.apply") as mock_apply:
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=[
+ {'path': '/dev/sdb', 'crush_device_class': 'ssd'},
+ ]),
+ service_id='default',
+ )
+ cephadm_module.create_osd_default_spec(dg)
+ saved = cephadm_module.spec_store.all_specs['osd.default']
+ assert saved.data_devices.paths[0].path == '/dev/sdb'
+ assert saved.data_devices.paths[0].crush_device_class == 'ssd'
+ mock_apply.assert_called_once()
+
+ def test_create_osds_skips_default_spec_when_osd_default_exists(self, cephadm_module):
+ with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')):
+ with mock.patch.object(CephadmOrchestrator, 'validate_device', return_value=''):
+ with mock.patch.object(
+ CephadmOrchestrator, 'create_osd_default_spec') as mock_create_default:
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.spec_store._specs['osd.default'] = DriveGroupSpec(
+ service_id='default',
+ placement=PlacementSpec(host_pattern='*'),
+ data_devices=DeviceSelection(all=True),
+ )
+ dg = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['/dev/foo']),
+ db_devices=DeviceSelection(paths=['/dev/bar']),
+ )
+ with mock.patch.object(
+ cephadm_module.osd_service, 'create_from_spec',
+ return_value='ok'):
+ out = wait(cephadm_module, cephadm_module.create_osds(dg))
+ mock_create_default.assert_not_called()
+ assert out == 'ok'
+
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_create_osds(self, cephadm_module):
with with_host(cephadm_module, 'test'):
data_devices=DeviceSelection(paths=['']))
c = cephadm_module.create_osds(dg)
out = wait(cephadm_module, c)
- assert "Error: No devices found for host test." in out
+ assert "Error: Device path is empty." in out
bad_dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='invalid_host'),
data_devices=DeviceSelection(paths=['']))
c = cephadm_module.create_osds(bad_dg)
data_devices=DeviceSelection(paths=['']))
c = cephadm_module.create_osds(dg)
out = wait(cephadm_module, c)
- assert "Error: No devices found for host test." in out
+ assert "Error: Device path is empty." in out
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch('cephadm.services.osd.OSDService._run_ceph_volume_command')