disk = device.Device("/dev/nvme0n1")
assert disk.vgs == []
- def test_vgs_is_not_empty(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
- BarPVolume = api.PVolume(vg_name='foo', lv_uuid='111', pv_name='/dev/nvme0n1', pv_uuid="0000", pv_tags={})
- pvolumes.append(BarPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
+ def test_vgs_is_not_empty(self, device_info, monkeypatch):
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
lsblk = {"TYPE": "disk"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/nvme0n1")
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member is False
- def test_pv_api(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
- FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
- pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
- data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "part"}
+ def test_existing_vg_available(self, monkeypatch, device_info):
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
+ lsblk = {"TYPE": "disk"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
device_info(devices=data, lsblk=lsblk)
- disk = device.Device("/dev/sda")
- assert disk.pvs_api
+ disk = device.Device("/dev/nvme0n1")
+ assert disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
+
+ def test_existing_vg_too_small(self, monkeypatch, device_info):
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
+ lsblk = {"TYPE": "disk"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/nvme0n1")
+ assert not disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
+
+ def test_multiple_existing_vgs(self, monkeypatch, device_info):
+ vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
+ vg_extent_size=1073741824)
+ vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
+ lsblk = {"TYPE": "disk"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/nvme0n1")
+ assert disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
@pytest.mark.parametrize("ceph_type", ["data", "block"])
def test_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch, ceph_type):
lsblk = {"TYPE": "part"}
lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
device_info(devices=data, lsblk=lsblk, lv=lv_data)
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
disk = device.Device("/dev/sda")
assert disk.used_by_ceph
# LVs can have a vg/lv path, while disks will have /dev/sda
self.abspath = path
self.lv_api = None
+ self.vgs = []
self.lvs = []
self.vg_name = None
self.lv_name = None
- self.pvs_api = []
self.disk_api = {}
self.blkid_api = {}
self.sys_api = {}
self._exists = None
self._is_lvm_member = None
self._parse()
- self.available, self.rejected_reasons = self._check_reject_reasons()
+
+ self.available_lvm, self.rejected_reasons_lvm = self._check_lvm_reject_reasons()
+ self.available_raw, self.rejected_reasons_raw = self._check_raw_reject_reasons()
+ self.available = self.available_lvm and self.available_raw
+ self.rejected_reasons = list(set(self.rejected_reasons_lvm +
+ self.rejected_reasons_raw))
+
self.device_id = self._get_device_id()
def __lt__(self, other):
# here, because most likely, we need to use VGs from this PV.
self._is_lvm_member = False
for path in self._get_pv_paths():
- # check if there was a pv created with the
- # name of device
- pvs = lvm.PVolumes().filter(pv_name=path)
- has_vgs = [pv.vg_name for pv in pvs if pv.vg_name]
- if has_vgs:
- self.vgs = list(set(has_vgs))
+ vgs = lvm.get_device_vgs(path)
+ if vgs:
+ self.vgs.extend(vgs)
# a pv can only be in one vg, so this should be safe
- self.vg_name = has_vgs[0]
+ # FIXME: While the above assumption holds, sda1 and sda2
+ # can each host a PV and VG. I think the vg_name property is
+ # actually unused (not 100% sure) and can simply be removed
+ self.vg_name = vgs[0]
self._is_lvm_member = True
- self.pvs_api = pvs
- for pv in pvs:
- if pv.vg_name and pv.lv_uuid:
- lv = lvm.get_lv(vg_name=pv.vg_name, lv_uuid=pv.lv_uuid)
- if lv:
- self.lvs.append(lv)
- else:
- self.vgs = []
+
+ self.lvs.extend(lvm.get_device_lvs(path))
return self._is_lvm_member
def _get_pv_paths(self):
if lv.tags.get("ceph.type") in ["data", "block"]]
return any(osd_ids)
- def _check_reject_reasons(self):
- """
- This checks a number of potential reject reasons for a drive and
- returns a tuple (boolean, list). The first element denotes whether a
- drive is available or not, the second element lists reasons in case a
- drive is not available.
- """
+
+ def _check_generic_reject_reasons(self):
reasons = [
('removable', 1, 'removable'),
('ro', 1, 'read-only'),
rejected.append("Used by ceph-disk")
if self.has_bluestore_label:
rejected.append('Has BlueStore device label')
+ return rejected
+
+ def _check_lvm_reject_reasons(self):
+ rejected = self._check_generic_reject_reasons()
+ available_vgs = [vg for vg in self.vgs if vg.free >= 5368709120]
+ if self.vgs and not available_vgs:
+ rejected.append('Insufficient space (<5GB) on vgs')
+
+ return len(rejected) == 0, rejected
+
+ def _check_raw_reject_reasons(self):
+ rejected = self._check_generic_reject_reasons()
+ if len(self.vgs) > 0:
+ rejected.append('LVM detected')
return len(rejected) == 0, rejected