]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-volume: add type annotation to api.lvm
authorGuillaume Abrioux <gabrioux@ibm.com>
Wed, 8 Jan 2025 08:38:11 +0000 (08:38 +0000)
committerGuillaume Abrioux <gabrioux@ibm.com>
Mon, 13 Jan 2025 12:29:51 +0000 (12:29 +0000)
This adds Python type annotations to `api.lvm`, along with all
necessary adjustments to ensure compatibility and maintain code
clarity.

Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
src/ceph-volume/ceph_volume/api/lvm.py

index c824accf3b127bed21eb65417630d65889bbd6ef..8a38c536aa0f21b29e747181cb040bfd14db75a9 100644 (file)
@@ -10,19 +10,19 @@ from itertools import repeat
 from math import floor
 from ceph_volume import process, util, conf
 from ceph_volume.exceptions import SizeAllocationError
-from typing import Any, Dict
+from typing import Any, Dict, Optional, List, Union, Set
 
 
 logger = logging.getLogger(__name__)
 
 
-def convert_filters_to_str(filters):
+def convert_filters_to_str(filters: Dict[str, Any]) -> str:
     """
     Convert filter args from dictionary to following format -
         filters={filter_name=filter_val,...}
     """
     if not filters:
-        return filters
+        return ''
 
     filter_arg = ''
     for k, v in filters.items():
@@ -33,13 +33,13 @@ def convert_filters_to_str(filters):
     return filter_arg
 
 
-def convert_tags_to_str(tags):
+def convert_tags_to_str(tags: Dict[str, Any]) -> str:
     """
     Convert tags from dictionary to following format -
         tags={tag_name=tag_val,...}
     """
     if not tags:
-        return tags
+        return ''
 
     tag_arg = 'tags={'
     for k, v in tags.items():
@@ -50,7 +50,7 @@ def convert_tags_to_str(tags):
     return tag_arg
 
 
-def make_filters_lvmcmd_ready(filters, tags):
+def make_filters_lvmcmd_ready(filters: Dict[str, Any], tags: Dict[str, Any]) -> str:
     """
     Convert filters (including tags) from dictionary to following format -
         filter_name=filter_val...,tags={tag_name=tag_val,...}
@@ -58,20 +58,20 @@ def make_filters_lvmcmd_ready(filters, tags):
     The command will look as follows =
         lvs -S filter_name=filter_val...,tags={tag_name=tag_val,...}
     """
-    filters = convert_filters_to_str(filters)
-    tags = convert_tags_to_str(tags)
-
-    if filters and tags:
-        return filters + ',' + tags
-    if filters and not tags:
-        return filters
-    if not filters and tags:
-        return tags
+    filters_str = convert_filters_to_str(filters)
+    tags_str = convert_tags_to_str(tags)
+
+    if filters_str and tags_str:
+        return filters_str + ',' + tags_str
+    if filters_str and not tags_str:
+        return filters_str
+    if not filters_str and tags_str:
+        return tags_str
     else:
         return ''
 
 
-def _output_parser(output, fields):
+def _output_parser(output: List[str], fields: str) -> List[Dict[str, Any]]:
     """
     Newer versions of LVM allow ``--reportformat=json``, but older versions,
     like the one included in Xenial do not. LVM has the ability to filter and
@@ -106,7 +106,7 @@ def _output_parser(output, fields):
     return report
 
 
-def _splitname_parser(line):
+def _splitname_parser(line: str) -> Dict[str, Any]:
     """
     Parses the output from ``dmsetup splitname``, that should contain prefixes
     (--nameprefixes) and set the separator to ";"
@@ -122,7 +122,7 @@ def _splitname_parser(line):
 
     :returns: dictionary with stripped prefixes
     """
-    parsed = {}
+    parsed: Dict[str, Any] = {}
     try:
         parts = line[0].split(';')
     except IndexError:
@@ -140,7 +140,7 @@ def _splitname_parser(line):
     return parsed
 
 
-def sizing(device_size, parts=None, size=None):
+def sizing(device_size: int, parts: Optional[int] = None, size: Optional[int] = None) -> Dict[str, Any]:
     """
     Calculate proper sizing to fully utilize the volume group in the most
     efficient way possible. To prevent situations where LVM might accept
@@ -164,7 +164,7 @@ def sizing(device_size, parts=None, size=None):
     if size and size > device_size:
         raise SizeAllocationError(size, device_size)
 
-    def get_percentage(parts):
+    def get_percentage(parts: int) -> int:
         return int(floor(100 / float(parts)))
 
     if parts is not None:
@@ -185,7 +185,7 @@ def sizing(device_size, parts=None, size=None):
     }
 
 
-def parse_tags(lv_tags):
+def parse_tags(lv_tags: str) -> Dict[str, Any]:
     """
     Return a dictionary mapping of all the tags associated with
     a Volume from the comma-separated tags coming from the LVM API
@@ -214,7 +214,7 @@ def parse_tags(lv_tags):
     return tag_mapping
 
 
-def _vdo_parents(devices):
+def _vdo_parents(devices: List[str]) -> List[str]:
     """
     It is possible we didn't get a logical volume, or a mapper path, but
     a device like /dev/sda2, to resolve this, we must look at all the slaves of
@@ -230,7 +230,7 @@ def _vdo_parents(devices):
     return parent_devices
 
 
-def _vdo_slaves(vdo_names):
+def _vdo_slaves(vdo_names: List[str]) -> List[str]:
     """
     find all the slaves associated with each vdo name (from realpath) by going
     into /sys/block/<realpath>/slaves
@@ -255,7 +255,7 @@ def _vdo_slaves(vdo_names):
     return devices
 
 
-def _is_vdo(path):
+def _is_vdo(path: str) -> bool:
     """
     A VDO device can be composed from many different devices, go through each
     one of those devices and its slaves (if any) and correlate them back to
@@ -279,7 +279,7 @@ def _is_vdo(path):
 
     # find all the slaves associated with each vdo name (from realpath) by
     # going into /sys/block/<realpath>/slaves
-    devices.extend(_vdo_slaves(vdo_names))
+    devices.extend(_vdo_slaves(list(vdo_names)))
 
     # Find all possible parents, looking into slaves that are related to VDO
     devices.extend(_vdo_parents(devices))
@@ -290,7 +290,7 @@ def _is_vdo(path):
         realpath_name in devices])
 
 
-def is_vdo(path):
+def is_vdo(path: str) -> str:
     """
     Detect if a path is backed by VDO, proxying the actual call to _is_vdo so
     that we can prevent an exception breaking OSD creation. If an exception is
@@ -306,7 +306,7 @@ def is_vdo(path):
         return '0'
 
 
-def dmsetup_splitname(dev):
+def dmsetup_splitname(dev: str) -> Dict[str, Any]:
     """
     Run ``dmsetup splitname`` and parse the results.
 
@@ -322,7 +322,7 @@ def dmsetup_splitname(dev):
     return _splitname_parser(out)
 
 
-def is_ceph_device(lv):
+def is_ceph_device(lv: "Volume") -> bool:
     try:
         lv.tags['ceph.osd_id']
     except (KeyError, AttributeError):
@@ -343,26 +343,28 @@ def is_ceph_device(lv):
 
 PV_FIELDS = 'pv_name,pv_tags,pv_uuid,vg_name,lv_uuid'
 
-class PVolume(object):
+class PVolume:
     """
     Represents a Physical Volume from LVM, with some top-level attributes like
     ``pv_name`` and parsed tags as a dictionary of key/value pairs.
     """
 
-    def __init__(self, **kw):
+    def __init__(self, **kw: Any) -> None:
+        self.pv_name: str = ''
+        self.pv_uuid: str = ''
         for k, v in kw.items():
             setattr(self, k, v)
         self.pv_api = kw
         self.name = kw['pv_name']
         self.tags = parse_tags(kw['pv_tags'])
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '<%s>' % self.pv_api['pv_name']
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return self.__str__()
 
-    def set_tags(self, tags):
+    def set_tags(self, tags: Dict[str, Any]) -> None:
         """
         :param tags: A dictionary of tag names and values, like::
 
@@ -378,15 +380,15 @@ class PVolume(object):
             self.set_tag(k, v)
         # after setting all the tags, refresh them for the current object, use the
         # pv_* identifiers to filter because those shouldn't change
-        pv_object = self.get_single_pv(filter={'pv_name': self.pv_name,
-                                               'pv_uuid': self.pv_uuid})
+        pv_object = get_single_pv(filters={'pv_name': self.pv_name,
+                                           'pv_uuid': self.pv_uuid})
 
         if not pv_object:
             raise RuntimeError('No PV was found.')
 
         self.tags = pv_object.tags
 
-    def set_tag(self, key, value):
+    def set_tag(self, key: str, value: str) -> None:
         """
         Set the key/value pair as an LVM tag. Does not "refresh" the values of
         the current object for its tags. Meant to be a "fire and forget" type
@@ -412,7 +414,7 @@ class PVolume(object):
         )
 
 
-def create_pv(device):
+def create_pv(device: str) -> None:
     """
     Create a physical volume from a device, useful when devices need to be later mapped
     to journals.
@@ -426,7 +428,7 @@ def create_pv(device):
     ], run_on_host=True)
 
 
-def remove_pv(pv_name):
+def remove_pv(pv_name: str) -> None:
     """
     Removes a physical volume using a double `-f` to prevent prompts and fully
     remove anything related to LVM. This is tremendously destructive, but so is all other actions
@@ -454,7 +456,7 @@ def remove_pv(pv_name):
     )
 
 
-def get_pvs(fields=PV_FIELDS, filters='', tags=None):
+def get_pvs(fields: str = PV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[PVolume]:
     """
     Return a list of PVs that are available on the system and match the
     filters and tags passed. Argument filters takes a dictionary containing
@@ -470,16 +472,20 @@ def get_pvs(fields=PV_FIELDS, filters='', tags=None):
     :param tags: dictionary containng LVM tags
     :returns: list of class PVolume object representing pvs on the system
     """
-    filters = make_filters_lvmcmd_ready(filters, tags)
+    if filters is None:
+        filters = {}
+    if tags is None:
+        tags = {}
+    filters_str = make_filters_lvmcmd_ready(filters, tags)
     args = ['pvs', '--noheadings', '--readonly', '--separator=";"', '-S',
-            filters, '-o', fields]
+            filters_str, '-o', fields]
 
     stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False)
     pvs_report = _output_parser(stdout, fields)
     return [PVolume(**pv_report) for pv_report in pvs_report]
 
 
-def get_single_pv(fields=PV_FIELDS, filters=None, tags=None):
+def get_single_pv(fields: str = PV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[PVolume]:
     """
     Wrapper of get_pvs() meant to be a convenience method to avoid the phrase::
         pvs = get_pvs()
@@ -511,10 +517,12 @@ class VolumeGroup(object):
     Represents an LVM group, with some top-level attributes like ``vg_name``
     """
 
-    def __init__(self, **kw):
+    def __init__(self, **kw: Any) -> None:
         self.pv_name: str = ''
         self.vg_name: str = ''
         self.vg_free_count: str = ''
+        self.vg_extent_size: str = ''
+        self.vg_extent_count: str = ''
         for k, v in kw.items():
             setattr(self, k, v)
         self.name = kw['vg_name']
@@ -522,34 +530,34 @@ class VolumeGroup(object):
             raise ValueError('VolumeGroup must have a non-empty name')
         self.tags = parse_tags(kw.get('vg_tags', ''))
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '<%s>' % self.name
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return self.__str__()
 
     @property
-    def free(self):
+    def free(self) -> int:
         """
         Return free space in VG in bytes
         """
         return int(self.vg_extent_size) * int(self.vg_free_count)
 
     @property
-    def free_percent(self):
+    def free_percent(self) -> float:
         """
         Return free space in VG in bytes
         """
         return int(self.vg_free_count) / int(self.vg_extent_count)
 
     @property
-    def size(self):
+    def size(self) -> int:
         """
         Returns VG size in bytes
         """
         return int(self.vg_extent_size) * int(self.vg_extent_count)
 
-    def sizing(self, parts=None, size=None):
+    def sizing(self, parts: Optional[int] = None, size: Optional[int] = None) -> Dict[str, Any]:
         """
         Calculate proper sizing to fully utilize the volume group in the most
         efficient way possible. To prevent situations where LVM might accept
@@ -559,7 +567,7 @@ class VolumeGroup(object):
 
         A dictionary with different sizing parameters is returned, to make it
         easier for others to choose what they need in order to create logical
-        volumes::
+        volumes:
 
         >>> data_vg.free
         1024
@@ -590,9 +598,9 @@ class VolumeGroup(object):
             extents = int(size / int(self.vg_extent_size))
             disk_sizing = sizing(self.free, size=size, parts=parts)
         else:
-            if parts is not None:
+            if parts is None or parts == 0:
                 # Prevent parts being 0, falling back to 1 (100% usage)
-                parts = parts or 1
+                parts = 1
             size = int(self.free / parts)
             extents = size * vg_free_count / self.free
             disk_sizing = sizing(self.free, parts=parts)
@@ -603,7 +611,7 @@ class VolumeGroup(object):
         disk_sizing['percentages'] = extent_sizing['percentages']
         return disk_sizing
 
-    def bytes_to_extents(self, size):
+    def bytes_to_extents(self, size: int) -> int:
         '''
         Return a how many free extents we can fit into a size in bytes. This has
         some uncertainty involved. If size/extent_size is within 1% of the
@@ -631,14 +639,14 @@ class VolumeGroup(object):
                            'bytes) are free'.format(size, self.vg_free_count,
                                                     self.free))
 
-    def slots_to_extents(self, slots):
+    def slots_to_extents(self, slots: int) -> int:
         '''
         Return how many extents fit the VG slot times
         '''
         return int(int(self.vg_extent_count) / slots)
 
 
-def create_vg(devices, name=None, name_prefix=None):
+def create_vg(devices: Union[str, Set, List[str]], name: Optional[str] = None, name_prefix: str = '') -> Optional[VolumeGroup]:
     """
     Create a Volume Group. Command looks like::
 
@@ -671,7 +679,7 @@ def create_vg(devices, name=None, name_prefix=None):
     return get_single_vg(filters={'vg_name': name})
 
 
-def extend_vg(vg, devices):
+def extend_vg(vg: VolumeGroup, devices: Union[List[str], str]) -> Optional[VolumeGroup]:
     """
     Extend a Volume Group. Command looks like::
 
@@ -696,7 +704,7 @@ def extend_vg(vg, devices):
     return get_single_vg(filters={'vg_name': vg.name})
 
 
-def reduce_vg(vg, devices):
+def reduce_vg(vg: VolumeGroup, devices: Union[List[str], str]) -> Optional[VolumeGroup]:
     """
     Reduce a Volume Group. Command looks like::
 
@@ -716,10 +724,10 @@ def reduce_vg(vg, devices):
         run_on_host=True
     )
 
-    return get_single_vg(filter={'vg_name': vg.name})
+    return get_single_vg(filters={'vg_name': vg.name})
 
 
-def remove_vg(vg_name):
+def remove_vg(vg_name: str) -> None:
     """
     Removes a volume group.
     """
@@ -739,7 +747,7 @@ def remove_vg(vg_name):
     )
 
 
-def get_vgs(fields=VG_FIELDS, filters='', tags=None):
+def get_vgs(fields: str = VG_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[VolumeGroup]:
     """
     Return a list of VGs that are available on the system and match the
     filters and tags passed. Argument filters takes a dictionary containing
@@ -755,15 +763,19 @@ def get_vgs(fields=VG_FIELDS, filters='', tags=None):
     :param tags: dictionary containng LVM tags
     :returns: list of class VolumeGroup object representing vgs on the system
     """
-    filters = make_filters_lvmcmd_ready(filters, tags)
-    args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters, '-o', fields]
+    if filters is None:
+        filters = {}
+    if tags is None:
+        tags = {}
+    filters_str = make_filters_lvmcmd_ready(filters, tags)
+    args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters_str, '-o', fields]
 
     stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False)
     vgs_report =_output_parser(stdout, fields)
     return [VolumeGroup(**vg_report) for vg_report in vgs_report]
 
 
-def get_single_vg(fields=VG_FIELDS, filters=None, tags=None):
+def get_single_vg(fields: str = VG_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[VolumeGroup]:
     """
     Wrapper of get_vgs() meant to be a convenience method to avoid the phrase::
         vgs = get_vgs()
@@ -780,7 +792,7 @@ def get_single_vg(fields=VG_FIELDS, filters=None, tags=None):
     return vgs[0]
 
 
-def get_device_vgs(device, name_prefix=''):
+def get_device_vgs(device: str, name_prefix: str = '') -> List[VolumeGroup]:
     stdout, stderr, returncode = process.call(
         ['pvs'] + VG_CMD_OPTIONS + ['-o', VG_FIELDS, device],
         run_on_host=True,
@@ -790,7 +802,7 @@ def get_device_vgs(device, name_prefix=''):
     return [VolumeGroup(**vg) for vg in vgs if vg['vg_name'] and vg['vg_name'].startswith(name_prefix)]
 
 
-def get_all_devices_vgs(name_prefix=''):
+def get_all_devices_vgs(name_prefix: str = '') -> List[VolumeGroup]:
     vg_fields = f'pv_name,{VG_FIELDS}'
     cmd = ['pvs'] + VG_CMD_OPTIONS + ['-o', vg_fields]
     stdout, stderr, returncode = process.call(
@@ -818,7 +830,7 @@ class Volume:
     ``lv_name`` and parsed tags as a dictionary of key/value pairs.
     """
 
-    def __init__(self, **kw: str) -> None:
+    def __init__(self, **kw: Any) -> None:
         self.lv_path: str = ''
         self.lv_name: str = ''
         self.lv_uuid: str = ''
@@ -839,8 +851,8 @@ class Volume:
     def __repr__(self) -> str:
         return self.__str__()
 
-    def as_dict(self) -> Dict[str, Any]:
-        obj = {}
+    def as_dict(self) -> Dict[Any, Any]:
+        obj: Dict[Any, Any] = {}
         obj.update(self.lv_api)
         obj['tags'] = self.tags
         obj['name'] = self.name
@@ -869,17 +881,17 @@ class Volume:
             report[type_uuid] = self.tags['ceph.{}'.format(type_uuid)]
             return report
 
-    def _format_tag_args(self, op, tags):
+    def _format_tag_args(self, op: str, tags: Dict[str, Any]) -> List[str]:
         tag_args = ['{}={}'.format(k, v) for k, v in tags.items()]
         # weird but efficient way of ziping two lists and getting a flat list
         return list(sum(zip(repeat(op), tag_args), ()))
 
-    def clear_tags(self, keys=None):
+    def clear_tags(self, keys: Optional[List[str]] = None) -> None:
         """
         Removes all or passed tags from the Logical Volume.
         """
         if not keys:
-            keys = self.tags.keys()
+            keys = list(self.tags.keys())
 
         del_tags = {k: self.tags[k] for k in keys if k in self.tags}
         if not del_tags:
@@ -892,7 +904,7 @@ class Volume:
             del self.tags[k]
 
 
-    def set_tags(self, tags):
+    def set_tags(self, tags: Dict[str, Any]) -> None:
         """
         :param tags: A dictionary of tag names and values, like::
 
@@ -904,14 +916,14 @@ class Volume:
         At the end of all modifications, the tags are refreshed to reflect
         LVM's most current view.
         """
-        self.clear_tags(tags.keys())
+        self.clear_tags(list(tags.keys()))
         add_tag_args = self._format_tag_args('--addtag', tags)
         process.call(['lvchange'] + add_tag_args + [self.lv_path], run_on_host=True)
         for k, v in tags.items():
             self.tags[k] = v
 
 
-    def clear_tag(self, key):
+    def clear_tag(self, key: str) -> None:
         if self.tags.get(key):
             current_value = self.tags[key]
             tag = "%s=%s" % (key, current_value)
@@ -919,7 +931,7 @@ class Volume:
             del self.tags[key]
 
 
-    def set_tag(self, key, value):
+    def set_tag(self, key: str, value: str) -> None:
         """
         Set the key/value pair as an LVM tag.
         """
@@ -935,21 +947,21 @@ class Volume:
         )
         self.tags[key] = value
 
-    def deactivate(self):
+    def deactivate(self) -> None:
         """
         Deactivate the LV by calling lvchange -an
         """
         process.call(['lvchange', '-an', self.lv_path], run_on_host=True)
 
 
-def create_lv(name_prefix,
-              uuid,
-              vg=None,
-              device=None,
-              slots=None,
-              extents=None,
-              size=None,
-              tags=None):
+def create_lv(name_prefix: str,
+              uuid: str,
+              vg: Optional[VolumeGroup] = None,
+              device: Optional[str] = None,
+              slots: Optional[int] = None,
+              extents: Optional[int] = None,
+              size: Optional[int] = None,
+              tags: Optional[Dict[str, str]] = None) -> Optional[Volume]:
     """
     Create a Logical Volume in a Volume Group. Command looks like::
 
@@ -1033,16 +1045,17 @@ def create_lv(name_prefix,
         'db': 'ceph.db_device',
         'lockbox': 'ceph.lockbox_device',  # XXX might not ever need this lockbox sorcery
     }
-    path_tag = type_path_tag.get(tags.get('ceph.type'))
-    if path_tag:
+    path_tag = type_path_tag.get(tags.get('ceph.type', ''))
+    if path_tag and isinstance(lv, Volume):
         tags.update({path_tag: lv.lv_path})
 
-    lv.set_tags(tags)
+    if isinstance(lv, Volume):
+        lv.set_tags(tags)
 
     return lv
 
 
-def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'):
+def create_lvs(volume_group: VolumeGroup, parts: int = 1, size: Optional[int] = None, name_prefix: str = 'ceph-lv') -> List[Optional[Volume]]:
     """
     Create multiple Logical Volumes from a Volume Group by calculating the
     proper extents from ``parts`` or ``size``. A custom prefix can be used
@@ -1065,9 +1078,6 @@ def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'):
     :param extents: The number of LVM extents to use to create the LV. Useful if looking to have
     accurate LV sizes (LVM rounds sizes otherwise)
     """
-    if parts is None and size is None:
-        # fallback to just one part (using 100% of the vg)
-        parts = 1
     lvs = []
     tags = {
         "ceph.osd_id": "null",
@@ -1080,12 +1090,12 @@ def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'):
         size = sizing['sizes']
         extents = sizing['extents']
         lvs.append(
-            create_lv(name_prefix, uuid.uuid4(), vg=volume_group, extents=extents, tags=tags)
+            create_lv(name_prefix, str(uuid.uuid4()), vg=volume_group, extents=extents, tags=tags)
         )
     return lvs
 
 
-def remove_lv(lv):
+def remove_lv(lv: Union[str, Volume]) -> bool:
     """
     Removes a logical volume given it's absolute path.
 
@@ -1115,7 +1125,7 @@ def remove_lv(lv):
     return True
 
 
-def get_lvs(fields=LV_FIELDS, filters='', tags=None):
+def get_lvs(fields: str = LV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[Volume]:
     """
     Return a list of LVs that are available on the system and match the
     filters and tags passed. Argument filters takes a dictionary containing
@@ -1131,15 +1141,19 @@ def get_lvs(fields=LV_FIELDS, filters='', tags=None):
     :param tags: dictionary containng LVM tags
     :returns: list of class Volume object representing LVs on the system
     """
-    filters = make_filters_lvmcmd_ready(filters, tags)
-    args = ['lvs'] + LV_CMD_OPTIONS + ['-S', filters, '-o', fields]
+    if filters is None:
+        filters = {}
+    if tags is None:
+        tags = {}
+    filters_str = make_filters_lvmcmd_ready(filters, tags)
+    args = ['lvs'] + LV_CMD_OPTIONS + ['-S', filters_str, '-o', fields]
 
     stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False)
     lvs_report = _output_parser(stdout, fields)
     return [Volume(**lv_report) for lv_report in lvs_report]
 
 
-def get_single_lv(fields=LV_FIELDS, filters=None, tags=None):
+def get_single_lv(fields: str = LV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[Volume]:
     """
     Wrapper of get_lvs() meant to be a convenience method to avoid the phrase::
         lvs = get_lvs()
@@ -1156,15 +1170,15 @@ def get_single_lv(fields=LV_FIELDS, filters=None, tags=None):
     return lvs[0]
 
 
-def get_lvs_from_osd_id(osd_id):
+def get_lvs_from_osd_id(osd_id: str) -> List[Volume]:
     return get_lvs(tags={'ceph.osd_id': osd_id})
 
 
-def get_single_lv_from_osd_id(osd_id):
+def get_single_lv_from_osd_id(osd_id: str) -> Optional[Volume]:
     return get_single_lv(tags={'ceph.osd_id': osd_id})
 
 
-def get_lv_by_name(name):
+def get_lv_by_name(name: str) -> List[Volume]:
     stdout, stderr, returncode = process.call(
         ['lvs', '--noheadings', '-o', LV_FIELDS, '-S',
          'lv_name={}'.format(name)],
@@ -1175,7 +1189,7 @@ def get_lv_by_name(name):
     return [Volume(**lv) for lv in lvs]
 
 
-def get_lvs_by_tag(lv_tag):
+def get_lvs_by_tag(lv_tag: str) -> List[Volume]:
     stdout, stderr, returncode = process.call(
         ['lvs', '--noheadings', '--separator=";"', '-a', '-o', LV_FIELDS, '-S',
          'lv_tags={{{}}}'.format(lv_tag)],
@@ -1186,7 +1200,7 @@ def get_lvs_by_tag(lv_tag):
     return [Volume(**lv) for lv in lvs]
 
 
-def get_device_lvs(device, name_prefix=''):
+def get_device_lvs(device: str, name_prefix: str = '') -> List[Volume]:
     stdout, stderr, returncode = process.call(
         ['pvs'] + LV_CMD_OPTIONS + ['-o', LV_FIELDS, device],
         run_on_host=True,
@@ -1196,7 +1210,7 @@ def get_device_lvs(device, name_prefix=''):
     return [Volume(**lv) for lv in lvs if lv['lv_name'] and
             lv['lv_name'].startswith(name_prefix)]
 
-def get_lvs_from_path(devpath):
+def get_lvs_from_path(devpath: str) -> List[Volume]:
     lvs = []
     if os.path.isabs(devpath):
         # we have a block device
@@ -1207,7 +1221,7 @@ def get_lvs_from_path(devpath):
 
     return lvs
 
-def get_lv_by_fullname(full_name):
+def get_lv_by_fullname(full_name: str) -> Optional[Volume]:
     """
     returns LV by the specified LV's full name (formatted as vg_name/lv_name)
     """