]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pacific: mgr/rook: Device inventory wip-58578-pacific 49876/head
authorJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Thu, 6 Oct 2022 08:54:47 +0000 (10:54 +0200)
committerJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Fri, 27 Jan 2023 11:43:01 +0000 (12:43 +0100)
Recovered device inventory list for host clusters

Signed-off-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
(cherry picked from commit 3a2a819c869ad3b0a28c5788b49358895fb9392c)

Reviewed-by: Travis Nielsen <tnielsen@redhat.com>
             Nizamudeen A <nia@redhat.com>

src/pybind/mgr/orchestrator/module.py
src/pybind/mgr/rook/rook_cluster.py

index 2288cfeef35c6aff418d065d2b977cbc6bccca05..b0a1737631534b29b97b252447a9e3bc3b0d656f 100644 (file)
@@ -498,7 +498,7 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
                 "On": "On",
                 "Off": "Off",
                 True: "Yes",
-                False: "",
+                False: "No",
             }
 
             out = []
index 323ac68888f4c9b6d324e6bfd70d4c2e7a5b8be2..e3ccbc259b3e6671afa60a115279e002c55f5212 100644 (file)
@@ -9,24 +9,27 @@ This module is runnable outside of ceph-mgr, useful for testing.
 import datetime
 import threading
 import logging
-import json
 from contextlib import contextmanager
 from time import sleep
+import re
 
 import jsonpatch
 from urllib.parse import urljoin
+import json
+
 
 # Optional kubernetes imports to enable MgrModule.can_run
 # to behave cleanly.
 from urllib3.exceptions import ProtocolError
 
+from ceph.deployment.inventory import Device
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec
 from ceph.utils import datetime_now
 from mgr_module import NFS_POOL_NAME
 from mgr_util import merge_dicts
 
-from typing import Optional, TypeVar, List, Callable, Any, cast, Generic, \
+from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \
     Iterable, Dict, Iterator, Type
 
 try:
@@ -87,6 +90,164 @@ def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
 
     return cast(Callable[..., threading.Thread], wrapper)
 
+class DefaultFetcher():
+    def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
+        self.storage_class = storage_class
+        self.coreV1_api = coreV1_api
+
+    def fetch(self) -> None:
+        self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
+        self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
+
+    def convert_size(self, size_str: str) -> int:
+        units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
+        coeff_and_unit = re.search('(\d+)(\D+)', size_str)
+        assert coeff_and_unit is not None
+        coeff = int(coeff_and_unit[1])
+        unit = coeff_and_unit[2]
+        try: 
+            factor = units.index(unit) % 7
+        except ValueError:
+            log.error("PV size format invalid")
+            raise
+        size = coeff * (2 ** (10 * factor))
+        return size
+
+    def devices(self) -> Dict[str, List[Device]]:
+        nodename_to_devices: Dict[str, List[Device]] = {}
+        for i in self.pvs_in_sc:
+            node, device = self.device(i)
+            if node not in nodename_to_devices:
+                nodename_to_devices[node] = []
+            nodename_to_devices[node].append(device)
+        return nodename_to_devices
+
+    def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
+        node = 'N/A'
+        if i.spec.node_affinity:
+            terms = i.spec.node_affinity.required.node_selector_terms
+            if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
+                node = terms[0].match_expressions[0].values[0]
+        size = self.convert_size(i.spec.capacity['storage'])
+        path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
+        state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
+        pv_name = i.metadata.name
+        device = Device(
+                path = path,
+                sys_api = dict(
+                    size = size,
+                    node = node,
+                    pv_name = pv_name
+                ),
+                available = state,
+        )
+        return (node, device)
+        
+
+class LSOFetcher(DefaultFetcher):
+    def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
+        super().__init__(storage_class, coreV1_api)
+        self.customObjects_api = customObjects_api
+        self.nodenames = nodenames
+
+    def fetch(self) -> None:
+        super().fetch()
+        self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
+                                                 group="local.storage.openshift.io",
+                                                 version="v1alpha1",
+                                                 plural="localvolumediscoveryresults")
+
+    def predicate(self, item: 'client.V1ConfigMapList') -> bool:
+            if self.nodenames is not None:
+                return item['spec']['nodeName'] in self.nodenames
+            else:
+                return True
+
+    def devices(self) -> Dict[str, List[Device]]:
+        try:
+            lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
+        except ApiException as dummy_e:
+            log.error("Failed to fetch device metadata")
+            raise
+        self.lso_devices = {}
+        for i in lso_discovery_results:
+            drives = i['status']['discoveredDevices']
+            for drive in drives:
+                self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
+        nodename_to_devices: Dict[str, List[Device]] = {}
+        for i in self.pvs_in_sc:
+            node, device = (None, None)
+            if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
+                node, device = super().device(i)
+            else:
+                node, device = self.device(i)
+            if node not in nodename_to_devices:
+                nodename_to_devices[node] = []
+            nodename_to_devices[node].append(device)
+        return nodename_to_devices
+            
+    def device(self, i: Any) -> Tuple[str, Device]:
+        node = i.metadata.labels['kubernetes.io/hostname']
+        device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
+        pv_name = i.metadata.name
+        vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
+        model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
+        device = Device(
+            path = device_discovery['path'],
+            sys_api = dict(
+                    size = device_discovery['size'],
+                    rotational = '1' if device_discovery['property']=='Rotational' else '0',
+                    node = node,
+                    pv_name = pv_name,
+                    model = model,
+                    vendor = vendor
+                ),
+            available = device_discovery['status']['state']=='Available',
+            device_id = device_discovery['deviceID'].split('/')[-1],
+            lsm_data = dict(
+                serialNum = device_discovery['serial']
+            )
+        )
+        return (node, device)
+
+
+class PDFetcher(DefaultFetcher):
+    """ Physical Devices Fetcher"""
+    def __init__(self, coreV1_api: 'client.CoreV1Api'):
+        self.coreV1_api = coreV1_api
+
+    def fetch(self) -> None:
+        """ Collect the devices information from k8s configmaps"""
+        self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+                                                              namespace='rook-ceph',
+                                                              label_selector='app=rook-discover')
+
+    def devices(self) -> Dict[str, List[Device]]:
+        """ Return the list of devices found"""
+        node_devices: Dict[str, List[Device]] = {}
+        for i in self.dev_cms.items:
+            devices_list: List[Device] = []
+            for d in json.loads(i.data['devices']):
+                devices_list.append(self.device(d)[1])
+            node_devices[i.metadata.labels['rook.io/node']] = devices_list
+
+        return node_devices
+
+    def device(self, devData: Dict[str,str]) -> Tuple[str, Device]:
+        """ Build an orchestrator device """
+        if 'cephVolumeData' in devData and devData['cephVolumeData']:
+            return "", Device.from_json(json.loads(devData['cephVolumeData']))
+        else:
+            return "", Device(
+                path='/dev/' + devData['name'],
+                sys_api=dict(
+                    rotational='1' if devData['rotational'] else '0',
+                    size=devData['size']
+                ),
+                available=False,
+                rejected_reasons=['device data coming from ceph-volume not provided'],
+            )
+
 
 class KubernetesResource(Generic[T]):
     def __init__(self, api_func: Callable, **kwargs: Any) -> None:
@@ -186,17 +347,45 @@ class KubernetesResource(Generic[T]):
             self.exception = e
             raise
 
+class KubernetesCustomResource(KubernetesResource):
+    def _fetch(self) -> str:
+        response = self.api_func(**self.kwargs)
+        metadata = response['metadata']
+        self._items = {item['metadata']['name']: item for item in response['items']}
+        log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+        return metadata['resourceVersion']
+
+    def get_item_name(self, item: Any) -> Any:
+        try:
+            return item['metadata']['name']
+        except AttributeError:
+                    raise AttributeError(
+                        "{} doesn't contain a metadata.name. Unable to track changes".format(
+                            self.api_func))
 
 class RookCluster(object):
     # import of client.CoreV1Api must be optional at import time.
     # Instead allow mgr/rook to be imported anyway.
-    def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'):
+    def __init__(
+        self,
+        coreV1_api: 'client.CoreV1Api',
+        batchV1_api: 'client.BatchV1Api',
+        customObjects_api: 'client.CustomObjectsApi',
+        storageV1_api: 'client.StorageV1Api',
+        rook_env: 'RookEnv',
+        storage_class: 'str'
+    ):
         self.rook_env = rook_env  # type: RookEnv
         self.coreV1_api = coreV1_api  # client.CoreV1Api
         self.batchV1_api = batchV1_api
+        self.customObjects_api = customObjects_api
+        self.storageV1_api = storageV1_api  # client.StorageV1Api
+        self.storage_class = storage_class  # type: str
 
         #  TODO: replace direct k8s calls with Rook API calls
         # when they're implemented
+        self.storage_classes: KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
+
         self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
                                                  namespace=self.rook_env.operator_namespace,
                                                  label_selector="app=rook-discover")
@@ -239,26 +428,32 @@ class RookCluster(object):
     def rook_api_post(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("POST", path, **kwargs)
 
-    def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]:
-        def predicate(item: client.V1ConfigMapList) -> bool:
-            if nodenames is not None:
-                return item.metadata.labels['rook.io/node'] in nodenames
+    def get_storage_class(self) -> 'client.V1StorageClass':
+        matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
+        if len(matching_sc) == 0:
+            log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
+            raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
+        return matching_sc[0]
+
+    def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
+        self.fetcher: Optional[DefaultFetcher] = None
+        op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
+        if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
+            self.fetcher = PDFetcher(self.coreV1_api)
+        else:
+            storage_class = self.get_storage_class()
+            if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+                self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
             else:
-                return True
-
-        try:
-            result = [i for i in self.inventory_maps.items if predicate(i)]
-        except ApiException as dummy_e:
-            log.exception("Failed to fetch device metadata")
-            raise
+                self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
 
-        nodename_to_devices = {}
-        for i in result:
-            drives = json.loads(i.data['devices'])
-            nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
-
-        return nodename_to_devices
+        self.fetcher.fetch()
+        return self.fetcher.devices()
 
+    def get_osds(self) -> List:
+        osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+        return list(osd_pods.items)
+        
     def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
         #
         # Fetch cephnfs object for "nfs_cluster" and then return a rados://