osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
- cmd = self.driveselection_to_ceph_volume(drive_selection,
- osd_id_claims_for_host)
- if not cmd:
+ cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
+ osd_id_claims_for_host)
+ if not cmds:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
drive_group.service_id))
return None
start_ts = datetime_now()
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = await self.create_single_host(
- drive_group, host, cmd,
+ drive_group, host, cmds,
replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
)
self.mgr.cache.update_osdspec_last_applied(
async def create_single_host(self,
drive_group: DriveGroupSpec,
- host: str, cmd: str, replace_osd_ids: List[str],
+ host: str, cmds: List[str], replace_osd_ids: List[str],
env_vars: Optional[List[str]] = None) -> str:
- out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
-
- if code == 1 and ', it is already prepared' in '\n'.join(err):
- # HACK: when we create against an existing LV, ceph-volume
- # returns an error and the above message. To make this
- # command idempotent, tolerate this "error" and continue.
- logger.debug('the device was already prepared; continuing')
- code = 0
- if code:
- raise RuntimeError(
- 'cephadm exited with an error code: %d, stderr:%s' % (
- code, '\n'.join(err)))
+ for cmd in cmds:
+ out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
+ if code == 1 and ', it is already prepared' in '\n'.join(err):
+ # HACK: when we create against an existing LV, ceph-volume
+ # returns an error and the above message. To make this
+ # command idempotent, tolerate this "error" and continue.
+ logger.debug('the device was already prepared; continuing')
+ code = 0
+ if code:
+ raise RuntimeError(
+ 'cephadm exited with an error code: %d, stderr:%s' % (
+ code, '\n'.join(err)))
return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
replace_osd_ids)
drive_selection = DriveSelection(drive_group, inventory_for_host,
existing_daemons=len(dd_for_spec_and_host))
logger.debug(f"Found drive selection {drive_selection}")
+ if drive_group.method and drive_group.method == 'raw':
+ # ceph-volume can currently only handle a 1:1 mapping
+ # of data/db/wal devices for raw mode osds. If db/wal devices
+ # are defined and the number does not match the number of data
+ # devices, we need to bail out
+ if drive_selection.data_devices() and drive_selection.db_devices():
+ if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
+ raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
+ f'{len(drive_selection.data_devices())} potential data device(s) and '
+ f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
+ if drive_selection.data_devices() and drive_selection.wal_devices():
+ if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
+ raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
+ f'{len(drive_selection.data_devices())} potential data device(s) and '
+ f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
host_ds_map.append((host, drive_selection))
return host_ds_map
@staticmethod
def driveselection_to_ceph_volume(drive_selection: DriveSelection,
osd_id_claims: Optional[List[str]] = None,
- preview: bool = False) -> Optional[str]:
+ preview: bool = False) -> List[str]:
logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
- cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
- osd_id_claims, preview=preview).run()
- logger.debug(f"Resulting ceph-volume cmd: {cmd}")
- return cmd
+ cmds: List[str] = translate.to_ceph_volume(drive_selection,
+ osd_id_claims, preview=preview).run()
+ logger.debug(f"Resulting ceph-volume cmds: {cmds}")
+ return cmds
def get_previews(self, host: str) -> List[Dict[str, Any]]:
# Find OSDSpecs that match host.
continue
# driveselection for host
- cmd = self.driveselection_to_ceph_volume(ds,
- osd_id_claims.filtered_by_host(host),
- preview=True)
- if not cmd:
+ cmds: List[str] = self.driveselection_to_ceph_volume(ds,
+ osd_id_claims.filtered_by_host(host),
+ preview=True)
+ if not cmds:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
osdspec.service_name()))
continue
# get preview data from ceph-volume
- out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
- if out:
- try:
- concat_out: Dict[str, Any] = json.loads(' '.join(out))
- except ValueError:
- logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
- concat_out = {}
- notes = []
- if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
- found = len(concat_out)
- limit = osdspec.data_devices.limit
- notes.append(
- f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
- ret_all.append({'data': concat_out,
- 'osdspec': osdspec.service_id,
- 'host': host,
- 'notes': notes})
+ for cmd in cmds:
+ out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
+ if out:
+ try:
+ concat_out: Dict[str, Any] = json.loads(' '.join(out))
+ except ValueError:
+ logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
+ concat_out = {}
+ notes = []
+ if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
+ found = len(concat_out)
+ limit = osdspec.data_devices.limit
+ notes.append(
+ f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
+ ret_all.append({'data': concat_out,
+ 'osdspec': osdspec.service_id,
+ 'host': host,
+ 'notes': notes})
return ret_all
def resolve_hosts_for_osdspecs(self,
assert isinstance(f1[1], DriveSelection)
@pytest.mark.parametrize(
- "devices, preview, exp_command",
+ "devices, preview, exp_commands",
[
# no preview and only one disk, prepare is used due the hack that is in place.
- (['/dev/sda'], False, "lvm batch --no-auto /dev/sda --yes --no-systemd"),
+ (['/dev/sda'], False, ["lvm batch --no-auto /dev/sda --yes --no-systemd"]),
# no preview and multiple disks, uses batch
(['/dev/sda', '/dev/sdb'], False,
- "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
+ ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"]),
# preview and only one disk needs to use batch again to generate the preview
- (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"),
+ (['/dev/sda'], True, ["lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"]),
# preview and multiple disks work the same
(['/dev/sda', '/dev/sdb'], True,
- "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
+ ["CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"]),
]
)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
- def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
+ def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
- assert out in exp_command
+ assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
+
+ @pytest.mark.parametrize(
+ "devices, preview, exp_commands",
+ [
+ # one data device, no preview
+ (['/dev/sda'], False, ["raw prepare --bluestore --data /dev/sda"]),
+ # multiple data devices, no preview
+ (['/dev/sda', '/dev/sdb'], False,
+ ["raw prepare --bluestore --data /dev/sda", "raw prepare --bluestore --data /dev/sdb"]),
+ # one data device, preview
+ (['/dev/sda'], True, ["raw prepare --bluestore --data /dev/sda --report --format json"]),
+ # multiple data devices, preview
+ (['/dev/sda', '/dev/sdb'], True,
+ ["raw prepare --bluestore --data /dev/sda --report --format json", "raw prepare --bluestore --data /dev/sdb --report --format json"]),
+ ]
+ )
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_raw_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_commands):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(service_id='test.spec', method='raw', placement=PlacementSpec(
+ host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
+ preview = preview
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
+ assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([
self.osd_id_claims = osd_id_claims
def run(self):
- # type: () -> Optional[str]
+ # type: () -> List[str]
""" Generate ceph-volume commands based on the DriveGroup filters """
data_devices = [x.path for x in self.selection.data_devices()]
db_devices = [x.path for x in self.selection.db_devices()]
journal_devices = [x.path for x in self.selection.journal_devices()]
if not data_devices:
- return None
+ return []
- cmd = ""
+ cmds: List[str] = []
if self.spec.method == 'raw':
assert self.spec.objectstore == 'bluestore'
- cmd = "raw prepare --bluestore"
- cmd += " --data {}".format(" ".join(data_devices))
- if db_devices:
- cmd += " --block.db {}".format(" ".join(db_devices))
- if wal_devices:
- cmd += " --block.wal {}".format(" ".join(wal_devices))
+ # ceph-volume raw prepare only support 1:1 ratio of data to db/wal devices
+ if data_devices and db_devices:
+ if len(data_devices) != len(db_devices):
+ raise ValueError('Number of data devices must match number of '
+ 'db devices for raw mode osds')
+ if data_devices and wal_devices:
+ if len(data_devices) != len(wal_devices):
+ raise ValueError('Number of data devices must match number of '
+ 'wal devices for raw mode osds')
+ # for raw prepare each data device needs its own prepare command
+ dev_counter = 0
+ while dev_counter < len(data_devices):
+ cmd = "raw prepare --bluestore"
+ cmd += " --data {}".format(data_devices[dev_counter])
+ if db_devices:
+ cmd += " --block.db {}".format(db_devices[dev_counter])
+ if wal_devices:
+ cmd += " --block.wal {}".format(wal_devices[dev_counter])
+ cmds.append(cmd)
+ dev_counter += 1
elif self.spec.objectstore == 'filestore':
+ # for lvm batch we can just do all devices in one command
cmd = "lvm batch --no-auto"
cmd += " {}".format(" ".join(data_devices))
' '.join(journal_devices))
cmd += " --filestore"
+ cmds.append(cmd)
elif self.spec.objectstore == 'bluestore':
-
+ # for lvm batch we can just do all devices in one command
cmd = "lvm batch --no-auto {}".format(" ".join(data_devices))
if db_devices:
if self.spec.block_db_size:
cmd += " --block-db-size {}".format(self.spec.block_db_size)
+ cmds.append(cmd)
- if self.spec.encrypted:
- cmd += " --dmcrypt"
+ for i in range(len(cmds)):
+ if self.spec.encrypted:
+ cmds[i] += " --dmcrypt"
- if self.spec.osds_per_device:
- cmd += " --osds-per-device {}".format(self.spec.osds_per_device)
+ if self.spec.osds_per_device:
+ cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device)
- if self.spec.data_allocate_fraction:
- cmd += " --data-allocate-fraction {}".format(self.spec.data_allocate_fraction)
+ if self.spec.data_allocate_fraction:
+ cmds[i] += " --data-allocate-fraction {}".format(self.spec.data_allocate_fraction)
- if self.osd_id_claims:
- cmd += " --osd-ids {}".format(" ".join(self.osd_id_claims))
+ if self.osd_id_claims:
+ cmds[i] += " --osd-ids {}".format(" ".join(self.osd_id_claims))
- if self.spec.method != 'raw':
- cmd += " --yes"
- cmd += " --no-systemd"
+ if self.spec.method != 'raw':
+ cmds[i] += " --yes"
+ cmds[i] += " --no-systemd"
- if self.preview:
- cmd += " --report"
- cmd += " --format json"
+ if self.preview:
+ cmds[i] += " --report"
+ cmds[i] += " --format json"
- return cmd
+ return cmds
spec.validate()
inventory = _mk_inventory(_mk_device()*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd'
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_1():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
- '--db-devices /dev/sdc /dev/sdd --yes --no-systemd')
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_2():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
- '--yes --no-systemd')
+ '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_3():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd '
'--wal-devices /dev/sde /dev/sdf --dmcrypt '
- '--yes --no-systemd')
+ '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_4():
_mk_device(size="10.0 GB", rotational=False)*2
)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
'--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
'--block-wal-size 500M --block-db-size 500M --dmcrypt '
- '--osds-per-device 3 --yes --no-systemd')
+ '--osds-per-device 3 --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_5():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd'
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_6():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd '
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd '
'--journal-size 500M --journal-devices /dev/sda /dev/sdb '
- '--filestore --yes --no-systemd')
+ '--filestore --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_7():
spec.validate()
inventory = _mk_inventory(_mk_device(rotational=True)*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, ['0', '1']).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd'
+ cmds = translate.to_ceph_volume(sel, ['0', '1']).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_8():
_mk_device(rotational=False, size="349.0 GB", model='INTEL SSDPED1K375GA') # wal/db
)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc --yes --no-systemd'
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
def test_ceph_volume_command_9():
spec.validate()
inventory = _mk_inventory(_mk_device()*2)
sel = drive_selection.DriveSelection(spec, inventory)
- cmd = translate.to_ceph_volume(sel, []).run()
- assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd'
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+def test_raw_ceph_volume_command_0():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False),
+ method='raw',
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True) + # data
+ _mk_device(rotational=True) + # data
+ _mk_device(rotational=False) + # db
+ _mk_device(rotational=False) # db
+ )
+ exp_cmds = ['raw prepare --bluestore --data /dev/sda --block.db /dev/sdc', 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdd']
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd in exp_cmds for cmd in cmds), f'Expected {exp_cmds} to match {cmds}'
+
+def test_raw_ceph_volume_command_1():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False),
+ method='raw',
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True) + # data
+ _mk_device(rotational=True) + # data
+ _mk_device(rotational=False) # db
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ with pytest.raises(ValueError):
+ cmds = translate.to_ceph_volume(sel, []).run()