from math import floor
from ceph_volume import process, util, conf
from ceph_volume.exceptions import SizeAllocationError
-from typing import Any, Dict
+from typing import Any, Dict, Optional, List, Union, Set
logger = logging.getLogger(__name__)
-def convert_filters_to_str(filters):
+def convert_filters_to_str(filters: Dict[str, Any]) -> str:
"""
Convert filter args from dictionary to following format -
filters={filter_name=filter_val,...}
"""
if not filters:
- return filters
+ return ''
filter_arg = ''
for k, v in filters.items():
return filter_arg
-def convert_tags_to_str(tags):
+def convert_tags_to_str(tags: Dict[str, Any]) -> str:
"""
Convert tags from dictionary to following format -
tags={tag_name=tag_val,...}
"""
if not tags:
- return tags
+ return ''
tag_arg = 'tags={'
for k, v in tags.items():
return tag_arg
-def make_filters_lvmcmd_ready(filters, tags):
+def make_filters_lvmcmd_ready(filters: Dict[str, Any], tags: Dict[str, Any]) -> str:
"""
Convert filters (including tags) from dictionary to following format -
filter_name=filter_val...,tags={tag_name=tag_val,...}
The command will look as follows =
lvs -S filter_name=filter_val...,tags={tag_name=tag_val,...}
"""
- filters = convert_filters_to_str(filters)
- tags = convert_tags_to_str(tags)
-
- if filters and tags:
- return filters + ',' + tags
- if filters and not tags:
- return filters
- if not filters and tags:
- return tags
+ filters_str = convert_filters_to_str(filters)
+ tags_str = convert_tags_to_str(tags)
+
+ if filters_str and tags_str:
+ return filters_str + ',' + tags_str
+ if filters_str and not tags_str:
+ return filters_str
+ if not filters_str and tags_str:
+ return tags_str
else:
return ''
-def _output_parser(output, fields):
+def _output_parser(output: List[str], fields: str) -> List[Dict[str, Any]]:
"""
Newer versions of LVM allow ``--reportformat=json``, but older versions,
like the one included in Xenial do not. LVM has the ability to filter and
return report
-def _splitname_parser(line):
+def _splitname_parser(line: str) -> Dict[str, Any]:
"""
Parses the output from ``dmsetup splitname``, that should contain prefixes
(--nameprefixes) and set the separator to ";"
:returns: dictionary with stripped prefixes
"""
- parsed = {}
+ parsed: Dict[str, Any] = {}
try:
parts = line[0].split(';')
except IndexError:
return parsed
-def sizing(device_size, parts=None, size=None):
+def sizing(device_size: int, parts: Optional[int] = None, size: Optional[int] = None) -> Dict[str, Any]:
"""
Calculate proper sizing to fully utilize the volume group in the most
efficient way possible. To prevent situations where LVM might accept
if size and size > device_size:
raise SizeAllocationError(size, device_size)
- def get_percentage(parts):
+ def get_percentage(parts: int) -> int:
return int(floor(100 / float(parts)))
if parts is not None:
}
-def parse_tags(lv_tags):
+def parse_tags(lv_tags: str) -> Dict[str, Any]:
"""
Return a dictionary mapping of all the tags associated with
a Volume from the comma-separated tags coming from the LVM API
return tag_mapping
-def _vdo_parents(devices):
+def _vdo_parents(devices: List[str]) -> List[str]:
"""
It is possible we didn't get a logical volume, or a mapper path, but
a device like /dev/sda2, to resolve this, we must look at all the slaves of
return parent_devices
-def _vdo_slaves(vdo_names):
+def _vdo_slaves(vdo_names: List[str]) -> List[str]:
"""
find all the slaves associated with each vdo name (from realpath) by going
into /sys/block/<realpath>/slaves
return devices
-def _is_vdo(path):
+def _is_vdo(path: str) -> bool:
"""
A VDO device can be composed from many different devices, go through each
one of those devices and its slaves (if any) and correlate them back to
# find all the slaves associated with each vdo name (from realpath) by
# going into /sys/block/<realpath>/slaves
- devices.extend(_vdo_slaves(vdo_names))
+ devices.extend(_vdo_slaves(list(vdo_names)))
# Find all possible parents, looking into slaves that are related to VDO
devices.extend(_vdo_parents(devices))
realpath_name in devices])
-def is_vdo(path):
+def is_vdo(path: str) -> str:
"""
Detect if a path is backed by VDO, proxying the actual call to _is_vdo so
that we can prevent an exception breaking OSD creation. If an exception is
return '0'
-def dmsetup_splitname(dev):
+def dmsetup_splitname(dev: str) -> Dict[str, Any]:
"""
Run ``dmsetup splitname`` and parse the results.
return _splitname_parser(out)
-def is_ceph_device(lv):
+def is_ceph_device(lv: "Volume") -> bool:
try:
lv.tags['ceph.osd_id']
except (KeyError, AttributeError):
PV_FIELDS = 'pv_name,pv_tags,pv_uuid,vg_name,lv_uuid'
-class PVolume(object):
+class PVolume:
"""
Represents a Physical Volume from LVM, with some top-level attributes like
``pv_name`` and parsed tags as a dictionary of key/value pairs.
"""
- def __init__(self, **kw):
+ def __init__(self, **kw: Any) -> None:
+ self.pv_name: str = ''
+ self.pv_uuid: str = ''
for k, v in kw.items():
setattr(self, k, v)
self.pv_api = kw
self.name = kw['pv_name']
self.tags = parse_tags(kw['pv_tags'])
- def __str__(self):
+ def __str__(self) -> str:
return '<%s>' % self.pv_api['pv_name']
- def __repr__(self):
+ def __repr__(self) -> str:
return self.__str__()
- def set_tags(self, tags):
+ def set_tags(self, tags: Dict[str, Any]) -> None:
"""
:param tags: A dictionary of tag names and values, like::
self.set_tag(k, v)
# after setting all the tags, refresh them for the current object, use the
# pv_* identifiers to filter because those shouldn't change
- pv_object = self.get_single_pv(filter={'pv_name': self.pv_name,
- 'pv_uuid': self.pv_uuid})
+ pv_object = get_single_pv(filters={'pv_name': self.pv_name,
+ 'pv_uuid': self.pv_uuid})
if not pv_object:
raise RuntimeError('No PV was found.')
self.tags = pv_object.tags
- def set_tag(self, key, value):
+ def set_tag(self, key: str, value: str) -> None:
"""
Set the key/value pair as an LVM tag. Does not "refresh" the values of
the current object for its tags. Meant to be a "fire and forget" type
)
-def create_pv(device):
+def create_pv(device: str) -> None:
"""
Create a physical volume from a device, useful when devices need to be later mapped
to journals.
], run_on_host=True)
-def remove_pv(pv_name):
+def remove_pv(pv_name: str) -> None:
"""
Removes a physical volume using a double `-f` to prevent prompts and fully
remove anything related to LVM. This is tremendously destructive, but so is all other actions
)
-def get_pvs(fields=PV_FIELDS, filters='', tags=None):
+def get_pvs(fields: str = PV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[PVolume]:
"""
Return a list of PVs that are available on the system and match the
filters and tags passed. Argument filters takes a dictionary containing
:param tags: dictionary containng LVM tags
:returns: list of class PVolume object representing pvs on the system
"""
- filters = make_filters_lvmcmd_ready(filters, tags)
+ if filters is None:
+ filters = {}
+ if tags is None:
+ tags = {}
+ filters_str = make_filters_lvmcmd_ready(filters, tags)
args = ['pvs', '--noheadings', '--readonly', '--separator=";"', '-S',
- filters, '-o', fields]
+ filters_str, '-o', fields]
stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False)
pvs_report = _output_parser(stdout, fields)
return [PVolume(**pv_report) for pv_report in pvs_report]
-def get_single_pv(fields=PV_FIELDS, filters=None, tags=None):
+def get_single_pv(fields: str = PV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[PVolume]:
"""
Wrapper of get_pvs() meant to be a convenience method to avoid the phrase::
pvs = get_pvs()
Represents an LVM group, with some top-level attributes like ``vg_name``
"""
- def __init__(self, **kw):
+ def __init__(self, **kw: Any) -> None:
self.pv_name: str = ''
self.vg_name: str = ''
self.vg_free_count: str = ''
+ self.vg_extent_size: str = ''
+ self.vg_extent_count: str = ''
for k, v in kw.items():
setattr(self, k, v)
self.name = kw['vg_name']
raise ValueError('VolumeGroup must have a non-empty name')
self.tags = parse_tags(kw.get('vg_tags', ''))
- def __str__(self):
+ def __str__(self) -> str:
return '<%s>' % self.name
- def __repr__(self):
+ def __repr__(self) -> str:
return self.__str__()
@property
- def free(self):
+ def free(self) -> int:
"""
Return free space in VG in bytes
"""
return int(self.vg_extent_size) * int(self.vg_free_count)
@property
- def free_percent(self):
+ def free_percent(self) -> float:
"""
Return free space in VG in bytes
"""
return int(self.vg_free_count) / int(self.vg_extent_count)
@property
- def size(self):
+ def size(self) -> int:
"""
Returns VG size in bytes
"""
return int(self.vg_extent_size) * int(self.vg_extent_count)
- def sizing(self, parts=None, size=None):
+ def sizing(self, parts: Optional[int] = None, size: Optional[int] = None) -> Dict[str, Any]:
"""
Calculate proper sizing to fully utilize the volume group in the most
efficient way possible. To prevent situations where LVM might accept
A dictionary with different sizing parameters is returned, to make it
easier for others to choose what they need in order to create logical
- volumes::
+ volumes:
>>> data_vg.free
1024
extents = int(size / int(self.vg_extent_size))
disk_sizing = sizing(self.free, size=size, parts=parts)
else:
- if parts is not None:
+ if parts is None or parts == 0:
# Prevent parts being 0, falling back to 1 (100% usage)
- parts = parts or 1
+ parts = 1
size = int(self.free / parts)
extents = size * vg_free_count / self.free
disk_sizing = sizing(self.free, parts=parts)
disk_sizing['percentages'] = extent_sizing['percentages']
return disk_sizing
- def bytes_to_extents(self, size):
+ def bytes_to_extents(self, size: int) -> int:
'''
Return a how many free extents we can fit into a size in bytes. This has
some uncertainty involved. If size/extent_size is within 1% of the
'bytes) are free'.format(size, self.vg_free_count,
self.free))
- def slots_to_extents(self, slots):
+ def slots_to_extents(self, slots: int) -> int:
'''
Return how many extents fit the VG slot times
'''
return int(int(self.vg_extent_count) / slots)
-def create_vg(devices, name=None, name_prefix=None):
+def create_vg(devices: Union[str, Set, List[str]], name: Optional[str] = None, name_prefix: str = '') -> Optional[VolumeGroup]:
"""
Create a Volume Group. Command looks like::
return get_single_vg(filters={'vg_name': name})
-def extend_vg(vg, devices):
+def extend_vg(vg: VolumeGroup, devices: Union[List[str], str]) -> Optional[VolumeGroup]:
"""
Extend a Volume Group. Command looks like::
return get_single_vg(filters={'vg_name': vg.name})
-def reduce_vg(vg, devices):
+def reduce_vg(vg: VolumeGroup, devices: Union[List[str], str]) -> Optional[VolumeGroup]:
"""
Reduce a Volume Group. Command looks like::
run_on_host=True
)
- return get_single_vg(filter={'vg_name': vg.name})
+ return get_single_vg(filters={'vg_name': vg.name})
-def remove_vg(vg_name):
+def remove_vg(vg_name: str) -> None:
"""
Removes a volume group.
"""
)
-def get_vgs(fields=VG_FIELDS, filters='', tags=None):
+def get_vgs(fields: str = VG_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[VolumeGroup]:
"""
Return a list of VGs that are available on the system and match the
filters and tags passed. Argument filters takes a dictionary containing
:param tags: dictionary containng LVM tags
:returns: list of class VolumeGroup object representing vgs on the system
"""
- filters = make_filters_lvmcmd_ready(filters, tags)
- args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters, '-o', fields]
+ if filters is None:
+ filters = {}
+ if tags is None:
+ tags = {}
+ filters_str = make_filters_lvmcmd_ready(filters, tags)
+ args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters_str, '-o', fields]
stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False)
vgs_report =_output_parser(stdout, fields)
return [VolumeGroup(**vg_report) for vg_report in vgs_report]
-def get_single_vg(fields=VG_FIELDS, filters=None, tags=None):
+def get_single_vg(fields: str = VG_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[VolumeGroup]:
"""
Wrapper of get_vgs() meant to be a convenience method to avoid the phrase::
vgs = get_vgs()
return vgs[0]
-def get_device_vgs(device, name_prefix=''):
+def get_device_vgs(device: str, name_prefix: str = '') -> List[VolumeGroup]:
stdout, stderr, returncode = process.call(
['pvs'] + VG_CMD_OPTIONS + ['-o', VG_FIELDS, device],
run_on_host=True,
return [VolumeGroup(**vg) for vg in vgs if vg['vg_name'] and vg['vg_name'].startswith(name_prefix)]
-def get_all_devices_vgs(name_prefix=''):
+def get_all_devices_vgs(name_prefix: str = '') -> List[VolumeGroup]:
vg_fields = f'pv_name,{VG_FIELDS}'
cmd = ['pvs'] + VG_CMD_OPTIONS + ['-o', vg_fields]
stdout, stderr, returncode = process.call(
``lv_name`` and parsed tags as a dictionary of key/value pairs.
"""
- def __init__(self, **kw: str) -> None:
+ def __init__(self, **kw: Any) -> None:
self.lv_path: str = ''
self.lv_name: str = ''
self.lv_uuid: str = ''
def __repr__(self) -> str:
return self.__str__()
- def as_dict(self) -> Dict[str, Any]:
- obj = {}
+ def as_dict(self) -> Dict[Any, Any]:
+ obj: Dict[Any, Any] = {}
obj.update(self.lv_api)
obj['tags'] = self.tags
obj['name'] = self.name
report[type_uuid] = self.tags['ceph.{}'.format(type_uuid)]
return report
- def _format_tag_args(self, op, tags):
+ def _format_tag_args(self, op: str, tags: Dict[str, Any]) -> List[str]:
tag_args = ['{}={}'.format(k, v) for k, v in tags.items()]
# weird but efficient way of ziping two lists and getting a flat list
return list(sum(zip(repeat(op), tag_args), ()))
- def clear_tags(self, keys=None):
+ def clear_tags(self, keys: Optional[List[str]] = None) -> None:
"""
Removes all or passed tags from the Logical Volume.
"""
if not keys:
- keys = self.tags.keys()
+ keys = list(self.tags.keys())
del_tags = {k: self.tags[k] for k in keys if k in self.tags}
if not del_tags:
del self.tags[k]
- def set_tags(self, tags):
+ def set_tags(self, tags: Dict[str, Any]) -> None:
"""
:param tags: A dictionary of tag names and values, like::
At the end of all modifications, the tags are refreshed to reflect
LVM's most current view.
"""
- self.clear_tags(tags.keys())
+ self.clear_tags(list(tags.keys()))
add_tag_args = self._format_tag_args('--addtag', tags)
process.call(['lvchange'] + add_tag_args + [self.lv_path], run_on_host=True)
for k, v in tags.items():
self.tags[k] = v
- def clear_tag(self, key):
+ def clear_tag(self, key: str) -> None:
if self.tags.get(key):
current_value = self.tags[key]
tag = "%s=%s" % (key, current_value)
del self.tags[key]
- def set_tag(self, key, value):
+ def set_tag(self, key: str, value: str) -> None:
"""
Set the key/value pair as an LVM tag.
"""
)
self.tags[key] = value
- def deactivate(self):
+ def deactivate(self) -> None:
"""
Deactivate the LV by calling lvchange -an
"""
process.call(['lvchange', '-an', self.lv_path], run_on_host=True)
-def create_lv(name_prefix,
- uuid,
- vg=None,
- device=None,
- slots=None,
- extents=None,
- size=None,
- tags=None):
+def create_lv(name_prefix: str,
+ uuid: str,
+ vg: Optional[VolumeGroup] = None,
+ device: Optional[str] = None,
+ slots: Optional[int] = None,
+ extents: Optional[int] = None,
+ size: Optional[int] = None,
+ tags: Optional[Dict[str, str]] = None) -> Optional[Volume]:
"""
Create a Logical Volume in a Volume Group. Command looks like::
'db': 'ceph.db_device',
'lockbox': 'ceph.lockbox_device', # XXX might not ever need this lockbox sorcery
}
- path_tag = type_path_tag.get(tags.get('ceph.type'))
- if path_tag:
+ path_tag = type_path_tag.get(tags.get('ceph.type', ''))
+ if path_tag and isinstance(lv, Volume):
tags.update({path_tag: lv.lv_path})
- lv.set_tags(tags)
+ if isinstance(lv, Volume):
+ lv.set_tags(tags)
return lv
-def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'):
+def create_lvs(volume_group: VolumeGroup, parts: int = 1, size: Optional[int] = None, name_prefix: str = 'ceph-lv') -> List[Optional[Volume]]:
"""
Create multiple Logical Volumes from a Volume Group by calculating the
proper extents from ``parts`` or ``size``. A custom prefix can be used
:param extents: The number of LVM extents to use to create the LV. Useful if looking to have
accurate LV sizes (LVM rounds sizes otherwise)
"""
- if parts is None and size is None:
- # fallback to just one part (using 100% of the vg)
- parts = 1
lvs = []
tags = {
"ceph.osd_id": "null",
size = sizing['sizes']
extents = sizing['extents']
lvs.append(
- create_lv(name_prefix, uuid.uuid4(), vg=volume_group, extents=extents, tags=tags)
+ create_lv(name_prefix, str(uuid.uuid4()), vg=volume_group, extents=extents, tags=tags)
)
return lvs
-def remove_lv(lv):
+def remove_lv(lv: Union[str, Volume]) -> bool:
"""
Removes a logical volume given it's absolute path.
return True
-def get_lvs(fields=LV_FIELDS, filters='', tags=None):
+def get_lvs(fields: str = LV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> List[Volume]:
"""
Return a list of LVs that are available on the system and match the
filters and tags passed. Argument filters takes a dictionary containing
:param tags: dictionary containng LVM tags
:returns: list of class Volume object representing LVs on the system
"""
- filters = make_filters_lvmcmd_ready(filters, tags)
- args = ['lvs'] + LV_CMD_OPTIONS + ['-S', filters, '-o', fields]
+ if filters is None:
+ filters = {}
+ if tags is None:
+ tags = {}
+ filters_str = make_filters_lvmcmd_ready(filters, tags)
+ args = ['lvs'] + LV_CMD_OPTIONS + ['-S', filters_str, '-o', fields]
stdout, stderr, returncode = process.call(args, run_on_host=True, verbose_on_failure=False)
lvs_report = _output_parser(stdout, fields)
return [Volume(**lv_report) for lv_report in lvs_report]
-def get_single_lv(fields=LV_FIELDS, filters=None, tags=None):
+def get_single_lv(fields: str = LV_FIELDS, filters: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, Any]] = None) -> Optional[Volume]:
"""
Wrapper of get_lvs() meant to be a convenience method to avoid the phrase::
lvs = get_lvs()
return lvs[0]
-def get_lvs_from_osd_id(osd_id):
+def get_lvs_from_osd_id(osd_id: str) -> List[Volume]:
return get_lvs(tags={'ceph.osd_id': osd_id})
-def get_single_lv_from_osd_id(osd_id):
+def get_single_lv_from_osd_id(osd_id: str) -> Optional[Volume]:
return get_single_lv(tags={'ceph.osd_id': osd_id})
-def get_lv_by_name(name):
+def get_lv_by_name(name: str) -> List[Volume]:
stdout, stderr, returncode = process.call(
['lvs', '--noheadings', '-o', LV_FIELDS, '-S',
'lv_name={}'.format(name)],
return [Volume(**lv) for lv in lvs]
-def get_lvs_by_tag(lv_tag):
+def get_lvs_by_tag(lv_tag: str) -> List[Volume]:
stdout, stderr, returncode = process.call(
['lvs', '--noheadings', '--separator=";"', '-a', '-o', LV_FIELDS, '-S',
'lv_tags={{{}}}'.format(lv_tag)],
return [Volume(**lv) for lv in lvs]
-def get_device_lvs(device, name_prefix=''):
+def get_device_lvs(device: str, name_prefix: str = '') -> List[Volume]:
stdout, stderr, returncode = process.call(
['pvs'] + LV_CMD_OPTIONS + ['-o', LV_FIELDS, device],
run_on_host=True,
return [Volume(**lv) for lv in lvs if lv['lv_name'] and
lv['lv_name'].startswith(name_prefix)]
-def get_lvs_from_path(devpath):
+def get_lvs_from_path(devpath: str) -> List[Volume]:
lvs = []
if os.path.isabs(devpath):
# we have a block device
return lvs
-def get_lv_by_fullname(full_name):
+def get_lv_by_fullname(full_name: str) -> Optional[Volume]:
"""
returns LV by the specified LV's full name (formatted as vg_name/lv_name)
"""