import logging
from contextlib import contextmanager
from time import sleep
+import re
import jsonpatch
from urllib.parse import urljoin
"{} doesn't contain a metadata.name. Unable to track changes".format(
self.api_func))
+class DefaultCreator():
+ def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
+ self.coreV1_api = coreV1_api
+ self.storage_class = storage_class
+ self.inventory = inventory
+
+ def parse_drive_group_size(self, drive_group_size: Optional[str]) -> Tuple[int, int]:
+ no_bound = -1
+ if not drive_group_size:
+ return (no_bound,no_bound)
+ if ':' in drive_group_size:
+ bounds = drive_group_size.split(':')
+ low_str = bounds[0]
+ high_str = bounds[1]
+ low = self.parse_bound(low_str)
+ high = self.parse_bound(high_str)
+ return (low, high)
+ else:
+ exact = self.parse_bound(drive_group_size)
+ return (exact, exact)
+
+ def parse_bound(self, bound: str) -> int:
+ no_bound = -1
+ if not bound:
+ return no_bound
+ else:
+ coeff_and_unit = re.search('(\d+)(\D+)', bound)
+ assert coeff_and_unit is not None
+ coeff = int(coeff_and_unit[1])
+ unit = coeff_and_unit[2]
+ units = ("M", "G", "T", "MB", "GB", "TB")
+ factor = units.index(unit) % 3
+ result = coeff * (1000 ** (factor + 2))
+ return result
+
+ def check_bounds(self, low: int, high: int, size: int) -> bool:
+ if low == -1 and high == -1:
+ return True
+ elif low == -1:
+ if size <= high:
+ return True
+ elif high == -1:
+ if size >= low:
+ return True
+ else:
+ if size <= high and size >= low:
+ return True
+ return False
+
+ def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
+ device_set = ccl.StorageClassDeviceSetsItem(
+ name=d.sys_api['pv_name'],
+ volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
+ count=1,
+ encrypted=drive_group.encrypted,
+ portable=False
+ )
+ device_set.volumeClaimTemplates.append(
+ ccl.VolumeClaimTemplatesItem(
+ metadata=ccl.Metadata(
+ name="data"
+ ),
+ spec=ccl.Spec(
+ storageClassName=self.storage_class,
+ volumeMode="Block",
+ accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
+ resources={
+ "requests":{
+ "storage": 1
+ }
+ },
+ volumeName=d.sys_api['pv_name']
+ )
+ )
+ )
+ return device_set
+
+ def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+ device_list = []
+ assert drive_group.data_devices is not None
+ low, high = self.parse_drive_group_size(drive_group.data_devices.size)
+ limit = drive_group.data_devices.limit if hasattr(drive_group.data_devices, 'limit') else None
+ count = 0
+ all = drive_group.data_devices.all if hasattr(drive_group.data_devices, 'all') else None
+ paths = [device.path for device in drive_group.data_devices.paths]
+ osd_list = []
+ for pod in rook_pods.items:
+ if hasattr(pod, 'metadata') and hasattr(pod.metadata, 'labels') and 'osd' in pod.metadata.labels and 'ceph.rook.io/DeviceSet' in pod.metadata.labels:
+ osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+ for _, node in self.inventory.items():
+ for device in node:
+ if device.sys_api['pv_name'] in osd_list:
+ count += 1
+ for _, node in self.inventory.items():
+ for device in node:
+ if not limit or (count < limit):
+ if device.available:
+ if all or ((device.sys_api['node'] in matching_hosts) and (self.check_bounds(low, high, int(device.sys_api['size']))) and ((not drive_group.data_devices.paths) or (device.path in paths))):
+ device_list.append(device)
+ count += 1
+ return device_list
+
+ def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
+ to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
+ assert drive_group.data_devices is not None
+ def _add_osds(current_cluster, new_cluster):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
+ new_cluster.spec.storage = ccl.Storage()
+
+ if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
+ new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
+
+ for device in to_create:
+ new_scds = self.device_to_device_set(drive_group, device)
+ new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
+ return new_cluster
+ return _add_osds
+
+class LSOCreator(DefaultCreator):
+ def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+ device_list = []
+ assert drive_group.data_devices is not None
+ low, high = self.parse_drive_group_size(drive_group.data_devices.size)
+ limit = drive_group.data_devices.limit if hasattr(drive_group.data_devices, 'limit') else None
+ count = 0
+ all = drive_group.data_devices.all if hasattr(drive_group.data_devices, 'all') else None
+ paths = [device.path for device in drive_group.data_devices.paths]
+ vendor = drive_group.data_devices.model if hasattr(drive_group.data_devices, 'vendor') else None
+ model = drive_group.data_devices.model if hasattr(drive_group.data_devices, 'model') else None
+ osd_list = []
+ for pod in rook_pods.items:
+ if hasattr(pod, 'metadata') and hasattr(pod.metadata, 'labels') and 'osd' in pod.metadata.labels and 'ceph.rook.io/DeviceSet' in pod.metadata.labels:
+ osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+ for _, node in self.inventory.items():
+ for device in node:
+ if device.sys_api['pv_name'] in osd_list:
+ count += 1
+ for _, node in self.inventory.items():
+ for device in node:
+ if not limit or (count < limit):
+ if device.available:
+ if all or ((device.sys_api['node'] in matching_hosts) and (self.check_bounds(low, high, int(device.sys_api['size']))) and ((not drive_group.data_devices.paths) or (device.path in paths)) and (not vendor or (device.sys_api['vendor'] == vendor)) and (not model or (device.sys_api['model'].startsWith(model)))):
+ device_list.append(device)
+ count += 1
+ return device_list
+
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
new.spec.mon.count = newcount
return new
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
- """
+
def add_osds(self, drive_group, matching_hosts):
# type: (DriveGroupSpec, List[str]) -> str
-
- # Rook currently (0.8) can only do single-drive OSDs, so we
- # treat all drive groups as just a list of individual OSDs.
-
- block_devices = drive_group.data_devices.paths if drive_group.data_devices else []
- directories = drive_group.data_directories
-
assert drive_group.objectstore in ("bluestore", "filestore")
+ assert drive_group.service_id
+ storage_class = self.get_storage_class()
+ inventory = self.get_discovered_devices()
+ self.creator: Optional[DefaultCreator] = None
+ if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+ self.creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
+ else:
+ self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
+ _add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts)
+ return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
def _add_osds(current_cluster, new_cluster):
# type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
)
return new_cluster
- return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
- """
def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)