self.get_module_option(opt['name'])) # type: ignore
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
-
- self._rook_cluster.storage_class_name = self.storage_class_name
+ assert isinstance(self.storage_class_name, str)
+ self.rook_cluster.storage_class_name = self.storage_class_name
def shutdown(self) -> None:
self._shutdown.set()
log.exception("Failed to fetch device metadata")
raise
- nodename_to_devices = {}
lso_devices = {}
for i in lso_discovery_results:
drives = i['status']['discoveredDevices']
lso_devices[drive['deviceID'].split('/')[-1]] = drive
pvs_in_sc = [i for i in self.pvs.items if i.spec.storage_class_name == self.storage_class_name]
- def convert_size(size_str: str):
+ def convert_size(size_str: str) -> int:
units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei")
unit = size_str[-2:]
try:
size = coeff * (2 ** (10 * factor))
return size
+ nodename_to_devices: Dict[str, Any] = {}
for i in pvs_in_sc:
if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in lso_devices):
size = convert_size(i.spec.capacity['storage'])