import logging
from textwrap import dedent
from ceph_volume import terminal, decorators
-from ceph_volume.util import disk, prompt_bool, arg_validators, templates
+from ceph_volume.util import device, disk, prompt_bool, arg_validators, templates
from ceph_volume.util import prepare
from . import common
from .create import Create
from .prepare import Prepare
+from typing import Any, Dict, List, Optional, Tuple
mlogger = terminal.MultiLogger(__name__)
logger = logging.getLogger(__name__)
* {path: <25} {size: <10} {state}"""
-def device_formatter(devices):
- lines = []
- for path, details in devices:
- lines.append(device_list_template.format(
- path=path, size=details['human_readable_size'],
- state='solid' if details['rotational'] == '0' else 'rotational')
- )
-
- return ''.join(lines)
-
-
-def ensure_disjoint_device_lists(data, db=[], wal=[]):
+def ensure_disjoint_device_lists(data: List[str],
+ db: Optional[List[str]] = None,
+ wal: Optional[List[str]] = None) -> None:
+ if db is None:
+ db = []
+ if wal is None:
+ wal = []
# check that all device lists are disjoint with each other
if not all([set(data).isdisjoint(set(db)),
set(data).isdisjoint(set(wal)),
raise Exception('Device lists are not disjoint')
-def separate_devices_from_lvs(devices):
+def separate_devices_from_lvs(devices: List[device.Device]) -> Tuple[List[device.Device], List[device.Device]]:
phys = []
lvm = []
for d in devices:
return phys, lvm
-def get_physical_osds(devices, args):
- '''
- Goes through passed physical devices and assigns OSDs
- '''
- data_slots = args.osds_per_device
- if args.data_slots:
- data_slots = max(args.data_slots, args.osds_per_device)
- rel_data_size = args.data_allocate_fraction / data_slots
- mlogger.debug('relative data size: {}'.format(rel_data_size))
- ret = []
- for dev in devices:
- if dev.available_lvm:
- dev_size = dev.vg_size[0]
- abs_size = disk.Size(b=int(dev_size * rel_data_size))
- free_size = dev.vg_free[0]
- for _ in range(args.osds_per_device):
- if abs_size > free_size:
- break
- free_size -= abs_size.b
- osd_id = None
- if args.osd_ids:
- osd_id = args.osd_ids.pop()
- ret.append(Batch.OSD(dev.path,
- rel_data_size,
- abs_size,
- args.osds_per_device,
- osd_id,
- 'dmcrypt' if args.dmcrypt else None,
- dev.symlink))
- return ret
-
-
-def get_lvm_osds(lvs, args):
- '''
- Goes through passed LVs and assigns planned osds
- '''
- ret = []
- for lv in lvs:
- if lv.used_by_ceph:
- continue
- osd_id = None
- if args.osd_ids:
- osd_id = args.osd_ids.pop()
- osd = Batch.OSD("{}/{}".format(lv.vg_name, lv.lv_name),
- 100.0,
- disk.Size(b=int(lv.lvs[0].lv_size)),
- 1,
- osd_id,
- 'dmcrypt' if args.dmcrypt else None)
- ret.append(osd)
- return ret
-
-
-def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, args):
+def get_physical_fast_allocs(devices: List[device.Device], type_: str, fast_slots_per_device: int, new_osds: int, args: argparse.Namespace) -> List[Tuple[str, float, disk.Size, int]]:
requested_slots = getattr(args, '{}_slots'.format(type_))
if not requested_slots or requested_slots < fast_slots_per_device:
if requested_slots:
get_size_fct = getattr(prepare, 'get_{}_size'.format(type_))
requested_size = get_size_fct(lv_format=False)
- ret = []
+ ret: List[Tuple[str, float, disk.Size, int]] = []
vg_device_map = group_devices_by_vg(devices)
for vg_name, vg_devices in vg_device_map.items():
for dev in vg_devices:
ret.append((dev.path, relative_size, abs_size, requested_slots))
return ret
-def group_devices_by_vg(devices):
- result = dict()
+def group_devices_by_vg(devices: List[device.Device]) -> Dict[str, Any]:
+ result: Dict[str, Any] = dict()
result['unused_devices'] = []
for dev in devices:
if len(dev.vgs) > 0:
result['unused_devices'].append(dev)
return result
-def get_lvm_fast_allocs(lvs):
+def get_lvm_fast_allocs(lvs: List[device.Device]) -> List[Tuple[str, float, disk.Size, int]]:
return [("{}/{}".format(d.vg_name, d.lv_name), 100.0,
disk.Size(b=int(d.lvs[0].lv_size)), 1) for d in lvs if not
d.journal_used_by_ceph]
ceph-volume lvm batch --report [DEVICE...]
""")
- def __init__(self, argv):
+ def __init__(self, argv: List[str]) -> None:
parser = argparse.ArgumentParser(
prog='ceph-volume lvm batch',
formatter_class=argparse.RawDescriptionHelpFormatter,
for dev_list in ['', 'db_', 'wal_']:
setattr(self, '{}usable'.format(dev_list), [])
- def report(self, plan):
+ def report(self, plan: List["OSD"]) -> None:
report = self._create_report(plan)
print(report)
- def _create_report(self, plan):
+ def _create_report(self, plan: List["OSD"]) -> str:
+ result: str = ''
if self.args.format == 'pretty':
- report = ''
- report += templates.total_osds.format(total_osds=len(plan))
+ result += templates.total_osds.format(total_osds=len(plan))
- report += templates.osd_component_titles
+ result += templates.osd_component_titles
for osd in plan:
- report += templates.osd_header
- report += osd.report()
- return report
+ result += templates.osd_header
+ result += osd.report()
else:
json_report = []
for osd in plan:
json_report.append(osd.report_json())
if self.args.format == 'json':
- return json.dumps(json_report)
+ result = json.dumps(json_report)
elif self.args.format == 'json-pretty':
- return json.dumps(json_report, indent=4,
+ result = json.dumps(json_report, indent=4,
sort_keys=True)
+ return result
- def _check_slot_args(self):
+ def _check_slot_args(self) -> None:
'''
checking if -slots args are consistent with other arguments
'''
if self.args.data_slots < self.args.osds_per_device:
raise ValueError('data_slots is smaller then osds_per_device')
- def _sort_rotational_disks(self):
+ def _sort_rotational_disks(self) -> None:
'''
Helper for legacy auto behaviour.
Sorts drives into rotating and non-rotating, the latter being used for
self.args.db_devices = ssd
@decorators.needs_root
- def main(self):
+ def main(self) -> None:
if not self.args.devices:
- return self.parser.print_help()
+ self.parser.print_help()
+ raise SystemExit(0)
if (self.args.auto and not self.args.db_devices and not
self.args.wal_devices):
if self.args.report:
self.report(plan)
- return 0
+ return
if not self.args.yes:
self.report(plan)
self._execute(plan)
- def _execute(self, plan):
+ def _execute(self, plan: List["OSD"]) -> None:
defaults = common.get_default_args()
global_args = [
'bluestore',
c = Create([], args=argparse.Namespace(**args))
c.create()
- def get_deployment_layout(self):
+ def get_deployment_layout(self) -> List["OSD"]:
'''
The methods here are mostly just organization, error reporting and
setting up of (default) args. The heavy lifting code for the deployment
osd.add_very_fast_device(*very_fast_allocations.pop())
return plan
- def fast_allocations(self, devices, requested_osds, new_osds, type_):
- ret = []
+ def fast_allocations(self, devices: List[device.Device], requested_osds: int, new_osds: int, type_: str) -> List[Tuple[str, float, disk.Size, int]]:
+ ret: List[Tuple[str, float, disk.Size, int]] = []
if not devices:
return ret
phys_devs, lvm_devs = separate_devices_from_lvs(devices)
'type_'])
def __init__(self,
- data_path,
- rel_size,
- abs_size,
- slots,
- id_,
- encryption,
- symlink=None):
+ data_path: str,
+ rel_size: float,
+ abs_size: disk.Size,
+ slots: int,
+ id_: Optional[str] = None,
+ encryption: Optional[str] = None,
+ symlink: Optional[str] = None) -> None:
self.id_ = id_
self.data = self.VolSpec(path=data_path,
rel_size=rel_size,
abs_size=abs_size,
slots=slots,
type_='data')
- self.fast = None
- self.very_fast = None
- self.encryption = encryption
- self.symlink = symlink
+ self.fast: Optional[Any] = None
+ self.very_fast: Optional[Any] = None
+ self.encryption: Optional[str] = encryption
+ self.symlink: Optional[str] = symlink
- def add_fast_device(self, path, rel_size, abs_size, slots, type_):
+ def add_fast_device(self, path: str, rel_size: float, abs_size: disk.Size, slots: int, type_: str) -> None:
self.fast = self.VolSpec(path=path,
- rel_size=rel_size,
- abs_size=abs_size,
- slots=slots,
- type_=type_)
+ rel_size=rel_size,
+ abs_size=abs_size,
+ slots=slots,
+ type_=type_)
- def add_very_fast_device(self, path, rel_size, abs_size, slots):
+ def add_very_fast_device(self, path: str, rel_size: float, abs_size: disk.Size, slots: int) -> None:
self.very_fast = self.VolSpec(path=path,
- rel_size=rel_size,
- abs_size=abs_size,
- slots=slots,
- type_='block_wal')
+ rel_size=rel_size,
+ abs_size=abs_size,
+ slots=slots,
+ type_='block_wal')
- def _get_osd_plan(self):
+ def _get_osd_plan(self) -> Dict[str, Any]:
plan = {
'data': self.data.path,
'data_size': self.data.abs_size,
plan.update({'osd_id': self.id_})
return plan
- def get_args(self, defaults):
+ def get_args(self, defaults: Dict[str, Any]) -> Dict[str, Any]:
my_defaults = defaults.copy()
my_defaults.update(self._get_osd_plan())
return my_defaults
- def report(self):
+ def report(self) -> str:
report = ''
if self.id_:
report += templates.osd_reused_id.format(
percent=self.very_fast.rel_size)
return report
- def report_json(self):
+ def report_json(self) -> Dict[str, Any]:
# cast all values to string so that the report can be dumped in to
# json.dumps
return {k: str(v) for k, v in self._get_osd_plan().items()}
+
+def get_physical_osds(devices: List[device.Device], args: argparse.Namespace) -> List[Batch.OSD]:
+ '''
+ Goes through passed physical devices and assigns OSDs
+ '''
+ data_slots = args.osds_per_device
+ if args.data_slots:
+ data_slots = max(args.data_slots, args.osds_per_device)
+ rel_data_size = args.data_allocate_fraction / data_slots
+ mlogger.debug('relative data size: {}'.format(rel_data_size))
+ ret = []
+ for dev in devices:
+ if dev.available_lvm:
+ dev_size = dev.vg_size[0]
+ abs_size = disk.Size(b=int(dev_size * rel_data_size))
+ free_size = dev.vg_free[0]
+ for _ in range(args.osds_per_device):
+ if abs_size > free_size:
+ break
+ free_size -= abs_size.b
+ osd_id = None
+ if args.osd_ids:
+ osd_id = args.osd_ids.pop()
+ ret.append(Batch.OSD(dev.path,
+ rel_data_size,
+ abs_size,
+ args.osds_per_device,
+ osd_id,
+ 'dmcrypt' if args.dmcrypt else None,
+ dev.symlink))
+ return ret
+
+def get_lvm_osds(lvs: List[device.Device], args: argparse.Namespace) -> List[Batch.OSD]:
+ '''
+ Goes through passed LVs and assigns planned osds
+ '''
+ ret = []
+ for lv in lvs:
+ if lv.used_by_ceph:
+ continue
+ osd_id = None
+ if args.osd_ids:
+ osd_id = args.osd_ids.pop()
+ osd = Batch.OSD("{}/{}".format(lv.vg_name, lv.lv_name),
+ 100.0,
+ disk.Size(b=int(lv.lvs[0].lv_size)),
+ 1,
+ osd_id,
+ 'dmcrypt' if args.dmcrypt else None)
+ ret.append(osd)
+ return ret
\ No newline at end of file