for host_name, host_devs in discovered_devs.items():
devs = []
for d in host_devs:
- if 'cephVolumeData' in d and d['cephVolumeData']:
- devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData'])))
- else:
- devs.append(inventory.Device(
- path = '/dev/' + d['name'],
- sys_api = dict(
- rotational = '1' if d['rotational'] else '0',
- size = d['size']
- ),
- available = False,
- rejected_reasons=['device data coming from ceph-volume not provided'],
- ))
+ devs.append(inventory.Device(
+ path = d['path'],
+ sys_api = dict(
+ size = d['size']
+ ),
+ available = d['status']['state']=='Available',
+ ))
+ if 'property' in d:
+ devs[-1].sys_api['rotational'] = '1' if d['property']=='Rotational' else '0'
+ if 'deviceID' in d and d['deviceID']:
+ devs[-1].device_id = d['deviceID'].split('/')[-1]
+ if 'serial' in d and d['serial']:
+ if not devs[-1].lsm_data:
+ devs[-1].lsm_data = dict()
+ devs[-1].lsm_data['serialNum'] = d['serial']
result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
import datetime
import threading
import logging
-import json
from contextlib import contextmanager
from time import sleep
def _fetch(self) -> str:
""" Execute the requested api method as a one-off fetch"""
response = self.api_func(**self.kwargs)
- # metadata is a client.V1ListMeta object type
- metadata = response.metadata # type: client.V1ListMeta
- self._items = {item.metadata.name: item for item in response.items}
- log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
- return metadata.resource_version
+ # try-except pattern because customObjectApi objects aren't subscriptable
+ try:
+ metadata = response['metadata']
+ self._items = {item['metadata']['name']: item for item in response['items']}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata['resourceVersion']
+ except TypeError:
+ metadata = response.metadata
+ self._items = {item.metadata.name: item for item in response.items}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata.resource_version
@property
def items(self) -> Iterable[T]:
self.health = ''
item = event['object']
try:
- name = item.metadata.name
+ name = item['metadata']['name']
except AttributeError:
raise AttributeError(
"{} doesn't contain a metadata.name. Unable to track changes".format(
self.api_func))
+ except TypeError:
+ name = item.metadata.name
log.info('{} event: {}'.format(event['type'], name))
# TODO: replace direct k8s calls with Rook API calls
# when they're implemented
- self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
- namespace=self.rook_env.operator_namespace,
- label_selector="app=rook-discover")
+ self.pvs : KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
+
+ self.discovery_results: KubernetesResource = KubernetesResource(self.customObjects_api.list_cluster_custom_object,
+ group="local.storage.openshift.io",
+ version="v1alpha1",
+ plural="localvolumediscoveryresults")
+
self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
namespace=self.rook_env.namespace,
def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]:
def predicate(item: client.V1ConfigMapList) -> bool:
if nodenames is not None:
- return item.metadata.labels['rook.io/node'] in nodenames
+ return item['spec']['nodeName'] in nodenames
else:
return True
try:
- result = [i for i in self.inventory_maps.items if predicate(i)]
+ devices = [i for i in self.discovery_results.items if predicate(i)]
except ApiException as dummy_e:
log.exception("Failed to fetch device metadata")
raise
nodename_to_devices = {}
- for i in result:
- drives = json.loads(i.data['devices'])
- nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
+ lso_devices = {}
+ for i in devices:
+ drives = i['status']['discoveredDevices']
+ for drive in drives:
+ lso_devices[drive['deviceID'].split('/')[-1]] = drive
+ pvs = [i for i in self.pvs.items]
+
+ def convert_size(size_str: str):
+ units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei")
+ unit = size_str[-2:]
+ try:
+ factor = units.index(unit)
+ except ValueError:
+ log.exception("PV size format invalid")
+ raise
+ coeff = int(size_str[:-2])
+ size = coeff * (2 ** (10 * factor))
+ return size
+
+ for i in pvs:
+ if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in lso_devices):
+ size = convert_size(i.spec.capacity['storage'])
+ path = i.spec.host_path.path if i.spec.host_path else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations['storage.openshift.com/device-name'] else ''
+ state = 'Available' if (i.spec.volume_mode == 'Block' and i.spec.claim_ref == None) else 'Unavailable'
+ info = {
+ 'path': path,
+ 'size': size,
+ 'status': {
+ 'state': state
+ }
+ }
+ node = 'N/A'
+
+ if i.spec.node_affinity:
+ terms = i.spec.node_affinity.required.node_selector_terms
+ if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
+ node = terms[0].match_expressions[0].values[0]
+ if node not in nodename_to_devices:
+ nodename_to_devices[node] = []
+ nodename_to_devices[node].append(info)
+ else:
+ if i.metadata.labels['kubernetes.io/hostname'] not in nodename_to_devices:
+ nodename_to_devices[i.metadata.labels['kubernetes.io/hostname']] = []
+ nodename_to_devices[i.metadata.labels['kubernetes.io/hostname']].append(lso_devices[i.metadata.annotations['storage.openshift.com/device-id']])
+
return nodename_to_devices