]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-volume: add type annotations to devices.lvm.batch
authorGuillaume Abrioux <gabrioux@ibm.com>
Thu, 9 Jan 2025 10:22:15 +0000 (10:22 +0000)
committerGuillaume Abrioux <gabrioux@ibm.com>
Mon, 24 Feb 2025 11:54:33 +0000 (11:54 +0000)
This commit adds the Python type annotations to `devices.lvm.batch`.

Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
(cherry picked from commit 1107f5b0e32581222c61bf808446d177ecef4c5d)

src/ceph-volume/ceph_volume/api/lvm.py
src/ceph-volume/ceph_volume/devices/lvm/batch.py

index f12905e8078f7de4e9f3a273d5b4be2c7aff21a2..8ddf7b3a39e996d5001b33e281a598d34c8fa7da 100644 (file)
@@ -836,6 +836,7 @@ class Volume:
         self.lv_name: str = ''
         self.lv_uuid: str = ''
         self.vg_name: str = ''
+        self.lv_size: str = ''
         self.lv_tags: Dict[str, Any] = {}
         for k, v in kw.items():
             setattr(self, k, v)
index c1549d8414be5dc7f05dc9ee708b97d7f810a90f..80803577bb48293d21ac4562533cd65ef5daecd5 100644 (file)
@@ -4,11 +4,12 @@ import json
 import logging
 from textwrap import dedent
 from ceph_volume import terminal, decorators
-from ceph_volume.util import disk, prompt_bool, arg_validators, templates
+from ceph_volume.util import device, disk, prompt_bool, arg_validators, templates
 from ceph_volume.util import prepare
 from . import common
 from .create import Create
 from .prepare import Prepare
+from typing import Any, Dict, List, Optional, Tuple
 
 mlogger = terminal.MultiLogger(__name__)
 logger = logging.getLogger(__name__)
@@ -18,18 +19,13 @@ device_list_template = """
   * {path: <25} {size: <10} {state}"""
 
 
-def device_formatter(devices):
-    lines = []
-    for path, details in devices:
-        lines.append(device_list_template.format(
-            path=path, size=details['human_readable_size'],
-            state='solid' if details['rotational'] == '0' else 'rotational')
-        )
-
-    return ''.join(lines)
-
-
-def ensure_disjoint_device_lists(data, db=[], wal=[]):
+def ensure_disjoint_device_lists(data: List[str],
+                                 db: Optional[List[str]] = None,
+                                 wal: Optional[List[str]] = None) -> None:
+    if db is None:
+        db = []
+    if wal is None:
+        wal = []
     # check that all device lists are disjoint with each other
     if not all([set(data).isdisjoint(set(db)),
                 set(data).isdisjoint(set(wal)),
@@ -37,7 +33,7 @@ def ensure_disjoint_device_lists(data, db=[], wal=[]):
         raise Exception('Device lists are not disjoint')
 
 
-def separate_devices_from_lvs(devices):
+def separate_devices_from_lvs(devices: List[device.Device]) -> Tuple[List[device.Device], List[device.Device]]:
     phys = []
     lvm = []
     for d in devices:
@@ -45,60 +41,7 @@ def separate_devices_from_lvs(devices):
     return phys, lvm
 
 
-def get_physical_osds(devices, args):
-    '''
-    Goes through passed physical devices and assigns OSDs
-    '''
-    data_slots = args.osds_per_device
-    if args.data_slots:
-        data_slots = max(args.data_slots, args.osds_per_device)
-    rel_data_size = args.data_allocate_fraction / data_slots
-    mlogger.debug('relative data size: {}'.format(rel_data_size))
-    ret = []
-    for dev in devices:
-        if dev.available_lvm:
-            dev_size = dev.vg_size[0]
-            abs_size = disk.Size(b=int(dev_size * rel_data_size))
-            free_size = dev.vg_free[0]
-            for _ in range(args.osds_per_device):
-                if abs_size > free_size:
-                    break
-                free_size -= abs_size.b
-                osd_id = None
-                if args.osd_ids:
-                    osd_id = args.osd_ids.pop()
-                ret.append(Batch.OSD(dev.path,
-                                     rel_data_size,
-                                     abs_size,
-                                     args.osds_per_device,
-                                     osd_id,
-                                     'dmcrypt' if args.dmcrypt else None,
-                                     dev.symlink))
-    return ret
-
-
-def get_lvm_osds(lvs, args):
-    '''
-    Goes through passed LVs and assigns planned osds
-    '''
-    ret = []
-    for lv in lvs:
-        if lv.used_by_ceph:
-            continue
-        osd_id = None
-        if args.osd_ids:
-            osd_id = args.osd_ids.pop()
-        osd = Batch.OSD("{}/{}".format(lv.vg_name, lv.lv_name),
-                        100.0,
-                        disk.Size(b=int(lv.lvs[0].lv_size)),
-                        1,
-                        osd_id,
-                        'dmcrypt' if args.dmcrypt else None)
-        ret.append(osd)
-    return ret
-
-
-def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, args):
+def get_physical_fast_allocs(devices: List[device.Device], type_: str, fast_slots_per_device: int, new_osds: int, args: argparse.Namespace) -> List[Tuple[str, float, disk.Size, int]]:
     requested_slots = getattr(args, '{}_slots'.format(type_))
     if not requested_slots or requested_slots < fast_slots_per_device:
         if requested_slots:
@@ -111,7 +54,7 @@ def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, ar
         get_size_fct = getattr(prepare, 'get_{}_size'.format(type_))
         requested_size = get_size_fct(lv_format=False)
 
-    ret = []
+    ret: List[Tuple[str, float, disk.Size, int]] = []
     vg_device_map = group_devices_by_vg(devices)
     for vg_name, vg_devices in vg_device_map.items():
         for dev in vg_devices:
@@ -153,8 +96,8 @@ def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, ar
                 ret.append((dev.path, relative_size, abs_size, requested_slots))
     return ret
 
-def group_devices_by_vg(devices):
-    result = dict()
+def group_devices_by_vg(devices: List[device.Device]) -> Dict[str, Any]:
+    result: Dict[str, Any] = dict()
     result['unused_devices'] = []
     for dev in devices:
         if len(dev.vgs) > 0:
@@ -167,7 +110,7 @@ def group_devices_by_vg(devices):
             result['unused_devices'].append(dev)
     return result
 
-def get_lvm_fast_allocs(lvs):
+def get_lvm_fast_allocs(lvs: List[device.Device]) -> List[Tuple[str, float, disk.Size, int]]:
     return [("{}/{}".format(d.vg_name, d.lv_name), 100.0,
              disk.Size(b=int(d.lvs[0].lv_size)), 1) for d in lvs if not
             d.journal_used_by_ceph]
@@ -190,7 +133,7 @@ class Batch(object):
         ceph-volume lvm batch --report [DEVICE...]
     """)
 
-    def __init__(self, argv):
+    def __init__(self, argv: List[str]) -> None:
         parser = argparse.ArgumentParser(
             prog='ceph-volume lvm batch',
             formatter_class=argparse.RawDescriptionHelpFormatter,
@@ -342,31 +285,31 @@ class Batch(object):
         for dev_list in ['', 'db_', 'wal_']:
             setattr(self, '{}usable'.format(dev_list), [])
 
-    def report(self, plan):
+    def report(self, plan: List["OSD"]) -> None:
         report = self._create_report(plan)
         print(report)
 
-    def _create_report(self, plan):
+    def _create_report(self, plan: List["OSD"]) -> str:
+        result: str = ''
         if self.args.format == 'pretty':
-            report = ''
-            report += templates.total_osds.format(total_osds=len(plan))
+            result += templates.total_osds.format(total_osds=len(plan))
 
-            report += templates.osd_component_titles
+            result += templates.osd_component_titles
             for osd in plan:
-                report += templates.osd_header
-                report += osd.report()
-            return report
+                result += templates.osd_header
+                result += osd.report()
         else:
             json_report = []
             for osd in plan:
                 json_report.append(osd.report_json())
             if self.args.format == 'json':
-                return json.dumps(json_report)
+                result = json.dumps(json_report)
             elif self.args.format == 'json-pretty':
-                return json.dumps(json_report, indent=4,
+                result = json.dumps(json_report, indent=4,
                                   sort_keys=True)
+        return result
 
-    def _check_slot_args(self):
+    def _check_slot_args(self) -> None:
         '''
         checking if -slots args are consistent with other arguments
         '''
@@ -374,7 +317,7 @@ class Batch(object):
             if self.args.data_slots < self.args.osds_per_device:
                 raise ValueError('data_slots is smaller then osds_per_device')
 
-    def _sort_rotational_disks(self):
+    def _sort_rotational_disks(self) -> None:
         '''
         Helper for legacy auto behaviour.
         Sorts drives into rotating and non-rotating, the latter being used for
@@ -393,9 +336,10 @@ class Batch(object):
         self.args.db_devices = ssd
 
     @decorators.needs_root
-    def main(self):
+    def main(self) -> None:
         if not self.args.devices:
-            return self.parser.print_help()
+            self.parser.print_help()
+            raise SystemExit(0)
 
         if (self.args.auto and not self.args.db_devices and not
             self.args.wal_devices):
@@ -411,7 +355,7 @@ class Batch(object):
 
         if self.args.report:
             self.report(plan)
-            return 0
+            return
 
         if not self.args.yes:
             self.report(plan)
@@ -422,7 +366,7 @@ class Batch(object):
 
         self._execute(plan)
 
-    def _execute(self, plan):
+    def _execute(self, plan: List["OSD"]) -> None:
         defaults = common.get_default_args()
         global_args = [
             'bluestore',
@@ -441,7 +385,7 @@ class Batch(object):
                 c = Create([], args=argparse.Namespace(**args))
                 c.create()
 
-    def get_deployment_layout(self):
+    def get_deployment_layout(self) -> List["OSD"]:
         '''
         The methods here are mostly just organization, error reporting and
         setting up of (default) args. The heavy lifting code for the deployment
@@ -500,8 +444,8 @@ class Batch(object):
                 osd.add_very_fast_device(*very_fast_allocations.pop())
         return plan
 
-    def fast_allocations(self, devices, requested_osds, new_osds, type_):
-        ret = []
+    def fast_allocations(self, devices: List[device.Device], requested_osds: int, new_osds: int, type_: str) -> List[Tuple[str, float, disk.Size, int]]:
+        ret: List[Tuple[str, float, disk.Size, int]] = []
         if not devices:
             return ret
         phys_devs, lvm_devs = separate_devices_from_lvs(devices)
@@ -540,39 +484,39 @@ class Batch(object):
                               'type_'])
 
         def __init__(self,
-                     data_path,
-                     rel_size,
-                     abs_size,
-                     slots,
-                     id_,
-                     encryption,
-                     symlink=None):
+                     data_path: str,
+                     rel_size: float,
+                     abs_size: disk.Size,
+                     slots: int,
+                     id_: Optional[str] = None,
+                     encryption: Optional[str] = None,
+                     symlink: Optional[str] = None) -> None:
             self.id_ = id_
             self.data = self.VolSpec(path=data_path,
                                 rel_size=rel_size,
                                 abs_size=abs_size,
                                 slots=slots,
                                 type_='data')
-            self.fast = None
-            self.very_fast = None
-            self.encryption = encryption
-            self.symlink = symlink
+            self.fast: Optional[Any] = None
+            self.very_fast: Optional[Any] = None
+            self.encryption: Optional[str] = encryption
+            self.symlink: Optional[str] = symlink
 
-        def add_fast_device(self, path, rel_size, abs_size, slots, type_):
+        def add_fast_device(self, path: str, rel_size: float, abs_size: disk.Size, slots: int, type_: str) -> None:
             self.fast = self.VolSpec(path=path,
-                                rel_size=rel_size,
-                                abs_size=abs_size,
-                                slots=slots,
-                                type_=type_)
+                                     rel_size=rel_size,
+                                     abs_size=abs_size,
+                                     slots=slots,
+                                     type_=type_)
 
-        def add_very_fast_device(self, path, rel_size, abs_size, slots):
+        def add_very_fast_device(self, path: str, rel_size: float, abs_size: disk.Size, slots: int) -> None:
             self.very_fast = self.VolSpec(path=path,
-                                rel_size=rel_size,
-                                abs_size=abs_size,
-                                slots=slots,
-                                type_='block_wal')
+                                          rel_size=rel_size,
+                                          abs_size=abs_size,
+                                          slots=slots,
+                                          type_='block_wal')
 
-        def _get_osd_plan(self):
+        def _get_osd_plan(self) -> Dict[str, Any]:
             plan = {
                 'data': self.data.path,
                 'data_size': self.data.abs_size,
@@ -595,12 +539,12 @@ class Batch(object):
                 plan.update({'osd_id': self.id_})
             return plan
 
-        def get_args(self, defaults):
+        def get_args(self, defaults: Dict[str, Any]) -> Dict[str, Any]:
             my_defaults = defaults.copy()
             my_defaults.update(self._get_osd_plan())
             return my_defaults
 
-        def report(self):
+        def report(self) -> str:
             report = ''
             if self.id_:
                 report += templates.osd_reused_id.format(
@@ -630,7 +574,58 @@ class Batch(object):
                     percent=self.very_fast.rel_size)
             return report
 
-        def report_json(self):
+        def report_json(self) -> Dict[str, Any]:
             # cast all values to string so that the report can be dumped in to
             # json.dumps
             return {k: str(v) for k, v in self._get_osd_plan().items()}
+
+def get_physical_osds(devices: List[device.Device], args: argparse.Namespace) -> List[Batch.OSD]:
+    '''
+    Goes through passed physical devices and assigns OSDs
+    '''
+    data_slots = args.osds_per_device
+    if args.data_slots:
+        data_slots = max(args.data_slots, args.osds_per_device)
+    rel_data_size = args.data_allocate_fraction / data_slots
+    mlogger.debug('relative data size: {}'.format(rel_data_size))
+    ret = []
+    for dev in devices:
+        if dev.available_lvm:
+            dev_size = dev.vg_size[0]
+            abs_size = disk.Size(b=int(dev_size * rel_data_size))
+            free_size = dev.vg_free[0]
+            for _ in range(args.osds_per_device):
+                if abs_size > free_size:
+                    break
+                free_size -= abs_size.b
+                osd_id = None
+                if args.osd_ids:
+                    osd_id = args.osd_ids.pop()
+                ret.append(Batch.OSD(dev.path,
+                                     rel_data_size,
+                                     abs_size,
+                                     args.osds_per_device,
+                                     osd_id,
+                                     'dmcrypt' if args.dmcrypt else None,
+                                     dev.symlink))
+    return ret
+
+def get_lvm_osds(lvs: List[device.Device], args: argparse.Namespace) -> List[Batch.OSD]:
+    '''
+    Goes through passed LVs and assigns planned osds
+    '''
+    ret = []
+    for lv in lvs:
+        if lv.used_by_ceph:
+            continue
+        osd_id = None
+        if args.osd_ids:
+            osd_id = args.osd_ids.pop()
+        osd = Batch.OSD("{}/{}".format(lv.vg_name, lv.lv_name),
+                        100.0,
+                        disk.Size(b=int(lv.lvs[0].lv_size)),
+                        1,
+                        osd_id,
+                        'dmcrypt' if args.dmcrypt else None)
+        ret.append(osd)
+    return ret
\ No newline at end of file