]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: enable osd previews 34216/head
authorJoshua Schmid <jschmid@suse.de>
Wed, 1 Apr 2020 08:12:28 +0000 (10:12 +0200)
committerJoshua Schmid <jschmid@suse.de>
Wed, 15 Apr 2020 07:48:18 +0000 (09:48 +0200)
Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py
src/pybind/mgr/test_orchestrator/module.py
src/python-common/ceph/deployment/drive_group.py
src/python-common/ceph/deployment/translate.py

index b730888e5c4eb51f82229eda03205d3f8913d6a0..19723b93dcad91851c3249358f51f59b97c7ee0f 100644 (file)
@@ -30,7 +30,7 @@ import uuid
 
 from ceph.deployment import inventory, translate
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.drive_selection import selector
+from ceph.deployment.drive_selection.selector import DriveSelection
 from ceph.deployment.service_spec import \
     HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
 
@@ -144,8 +144,7 @@ class SpecStore():
             del self.spec_created[service_name]
             self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
 
-    def find(self, service_name=None):
-        # type: (Optional[str]) -> List[ServiceSpec]
+    def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
         specs = []
         for sn, spec in self.specs.items():
             if not service_name or \
@@ -2102,10 +2101,23 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
     @trivial_completion
     def create_osds(self, drive_group: DriveGroupSpec):
-        self.log.debug("Processing DriveGroup {}".format(drive_group))
+        self.log.debug(f"Processing DriveGroup {drive_group}")
+        ret = []
+        for host, drive_selection in self.prepare_drivegroup(drive_group):
+            self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
+            cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection)
+            if not cmd:
+                self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
+                continue
+            ret_msg = self._create_osd(host, cmd)
+            ret.append(ret_msg)
+        return ", ".join(ret)
+
+    def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
         # 1) use fn_filter to determine matching_hosts
         matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
         # 2) Map the inventory to the InventoryHost object
+        host_ds_map = []
 
         def _find_inv_for_host(hostname: str, inventory_dict: dict):
             # This is stupid and needs to be loaded with the host
@@ -2114,27 +2126,49 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     return _inventory
             raise OrchestratorError("No inventory found for host: {}".format(hostname))
 
-        ret = []
-        # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
+        # 3) iterate over matching_host and call DriveSelection
         self.log.debug(f"Checking matching hosts -> {matching_hosts}")
         for host in matching_hosts:
             inventory_for_host = _find_inv_for_host(host, self.cache.devices)
             self.log.debug(f"Found inventory for host {inventory_for_host}")
-            drive_selection = selector.DriveSelection(drive_group, inventory_for_host)
+            drive_selection = DriveSelection(drive_group, inventory_for_host)
             self.log.debug(f"Found drive selection {drive_selection}")
-            cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
-            self.log.debug(f"translated to cmd {cmd}")
-            if not cmd:
-                self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
-                continue
-            self.log.info('Applying %s on host %s...' % (
-                drive_group.service_name(), host))
-            ret_msg = self._create_osd(host, cmd)
-            ret.append(ret_msg)
-        return ", ".join(ret)
-
-    def _create_osd(self, host, cmd):
+            host_ds_map.append((host, drive_selection))
+        return host_ds_map
+
+    def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
+                                      drive_selection: DriveSelection,
+                                      preview: bool = False) -> Optional[str]:
+        self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
+        cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, preview=preview).run()
+        self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
+        return cmd
+
+    def preview_drivegroups(self, drive_group_name: Optional[str] = None,
+                            dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
+        # find drivegroups
+        if drive_group_name:
+            drive_groups = cast(List[DriveGroupSpec],
+                                self.spec_store.find(service_name=drive_group_name))
+        elif dg_specs:
+            drive_groups = dg_specs
+        else:
+            drive_groups = []
+        ret_all = []
+        for drive_group in drive_groups:
+            # prepare driveselection
+            for host, ds in self.prepare_drivegroup(drive_group):
+                cmd = self.driveselection_to_ceph_volume(drive_group, ds, preview=True)
+                if not cmd:
+                    self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
+                    continue
+                out, err, code = self._run_ceph_volume_command(host, cmd)
+                if out:
+                    concat_out = json.loads(" ".join(out))
+                    ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
+        return ret_all
 
+    def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
         self._require_hosts(host)
 
         # get bootstrap key
@@ -2153,8 +2187,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             'keyring': keyring,
         })
 
-        before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
-
         split_cmd = cmd.split(' ')
         _cmd = ['--config-json', '-', '--']
         _cmd.extend(split_cmd)
@@ -2163,6 +2195,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             _cmd,
             stdin=j,
             error_ok=True)
+        return out, err, code
+
+    def _create_osd(self, host, cmd):
+        out, err, code = self._run_ceph_volume_command(host, cmd)
+
         if code == 1 and ', it is already prepared' in '\n'.join(err):
             # HACK: when we create against an existing LV, ceph-volume
             # returns an error and the above message.  To make this
@@ -2182,6 +2219,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 'lvm', 'list',
                 '--format', 'json',
             ])
+        before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
         osds_elems = json.loads('\n'.join(out))
         fsid = self._cluster_fsid
         osd_uuid_map = self.get_osd_uuid_map()
index 2aeaa3ed2fb9e3be38799a0dddd5caae23dcb0fd..4bd9d3558b4bdd85ec72efe6079b550b270593fc 100644 (file)
@@ -16,6 +16,8 @@ from execnet.gateway_bootstrap import HostNotFound
 
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
     NFSServiceSpec, IscsiServiceSpec
+from ceph.deployment.drive_selection.selector import DriveSelection
+from ceph.deployment.inventory import Devices, Device
 from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
     HostSpec, OrchestratorError
 from tests import mock
@@ -157,7 +159,6 @@ class TestCephadm(object):
                 c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar')
                 assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
 
-
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     def test_mon_add(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
@@ -207,6 +208,54 @@ class TestCephadm(object):
             out = wait(cephadm_module, c)
             assert out == "Created no osd(s) on host test; already created?"
 
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    def test_prepare_drivegroup(self, cephadm_module):
+        with self._with_host(cephadm_module, 'test'):
+            dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
+            out = cephadm_module.prepare_drivegroup(dg)
+            assert len(out) == 1
+            f1 = out[0]
+            assert f1[0] == 'test'
+            assert isinstance(f1[1], DriveSelection)
+
+    @pytest.mark.parametrize(
+        "devices, preview, exp_command",
+        [
+            # no preview and only one disk, prepare is used due the hack that is in place.
+            (['/dev/sda'], False, "lvm prepare --bluestore --data /dev/sda --no-systemd"),
+            # no preview and multiple disks, uses batch
+            (['/dev/sda', '/dev/sdb'], False, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
+            # preview and only one disk needs to use batch again to generate the preview
+            (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --report --format json"),
+            # preview and multiple disks work the same
+            (['/dev/sda', '/dev/sdb'], True, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
+        ]
+    )
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
+        with self._with_host(cephadm_module, 'test'):
+            dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+            ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
+            preview = preview
+            out = cephadm_module.driveselection_to_ceph_volume(dg, ds, preview)
+            assert out in exp_command
+
+    @mock.patch("cephadm.module.SpecStore.find")
+    @mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup")
+    @mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume")
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command")
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
+        with self._with_host(cephadm_module, 'test'):
+            dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
+            _find_store.return_value = [dg]
+            _prepare_dg.return_value = [('host1', 'ds_dummy')]
+            _run_c_v_command.return_value = ("{}", '', 0)
+            cephadm_module.preview_drivegroups(drive_group_name='foo')
+            _find_store.assert_called_once_with(service_name='foo')
+            _prepare_dg.assert_called_once_with(dg)
+            _run_c_v_command.assert_called_once()
+
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
index df9b94082202cbecf952558808e5d482503be81b..9053eee2d653417e46bff12687c88ca583a7c8da 100644 (file)
@@ -941,6 +941,14 @@ class Orchestrator(object):
         """ Update OSD cluster """
         raise NotImplementedError()
 
+    def set_unmanaged_flag(self, service_name: str, unmanaged_flag: bool) -> HandleCommandResult:
+        raise NotImplementedError()
+
+    def preview_drivegroups(self, drive_group_name: Optional[str] = 'osd',
+                            dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
+        """ Get a preview for OSD deployments """
+        raise NotImplementedError()
+
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
                     force: bool = False) -> Completion:
index 97a8a4962830a5c078acbbba584da8c85902adc5..2edc4812cd69a65d6fb1e306534dc9c14a6c05b6 100644 (file)
@@ -27,6 +27,7 @@ from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_comma
     RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
     ServiceDescription, DaemonDescription, IscsiServiceSpec
 
+
 def nice_delta(now, t, suffix=''):
     if t:
         return to_pretty_timedelta(now - t) + suffix
@@ -457,18 +458,98 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule):
 
             return HandleCommandResult(stdout=table.get_string())
 
+    def set_unmanaged_flag(self, service_name: str, unmanaged_flag: bool) -> HandleCommandResult:
+        # setting unmanaged for $service_name
+        completion = self.describe_service(service_name=service_name)
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        services: List[ServiceDescription] = completion.result
+        specs = list()
+        for service in services:
+            spec = service.spec
+            spec.unmanaged = unmanaged_flag
+            specs.append(spec)
+        completion = self.apply(specs)
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        if specs:
+            return HandleCommandResult(stdout=f"Changed <unmanaged> flag to <{unmanaged_flag}> for "
+                                              f"{[spec.service_name() for spec in specs]}")
+        else:
+            return HandleCommandResult(stdout=f"No specs found with the <service_name> -> {service_name}")
+
     @_cli_write_command(
         'orch apply osd',
-        'name=all_available_devices,type=CephBool,req=false',
+        'name=all_available_devices,type=CephBool,req=false '
+        'name=preview,type=CephBool,req=false '
+        'name=service_name,type=CephString,req=false '
+        'name=unmanaged,type=CephBool,req=false '
+        "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
         'Create OSD daemon(s) using a drive group spec')
-    def _apply_osd(self, all_available_devices=False, inbuf=None):
-        # type: (bool, Optional[str]) -> HandleCommandResult
+    def _apply_osd(self,
+                   all_available_devices: bool = False,
+                   preview: bool = False,
+                   service_name: Optional[str] = None,
+                   unmanaged: Optional[bool] = None,
+                   format: Optional[str] = 'plain',
+                   inbuf: Optional[str] = None) -> HandleCommandResult:
         """Apply DriveGroupSpecs to create OSDs"""
         usage = """
 Usage:
   ceph orch apply osd -i <json_file/yaml_file>
   ceph orch apply osd --use-all-devices
+  ceph orch apply osd --service-name <service_name> --preview
+  ceph orch apply osd --service-name <service_name> --unmanaged=True|False
 """
+
+        def print_preview(prev, format):
+            if format != 'plain':
+                return to_format(prev, format)
+            else:
+                table = PrettyTable(
+                    ['NAME', 'HOST', 'DATA', 'DB', 'WAL'],
+                    border=False)
+                table.align = 'l'
+                table.left_padding_width = 0
+                table.right_padding_width = 1
+                for data in prev:
+                    dg_name = data.get('drivegroup')
+                    hostname = data.get('host')
+                    for osd in data.get('data', {}).get('osds', []):
+                        db_path = '-'
+                        wal_path = '-'
+                        block_db = osd.get('block.db', {}).get('path')
+                        block_wal = osd.get('block.wal', {}).get('path')
+                        block_data = osd.get('data', {}).get('path', '')
+                        if not block_data:
+                            continue
+                        if block_db:
+                            db_path = data.get('data', {}).get('vg', {}).get('devices', [])
+                        if block_wal:
+                            wal_path = data.get('data', {}).get('wal_vg', {}).get('devices', [])
+                        table.add_row((dg_name, hostname, block_data, db_path, wal_path))
+                out = table.get_string()
+                if not out:
+                    out = "No pending deployments."
+                return out
+
+        if (inbuf or all_available_devices) and service_name:
+            # mutually exclusive
+            return HandleCommandResult(-errno.EINVAL, stderr=usage)
+
+        if preview and not (service_name or all_available_devices or inbuf):
+            # get all stored drivegroups and print
+            prev = self.preview_drivegroups()
+            return HandleCommandResult(stdout=print_preview(prev, format))
+
+        if service_name and preview:
+            # get specified drivegroup and print
+            prev = self.preview_drivegroups(service_name)
+            return HandleCommandResult(stdout=print_preview(prev, format))
+
+        if service_name and unmanaged is not None:
+            return self.set_unmanaged_flag(service_name, unmanaged)
+
         if not inbuf and not all_available_devices:
             return HandleCommandResult(-errno.EINVAL, stderr=usage)
         if inbuf:
@@ -476,7 +557,7 @@ Usage:
                 raise OrchestratorError('--all-available-devices cannot be combined with an osd spec')
             try:
                 drivegroups = yaml.load_all(inbuf)
-                dg_specs = [ServiceSpec.from_json(dg) for dg in drivegroups]
+                dg_specs = [DriveGroupSpec.from_json(dg) for dg in drivegroups]
             except ValueError as e:
                 msg = 'Failed to read JSON/YAML input: {}'.format(str(e)) + usage
                 return HandleCommandResult(-errno.EINVAL, stderr=msg)
@@ -489,10 +570,12 @@ Usage:
                 )
             ]
 
-        completion = self.apply_drivegroups(dg_specs)
-        self._orchestrator_wait([completion])
-        raise_if_exception(completion)
-        return HandleCommandResult(stdout=completion.result_str())
+        if not preview:
+            completion = self.apply_drivegroups(dg_specs)
+            self._orchestrator_wait([completion])
+            raise_if_exception(completion)
+        ret = self.preview_drivegroups(dg_specs=dg_specs)
+        return HandleCommandResult(stdout=print_preview(ret, format))
 
     @_cli_write_command(
         'orch daemon add osd',
index 4fc125da33a18882fca73504f3cb2301f6f2745e..eb59cda17c12d7ae4d4eff4ac4d21ee74c0c61e7 100644 (file)
@@ -247,6 +247,9 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return list(filter(_filter_func, daemons))
 
+    def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
+        return [{}]
+
     def create_osds(self, drive_group):
         # type: (DriveGroupSpec) -> TestCompletion
         """ Creates OSDs from a drive group specification.
index 595428a2778c9cb890986f0581812f12e6444c6e..d66ac8828c73abb35543402040369feb2fbe9bf0 100644 (file)
@@ -149,10 +149,12 @@ class DriveGroupSpec(ServiceSpec):
                  block_wal_size=None,  # type: Optional[int]
                  journal_size=None,  # type: Optional[int]
                  service_type=None,  # type: Optional[str]
-                 unmanaged=None,  # type: Optional[bool]
+                 unmanaged=False,  # type: bool
                  ):
         assert service_type is None or service_type == 'osd'
-        super(DriveGroupSpec, self).__init__('osd', service_id=service_id, placement=placement)
+        super(DriveGroupSpec, self).__init__('osd', service_id=service_id,
+                                             placement=placement,
+                                             unmanaged=unmanaged)
 
         #: A :class:`ceph.deployment.drive_group.DeviceSelection`
         self.data_devices = data_devices
index a09e7126971849e296c757805a1632676bafe998..5d51605465540d2b7f83516d0d81347d3b31d71d 100644 (file)
@@ -15,11 +15,13 @@ class to_ceph_volume(object):
 
     def __init__(self,
                  spec,  # type: DriveGroupSpec
-                 selection  # type: DriveSelection
+                 selection,  # type: DriveSelection
+                 preview=False
                  ):
 
         self.spec = spec
         self.selection = selection
+        self.preview = preview
 
     def run(self):
         # type: () -> Optional[str]
@@ -54,6 +56,12 @@ class to_ceph_volume(object):
            not db_devices and \
            not wal_devices:
             cmd = "lvm prepare --bluestore --data %s --no-systemd" % (' '.join(data_devices))
+            if self.preview:
+                # Like every horrible hack, this has sideffects on other features.
+                # In this case, 'lvm prepare' has neither a '--report' nor a '--format json' option
+                # which essentially doesn't allow for a proper previews here.
+                # Fall back to lvm batch in order to get a preview.
+                return f"lvm batch --no-auto {' '.join(data_devices)} --report --format json"
             return cmd
 
         if self.spec.objectstore == 'bluestore':
@@ -81,4 +89,8 @@ class to_ceph_volume(object):
         cmd += " --yes"
         cmd += " --no-systemd"
 
+        if self.preview:
+            cmd += " --report"
+            cmd += " --format json"
+
         return cmd