import datetime
import threading
import logging
-import json
from contextlib import contextmanager
from time import sleep
+import re
import jsonpatch
from urllib.parse import urljoin
+import json
+
# Optional kubernetes imports to enable MgrModule.can_run
# to behave cleanly.
from urllib3.exceptions import ProtocolError
+from ceph.deployment.inventory import Device
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec
from ceph.utils import datetime_now
from mgr_module import NFS_POOL_NAME
from mgr_util import merge_dicts
-from typing import Optional, TypeVar, List, Callable, Any, cast, Generic, \
+from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \
Iterable, Dict, Iterator, Type
try:
return cast(Callable[..., threading.Thread], wrapper)
+class DefaultFetcher():
+ def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
+ self.storage_class = storage_class
+ self.coreV1_api = coreV1_api
+
+ def fetch(self) -> None:
+ self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
+ self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
+
+ def convert_size(self, size_str: str) -> int:
+ units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
+ coeff_and_unit = re.search('(\d+)(\D+)', size_str)
+ assert coeff_and_unit is not None
+ coeff = int(coeff_and_unit[1])
+ unit = coeff_and_unit[2]
+ try:
+ factor = units.index(unit) % 7
+ except ValueError:
+ log.error("PV size format invalid")
+ raise
+ size = coeff * (2 ** (10 * factor))
+ return size
+
+ def devices(self) -> Dict[str, List[Device]]:
+ nodename_to_devices: Dict[str, List[Device]] = {}
+ for i in self.pvs_in_sc:
+ node, device = self.device(i)
+ if node not in nodename_to_devices:
+ nodename_to_devices[node] = []
+ nodename_to_devices[node].append(device)
+ return nodename_to_devices
+
+ def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
+ node = 'N/A'
+ if i.spec.node_affinity:
+ terms = i.spec.node_affinity.required.node_selector_terms
+ if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
+ node = terms[0].match_expressions[0].values[0]
+ size = self.convert_size(i.spec.capacity['storage'])
+ path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
+ state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
+ pv_name = i.metadata.name
+ device = Device(
+ path = path,
+ sys_api = dict(
+ size = size,
+ node = node,
+ pv_name = pv_name
+ ),
+ available = state,
+ )
+ return (node, device)
+
+
+class LSOFetcher(DefaultFetcher):
+ def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
+ super().__init__(storage_class, coreV1_api)
+ self.customObjects_api = customObjects_api
+ self.nodenames = nodenames
+
+ def fetch(self) -> None:
+ super().fetch()
+ self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
+ group="local.storage.openshift.io",
+ version="v1alpha1",
+ plural="localvolumediscoveryresults")
+
+ def predicate(self, item: 'client.V1ConfigMapList') -> bool:
+ if self.nodenames is not None:
+ return item['spec']['nodeName'] in self.nodenames
+ else:
+ return True
+
+ def devices(self) -> Dict[str, List[Device]]:
+ try:
+ lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
+ except ApiException as dummy_e:
+ log.error("Failed to fetch device metadata")
+ raise
+ self.lso_devices = {}
+ for i in lso_discovery_results:
+ drives = i['status']['discoveredDevices']
+ for drive in drives:
+ self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
+ nodename_to_devices: Dict[str, List[Device]] = {}
+ for i in self.pvs_in_sc:
+ node, device = (None, None)
+ if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
+ node, device = super().device(i)
+ else:
+ node, device = self.device(i)
+ if node not in nodename_to_devices:
+ nodename_to_devices[node] = []
+ nodename_to_devices[node].append(device)
+ return nodename_to_devices
+
+ def device(self, i: Any) -> Tuple[str, Device]:
+ node = i.metadata.labels['kubernetes.io/hostname']
+ device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
+ pv_name = i.metadata.name
+ vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
+ model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
+ device = Device(
+ path = device_discovery['path'],
+ sys_api = dict(
+ size = device_discovery['size'],
+ rotational = '1' if device_discovery['property']=='Rotational' else '0',
+ node = node,
+ pv_name = pv_name,
+ model = model,
+ vendor = vendor
+ ),
+ available = device_discovery['status']['state']=='Available',
+ device_id = device_discovery['deviceID'].split('/')[-1],
+ lsm_data = dict(
+ serialNum = device_discovery['serial']
+ )
+ )
+ return (node, device)
+
+
+class PDFetcher(DefaultFetcher):
+ """ Physical Devices Fetcher"""
+ def __init__(self, coreV1_api: 'client.CoreV1Api'):
+ self.coreV1_api = coreV1_api
+
+ def fetch(self) -> None:
+ """ Collect the devices information from k8s configmaps"""
+ self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+ namespace='rook-ceph',
+ label_selector='app=rook-discover')
+
+ def devices(self) -> Dict[str, List[Device]]:
+ """ Return the list of devices found"""
+ node_devices: Dict[str, List[Device]] = {}
+ for i in self.dev_cms.items:
+ devices_list: List[Device] = []
+ for d in json.loads(i.data['devices']):
+ devices_list.append(self.device(d)[1])
+ node_devices[i.metadata.labels['rook.io/node']] = devices_list
+
+ return node_devices
+
+ def device(self, devData: Dict[str,str]) -> Tuple[str, Device]:
+ """ Build an orchestrator device """
+ if 'cephVolumeData' in devData and devData['cephVolumeData']:
+ return "", Device.from_json(json.loads(devData['cephVolumeData']))
+ else:
+ return "", Device(
+ path='/dev/' + devData['name'],
+ sys_api=dict(
+ rotational='1' if devData['rotational'] else '0',
+ size=devData['size']
+ ),
+ available=False,
+ rejected_reasons=['device data coming from ceph-volume not provided'],
+ )
+
class KubernetesResource(Generic[T]):
def __init__(self, api_func: Callable, **kwargs: Any) -> None:
self.exception = e
raise
+class KubernetesCustomResource(KubernetesResource):
+ def _fetch(self) -> str:
+ response = self.api_func(**self.kwargs)
+ metadata = response['metadata']
+ self._items = {item['metadata']['name']: item for item in response['items']}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata['resourceVersion']
+
+ def get_item_name(self, item: Any) -> Any:
+ try:
+ return item['metadata']['name']
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
- def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'):
+ def __init__(
+ self,
+ coreV1_api: 'client.CoreV1Api',
+ batchV1_api: 'client.BatchV1Api',
+ customObjects_api: 'client.CustomObjectsApi',
+ storageV1_api: 'client.StorageV1Api',
+ rook_env: 'RookEnv',
+ storage_class: 'str'
+ ):
self.rook_env = rook_env # type: RookEnv
self.coreV1_api = coreV1_api # client.CoreV1Api
self.batchV1_api = batchV1_api
+ self.customObjects_api = customObjects_api
+ self.storageV1_api = storageV1_api # client.StorageV1Api
+ self.storage_class = storage_class # type: str
# TODO: replace direct k8s calls with Rook API calls
# when they're implemented
+ self.storage_classes: KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
+
self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
namespace=self.rook_env.operator_namespace,
label_selector="app=rook-discover")
def rook_api_post(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("POST", path, **kwargs)
- def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]:
- def predicate(item: client.V1ConfigMapList) -> bool:
- if nodenames is not None:
- return item.metadata.labels['rook.io/node'] in nodenames
+ def get_storage_class(self) -> 'client.V1StorageClass':
+ matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
+ if len(matching_sc) == 0:
+ log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
+ raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
+ return matching_sc[0]
+
+ def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
+ self.fetcher: Optional[DefaultFetcher] = None
+ op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
+ if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
+ self.fetcher = PDFetcher(self.coreV1_api)
+ else:
+ storage_class = self.get_storage_class()
+ if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+ self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
else:
- return True
-
- try:
- result = [i for i in self.inventory_maps.items if predicate(i)]
- except ApiException as dummy_e:
- log.exception("Failed to fetch device metadata")
- raise
+ self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
- nodename_to_devices = {}
- for i in result:
- drives = json.loads(i.data['devices'])
- nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
-
- return nodename_to_devices
+ self.fetcher.fetch()
+ return self.fetcher.devices()
+ def get_osds(self) -> List:
+ osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+ return list(osd_pods.items)
+
def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
#
# Fetch cephnfs object for "nfs_cluster" and then return a rados://