from ceph.deployment import inventory, translate
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.drive_selection import selector
+from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.service_spec import \
HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
del self.spec_created[service_name]
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
- def find(self, service_name=None):
- # type: (Optional[str]) -> List[ServiceSpec]
+ def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
specs = []
for sn, spec in self.specs.items():
if not service_name or \
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
- self.log.debug("Processing DriveGroup {}".format(drive_group))
+ self.log.debug(f"Processing DriveGroup {drive_group}")
+ ret = []
+ for host, drive_selection in self.prepare_drivegroup(drive_group):
+ self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
+ cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection)
+ if not cmd:
+ self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
+ continue
+ ret_msg = self._create_osd(host, cmd)
+ ret.append(ret_msg)
+ return ", ".join(ret)
+
+ def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
# 1) use fn_filter to determine matching_hosts
matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
# 2) Map the inventory to the InventoryHost object
+ host_ds_map = []
def _find_inv_for_host(hostname: str, inventory_dict: dict):
# This is stupid and needs to be loaded with the host
return _inventory
raise OrchestratorError("No inventory found for host: {}".format(hostname))
- ret = []
- # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
+ # 3) iterate over matching_host and call DriveSelection
self.log.debug(f"Checking matching hosts -> {matching_hosts}")
for host in matching_hosts:
inventory_for_host = _find_inv_for_host(host, self.cache.devices)
self.log.debug(f"Found inventory for host {inventory_for_host}")
- drive_selection = selector.DriveSelection(drive_group, inventory_for_host)
+ drive_selection = DriveSelection(drive_group, inventory_for_host)
self.log.debug(f"Found drive selection {drive_selection}")
- cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
- self.log.debug(f"translated to cmd {cmd}")
- if not cmd:
- self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
- continue
- self.log.info('Applying %s on host %s...' % (
- drive_group.service_name(), host))
- ret_msg = self._create_osd(host, cmd)
- ret.append(ret_msg)
- return ", ".join(ret)
-
- def _create_osd(self, host, cmd):
+ host_ds_map.append((host, drive_selection))
+ return host_ds_map
+
+ def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
+ drive_selection: DriveSelection,
+ preview: bool = False) -> Optional[str]:
+ self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
+ cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, preview=preview).run()
+ self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
+ return cmd
+
+ def preview_drivegroups(self, drive_group_name: Optional[str] = None,
+ dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
+ # find drivegroups
+ if drive_group_name:
+ drive_groups = cast(List[DriveGroupSpec],
+ self.spec_store.find(service_name=drive_group_name))
+ elif dg_specs:
+ drive_groups = dg_specs
+ else:
+ drive_groups = []
+ ret_all = []
+ for drive_group in drive_groups:
+ # prepare driveselection
+ for host, ds in self.prepare_drivegroup(drive_group):
+ cmd = self.driveselection_to_ceph_volume(drive_group, ds, preview=True)
+ if not cmd:
+ self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
+ continue
+ out, err, code = self._run_ceph_volume_command(host, cmd)
+ if out:
+ concat_out = json.loads(" ".join(out))
+ ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
+ return ret_all
+ def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
self._require_hosts(host)
# get bootstrap key
'keyring': keyring,
})
- before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
-
split_cmd = cmd.split(' ')
_cmd = ['--config-json', '-', '--']
_cmd.extend(split_cmd)
_cmd,
stdin=j,
error_ok=True)
+ return out, err, code
+
+ def _create_osd(self, host, cmd):
+ out, err, code = self._run_ceph_volume_command(host, cmd)
+
if code == 1 and ', it is already prepared' in '\n'.join(err):
# HACK: when we create against an existing LV, ceph-volume
# returns an error and the above message. To make this
'lvm', 'list',
'--format', 'json',
])
+ before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
osds_elems = json.loads('\n'.join(out))
fsid = self._cluster_fsid
osd_uuid_map = self.get_osd_uuid_map()
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
NFSServiceSpec, IscsiServiceSpec
+from ceph.deployment.drive_selection.selector import DriveSelection
+from ceph.deployment.inventory import Devices, Device
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar')
assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
-
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
out = wait(cephadm_module, c)
assert out == "Created no osd(s) on host test; already created?"
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ def test_prepare_drivegroup(self, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
+ out = cephadm_module.prepare_drivegroup(dg)
+ assert len(out) == 1
+ f1 = out[0]
+ assert f1[0] == 'test'
+ assert isinstance(f1[1], DriveSelection)
+
+ @pytest.mark.parametrize(
+ "devices, preview, exp_command",
+ [
+ # no preview and only one disk, prepare is used due the hack that is in place.
+ (['/dev/sda'], False, "lvm prepare --bluestore --data /dev/sda --no-systemd"),
+ # no preview and multiple disks, uses batch
+ (['/dev/sda', '/dev/sdb'], False, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
+ # preview and only one disk needs to use batch again to generate the preview
+ (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --report --format json"),
+ # preview and multiple disks work the same
+ (['/dev/sda', '/dev/sdb'], True, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
+ ]
+ )
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
+ with self._with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
+ preview = preview
+ out = cephadm_module.driveselection_to_ceph_volume(dg, ds, preview)
+ assert out in exp_command
+
+ @mock.patch("cephadm.module.SpecStore.find")
+ @mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup")
+ @mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume")
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command")
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
+ with self._with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
+ _find_store.return_value = [dg]
+ _prepare_dg.return_value = [('host1', 'ds_dummy')]
+ _run_c_v_command.return_value = ("{}", '', 0)
+ cephadm_module.preview_drivegroups(drive_group_name='foo')
+ _find_store.assert_called_once_with(service_name='foo')
+ _prepare_dg.assert_called_once_with(dg)
+ _run_c_v_command.assert_called_once()
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
dict(
""" Update OSD cluster """
raise NotImplementedError()
+ def set_unmanaged_flag(self, service_name: str, unmanaged_flag: bool) -> HandleCommandResult:
+ raise NotImplementedError()
+
+ def preview_drivegroups(self, drive_group_name: Optional[str] = 'osd',
+ dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
+ """ Get a preview for OSD deployments """
+ raise NotImplementedError()
+
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
force: bool = False) -> Completion:
RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
ServiceDescription, DaemonDescription, IscsiServiceSpec
+
def nice_delta(now, t, suffix=''):
if t:
return to_pretty_timedelta(now - t) + suffix
return HandleCommandResult(stdout=table.get_string())
+ def set_unmanaged_flag(self, service_name: str, unmanaged_flag: bool) -> HandleCommandResult:
+ # setting unmanaged for $service_name
+ completion = self.describe_service(service_name=service_name)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ services: List[ServiceDescription] = completion.result
+ specs = list()
+ for service in services:
+ spec = service.spec
+ spec.unmanaged = unmanaged_flag
+ specs.append(spec)
+ completion = self.apply(specs)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ if specs:
+ return HandleCommandResult(stdout=f"Changed <unmanaged> flag to <{unmanaged_flag}> for "
+ f"{[spec.service_name() for spec in specs]}")
+ else:
+ return HandleCommandResult(stdout=f"No specs found with the <service_name> -> {service_name}")
+
@_cli_write_command(
'orch apply osd',
- 'name=all_available_devices,type=CephBool,req=false',
+ 'name=all_available_devices,type=CephBool,req=false '
+ 'name=preview,type=CephBool,req=false '
+ 'name=service_name,type=CephString,req=false '
+ 'name=unmanaged,type=CephBool,req=false '
+ "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
'Create OSD daemon(s) using a drive group spec')
- def _apply_osd(self, all_available_devices=False, inbuf=None):
- # type: (bool, Optional[str]) -> HandleCommandResult
+ def _apply_osd(self,
+ all_available_devices: bool = False,
+ preview: bool = False,
+ service_name: Optional[str] = None,
+ unmanaged: Optional[bool] = None,
+ format: Optional[str] = 'plain',
+ inbuf: Optional[str] = None) -> HandleCommandResult:
"""Apply DriveGroupSpecs to create OSDs"""
usage = """
Usage:
ceph orch apply osd -i <json_file/yaml_file>
ceph orch apply osd --use-all-devices
+ ceph orch apply osd --service-name <service_name> --preview
+ ceph orch apply osd --service-name <service_name> --unmanaged=True|False
"""
+
+ def print_preview(prev, format):
+ if format != 'plain':
+ return to_format(prev, format)
+ else:
+ table = PrettyTable(
+ ['NAME', 'HOST', 'DATA', 'DB', 'WAL'],
+ border=False)
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 1
+ for data in prev:
+ dg_name = data.get('drivegroup')
+ hostname = data.get('host')
+ for osd in data.get('data', {}).get('osds', []):
+ db_path = '-'
+ wal_path = '-'
+ block_db = osd.get('block.db', {}).get('path')
+ block_wal = osd.get('block.wal', {}).get('path')
+ block_data = osd.get('data', {}).get('path', '')
+ if not block_data:
+ continue
+ if block_db:
+ db_path = data.get('data', {}).get('vg', {}).get('devices', [])
+ if block_wal:
+ wal_path = data.get('data', {}).get('wal_vg', {}).get('devices', [])
+ table.add_row((dg_name, hostname, block_data, db_path, wal_path))
+ out = table.get_string()
+ if not out:
+ out = "No pending deployments."
+ return out
+
+ if (inbuf or all_available_devices) and service_name:
+ # mutually exclusive
+ return HandleCommandResult(-errno.EINVAL, stderr=usage)
+
+ if preview and not (service_name or all_available_devices or inbuf):
+ # get all stored drivegroups and print
+ prev = self.preview_drivegroups()
+ return HandleCommandResult(stdout=print_preview(prev, format))
+
+ if service_name and preview:
+ # get specified drivegroup and print
+ prev = self.preview_drivegroups(service_name)
+ return HandleCommandResult(stdout=print_preview(prev, format))
+
+ if service_name and unmanaged is not None:
+ return self.set_unmanaged_flag(service_name, unmanaged)
+
if not inbuf and not all_available_devices:
return HandleCommandResult(-errno.EINVAL, stderr=usage)
if inbuf:
raise OrchestratorError('--all-available-devices cannot be combined with an osd spec')
try:
drivegroups = yaml.load_all(inbuf)
- dg_specs = [ServiceSpec.from_json(dg) for dg in drivegroups]
+ dg_specs = [DriveGroupSpec.from_json(dg) for dg in drivegroups]
except ValueError as e:
msg = 'Failed to read JSON/YAML input: {}'.format(str(e)) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
)
]
- completion = self.apply_drivegroups(dg_specs)
- self._orchestrator_wait([completion])
- raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ if not preview:
+ completion = self.apply_drivegroups(dg_specs)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ ret = self.preview_drivegroups(dg_specs=dg_specs)
+ return HandleCommandResult(stdout=print_preview(ret, format))
@_cli_write_command(
'orch daemon add osd',
return list(filter(_filter_func, daemons))
+ def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
+ return [{}]
+
def create_osds(self, drive_group):
# type: (DriveGroupSpec) -> TestCompletion
""" Creates OSDs from a drive group specification.
block_wal_size=None, # type: Optional[int]
journal_size=None, # type: Optional[int]
service_type=None, # type: Optional[str]
- unmanaged=None, # type: Optional[bool]
+ unmanaged=False, # type: bool
):
assert service_type is None or service_type == 'osd'
- super(DriveGroupSpec, self).__init__('osd', service_id=service_id, placement=placement)
+ super(DriveGroupSpec, self).__init__('osd', service_id=service_id,
+ placement=placement,
+ unmanaged=unmanaged)
#: A :class:`ceph.deployment.drive_group.DeviceSelection`
self.data_devices = data_devices
def __init__(self,
spec, # type: DriveGroupSpec
- selection # type: DriveSelection
+ selection, # type: DriveSelection
+ preview=False
):
self.spec = spec
self.selection = selection
+ self.preview = preview
def run(self):
# type: () -> Optional[str]
not db_devices and \
not wal_devices:
cmd = "lvm prepare --bluestore --data %s --no-systemd" % (' '.join(data_devices))
+ if self.preview:
+ # Like every horrible hack, this has sideffects on other features.
+ # In this case, 'lvm prepare' has neither a '--report' nor a '--format json' option
+ # which essentially doesn't allow for a proper previews here.
+ # Fall back to lvm batch in order to get a preview.
+ return f"lvm batch --no-auto {' '.join(data_devices)} --report --format json"
return cmd
if self.spec.objectstore == 'bluestore':
cmd += " --yes"
cmd += " --no-systemd"
+ if self.preview:
+ cmd += " --report"
+ cmd += " --format json"
+
return cmd