import json
from textwrap import dedent
from ceph_volume import terminal, decorators
-from ceph_volume.util import disk, prompt_bool
+from ceph_volume.util import prompt_bool
from ceph_volume.util import arg_validators
from . import strategies
_help = dedent("""
Automatically size devices ready for OSD provisioning based on default strategies.
- Detected devices:
- {detected_devices}
-
Usage:
ceph-volume lvm batch [DEVICE...]
parser = argparse.ArgumentParser(
prog='ceph-volume lvm batch',
formatter_class=argparse.RawDescriptionHelpFormatter,
- description=self.print_help(),
+ description=self._help,
)
parser.add_argument(
for dev_list in ['', 'db_', 'wal_', 'journal_']:
setattr(self, '{}usable'.format(dev_list), [])
- def get_devices(self):
- # remove devices with partitions
- devices = [(device, details) for device, details in
- disk.get_devices().items() if details.get('partitions') == {}]
- size_sort = lambda x: (x[0], x[1]['size'])
- return device_formatter(sorted(devices, key=size_sort))
-
- def print_help(self):
- return self._help.format(
- detected_devices=self.get_devices(),
- )
-
def report(self):
if self.args.format == 'pretty':
self.strategy.report_pretty(self.filtered_devices)
b = batch.Batch([])
b.main()
- def test_get_devices(self, monkeypatch):
- return_value = {
- '/dev/vdd': {
- 'removable': '0',
- 'vendor': '0x1af4',
- 'model': '',
- 'sas_address': '',
- 'sas_device_handle': '',
- 'sectors': 0,
- 'size': 21474836480.0,
- 'support_discard': '',
- 'partitions': {
- 'vdd1': {
- 'start': '2048',
- 'sectors': '41940959',
- 'sectorsize': 512,
- 'size': '20.00 GB'
- }
- },
- 'rotational': '1',
- 'scheduler_mode': 'mq-deadline',
- 'sectorsize': '512',
- 'human_readable_size': '20.00 GB',
- 'path': '/dev/vdd'
- },
- '/dev/vdf': {
- 'removable': '0',
- 'vendor': '0x1af4',
- 'model': '',
- 'sas_address': '',
- 'sas_device_handle': '',
- 'sectors': 0,
- 'size': 21474836480.0,
- 'support_discard': '',
- 'partitions': {},
- 'rotational': '1',
- 'scheduler_mode': 'mq-deadline',
- 'sectorsize': '512',
- 'human_readable_size': '20.00 GB',
- 'path': '/dev/vdf'
- }
- }
- monkeypatch.setattr('ceph_volume.devices.lvm.batch.disk.get_devices',
- lambda: return_value)
- b = batch.Batch([])
- result = b.get_devices().strip()
- assert result == '* /dev/vdf 20.00 GB rotational'
-
def test_disjoint_device_lists(self, factory):
device1 = factory(used_by_ceph=False, available=True, abspath="/dev/sda")
device2 = factory(used_by_ceph=False, available=True, abspath="/dev/sdb")