From 3f6dae8d616e5c5064e7f9f9920d340eb4d32c5b Mon Sep 17 00:00:00 2001 From: Guillaume Abrioux Date: Wed, 8 Jan 2025 08:38:11 +0000 Subject: [PATCH] ceph-volume: add type annotation to api.lvm This adds Python type annotations to `api.lvm`, along with all necessary adjustments to ensure compatibility and maintain code clarity. Signed-off-by: Guillaume Abrioux --- src/ceph-volume/ceph_volume/api/lvm.py | 222 +++++++++++++------------ 1 file changed, 118 insertions(+), 104 deletions(-) diff --git a/src/ceph-volume/ceph_volume/api/lvm.py b/src/ceph-volume/ceph_volume/api/lvm.py index c824accf3b127..8a38c536aa0f2 100644 --- a/src/ceph-volume/ceph_volume/api/lvm.py +++ b/src/ceph-volume/ceph_volume/api/lvm.py @@ -10,19 +10,19 @@ from itertools import repeat from math import floor from ceph_volume import process, util, conf from ceph_volume.exceptions import SizeAllocationError -from typing import Any, Dict +from typing import Any, Dict, Optional, List, Union, Set logger = logging.getLogger(__name__) -def convert_filters_to_str(filters): +def convert_filters_to_str(filters: Dict[str, Any]) -> str: """ Convert filter args from dictionary to following format - filters={filter_name=filter_val,...} """ if not filters: - return filters + return '' filter_arg = '' for k, v in filters.items(): @@ -33,13 +33,13 @@ def convert_filters_to_str(filters): return filter_arg -def convert_tags_to_str(tags): +def convert_tags_to_str(tags: Dict[str, Any]) -> str: """ Convert tags from dictionary to following format - tags={tag_name=tag_val,...} """ if not tags: - return tags + return '' tag_arg = 'tags={' for k, v in tags.items(): @@ -50,7 +50,7 @@ def convert_tags_to_str(tags): return tag_arg -def make_filters_lvmcmd_ready(filters, tags): +def make_filters_lvmcmd_ready(filters: Dict[str, Any], tags: Dict[str, Any]) -> str: """ Convert filters (including tags) from dictionary to following format - filter_name=filter_val...,tags={tag_name=tag_val,...} @@ -58,20 +58,20 @@ def make_filters_lvmcmd_ready(filters, tags): The command will look as follows = lvs -S filter_name=filter_val...,tags={tag_name=tag_val,...} """ - filters = convert_filters_to_str(filters) - tags = convert_tags_to_str(tags) - - if filters and tags: - return filters + ',' + tags - if filters and not tags: - return filters - if not filters and tags: - return tags + filters_str = convert_filters_to_str(filters) + tags_str = convert_tags_to_str(tags) + + if filters_str and tags_str: + return filters_str + ',' + tags_str + if filters_str and not tags_str: + return filters_str + if not filters_str and tags_str: + return tags_str else: return '' -def _output_parser(output, fields): +def _output_parser(output: List[str], fields: str) -> List[Dict[str, Any]]: """ Newer versions of LVM allow ``--reportformat=json``, but older versions, like the one included in Xenial do not. LVM has the ability to filter and @@ -106,7 +106,7 @@ def _output_parser(output, fields): return report -def _splitname_parser(line): +def _splitname_parser(line: str) -> Dict[str, Any]: """ Parses the output from ``dmsetup splitname``, that should contain prefixes (--nameprefixes) and set the separator to ";" @@ -122,7 +122,7 @@ def _splitname_parser(line): :returns: dictionary with stripped prefixes """ - parsed = {} + parsed: Dict[str, Any] = {} try: parts = line[0].split(';') except IndexError: @@ -140,7 +140,7 @@ def _splitname_parser(line): return parsed -def sizing(device_size, parts=None, size=None): +def sizing(device_size: int, parts: Optional[int] = None, size: Optional[int] = None) -> Dict[str, Any]: """ Calculate proper sizing to fully utilize the volume group in the most efficient way possible. To prevent situations where LVM might accept @@ -164,7 +164,7 @@ def sizing(device_size, parts=None, size=None): if size and size > device_size: raise SizeAllocationError(size, device_size) - def get_percentage(parts): + def get_percentage(parts: int) -> int: return int(floor(100 / float(parts))) if parts is not None: @@ -185,7 +185,7 @@ def sizing(device_size, parts=None, size=None): } -def parse_tags(lv_tags): +def parse_tags(lv_tags: str) -> Dict[str, Any]: """ Return a dictionary mapping of all the tags associated with a Volume from the comma-separated tags coming from the LVM API @@ -214,7 +214,7 @@ def parse_tags(lv_tags): return tag_mapping -def _vdo_parents(devices): +def _vdo_parents(devices: List[str]) -> List[str]: """ It is possible we didn't get a logical volume, or a mapper path, but a device like /dev/sda2, to resolve this, we must look at all the slaves of @@ -230,7 +230,7 @@ def _vdo_parents(devices): return parent_devices -def _vdo_slaves(vdo_names): +def _vdo_slaves(vdo_names: List[str]) -> List[str]: """ find all the slaves associated with each vdo name (from realpath) by going into /sys/block//slaves @@ -255,7 +255,7 @@ def _vdo_slaves(vdo_names): return devices -def _is_vdo(path): +def _is_vdo(path: str) -> bool: """ A VDO device can be composed from many different devices, go through each one of those devices and its slaves (if any) and correlate them back to @@ -279,7 +279,7 @@ def _is_vdo(path): # find all the slaves associated with each vdo name (from realpath) by # going into /sys/block//slaves - devices.extend(_vdo_slaves(vdo_names)) + devices.extend(_vdo_slaves(list(vdo_names))) # Find all possible parents, looking into slaves that are related to VDO devices.extend(_vdo_parents(devices)) @@ -290,7 +290,7 @@ def _is_vdo(path): realpath_name in devices]) -def is_vdo(path): +def is_vdo(path: str) -> str: """ Detect if a path is backed by VDO, proxying the actual call to _is_vdo so that we can prevent an exception breaking OSD creation. If an exception is @@ -306,7 +306,7 @@ def is_vdo(path): return '0' -def dmsetup_splitname(dev): +def dmsetup_splitname(dev: str) -> Dict[str, Any]: """ Run ``dmsetup splitname`` and parse the results. @@ -322,7 +322,7 @@ def dmsetup_splitname(dev): return _splitname_parser(out) -def is_ceph_device(lv): +def is_ceph_device(lv: "Volume") -> bool: try: lv.tags['ceph.osd_id'] except (KeyError, AttributeError): @@ -343,26 +343,28 @@ def is_ceph_device(lv): PV_FIELDS = 'pv_name,pv_tags,pv_uuid,vg_name,lv_uuid' -class PVolume(object): +class PVolume: """ Represents a Physical Volume from LVM, with some top-level attributes like ``pv_name`` and parsed tags as a dictionary of key/value pairs. """ - def __init__(self, **kw): + def __init__(self, **kw: Any) -> None: + self.pv_name: str = '' + self.pv_uuid: str = '' for k, v in kw.items(): setattr(self, k, v) self.pv_api = kw self.name = kw['pv_name'] self.tags = parse_tags(kw['pv_tags']) - def __str__(self): + def __str__(self) -> str: return '<%s>' % self.pv_api['pv_name'] - def __repr__(self): + def __repr__(self) -> str: return self.__str__() - def set_tags(self, tags): + def set_tags(self, tags: Dict[str, Any]) -> None: """ :param tags: A dictionary of tag names and values, like:: @@ -378,15 +380,15 @@ class PVolume(object): self.set_tag(k, v) # after setting all the tags, refresh them for the current object, use the # pv_* identifiers to filter because those shouldn't change - pv_object = self.get_single_pv(filter={'pv_name': self.pv_name, - 'pv_uuid': self.pv_uuid}) + pv_object = get_single_pv(filters={'pv_name': self.pv_name, + 'pv_uuid': self.pv_uuid}) if not pv_object: raise RuntimeError('No PV was found.') self.tags = pv_object.tags - def set_tag(self, key, value): + def set_tag(self, key: str, value: str) -> None: """ Set the key/value pair as an LVM tag. Does not "refresh" the values of the current object for its tags. Meant to be a "fire and forget" type @@ -412,7 +414,7 @@ class PVolume(object): ) -def create_pv(device): +def create_pv(device: str) -> None: """ Create a physical volume from a device, useful when devices need to be later mapped to journals. @@ -426,7 +428,7 @@ def create_pv(device): ], run_on_host=True) -def remove_pv(pv_name): +def remove_pv(pv_name: str) -> None: """ Removes a physical volume using a double `-f` to prevent prompts and fully remove anything related to LVM. This is tremendously destructive, but so is all other actions @@ -454,7 +456,7 @@ def remove_pv(pv_name): ) -def get_pvs(fields=PV_FIELDS, filters='', tags=None): +def get_pvs(fields: str = PV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[PVolume]: """ Return a list of PVs that are available on the system and match the filters and tags passed. Argument filters takes a dictionary containing @@ -470,16 +472,20 @@ def get_pvs(fields=PV_FIELDS, filters='', tags=None): :param tags: dictionary containng LVM tags :returns: list of class PVolume object representing pvs on the system """ - filters = make_filters_lvmcmd_ready(filters, tags) + if filters is None: + filters = {} + if tags is None: + tags = {} + filters_str = make_filters_lvmcmd_ready(filters, tags) args = ['pvs', '--noheadings', '--readonly', '--separator=";"', '-S', - filters, '-o', fields] + filters_str, '-o', fields] stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False) pvs_report = _output_parser(stdout, fields) return [PVolume(**pv_report) for pv_report in pvs_report] -def get_single_pv(fields=PV_FIELDS, filters=None, tags=None): +def get_single_pv(fields: str = PV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[PVolume]: """ Wrapper of get_pvs() meant to be a convenience method to avoid the phrase:: pvs = get_pvs() @@ -511,10 +517,12 @@ class VolumeGroup(object): Represents an LVM group, with some top-level attributes like ``vg_name`` """ - def __init__(self, **kw): + def __init__(self, **kw: Any) -> None: self.pv_name: str = '' self.vg_name: str = '' self.vg_free_count: str = '' + self.vg_extent_size: str = '' + self.vg_extent_count: str = '' for k, v in kw.items(): setattr(self, k, v) self.name = kw['vg_name'] @@ -522,34 +530,34 @@ class VolumeGroup(object): raise ValueError('VolumeGroup must have a non-empty name') self.tags = parse_tags(kw.get('vg_tags', '')) - def __str__(self): + def __str__(self) -> str: return '<%s>' % self.name - def __repr__(self): + def __repr__(self) -> str: return self.__str__() @property - def free(self): + def free(self) -> int: """ Return free space in VG in bytes """ return int(self.vg_extent_size) * int(self.vg_free_count) @property - def free_percent(self): + def free_percent(self) -> float: """ Return free space in VG in bytes """ return int(self.vg_free_count) / int(self.vg_extent_count) @property - def size(self): + def size(self) -> int: """ Returns VG size in bytes """ return int(self.vg_extent_size) * int(self.vg_extent_count) - def sizing(self, parts=None, size=None): + def sizing(self, parts: Optional[int] = None, size: Optional[int] = None) -> Dict[str, Any]: """ Calculate proper sizing to fully utilize the volume group in the most efficient way possible. To prevent situations where LVM might accept @@ -559,7 +567,7 @@ class VolumeGroup(object): A dictionary with different sizing parameters is returned, to make it easier for others to choose what they need in order to create logical - volumes:: + volumes: >>> data_vg.free 1024 @@ -590,9 +598,9 @@ class VolumeGroup(object): extents = int(size / int(self.vg_extent_size)) disk_sizing = sizing(self.free, size=size, parts=parts) else: - if parts is not None: + if parts is None or parts == 0: # Prevent parts being 0, falling back to 1 (100% usage) - parts = parts or 1 + parts = 1 size = int(self.free / parts) extents = size * vg_free_count / self.free disk_sizing = sizing(self.free, parts=parts) @@ -603,7 +611,7 @@ class VolumeGroup(object): disk_sizing['percentages'] = extent_sizing['percentages'] return disk_sizing - def bytes_to_extents(self, size): + def bytes_to_extents(self, size: int) -> int: ''' Return a how many free extents we can fit into a size in bytes. This has some uncertainty involved. If size/extent_size is within 1% of the @@ -631,14 +639,14 @@ class VolumeGroup(object): 'bytes) are free'.format(size, self.vg_free_count, self.free)) - def slots_to_extents(self, slots): + def slots_to_extents(self, slots: int) -> int: ''' Return how many extents fit the VG slot times ''' return int(int(self.vg_extent_count) / slots) -def create_vg(devices, name=None, name_prefix=None): +def create_vg(devices: Union[str, Set, List[str]], name: Optional[str] = None, name_prefix: str = '') -> Optional[VolumeGroup]: """ Create a Volume Group. Command looks like:: @@ -671,7 +679,7 @@ def create_vg(devices, name=None, name_prefix=None): return get_single_vg(filters={'vg_name': name}) -def extend_vg(vg, devices): +def extend_vg(vg: VolumeGroup, devices: Union[List[str], str]) -> Optional[VolumeGroup]: """ Extend a Volume Group. Command looks like:: @@ -696,7 +704,7 @@ def extend_vg(vg, devices): return get_single_vg(filters={'vg_name': vg.name}) -def reduce_vg(vg, devices): +def reduce_vg(vg: VolumeGroup, devices: Union[List[str], str]) -> Optional[VolumeGroup]: """ Reduce a Volume Group. Command looks like:: @@ -716,10 +724,10 @@ def reduce_vg(vg, devices): run_on_host=True ) - return get_single_vg(filter={'vg_name': vg.name}) + return get_single_vg(filters={'vg_name': vg.name}) -def remove_vg(vg_name): +def remove_vg(vg_name: str) -> None: """ Removes a volume group. """ @@ -739,7 +747,7 @@ def remove_vg(vg_name): ) -def get_vgs(fields=VG_FIELDS, filters='', tags=None): +def get_vgs(fields: str = VG_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[VolumeGroup]: """ Return a list of VGs that are available on the system and match the filters and tags passed. Argument filters takes a dictionary containing @@ -755,15 +763,19 @@ def get_vgs(fields=VG_FIELDS, filters='', tags=None): :param tags: dictionary containng LVM tags :returns: list of class VolumeGroup object representing vgs on the system """ - filters = make_filters_lvmcmd_ready(filters, tags) - args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters, '-o', fields] + if filters is None: + filters = {} + if tags is None: + tags = {} + filters_str = make_filters_lvmcmd_ready(filters, tags) + args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters_str, '-o', fields] stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False) vgs_report =_output_parser(stdout, fields) return [VolumeGroup(**vg_report) for vg_report in vgs_report] -def get_single_vg(fields=VG_FIELDS, filters=None, tags=None): +def get_single_vg(fields: str = VG_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[VolumeGroup]: """ Wrapper of get_vgs() meant to be a convenience method to avoid the phrase:: vgs = get_vgs() @@ -780,7 +792,7 @@ def get_single_vg(fields=VG_FIELDS, filters=None, tags=None): return vgs[0] -def get_device_vgs(device, name_prefix=''): +def get_device_vgs(device: str, name_prefix: str = '') -> List[VolumeGroup]: stdout, stderr, returncode = process.call( ['pvs'] + VG_CMD_OPTIONS + ['-o', VG_FIELDS, device], run_on_host=True, @@ -790,7 +802,7 @@ def get_device_vgs(device, name_prefix=''): return [VolumeGroup(**vg) for vg in vgs if vg['vg_name'] and vg['vg_name'].startswith(name_prefix)] -def get_all_devices_vgs(name_prefix=''): +def get_all_devices_vgs(name_prefix: str = '') -> List[VolumeGroup]: vg_fields = f'pv_name,{VG_FIELDS}' cmd = ['pvs'] + VG_CMD_OPTIONS + ['-o', vg_fields] stdout, stderr, returncode = process.call( @@ -818,7 +830,7 @@ class Volume: ``lv_name`` and parsed tags as a dictionary of key/value pairs. """ - def __init__(self, **kw: str) -> None: + def __init__(self, **kw: Any) -> None: self.lv_path: str = '' self.lv_name: str = '' self.lv_uuid: str = '' @@ -839,8 +851,8 @@ class Volume: def __repr__(self) -> str: return self.__str__() - def as_dict(self) -> Dict[str, Any]: - obj = {} + def as_dict(self) -> Dict[Any, Any]: + obj: Dict[Any, Any] = {} obj.update(self.lv_api) obj['tags'] = self.tags obj['name'] = self.name @@ -869,17 +881,17 @@ class Volume: report[type_uuid] = self.tags['ceph.{}'.format(type_uuid)] return report - def _format_tag_args(self, op, tags): + def _format_tag_args(self, op: str, tags: Dict[str, Any]) -> List[str]: tag_args = ['{}={}'.format(k, v) for k, v in tags.items()] # weird but efficient way of ziping two lists and getting a flat list return list(sum(zip(repeat(op), tag_args), ())) - def clear_tags(self, keys=None): + def clear_tags(self, keys: Optional[List[str]] = None) -> None: """ Removes all or passed tags from the Logical Volume. """ if not keys: - keys = self.tags.keys() + keys = list(self.tags.keys()) del_tags = {k: self.tags[k] for k in keys if k in self.tags} if not del_tags: @@ -892,7 +904,7 @@ class Volume: del self.tags[k] - def set_tags(self, tags): + def set_tags(self, tags: Dict[str, Any]) -> None: """ :param tags: A dictionary of tag names and values, like:: @@ -904,14 +916,14 @@ class Volume: At the end of all modifications, the tags are refreshed to reflect LVM's most current view. """ - self.clear_tags(tags.keys()) + self.clear_tags(list(tags.keys())) add_tag_args = self._format_tag_args('--addtag', tags) process.call(['lvchange'] + add_tag_args + [self.lv_path], run_on_host=True) for k, v in tags.items(): self.tags[k] = v - def clear_tag(self, key): + def clear_tag(self, key: str) -> None: if self.tags.get(key): current_value = self.tags[key] tag = "%s=%s" % (key, current_value) @@ -919,7 +931,7 @@ class Volume: del self.tags[key] - def set_tag(self, key, value): + def set_tag(self, key: str, value: str) -> None: """ Set the key/value pair as an LVM tag. """ @@ -935,21 +947,21 @@ class Volume: ) self.tags[key] = value - def deactivate(self): + def deactivate(self) -> None: """ Deactivate the LV by calling lvchange -an """ process.call(['lvchange', '-an', self.lv_path], run_on_host=True) -def create_lv(name_prefix, - uuid, - vg=None, - device=None, - slots=None, - extents=None, - size=None, - tags=None): +def create_lv(name_prefix: str, + uuid: str, + vg: Optional[VolumeGroup] = None, + device: Optional[str] = None, + slots: Optional[int] = None, + extents: Optional[int] = None, + size: Optional[int] = None, + tags: Optional[Dict[str, str]] = None) -> Optional[Volume]: """ Create a Logical Volume in a Volume Group. Command looks like:: @@ -1033,16 +1045,17 @@ def create_lv(name_prefix, 'db': 'ceph.db_device', 'lockbox': 'ceph.lockbox_device', # XXX might not ever need this lockbox sorcery } - path_tag = type_path_tag.get(tags.get('ceph.type')) - if path_tag: + path_tag = type_path_tag.get(tags.get('ceph.type', '')) + if path_tag and isinstance(lv, Volume): tags.update({path_tag: lv.lv_path}) - lv.set_tags(tags) + if isinstance(lv, Volume): + lv.set_tags(tags) return lv -def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'): +def create_lvs(volume_group: VolumeGroup, parts: int = 1, size: Optional[int] = None, name_prefix: str = 'ceph-lv') -> List[Optional[Volume]]: """ Create multiple Logical Volumes from a Volume Group by calculating the proper extents from ``parts`` or ``size``. A custom prefix can be used @@ -1065,9 +1078,6 @@ def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'): :param extents: The number of LVM extents to use to create the LV. Useful if looking to have accurate LV sizes (LVM rounds sizes otherwise) """ - if parts is None and size is None: - # fallback to just one part (using 100% of the vg) - parts = 1 lvs = [] tags = { "ceph.osd_id": "null", @@ -1080,12 +1090,12 @@ def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'): size = sizing['sizes'] extents = sizing['extents'] lvs.append( - create_lv(name_prefix, uuid.uuid4(), vg=volume_group, extents=extents, tags=tags) + create_lv(name_prefix, str(uuid.uuid4()), vg=volume_group, extents=extents, tags=tags) ) return lvs -def remove_lv(lv): +def remove_lv(lv: Union[str, Volume]) -> bool: """ Removes a logical volume given it's absolute path. @@ -1115,7 +1125,7 @@ def remove_lv(lv): return True -def get_lvs(fields=LV_FIELDS, filters='', tags=None): +def get_lvs(fields: str = LV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[Volume]: """ Return a list of LVs that are available on the system and match the filters and tags passed. Argument filters takes a dictionary containing @@ -1131,15 +1141,19 @@ def get_lvs(fields=LV_FIELDS, filters='', tags=None): :param tags: dictionary containng LVM tags :returns: list of class Volume object representing LVs on the system """ - filters = make_filters_lvmcmd_ready(filters, tags) - args = ['lvs'] + LV_CMD_OPTIONS + ['-S', filters, '-o', fields] + if filters is None: + filters = {} + if tags is None: + tags = {} + filters_str = make_filters_lvmcmd_ready(filters, tags) + args = ['lvs'] + LV_CMD_OPTIONS + ['-S', filters_str, '-o', fields] stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False) lvs_report = _output_parser(stdout, fields) return [Volume(**lv_report) for lv_report in lvs_report] -def get_single_lv(fields=LV_FIELDS, filters=None, tags=None): +def get_single_lv(fields: str = LV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[Volume]: """ Wrapper of get_lvs() meant to be a convenience method to avoid the phrase:: lvs = get_lvs() @@ -1156,15 +1170,15 @@ def get_single_lv(fields=LV_FIELDS, filters=None, tags=None): return lvs[0] -def get_lvs_from_osd_id(osd_id): +def get_lvs_from_osd_id(osd_id: str) -> List[Volume]: return get_lvs(tags={'ceph.osd_id': osd_id}) -def get_single_lv_from_osd_id(osd_id): +def get_single_lv_from_osd_id(osd_id: str) -> Optional[Volume]: return get_single_lv(tags={'ceph.osd_id': osd_id}) -def get_lv_by_name(name): +def get_lv_by_name(name: str) -> List[Volume]: stdout, stderr, returncode = process.call( ['lvs', '--noheadings', '-o', LV_FIELDS, '-S', 'lv_name={}'.format(name)], @@ -1175,7 +1189,7 @@ def get_lv_by_name(name): return [Volume(**lv) for lv in lvs] -def get_lvs_by_tag(lv_tag): +def get_lvs_by_tag(lv_tag: str) -> List[Volume]: stdout, stderr, returncode = process.call( ['lvs', '--noheadings', '--separator=";"', '-a', '-o', LV_FIELDS, '-S', 'lv_tags={{{}}}'.format(lv_tag)], @@ -1186,7 +1200,7 @@ def get_lvs_by_tag(lv_tag): return [Volume(**lv) for lv in lvs] -def get_device_lvs(device, name_prefix=''): +def get_device_lvs(device: str, name_prefix: str = '') -> List[Volume]: stdout, stderr, returncode = process.call( ['pvs'] + LV_CMD_OPTIONS + ['-o', LV_FIELDS, device], run_on_host=True, @@ -1196,7 +1210,7 @@ def get_device_lvs(device, name_prefix=''): return [Volume(**lv) for lv in lvs if lv['lv_name'] and lv['lv_name'].startswith(name_prefix)] -def get_lvs_from_path(devpath): +def get_lvs_from_path(devpath: str) -> List[Volume]: lvs = [] if os.path.isabs(devpath): # we have a block device @@ -1207,7 +1221,7 @@ def get_lvs_from_path(devpath): return lvs -def get_lv_by_fullname(full_name): +def get_lv_by_fullname(full_name: str) -> Optional[Volume]: """ returns LV by the specified LV's full name (formatted as vg_name/lv_name) """ -- 2.39.5