]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
drivegroups: add support for drivegroups + tests
authorJoshua Schmid <jschmid@suse.de>
Fri, 6 Dec 2019 14:43:39 +0000 (15:43 +0100)
committerSage Weil <sage@redhat.com>
Tue, 28 Jan 2020 16:09:33 +0000 (10:09 -0600)
Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/cephadm/module.py
src/python-common/ceph/deployment/drive_group.py
src/python-common/ceph/deployment/drive_selection/selector.py
src/python-common/ceph/tests/test_disk_selector.py
src/python-common/ceph/tests/test_drive_group.py

index a080518824a3b2744d1a602511aea06a6596d2f6..2888ee73681b1f682917d1f3acd00c4f7e9793f7 100644 (file)
@@ -21,7 +21,9 @@ import multiprocessing.pool
 import shutil
 import subprocess
 
-from ceph.deployment import inventory
+from ceph.deployment import inventory, translate
+from ceph.deployment.drive_selection import selector
+
 from mgr_module import MgrModule
 import mgr_util
 import orchestrator
@@ -1287,15 +1289,49 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             r[str(o['osd'])] = o['uuid']
         return r
 
-    @async_completion
-    def _create_osd(self, all_hosts_, drive_group):
-        all_hosts = orchestrator.InventoryNode.get_host_names(all_hosts_)
-        assert len(drive_group.hosts(all_hosts)) == 1
-        assert len(drive_group.data_devices.paths) > 0
-        assert all(map(lambda p: isinstance(p, six.string_types),
-            drive_group.data_devices.paths))
-
-        host = drive_group.hosts(all_hosts)[0]
+    def call_inventory(self, hosts, drive_groups):
+        def call_create(inventory):
+            return self._prepare_deployment(hosts, drive_groups, inventory)
+
+        return self.get_inventory().then(call_create)
+
+    def create_osds(self, drive_groups):
+        return self.get_hosts().then(lambda hosts: self.call_inventory(hosts, drive_groups))
+
+
+    def _prepare_deployment(self, all_hosts, drive_groups, inventory_list):
+        # type: (List[orchestrator.InventoryNode], List[orchestrator.DriveGroupSpecs], List[orchestrator.InventoryNode] -> orchestrator.Completion
+
+        for drive_group in drive_groups:
+            self.log.info("Processing DriveGroup {}".format(drive_group))
+            # 1) use fn_filter to determine matching_hosts
+            matching_hosts = drive_group.hosts([x.name for x in all_hosts])
+            # 2) Map the inventory to the InventoryNode object
+            # FIXME: lazy-load the inventory from a InventoryNode object;
+            #        this would save one call to the inventory(at least externally)
+
+            def _find_inv_for_host(hostname, inventory_list):
+                # This is stupid and needs to be loaded with the host
+                for _inventory in inventory_list:
+                    if _inventory.name == hostname:
+                        return _inventory
+
+            cmds = []
+            # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
+            for host in matching_hosts:
+                inventory_for_host = _find_inv_for_host(host, inventory_list)
+                drive_selection = selector.DriveSelection(drive_group, inventory_for_host.devices)
+                cmd = translate.ToCephVolume(drive_group, drive_selection).run()
+                if not cmd:
+                    self.log.info("No data_devices, skipping DriveGroup: {}".format(drive_group.name))
+                    continue
+                cmds.append((host, cmd))
+
+        return self._create_osd(cmds)
+
+    @async_map_completion
+    def _create_osd(self, host, cmd):
+
         self._require_hosts(host)
 
         # get bootstrap key
@@ -1314,20 +1350,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
             'keyring': keyring,
         })
 
-        devices = drive_group.data_devices.paths
-        for device in devices:
-            out, err, code = self._run_cephadm(
-                host, 'osd', 'ceph-volume',
-                [
-                    '--config-and-keyring', '-',
-                    '--',
-                    'lvm', 'prepare',
-                    "--cluster-fsid", self._cluster_fsid,
-                    "--{}".format(drive_group.objectstore),
-                    "--data", device,
-                ],
-                stdin=j)
-            self.log.debug('ceph-volume prepare: %s' % out)
+        split_cmd = cmd.split(' ')
+        _cmd = ['--config-and-keyring', '-', '--']
+        _cmd.extend(split_cmd)
+        out, code = self._run_ceph_daemon(
+            host, 'osd', 'ceph-volume',
+            _cmd,
+            stdin=j)
+        self.log.debug('ceph-volume prepare: %s' % out)
 
         # check result
         out, err, code = self._run_cephadm(
@@ -1346,9 +1376,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
                 if osd['tags']['ceph.cluster_fsid'] != fsid:
                     self.log.debug('mismatched fsid, skipping %s' % osd)
                     continue
-                if len(list(set(devices) & set(osd['devices']))) == 0 and osd.get('lv_path') not in devices:
-                    self.log.debug('mismatched devices, skipping %s' % osd)
-                    continue
                 if osd_id not in osd_uuid_map:
                     self.log.debug('osd id %d does not exist in cluster' % osd_id)
                     continue
@@ -1365,22 +1392,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
 
         return "Created osd(s) on host '{}'".format(host)
 
-    def create_osds(self, drive_group):
-        """
-        Create a new osd.
-
-        The orchestrator CLI currently handles a narrow form of drive
-        specification defined by a single block device using bluestore.
-
-        :param drive_group: osd specification
-
-        TODO:
-          - support full drive_group specification
-          - support batch creation
-        """
-
-        return self.get_hosts().then(lambda hosts: self._create_osd(hosts, drive_group))
-
     @with_services('osd')
     def remove_osds(self, osd_ids, services):
         # type: (List[str], List[orchestrator.ServiceDescription]) -> AsyncCompletion
index da531d42c71790653b539f0c33aded4a97c6add2..eeebdb1f77d1df045a88fb0fcc699acc22dafee3 100644 (file)
@@ -106,6 +106,22 @@ class DriveGroupValidationError(Exception):
         super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
 
 
+class DriveGroupSpecs(object):
+
+    def __init__(self, drive_group_json: dict):
+        self.drive_group_json: dict = drive_group_json
+        self.drive_groups: list = list()
+        self.build_drive_groups()
+
+    def build_drive_groups(self) -> list:
+        for drive_group_name, drive_group_spec in self.drive_group_json.items():
+            self.drive_groups.append(DriveGroupSpec.from_json
+                                     (drive_group_spec, name=drive_group_name))
+
+    def __repr__(self) -> str:
+        return ", ".join([repr(x) for x in self.drive_groups])
+
+
 class DriveGroupSpec(object):
     """
     Describe a drive group in the same form that ceph-volume
@@ -121,7 +137,8 @@ class DriveGroupSpec(object):
     ]
 
     def __init__(self,
-                 host_pattern,  # type: str
+                 host_pattern=None,  # type: str
+                 name=None,  # type: str
                  data_devices=None,  # type: Optional[DeviceSelection]
                  db_devices=None,  # type: Optional[DeviceSelection]
                  wal_devices=None,  # type: Optional[DeviceSelection]
@@ -138,6 +155,10 @@ class DriveGroupSpec(object):
                  journal_size=None,  # type: Optional[int]
                  ):
 
+        #: A name for the drive group. Since we can have multiple
+        # drive groups in a cluster we need a way to identify them.
+        self.name = name
+
         # concept of applying a drive group to a (set) of hosts is tightly
         # linked to the drive group itself
         #
@@ -190,7 +211,7 @@ class DriveGroupSpec(object):
         self.osd_id_claims = osd_id_claims
 
     @classmethod
-    def from_json(cls, json_drive_group):
+    def from_json(cls, json_drive_group, name=None):
         # type: (dict) -> DriveGroupSpec
         """
         Initialize 'Drive group' structure
@@ -212,7 +233,9 @@ class DriveGroupSpec(object):
         try:
             args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
                     json_drive_group.items()}
-            return DriveGroupSpec(**args)
+            if not args:
+                raise DriveGroupValidationError("Didn't find Drivegroup specs")
+            return DriveGroupSpec(**args, name=name) # noqa, that's no syntax error
         except (KeyError, TypeError) as e:
             raise DriveGroupValidationError(str(e))
 
@@ -251,7 +274,8 @@ class DriveGroupSpec(object):
             keys.remove('encrypted')
         if 'objectstore' in keys and self.objectstore == 'bluestore':
             keys.remove('objectstore')
-        return "DriveGroupSpec({})".format(
+        return "DriveGroupSpec(name={}->{})".format(
+            self.name,
             ', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys)
         )
 
index 48736220158d56b3d07d84df90b6a78a6ccadead..9b719fa0e23ac59d01aec99d74573743c1b769f8 100644 (file)
@@ -23,10 +23,16 @@ class DriveSelection(object):
         self.disks = disks.copy()
         self.spec = spec
 
-        self._data = self.assign_devices(self.spec.data_devices)
-        self._wal = self.assign_devices(self.spec.wal_devices)
-        self._db = self.assign_devices(self.spec.db_devices)
-        self._jornal = self.assign_devices(self.spec.journal_devices)
+        if self.spec.data_devices.paths:
+            self._data = self.spec.data_devices.paths
+            self._db = []
+            self._wal = []
+            self._journal = []
+        else:
+            self._data = self.assign_devices(self.spec.data_devices)
+            self._wal = self.assign_devices(self.spec.wal_devices)
+            self._db = self.assign_devices(self.spec.db_devices)
+            self._journal = self.assign_devices(self.spec.journal_devices)
 
     def data_devices(self):
         # type: () -> List[Device]
@@ -42,7 +48,7 @@ class DriveSelection(object):
 
     def journal_devices(self):
         # type: () -> List[Device]
-        return self._jornal
+        return self._journal
 
     @staticmethod
     def _limit_reached(device_filter, len_devices,
@@ -95,7 +101,7 @@ class DriveSelection(object):
 
         return a sorted(by path) list of devices
         """
-        if device_filter is None:
+        if not device_filter and not self.spec.data_devices.paths:
             logger.debug('device_filter is None')
             return []
         devices = list()  # type: List[Device]
@@ -111,6 +117,12 @@ class DriveSelection(object):
 
                 # continue criterias
                 assert _filter.matcher is not None
+
+                if not disk.available:
+                    logger.debug(
+                        "Ignoring disk {}. Disk is not available".format(disk.path))
+                    continue
+
                 if not _filter.matcher.compare(disk):
                     logger.debug(
                         "Ignoring disk {}. Filter did not match".format(
index 65c2c7aa1a06ba5d245ebe1a1e627886d84700e2..7aa73945acf93e71075cd4414f7b62b2ed10cd03 100644 (file)
@@ -11,6 +11,7 @@ except ImportError:
 
 from ceph.deployment import drive_selection
 from ceph.tests.factories import InventoryFactory
+from ceph.tests.utils import _mk_inventory, _mk_device
 
 
 class TestMatcher(object):
@@ -672,39 +673,6 @@ class TestFilter(object):
         assert ret.is_matchable is False
 
 
-def _mk_device(rotational=True, locked=False):
-    return [Device(
-        path='??',
-        sys_api={
-            "rotational": '1' if rotational else '0',
-            "vendor": "Vendor",
-            "human_readable_size": "394.27 GB",
-            "partitions": {},
-            "locked": int(locked),
-            "sectorsize": "512",
-            "removable": "0",
-            "path": "??",
-            "support_discard": "",
-            "model": "Model",
-            "ro": "0",
-            "nr_requests": "128",
-            "size": 423347879936
-        },
-        available=not locked,
-        rejected_reasons=['locked'] if locked else [],
-        lvs=[],
-        device_id="Model-Vendor-foobar"
-    )]
-
-
-def _mk_inventory(devices):
-    devs = []
-    for dev_, name in zip(devices, map(chr, range(ord('a'), ord('z')))):
-        dev = Device.from_json(dev_.to_json())
-        dev.path = '/dev/sd' + name
-        dev.sys_api = dict(dev_.sys_api, path='/dev/sd' + name)
-        devs.append(dev)
-    return Devices(devices=devs)
 
 
 class TestDriveSelection(object):
index 8afb8751c73388b5af0bed999a5bb033c3812ec4..81212032c0cf301457f0e05624832e6f632e6d86 100644 (file)
@@ -1,5 +1,8 @@
 import pytest
+import mock
 
+from ceph.deployment import drive_selection, translate
+from ceph.tests.utils import _mk_inventory, _mk_device
 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError
 
 
@@ -31,3 +34,98 @@ def test_drive_selection():
 
     with pytest.raises(DriveGroupValidationError, match='exclusive'):
         DeviceSelection(paths=['/dev/sda'], rotational=False)
+
+
+def test_ceph_volume_command_0():
+    spec = DriveGroupSpec(host_pattern='*',
+                          data_devices=DeviceSelection(all=True)
+                          )
+    inventory = _mk_inventory(_mk_device()*2)
+    sel = drive_selection.DriveSelection(spec, inventory)
+    cmd = translate.ToCephVolume(spec, sel).run()
+    assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd'
+
+
+def test_ceph_volume_command_1():
+    spec = DriveGroupSpec(host_pattern='*',
+                          data_devices=DeviceSelection(rotational=True),
+                          db_devices=DeviceSelection(rotational=False)
+                          )
+    inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
+    sel = drive_selection.DriveSelection(spec, inventory)
+    cmd = translate.ToCephVolume(spec, sel).run()
+    assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --yes --no-systemd'
+
+
+def test_ceph_volume_command_2():
+    spec = DriveGroupSpec(host_pattern='*',
+                          data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+                          db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+                          wal_devices=DeviceSelection(size='10G')
+                          )
+    inventory = _mk_inventory(_mk_device(rotational=True)*2 +
+                              _mk_device(rotational=False)*2 +
+                              _mk_device(size="10.0 GB", rotational=False)*2
+                              )
+    sel = drive_selection.DriveSelection(spec, inventory)
+    cmd = translate.ToCephVolume(spec, sel).run()
+    assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --yes --no-systemd'
+
+
+def test_ceph_volume_command_3():
+    spec = DriveGroupSpec(host_pattern='*',
+                          data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+                          db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+                          wal_devices=DeviceSelection(size='10G'),
+                          encrypted=True
+                          )
+    inventory = _mk_inventory(_mk_device(rotational=True)*2 +
+                              _mk_device(rotational=False)*2 +
+                              _mk_device(size="10.0 GB", rotational=False)*2
+                              )
+    sel = drive_selection.DriveSelection(spec, inventory)
+    cmd = translate.ToCephVolume(spec, sel).run()
+    assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --dmcrypt --yes --no-systemd'
+
+
+def test_ceph_volume_command_4():
+    spec = DriveGroupSpec(host_pattern='*',
+                          data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+                          db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+                          wal_devices=DeviceSelection(size='10G'),
+                          block_db_size='500M',
+                          block_wal_size='500M',
+                          osds_per_device=3,
+                          encrypted=True
+                          )
+    inventory = _mk_inventory(_mk_device(rotational=True)*2 +
+                              _mk_device(rotational=False)*2 +
+                              _mk_device(size="10.0 GB", rotational=False)*2
+                              )
+    sel = drive_selection.DriveSelection(spec, inventory)
+    cmd = translate.ToCephVolume(spec, sel).run()
+    assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf --block-wal-size 500M --block-db-size 500M --dmcrypt --osds-per-device 3 --yes --no-systemd'
+
+
+def test_ceph_volume_command_5():
+    spec = DriveGroupSpec(host_pattern='*',
+                          data_devices=DeviceSelection(rotational=True),
+                          objectstore='filestore'
+                          )
+    inventory = _mk_inventory(_mk_device(rotational=True)*2)
+    sel = drive_selection.DriveSelection(spec, inventory)
+    cmd = translate.ToCephVolume(spec, sel).run()
+    assert cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd'
+
+
+def test_ceph_volume_command_6():
+    spec = DriveGroupSpec(host_pattern='*',
+                          data_devices=DeviceSelection(rotational=False),
+                          journal_devices=DeviceSelection(rotational=True),
+                          journal_size='500M',
+                          objectstore='filestore'
+                          )
+    inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
+    sel = drive_selection.DriveSelection(spec, inventory)
+    cmd = translate.ToCephVolume(spec, sel).run()
+    assert cmd == 'lvm batch --no-auto /dev/sdc /dev/sdd --journal-size 500M --journal-devices /dev/sda /dev/sdb --filestore --yes --no-systemd'