From: Guillaume Abrioux Date: Thu, 9 Jan 2025 10:22:15 +0000 (+0000) Subject: ceph-volume: add type annotations to devices.lvm.batch X-Git-Tag: testing/wip-hyelloji-testing-20250127.141229~21^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1107f5b0e32581222c61bf808446d177ecef4c5d;p=ceph-ci.git ceph-volume: add type annotations to devices.lvm.batch This commit adds the Python type annotations to `devices.lvm.batch`. Signed-off-by: Guillaume Abrioux --- diff --git a/src/ceph-volume/ceph_volume/api/lvm.py b/src/ceph-volume/ceph_volume/api/lvm.py index f12905e8078..8ddf7b3a39e 100644 --- a/src/ceph-volume/ceph_volume/api/lvm.py +++ b/src/ceph-volume/ceph_volume/api/lvm.py @@ -836,6 +836,7 @@ class Volume: self.lv_name: str = '' self.lv_uuid: str = '' self.vg_name: str = '' + self.lv_size: str = '' self.lv_tags: Dict[str, Any] = {} for k, v in kw.items(): setattr(self, k, v) diff --git a/src/ceph-volume/ceph_volume/devices/lvm/batch.py b/src/ceph-volume/ceph_volume/devices/lvm/batch.py index c1549d8414b..80803577bb4 100644 --- a/src/ceph-volume/ceph_volume/devices/lvm/batch.py +++ b/src/ceph-volume/ceph_volume/devices/lvm/batch.py @@ -4,11 +4,12 @@ import json import logging from textwrap import dedent from ceph_volume import terminal, decorators -from ceph_volume.util import disk, prompt_bool, arg_validators, templates +from ceph_volume.util import device, disk, prompt_bool, arg_validators, templates from ceph_volume.util import prepare from . import common from .create import Create from .prepare import Prepare +from typing import Any, Dict, List, Optional, Tuple mlogger = terminal.MultiLogger(__name__) logger = logging.getLogger(__name__) @@ -18,18 +19,13 @@ device_list_template = """ * {path: <25} {size: <10} {state}""" -def device_formatter(devices): - lines = [] - for path, details in devices: - lines.append(device_list_template.format( - path=path, size=details['human_readable_size'], - state='solid' if details['rotational'] == '0' else 'rotational') - ) - - return ''.join(lines) - - -def ensure_disjoint_device_lists(data, db=[], wal=[]): +def ensure_disjoint_device_lists(data: List[str], + db: Optional[List[str]] = None, + wal: Optional[List[str]] = None) -> None: + if db is None: + db = [] + if wal is None: + wal = [] # check that all device lists are disjoint with each other if not all([set(data).isdisjoint(set(db)), set(data).isdisjoint(set(wal)), @@ -37,7 +33,7 @@ def ensure_disjoint_device_lists(data, db=[], wal=[]): raise Exception('Device lists are not disjoint') -def separate_devices_from_lvs(devices): +def separate_devices_from_lvs(devices: List[device.Device]) -> Tuple[List[device.Device], List[device.Device]]: phys = [] lvm = [] for d in devices: @@ -45,60 +41,7 @@ def separate_devices_from_lvs(devices): return phys, lvm -def get_physical_osds(devices, args): - ''' - Goes through passed physical devices and assigns OSDs - ''' - data_slots = args.osds_per_device - if args.data_slots: - data_slots = max(args.data_slots, args.osds_per_device) - rel_data_size = args.data_allocate_fraction / data_slots - mlogger.debug('relative data size: {}'.format(rel_data_size)) - ret = [] - for dev in devices: - if dev.available_lvm: - dev_size = dev.vg_size[0] - abs_size = disk.Size(b=int(dev_size * rel_data_size)) - free_size = dev.vg_free[0] - for _ in range(args.osds_per_device): - if abs_size > free_size: - break - free_size -= abs_size.b - osd_id = None - if args.osd_ids: - osd_id = args.osd_ids.pop() - ret.append(Batch.OSD(dev.path, - rel_data_size, - abs_size, - args.osds_per_device, - osd_id, - 'dmcrypt' if args.dmcrypt else None, - dev.symlink)) - return ret - - -def get_lvm_osds(lvs, args): - ''' - Goes through passed LVs and assigns planned osds - ''' - ret = [] - for lv in lvs: - if lv.used_by_ceph: - continue - osd_id = None - if args.osd_ids: - osd_id = args.osd_ids.pop() - osd = Batch.OSD("{}/{}".format(lv.vg_name, lv.lv_name), - 100.0, - disk.Size(b=int(lv.lvs[0].lv_size)), - 1, - osd_id, - 'dmcrypt' if args.dmcrypt else None) - ret.append(osd) - return ret - - -def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, args): +def get_physical_fast_allocs(devices: List[device.Device], type_: str, fast_slots_per_device: int, new_osds: int, args: argparse.Namespace) -> List[Tuple[str, float, disk.Size, int]]: requested_slots = getattr(args, '{}_slots'.format(type_)) if not requested_slots or requested_slots < fast_slots_per_device: if requested_slots: @@ -111,7 +54,7 @@ def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, ar get_size_fct = getattr(prepare, 'get_{}_size'.format(type_)) requested_size = get_size_fct(lv_format=False) - ret = [] + ret: List[Tuple[str, float, disk.Size, int]] = [] vg_device_map = group_devices_by_vg(devices) for vg_name, vg_devices in vg_device_map.items(): for dev in vg_devices: @@ -153,8 +96,8 @@ def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, ar ret.append((dev.path, relative_size, abs_size, requested_slots)) return ret -def group_devices_by_vg(devices): - result = dict() +def group_devices_by_vg(devices: List[device.Device]) -> Dict[str, Any]: + result: Dict[str, Any] = dict() result['unused_devices'] = [] for dev in devices: if len(dev.vgs) > 0: @@ -167,7 +110,7 @@ def group_devices_by_vg(devices): result['unused_devices'].append(dev) return result -def get_lvm_fast_allocs(lvs): +def get_lvm_fast_allocs(lvs: List[device.Device]) -> List[Tuple[str, float, disk.Size, int]]: return [("{}/{}".format(d.vg_name, d.lv_name), 100.0, disk.Size(b=int(d.lvs[0].lv_size)), 1) for d in lvs if not d.journal_used_by_ceph] @@ -190,7 +133,7 @@ class Batch(object): ceph-volume lvm batch --report [DEVICE...] """) - def __init__(self, argv): + def __init__(self, argv: List[str]) -> None: parser = argparse.ArgumentParser( prog='ceph-volume lvm batch', formatter_class=argparse.RawDescriptionHelpFormatter, @@ -342,31 +285,31 @@ class Batch(object): for dev_list in ['', 'db_', 'wal_']: setattr(self, '{}usable'.format(dev_list), []) - def report(self, plan): + def report(self, plan: List["OSD"]) -> None: report = self._create_report(plan) print(report) - def _create_report(self, plan): + def _create_report(self, plan: List["OSD"]) -> str: + result: str = '' if self.args.format == 'pretty': - report = '' - report += templates.total_osds.format(total_osds=len(plan)) + result += templates.total_osds.format(total_osds=len(plan)) - report += templates.osd_component_titles + result += templates.osd_component_titles for osd in plan: - report += templates.osd_header - report += osd.report() - return report + result += templates.osd_header + result += osd.report() else: json_report = [] for osd in plan: json_report.append(osd.report_json()) if self.args.format == 'json': - return json.dumps(json_report) + result = json.dumps(json_report) elif self.args.format == 'json-pretty': - return json.dumps(json_report, indent=4, + result = json.dumps(json_report, indent=4, sort_keys=True) + return result - def _check_slot_args(self): + def _check_slot_args(self) -> None: ''' checking if -slots args are consistent with other arguments ''' @@ -374,7 +317,7 @@ class Batch(object): if self.args.data_slots < self.args.osds_per_device: raise ValueError('data_slots is smaller then osds_per_device') - def _sort_rotational_disks(self): + def _sort_rotational_disks(self) -> None: ''' Helper for legacy auto behaviour. Sorts drives into rotating and non-rotating, the latter being used for @@ -393,9 +336,10 @@ class Batch(object): self.args.db_devices = ssd @decorators.needs_root - def main(self): + def main(self) -> None: if not self.args.devices: - return self.parser.print_help() + self.parser.print_help() + raise SystemExit(0) if (self.args.auto and not self.args.db_devices and not self.args.wal_devices): @@ -411,7 +355,7 @@ class Batch(object): if self.args.report: self.report(plan) - return 0 + return if not self.args.yes: self.report(plan) @@ -422,7 +366,7 @@ class Batch(object): self._execute(plan) - def _execute(self, plan): + def _execute(self, plan: List["OSD"]) -> None: defaults = common.get_default_args() global_args = [ 'bluestore', @@ -441,7 +385,7 @@ class Batch(object): c = Create([], args=argparse.Namespace(**args)) c.create() - def get_deployment_layout(self): + def get_deployment_layout(self) -> List["OSD"]: ''' The methods here are mostly just organization, error reporting and setting up of (default) args. The heavy lifting code for the deployment @@ -500,8 +444,8 @@ class Batch(object): osd.add_very_fast_device(*very_fast_allocations.pop()) return plan - def fast_allocations(self, devices, requested_osds, new_osds, type_): - ret = [] + def fast_allocations(self, devices: List[device.Device], requested_osds: int, new_osds: int, type_: str) -> List[Tuple[str, float, disk.Size, int]]: + ret: List[Tuple[str, float, disk.Size, int]] = [] if not devices: return ret phys_devs, lvm_devs = separate_devices_from_lvs(devices) @@ -540,39 +484,39 @@ class Batch(object): 'type_']) def __init__(self, - data_path, - rel_size, - abs_size, - slots, - id_, - encryption, - symlink=None): + data_path: str, + rel_size: float, + abs_size: disk.Size, + slots: int, + id_: Optional[str] = None, + encryption: Optional[str] = None, + symlink: Optional[str] = None) -> None: self.id_ = id_ self.data = self.VolSpec(path=data_path, rel_size=rel_size, abs_size=abs_size, slots=slots, type_='data') - self.fast = None - self.very_fast = None - self.encryption = encryption - self.symlink = symlink + self.fast: Optional[Any] = None + self.very_fast: Optional[Any] = None + self.encryption: Optional[str] = encryption + self.symlink: Optional[str] = symlink - def add_fast_device(self, path, rel_size, abs_size, slots, type_): + def add_fast_device(self, path: str, rel_size: float, abs_size: disk.Size, slots: int, type_: str) -> None: self.fast = self.VolSpec(path=path, - rel_size=rel_size, - abs_size=abs_size, - slots=slots, - type_=type_) + rel_size=rel_size, + abs_size=abs_size, + slots=slots, + type_=type_) - def add_very_fast_device(self, path, rel_size, abs_size, slots): + def add_very_fast_device(self, path: str, rel_size: float, abs_size: disk.Size, slots: int) -> None: self.very_fast = self.VolSpec(path=path, - rel_size=rel_size, - abs_size=abs_size, - slots=slots, - type_='block_wal') + rel_size=rel_size, + abs_size=abs_size, + slots=slots, + type_='block_wal') - def _get_osd_plan(self): + def _get_osd_plan(self) -> Dict[str, Any]: plan = { 'data': self.data.path, 'data_size': self.data.abs_size, @@ -595,12 +539,12 @@ class Batch(object): plan.update({'osd_id': self.id_}) return plan - def get_args(self, defaults): + def get_args(self, defaults: Dict[str, Any]) -> Dict[str, Any]: my_defaults = defaults.copy() my_defaults.update(self._get_osd_plan()) return my_defaults - def report(self): + def report(self) -> str: report = '' if self.id_: report += templates.osd_reused_id.format( @@ -630,7 +574,58 @@ class Batch(object): percent=self.very_fast.rel_size) return report - def report_json(self): + def report_json(self) -> Dict[str, Any]: # cast all values to string so that the report can be dumped in to # json.dumps return {k: str(v) for k, v in self._get_osd_plan().items()} + +def get_physical_osds(devices: List[device.Device], args: argparse.Namespace) -> List[Batch.OSD]: + ''' + Goes through passed physical devices and assigns OSDs + ''' + data_slots = args.osds_per_device + if args.data_slots: + data_slots = max(args.data_slots, args.osds_per_device) + rel_data_size = args.data_allocate_fraction / data_slots + mlogger.debug('relative data size: {}'.format(rel_data_size)) + ret = [] + for dev in devices: + if dev.available_lvm: + dev_size = dev.vg_size[0] + abs_size = disk.Size(b=int(dev_size * rel_data_size)) + free_size = dev.vg_free[0] + for _ in range(args.osds_per_device): + if abs_size > free_size: + break + free_size -= abs_size.b + osd_id = None + if args.osd_ids: + osd_id = args.osd_ids.pop() + ret.append(Batch.OSD(dev.path, + rel_data_size, + abs_size, + args.osds_per_device, + osd_id, + 'dmcrypt' if args.dmcrypt else None, + dev.symlink)) + return ret + +def get_lvm_osds(lvs: List[device.Device], args: argparse.Namespace) -> List[Batch.OSD]: + ''' + Goes through passed LVs and assigns planned osds + ''' + ret = [] + for lv in lvs: + if lv.used_by_ceph: + continue + osd_id = None + if args.osd_ids: + osd_id = args.osd_ids.pop() + osd = Batch.OSD("{}/{}".format(lv.vg_name, lv.lv_name), + 100.0, + disk.Size(b=int(lv.lvs[0].lv_size)), + 1, + osd_id, + 'dmcrypt' if args.dmcrypt else None) + ret.append(osd) + return ret \ No newline at end of file