--- /dev/null
+# -*- coding: utf-8 -*-
+
+import pytest
+from ceph_volume.util.device import Devices
+from ceph_volume import sys_info
+
+@pytest.fixture
+def device_report_keys():
+ report = Devices().json_report()[0]
+ return list(report.keys())
+
+@pytest.fixture
+def device_sys_api_keys():
+ report = Devices().json_report()[0]
+ return list(report['sys_api'].keys())
+
+
+class TestInventory(object):
+
+ # populate sys_info with something; creating a Device instance will use
+ # this data
+ sys_info.devices = {
+ # example output of disk.get_devices()
+ '/dev/sdb': {'human_readable_size': '1.82 TB',
+ 'locked': 0,
+ 'model': 'PERC H700',
+ 'nr_requests': '128',
+ 'partitions': {},
+ 'path': '/dev/sdb',
+ 'removable': '0',
+ 'rev': '2.10',
+ 'ro': '0',
+ 'rotational': '1',
+ 'sas_address': '',
+ 'sas_device_handle': '',
+ 'scheduler_mode': 'cfq',
+ 'sectors': 0,
+ 'sectorsize': '512',
+ 'size': 1999844147200.0,
+ 'support_discard': '',
+ 'vendor': 'DELL'}
+ }
+
+ expected_keys = [
+ 'path',
+ 'rejected_reasons',
+ 'sys_api',
+ 'valid',
+ 'lvs',
+ ]
+
+ expected_sys_api_keys = [
+ 'human_readable_size',
+ 'locked',
+ 'model',
+ 'nr_requests',
+ 'partitions',
+ 'path',
+ 'removable',
+ 'rev',
+ 'ro',
+ 'rotational',
+ 'sas_address',
+ 'sas_device_handle',
+ 'scheduler_mode',
+ 'sectors',
+ 'sectorsize',
+ 'size',
+ 'support_discard',
+ 'vendor',
+ ]
+
+ def test_json_inventory_keys_unexpected(self, device_report_keys):
+ for k in device_report_keys:
+ assert k in self.expected_keys, "unexpected key {} in report".format(k)
+
+ def test_json_inventory_keys_missing(self, device_report_keys):
+ for k in self.expected_keys:
+ assert k in device_report_keys, "expected key {} in report".format(k)
+
+ def test_sys_api_keys_unexpected(self, device_sys_api_keys):
+ for k in device_sys_api_keys:
+ assert k in self.expected_sys_api_keys, "unexpected key {} in sys_api field".format(k)
+
+ def test_sys_api_keys_missing(self, device_sys_api_keys):
+ for k in self.expected_sys_api_keys:
+ assert k in device_sys_api_keys, "expected key {} in sys_api field".format(k)
+
disk = device.Device("/dev/sda")
assert not disk.used_by_ceph
+ disk1 = device.Device("/dev/sda")
+ disk2 = device.Device("/dev/sdb")
+ disk2._valid = False
+ disk3 = device.Device("/dev/sdc")
+ disk4 = device.Device("/dev/sdd")
+ disk4._valid = False
+
+ @pytest.mark.parametrize("diska, diskb", [
+ pytest.param(disk1, disk2, id="(_, valid) < (_, invalid)"),
+ pytest.param(disk1, disk3, id="(sda, valid) < (sdc, valid)"),
+ pytest.param(disk3, disk2, id="(sdc, valid) < (sdb, invalid)"),
+ pytest.param(disk2, disk4, id="(sdb, invalid) < (sdd, invalid)"),
+ ])
+ def test_ordering(self, diska, diskb):
+ assert diska < diskb and diskb > diska
ceph_partlabels = [
'ceph data', 'ceph journal', 'ceph block',
+# -*- coding: utf-8 -*-
+
import os
+from functools import total_ordering
from ceph_volume import sys_info
from ceph_volume.api import lvm
from ceph_volume.util import disk
+report_template = """
+{dev:<25} {size:<12} {rot!s:<7} {valid!s:<7} {model}"""
+
+
+class Devices(object):
+ """
+ A container for Device instances with reporting
+ """
+
+ def __init__(self, devices=None):
+ if not sys_info.devices:
+ sys_info.devices = disk.get_devices()
+ self.devices = [Device(k) for k in
+ sys_info.devices.keys()]
+
+ def pretty_report(self, all=True):
+ output = [
+ report_template.format(
+ dev='Device Path',
+ size='Size',
+ rot='rotates',
+ model='Model name',
+ valid='valid',
+ )]
+ for device in sorted(self.devices):
+ output.append(device.report())
+ return ''.join(output)
+
+ def json_report(self):
+ output = []
+ for device in sorted(self.devices):
+ output.append(device.json_report())
+ return output
+@total_ordering
class Device(object):
+ pretty_template = """
+ {attr:<25} {value}"""
+
+ report_fields = [
+ '_rejected_reasons',
+ '_valid',
+ 'path',
+ 'sys_api',
+ ]
+ pretty_report_sys_fields = [
+ 'human_readable_size',
+ 'model',
+ 'removable',
+ 'ro',
+ 'rotational',
+ 'sas_address',
+ 'scheduler_mode',
+ 'vendor',
+ ]
+
def __init__(self, path):
self.path = path
# LVs can have a vg/lv path, while disks will have /dev/sda
self._parse()
self.is_valid
+ def __lt__(self, other):
+ '''
+ Implementing this method and __eq__ allows the @total_ordering
+ decorator to turn the Device class into a totally ordered type.
+ This can slower then implementing all comparison operations.
+ This sorting should put valid devices before invalid devices and sort
+ on the path otherwise (str sorting).
+ '''
+ if self._valid == other._valid:
+ return self.path < other.path
+ return self._valid and not other._valid
+
+ def __eq__(self, other):
+ return self.path == other.path
+
def _parse(self):
+ if not sys_info.devices:
+ sys_info.devices = disk.get_devices()
+ self.sys_api = sys_info.devices.get(self.abspath, {})
+
# start with lvm since it can use an absolute or relative path
lv = lvm.get_lv_from_argument(self.path)
if lv:
if device_type in ['part', 'disk']:
self._set_lvm_membership()
- if not sys_info.devices:
- sys_info.devices = disk.get_devices()
- self.sys_api = sys_info.devices.get(self.abspath, {})
-
self.ceph_disk = CephDiskDevice(self)
def __repr__(self):
prefix = 'Raw Device'
return '<%s: %s>' % (prefix, self.abspath)
- def _set_lvm_membership(self):
- if self._is_lvm_member is None:
- # check if there was a pv created with the
- # name of device
- pvs = lvm.PVolumes()
- pvs.filter(pv_name=self.abspath)
- if not pvs:
- self._is_lvm_member = False
- return self._is_lvm_member
- has_vgs = [pv.vg_name for pv in pvs if pv.vg_name]
- if has_vgs:
- # a pv can only be in one vg, so this should be safe
- self.vg_name = has_vgs[0]
- self._is_lvm_member = True
- self.pvs_api = pvs
- for pv in pvs:
- if pv.vg_name and pv.lv_uuid:
- lv = lvm.get_lv(vg_name=pv.vg_name, lv_uuid=pv.lv_uuid)
- if lv:
- self.lvs.append(lv)
+ def pretty_report(self):
+ def format_value(v):
+ if isinstance(v, list):
+ return ', '.join(v)
else:
- # this is contentious, if a PV is recognized by LVM but has no
- # VGs, should we consider it as part of LVM? We choose not to
- # here, because most likely, we need to use VGs from this PV.
- self._is_lvm_member = False
+ return v
+ def format_key(k):
+ return k.strip('_').replace('_', ' ')
+ output = ['\n====== Device report {} ======\n'.format(self.path)]
+ output.extend(
+ [self.pretty_template.format(
+ attr=format_key(k),
+ value=format_value(v)) for k, v in vars(self).items() if k in
+ self.report_fields and k != 'disk_api' and k != 'sys_api'] )
+ output.extend(
+ [self.pretty_template.format(
+ attr=format_key(k),
+ value=format_value(v)) for k, v in self.sys_api.items() if k in
+ self.pretty_report_sys_fields])
+ for lv in self.lvs:
+ output.append("""
+ --- Logical Volume ---""")
+ output.extend(
+ [self.pretty_template.format(
+ attr=format_key(k),
+ value=format_value(v)) for k, v in lv.report().items()])
+ return ''.join(output)
+ def report(self):
+ return report_template.format(
+ dev=self.abspath,
+ size=self.size_human,
+ rot=self.rotational,
+ valid=self.is_valid,
+ model=self.model,
+ )
+
+ def json_report(self):
+ output = {k.strip('_'): v for k, v in vars(self).items() if k in
+ self.report_fields}
+ output['lvs'] = [lv.report() for lv in self.lvs]
+ return output
+
+ def _set_lvm_membership(self):
+ if self._is_lvm_member is None:
+ # this is contentious, if a PV is recognized by LVM but has no
+ # VGs, should we consider it as part of LVM? We choose not to
+ # here, because most likely, we need to use VGs from this PV.
+ self._is_lvm_member = False
+ for path in self._get_pv_paths():
+ # check if there was a pv created with the
+ # name of device
+ pvs = lvm.PVolumes()
+ pvs.filter(pv_name=path)
+ has_vgs = [pv.vg_name for pv in pvs if pv.vg_name]
+ if has_vgs:
+ # a pv can only be in one vg, so this should be safe
+ self.vg_name = has_vgs[0]
+ self._is_lvm_member = True
+ self.pvs_api = pvs
+ for pv in pvs:
+ if pv.vg_name and pv.lv_uuid:
+ lv = lvm.get_lv(vg_name=pv.vg_name, lv_uuid=pv.lv_uuid)
+ if lv:
+ self.lvs.append(lv)
return self._is_lvm_member
+ def _get_pv_paths(self):
+ """
+ For block devices LVM can reside on the raw block device or on a
+ partition. Return a list of paths to be checked for a pv.
+ """
+ paths = [self.abspath]
+ path_dir = os.path.dirname(self.abspath)
+ for part in self.sys_api.get('partitions', {}).keys():
+ paths.append(os.path.join(path_dir, part))
+ return paths
+
@property
def exists(self):
return os.path.exists(self.abspath)
@property
def rotational(self):
- if self.sys_api['rotational'] == '1':
- return True
- return False
+ return self.sys_api['rotational'] == '1'
+
+ @property
+ def model(self):
+ return self.sys_api['model']
+
+ @property
+ def size_human(self):
+ return self.sys_api['human_readable_size']
+
+ @property
+ def size(self):
+ return self.sys_api['size']
@property
def is_lvm_member(self):