]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: OSD creation using Creator classes
authorJoseph Sawaya <jsawaya@redhat.com>
Mon, 19 Jul 2021 13:37:58 +0000 (09:37 -0400)
committerJoseph Sawaya <jsawaya@redhat.com>
Tue, 17 Aug 2021 14:50:27 +0000 (10:50 -0400)
This commit implements the apply_drivegroups method in the
RookOrchestrator class and creates the DefaultCreator and
LSOCreator classes that handle creating OSDs. The add_osds
method in RookCluster will use each creator based on what
storage class the user provided in the ceph config.

Signed-off-by: Joseph Sawaya <jsawaya@redhat.com>
src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index 90b08e6b0082983db642159b1de29931a42faadf..be7e4d14affa985301691ee799792994e8bd78ce 100644 (file)
@@ -457,6 +457,22 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
     @handle_orch_error
     def remove_daemons(self, names: List[str]) -> List[str]:
         return self.rook_cluster.remove_pods(names)
+
+    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
+        all_hosts = raise_if_exception(self.get_hosts())
+        for drive_group in specs:
+            matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
+
+            if not self.rook_cluster.node_exists(matching_hosts[0]):
+                raise RuntimeError("Node '{0}' is not in the Kubernetes "
+                               "cluster".format(matching_hosts))
+
+            # Validate whether cluster CRD can accept individual OSD
+            # creations (i.e. not useAllDevices)
+            if not self.rook_cluster.can_create_osd():
+                raise RuntimeError("Rook cluster configuration does not "
+                                "support OSD creation.")
+        return OrchResult([self.rook_cluster.add_osds(drive_group, matching_hosts)])
     """
     @handle_orch_error
     def create_osds(self, drive_group):
index 161377c1ac2ebcbf28aa238baea5b355835a3cb9..12ee80fd87c800c2e8483f557602e31dfc6dcfb4 100644 (file)
@@ -11,6 +11,7 @@ import threading
 import logging
 from contextlib import contextmanager
 from time import sleep
+import re
 
 import jsonpatch
 from urllib.parse import urljoin
@@ -322,6 +323,153 @@ class KubernetesCustomResource(KubernetesResource):
                         "{} doesn't contain a metadata.name. Unable to track changes".format(
                             self.api_func))
 
+class DefaultCreator():
+    def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
+        self.coreV1_api = coreV1_api
+        self.storage_class = storage_class
+        self.inventory = inventory
+
+    def parse_drive_group_size(self, drive_group_size: Optional[str]) -> Tuple[int, int]:
+        no_bound = -1
+        if not drive_group_size:
+            return (no_bound,no_bound)
+        if ':' in drive_group_size:
+            bounds = drive_group_size.split(':')
+            low_str = bounds[0]
+            high_str = bounds[1]
+            low = self.parse_bound(low_str)
+            high = self.parse_bound(high_str)
+            return (low, high)
+        else:
+            exact = self.parse_bound(drive_group_size)
+            return (exact, exact)
+
+    def parse_bound(self, bound: str) -> int:
+        no_bound = -1
+        if not bound:
+            return no_bound
+        else:
+            coeff_and_unit = re.search('(\d+)(\D+)', bound)
+            assert coeff_and_unit is not None
+            coeff = int(coeff_and_unit[1])
+            unit = coeff_and_unit[2]
+            units = ("M", "G", "T", "MB", "GB", "TB")
+            factor = units.index(unit) % 3
+            result = coeff * (1000 ** (factor + 2))
+            return result
+
+    def check_bounds(self, low: int, high: int, size: int) -> bool:
+        if low == -1 and high == -1:
+            return True
+        elif low == -1:
+            if size <= high:
+                return True
+        elif high == -1:
+            if size >= low:
+                return True
+        else:
+            if size <= high and size >= low:
+                return True
+        return False
+
+    def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
+        device_set = ccl.StorageClassDeviceSetsItem(
+                    name=d.sys_api['pv_name'],
+                    volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
+                    count=1,
+                    encrypted=drive_group.encrypted,
+                    portable=False
+                )
+        device_set.volumeClaimTemplates.append(
+            ccl.VolumeClaimTemplatesItem(
+                metadata=ccl.Metadata(
+                    name="data"
+                ),
+                spec=ccl.Spec(
+                    storageClassName=self.storage_class,
+                    volumeMode="Block",
+                    accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
+                    resources={
+                        "requests":{
+                                "storage": 1
+                        }
+                    },
+                    volumeName=d.sys_api['pv_name']
+                )
+            )
+        )
+        return device_set
+
+    def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+        device_list = []
+        assert drive_group.data_devices is not None
+        low, high = self.parse_drive_group_size(drive_group.data_devices.size)
+        limit = drive_group.data_devices.limit if hasattr(drive_group.data_devices, 'limit') else None
+        count = 0
+        all = drive_group.data_devices.all if hasattr(drive_group.data_devices, 'all') else None
+        paths = [device.path for device in drive_group.data_devices.paths]
+        osd_list = []
+        for pod in rook_pods.items:
+            if hasattr(pod, 'metadata') and hasattr(pod.metadata, 'labels') and 'osd' in pod.metadata.labels and 'ceph.rook.io/DeviceSet' in pod.metadata.labels:
+                osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+        for _, node in self.inventory.items():
+            for device in node:
+                if device.sys_api['pv_name'] in osd_list:
+                    count += 1
+        for _, node in self.inventory.items():
+            for device in node:
+                if not limit or (count < limit):
+                    if device.available:
+                        if all or ((device.sys_api['node'] in matching_hosts) and (self.check_bounds(low, high, int(device.sys_api['size']))) and ((not drive_group.data_devices.paths) or (device.path in paths))):
+                            device_list.append(device)
+                            count += 1
+        return device_list
+
+    def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
+        to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
+        assert drive_group.data_devices is not None
+        def _add_osds(current_cluster, new_cluster):
+            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
+                new_cluster.spec.storage = ccl.Storage()
+
+            if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
+                new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
+
+            for device in to_create:
+                new_scds = self.device_to_device_set(drive_group, device)
+                new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
+            return new_cluster
+        return _add_osds
+
+class LSOCreator(DefaultCreator):
+    def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+        device_list = []
+        assert drive_group.data_devices is not None
+        low, high = self.parse_drive_group_size(drive_group.data_devices.size)
+        limit = drive_group.data_devices.limit if hasattr(drive_group.data_devices, 'limit') else None
+        count = 0
+        all = drive_group.data_devices.all if hasattr(drive_group.data_devices, 'all') else None
+        paths = [device.path for device in drive_group.data_devices.paths]
+        vendor = drive_group.data_devices.model if hasattr(drive_group.data_devices, 'vendor') else None
+        model = drive_group.data_devices.model if hasattr(drive_group.data_devices, 'model') else None
+        osd_list = []
+        for pod in rook_pods.items:
+            if hasattr(pod, 'metadata') and hasattr(pod.metadata, 'labels') and 'osd' in pod.metadata.labels and 'ceph.rook.io/DeviceSet' in pod.metadata.labels:
+                osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+        for _, node in self.inventory.items():
+            for device in node:
+                if device.sys_api['pv_name'] in osd_list:
+                    count += 1
+        for _, node in self.inventory.items():
+            for device in node:
+                if not limit or (count < limit):
+                    if device.available:
+                        if all or ((device.sys_api['node'] in matching_hosts) and (self.check_bounds(low, high, int(device.sys_api['size']))) and ((not drive_group.data_devices.paths) or (device.path in paths)) and (not vendor or (device.sys_api['vendor'] == vendor)) and (not model or (device.sys_api['model'].startsWith(model)))):
+                            device_list.append(device)
+                            count += 1
+        return device_list
+
 class RookCluster(object):
     # import of client.CoreV1Api must be optional at import time.
     # Instead allow mgr/rook to be imported anyway.
@@ -688,17 +836,20 @@ class RookCluster(object):
             new.spec.mon.count = newcount
             return new
         return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
-    """
+
     def add_osds(self, drive_group, matching_hosts):
         # type: (DriveGroupSpec, List[str]) -> str
-        
-        # Rook currently (0.8) can only do single-drive OSDs, so we
-        # treat all drive groups as just a list of individual OSDs.
-        
-        block_devices = drive_group.data_devices.paths if drive_group.data_devices else []
-        directories = drive_group.data_directories
-
         assert drive_group.objectstore in ("bluestore", "filestore")
+        assert drive_group.service_id
+        storage_class = self.get_storage_class()
+        inventory = self.get_discovered_devices()
+        self.creator: Optional[DefaultCreator] = None
+        if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+            self.creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)    
+        else:
+            self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
+        _add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts)
+        return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
 
         def _add_osds(current_cluster, new_cluster):
             # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
@@ -757,8 +908,6 @@ class RookCluster(object):
                             )
             return new_cluster
 
-        return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
-    """
     def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
         current_json = self.rook_api_get(
             "{}/{}".format(crd_name, cr_name)