From c1dbce81c0cf2ec2d44b94007208a4930d6329c5 Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Mon, 19 Jul 2021 09:37:58 -0400 Subject: [PATCH] mgr/rook: OSD creation using Creator classes This commit implements the apply_drivegroups method in the RookOrchestrator class and creates the DefaultCreator and LSOCreator classes that handle creating OSDs. The add_osds method in RookCluster will use each creator based on what storage class the user provided in the ceph config. Signed-off-by: Joseph Sawaya --- src/pybind/mgr/rook/module.py | 16 +++ src/pybind/mgr/rook/rook_cluster.py | 169 ++++++++++++++++++++++++++-- 2 files changed, 175 insertions(+), 10 deletions(-) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 90b08e6b008..be7e4d14aff 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -457,6 +457,22 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): @handle_orch_error def remove_daemons(self, names: List[str]) -> List[str]: return self.rook_cluster.remove_pods(names) + + def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: + all_hosts = raise_if_exception(self.get_hosts()) + for drive_group in specs: + matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts) + + if not self.rook_cluster.node_exists(matching_hosts[0]): + raise RuntimeError("Node '{0}' is not in the Kubernetes " + "cluster".format(matching_hosts)) + + # Validate whether cluster CRD can accept individual OSD + # creations (i.e. not useAllDevices) + if not self.rook_cluster.can_create_osd(): + raise RuntimeError("Rook cluster configuration does not " + "support OSD creation.") + return OrchResult([self.rook_cluster.add_osds(drive_group, matching_hosts)]) """ @handle_orch_error def create_osds(self, drive_group): diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index 161377c1ac2..12ee80fd87c 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -11,6 +11,7 @@ import threading import logging from contextlib import contextmanager from time import sleep +import re import jsonpatch from urllib.parse import urljoin @@ -322,6 +323,153 @@ class KubernetesCustomResource(KubernetesResource): "{} doesn't contain a metadata.name. Unable to track changes".format( self.api_func)) +class DefaultCreator(): + def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'): + self.coreV1_api = coreV1_api + self.storage_class = storage_class + self.inventory = inventory + + def parse_drive_group_size(self, drive_group_size: Optional[str]) -> Tuple[int, int]: + no_bound = -1 + if not drive_group_size: + return (no_bound,no_bound) + if ':' in drive_group_size: + bounds = drive_group_size.split(':') + low_str = bounds[0] + high_str = bounds[1] + low = self.parse_bound(low_str) + high = self.parse_bound(high_str) + return (low, high) + else: + exact = self.parse_bound(drive_group_size) + return (exact, exact) + + def parse_bound(self, bound: str) -> int: + no_bound = -1 + if not bound: + return no_bound + else: + coeff_and_unit = re.search('(\d+)(\D+)', bound) + assert coeff_and_unit is not None + coeff = int(coeff_and_unit[1]) + unit = coeff_and_unit[2] + units = ("M", "G", "T", "MB", "GB", "TB") + factor = units.index(unit) % 3 + result = coeff * (1000 ** (factor + 2)) + return result + + def check_bounds(self, low: int, high: int, size: int) -> bool: + if low == -1 and high == -1: + return True + elif low == -1: + if size <= high: + return True + elif high == -1: + if size >= low: + return True + else: + if size <= high and size >= low: + return True + return False + + def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem: + device_set = ccl.StorageClassDeviceSetsItem( + name=d.sys_api['pv_name'], + volumeClaimTemplates= ccl.VolumeClaimTemplatesList(), + count=1, + encrypted=drive_group.encrypted, + portable=False + ) + device_set.volumeClaimTemplates.append( + ccl.VolumeClaimTemplatesItem( + metadata=ccl.Metadata( + name="data" + ), + spec=ccl.Spec( + storageClassName=self.storage_class, + volumeMode="Block", + accessModes=ccl.CrdObjectList(["ReadWriteOnce"]), + resources={ + "requests":{ + "storage": 1 + } + }, + volumeName=d.sys_api['pv_name'] + ) + ) + ) + return device_set + + def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: + device_list = [] + assert drive_group.data_devices is not None + low, high = self.parse_drive_group_size(drive_group.data_devices.size) + limit = drive_group.data_devices.limit if hasattr(drive_group.data_devices, 'limit') else None + count = 0 + all = drive_group.data_devices.all if hasattr(drive_group.data_devices, 'all') else None + paths = [device.path for device in drive_group.data_devices.paths] + osd_list = [] + for pod in rook_pods.items: + if hasattr(pod, 'metadata') and hasattr(pod.metadata, 'labels') and 'osd' in pod.metadata.labels and 'ceph.rook.io/DeviceSet' in pod.metadata.labels: + osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) + for _, node in self.inventory.items(): + for device in node: + if device.sys_api['pv_name'] in osd_list: + count += 1 + for _, node in self.inventory.items(): + for device in node: + if not limit or (count < limit): + if device.available: + if all or ((device.sys_api['node'] in matching_hosts) and (self.check_bounds(low, high, int(device.sys_api['size']))) and ((not drive_group.data_devices.paths) or (device.path in paths))): + device_list.append(device) + count += 1 + return device_list + + def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any: + to_create = self.filter_devices(rook_pods, drive_group,matching_hosts) + assert drive_group.data_devices is not None + def _add_osds(current_cluster, new_cluster): + # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage: + new_cluster.spec.storage = ccl.Storage() + + if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: + new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() + + for device in to_create: + new_scds = self.device_to_device_set(drive_group, device) + new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) + return new_cluster + return _add_osds + +class LSOCreator(DefaultCreator): + def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: + device_list = [] + assert drive_group.data_devices is not None + low, high = self.parse_drive_group_size(drive_group.data_devices.size) + limit = drive_group.data_devices.limit if hasattr(drive_group.data_devices, 'limit') else None + count = 0 + all = drive_group.data_devices.all if hasattr(drive_group.data_devices, 'all') else None + paths = [device.path for device in drive_group.data_devices.paths] + vendor = drive_group.data_devices.model if hasattr(drive_group.data_devices, 'vendor') else None + model = drive_group.data_devices.model if hasattr(drive_group.data_devices, 'model') else None + osd_list = [] + for pod in rook_pods.items: + if hasattr(pod, 'metadata') and hasattr(pod.metadata, 'labels') and 'osd' in pod.metadata.labels and 'ceph.rook.io/DeviceSet' in pod.metadata.labels: + osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) + for _, node in self.inventory.items(): + for device in node: + if device.sys_api['pv_name'] in osd_list: + count += 1 + for _, node in self.inventory.items(): + for device in node: + if not limit or (count < limit): + if device.available: + if all or ((device.sys_api['node'] in matching_hosts) and (self.check_bounds(low, high, int(device.sys_api['size']))) and ((not drive_group.data_devices.paths) or (device.path in paths)) and (not vendor or (device.sys_api['vendor'] == vendor)) and (not model or (device.sys_api['model'].startsWith(model)))): + device_list.append(device) + count += 1 + return device_list + class RookCluster(object): # import of client.CoreV1Api must be optional at import time. # Instead allow mgr/rook to be imported anyway. @@ -688,17 +836,20 @@ class RookCluster(object): new.spec.mon.count = newcount return new return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) - """ + def add_osds(self, drive_group, matching_hosts): # type: (DriveGroupSpec, List[str]) -> str - - # Rook currently (0.8) can only do single-drive OSDs, so we - # treat all drive groups as just a list of individual OSDs. - - block_devices = drive_group.data_devices.paths if drive_group.data_devices else [] - directories = drive_group.data_directories - assert drive_group.objectstore in ("bluestore", "filestore") + assert drive_group.service_id + storage_class = self.get_storage_class() + inventory = self.get_discovered_devices() + self.creator: Optional[DefaultCreator] = None + if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels): + self.creator = LSOCreator(inventory, self.coreV1_api, self.storage_class) + else: + self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class) + _add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts) + return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds) def _add_osds(current_cluster, new_cluster): # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster @@ -757,8 +908,6 @@ class RookCluster(object): ) return new_cluster - return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds) - """ def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str: current_json = self.rook_api_get( "{}/{}".format(crd_name, cr_name) -- 2.39.5