deepcopy(volumes))
data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "disk"}
- device_info(devices=data,lsblk=lsblk)
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.sys_api
assert "foo" in disk.sys_api
# 5GB in size
data = {"/dev/sda": {"size": "5368709120"}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(devices=data,lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.lvm_size.gb == 4
def test_lvm_size_rounds_down(self, fake_call, device_info):
# 5.5GB in size
data = {"/dev/sda": {"size": "5905580032"}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(devices=data,lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.lvm_size.gb == 4
def test_is_lv(self, fake_call, device_info):
data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
- lsblk = {"TYPE": "lvm"}
+ lsblk = {"TYPE": "lvm", "NAME": "vg-lv"}
device_info(lv=data,lsblk=lsblk)
disk = device.Device("vg/lv")
assert disk.is_lv
pv_tags={})
pvolumes = []
pvolumes.append(BarPVolume)
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(lsblk=lsblk)
monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {})
vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
vg_extent_size=1073741824)
monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/nvme0n1")
assert len(disk.vgs) == 1
def test_device_is_device(self, fake_call, device_info):
data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "device"}
+ lsblk = {"TYPE": "device", "NAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_device is True
def test_device_is_rotational(self, fake_call, device_info):
data = {"/dev/sda": {"rotational": "1"}}
- lsblk = {"TYPE": "device"}
+ lsblk = {"TYPE": "device", "NAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.rotational
def test_device_is_not_rotational(self, fake_call, device_info):
data = {"/dev/sda": {"rotational": "0"}}
- lsblk = {"TYPE": "device"}
+ lsblk = {"TYPE": "device", "NAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.rotational
def test_device_is_rotational_lsblk(self, fake_call, device_info):
data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "device", "ROTA": "1"}
+ lsblk = {"TYPE": "device", "ROTA": "1", "NAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.rotational
def test_device_is_not_rotational_lsblk(self, fake_call, device_info):
data = {"/dev/sda": {"rotational": "0"}}
- lsblk = {"TYPE": "device", "ROTA": "0"}
+ lsblk = {"TYPE": "device", "ROTA": "0", "NAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.rotational
def test_device_is_rotational_defaults_true(self, fake_call, device_info):
# rotational will default true if no info from sys_api or lsblk is found
data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "device", "foo": "bar"}
+ lsblk = {"TYPE": "device", "foo": "bar", "NAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.rotational
def test_disk_is_device(self, fake_call, device_info):
data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_device is True
def test_is_partition(self, fake_call, device_info):
data = {"/dev/sda1": {"foo": "bar"}}
- lsblk = {"TYPE": "part", "PKNAME": "sda"}
+ lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda1")
assert disk.is_partition
def test_mpath_device_is_device(self, fake_call, device_info):
data = {"/dev/foo": {"foo": "bar"}}
- lsblk = {"TYPE": "mpath"}
+ lsblk = {"TYPE": "mpath", "NAME": "foo"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/foo")
assert disk.is_device is True
def test_is_not_lvm_member(self, fake_call, device_info):
data = {"/dev/sda1": {"foo": "bar"}}
- lsblk = {"TYPE": "part", "PKNAME": "sda"}
+ lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda1")
assert not disk.is_lvm_member
def test_is_lvm_member(self, fake_call, device_info):
data = {"/dev/sda1": {"foo": "bar"}}
- lsblk = {"TYPE": "part", "PKNAME": "sda"}
+ lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda1")
assert not disk.is_lvm_member
def test_is_mapper_device(self, fake_call, device_info):
- lsblk = {"TYPE": "lvm"}
+ lsblk = {"TYPE": "lvm", "NAME": "foo"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/mapper/foo")
assert disk.is_mapper
def test_dm_is_mapper_device(self, fake_call, device_info):
- lsblk = {"TYPE": "lvm"}
+ lsblk = {"TYPE": "lvm", "NAME": "dm-4"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/dm-4")
assert disk.is_mapper
def test_is_not_mapper_device(self, fake_call, device_info):
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.is_mapper
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "lsblk_ceph_disk_member",
"disable_kernel_queries")
def test_is_ceph_disk_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert "Used by ceph-disk" in disk.rejected_reasons
@pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "lsblk_ceph_disk_member",
"disable_kernel_queries")
def test_is_ceph_disk_member_not_available_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
def test_reject_removable_device(self, fake_call, device_info):
data = {"/dev/sdb": {"removable": 1}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
device_info(devices=data,lsblk=lsblk)
disk = device.Device("/dev/sdb")
assert not disk.available
def test_reject_device_with_gpt_headers(self, fake_call, device_info):
data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
blkid= {"PTTYPE": "gpt"}
device_info(
devices=data,
def test_accept_non_removable_device(self, fake_call, device_info):
data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
device_info(devices=data,lsblk=lsblk)
disk = device.Device("/dev/sdb")
assert disk.available
def test_reject_not_acceptable_device(self, fake_call, device_info):
data = {"/dev/dm-0": {"foo": "bar"}}
- lsblk = {"TYPE": "mpath"}
+ lsblk = {"TYPE": "mpath", "NAME": "dm-0"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/dm-0")
assert not disk.available
def test_reject_readonly_device(self, fake_call, device_info):
data = {"/dev/cdrom": {"ro": 1}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "cdrom"}
device_info(devices=data,lsblk=lsblk)
disk = device.Device("/dev/cdrom")
assert not disk.available
def test_reject_smaller_than_5gb(self, fake_call, device_info):
data = {"/dev/sda": {"size": 5368709119}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(devices=data,lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.available, 'too small device is available'
def test_accept_non_readonly_device(self, fake_call, device_info):
data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(devices=data,lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.available
def test_reject_bluestore_device(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
patch_bluestore_label.return_value = True
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.available
def test_reject_device_with_oserror(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
patch_bluestore_label.side_effect = OSError('test failure')
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.available
assert "Failed to determine if device is BlueStore" in disk.rejected_reasons
- @pytest.mark.usefixtures("device_info_not_ceph_disk_member",
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "device_info_not_ceph_disk_member",
"disable_kernel_queries")
def test_is_not_ceph_disk_member_lsblk(self, fake_call, patch_bluestore_label):
disk = device.Device("/dev/sda")
vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1536,
vg_extent_size=4194304)
monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
data = {"/dev/nvme0n1": {"size": "6442450944"}}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/nvme0n1")
vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
vg_extent_size=1073741824)
monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
data = {"/dev/nvme0n1": {"size": "6442450944"}}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/nvme0n1")
vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=536,
vg_extent_size=4194304)
monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
data = {"/dev/nvme0n1": {"size": "6442450944"}}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/nvme0n1")
def test_used_by_ceph(self, fake_call, device_info,
monkeypatch, ceph_type):
data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "part", "PKNAME": "sda"}
+ lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
lv_uuid="0000", pv_tags={}, vg_name="vg")
pvolumes = []
pvolumes = []
pvolumes.append(FooPVolume)
data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "part", "PKNAME": "sda"}
+ lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
def test_get_device_id(self, fake_call, device_info):
udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
- lsblk = {"TYPE": "disk"}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(udevadm=udev,lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
class TestDeviceEncryption(object):
def test_partition_is_not_encrypted_lsblk(self, fake_call, device_info):
- lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'PKNAME': 'sda'}
+ lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'NAME': 'sda', 'PKNAME': 'sda'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
def test_partition_is_encrypted_lsblk(self, fake_call, device_info):
- lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'PKNAME': 'sda'}
+ lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'PKNAME': 'sda'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
def test_partition_is_not_encrypted_blkid(self, fake_call, device_info):
- lsblk = {'TYPE': 'part', 'PKNAME': 'sda'}
+ lsblk = {'TYPE': 'part', 'NAME': 'sda', 'PKNAME': 'sda'}
blkid = {'TYPE': 'ceph data'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
def test_partition_is_encrypted_blkid(self, fake_call, device_info):
- lsblk = {'TYPE': 'part', 'PKNAME': 'sda'}
+ lsblk = {'TYPE': 'part', 'NAME': 'sda' ,'PKNAME': 'sda'}
blkid = {'TYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
def test_mapper_is_encrypted_luks1(self, fake_call, device_info, monkeypatch):
status = {'type': 'LUKS1'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
- lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid','TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/mapper/uuid")
def test_mapper_is_encrypted_luks2(self, fake_call, device_info, monkeypatch):
status = {'type': 'LUKS2'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
- lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/mapper/uuid")
def test_mapper_is_encrypted_plain(self, fake_call, device_info, monkeypatch):
status = {'type': 'PLAIN'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
- lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/mapper/uuid")
def test_mapper_is_not_encrypted_plain(self, fake_call, device_info, monkeypatch):
monkeypatch.setattr(device, 'encryption_status', lambda x: {})
- lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is False
def test_lv_is_encrypted_blkid(self, fake_call, device_info):
- lsblk = {'TYPE': 'lvm'}
+ lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
blkid = {'TYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
def test_lv_is_not_encrypted_blkid(self, fake_call, factory, device_info):
- lsblk = {'TYPE': 'lvm'}
+ lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
blkid = {'TYPE': 'xfs'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
def test_lv_is_encrypted_lsblk(self, fake_call, device_info):
- lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
def test_lv_is_not_encrypted_lsblk(self, fake_call, factory, device_info):
- lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
def test_lv_is_encrypted_lvm_api(self, fake_call, factory, device_info):
- lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
def test_lv_is_not_encrypted_lvm_api(self, fake_call, factory, device_info):
- lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
}
def test_valid_before_invalid(self, fake_call, device_info):
- lsblk = {"TYPE": "disk"}
- device_info(devices=self.data,lsblk=lsblk)
+ lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
+ lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
+ device_info(devices=self.data,lsblk=lsblk_sda)
sda = device.Device("/dev/sda")
+ device_info(devices=self.data,lsblk=lsblk_sdb)
sdb = device.Device("/dev/sdb")
assert sda < sdb
assert sdb > sda
def test_valid_alphabetical_ordering(self, fake_call, device_info):
- lsblk = {"TYPE": "disk"}
- device_info(devices=self.data,lsblk=lsblk)
+ lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
+ lsblk_sdc = {"NAME": "sdc", "TYPE": "disk"}
+ device_info(devices=self.data,lsblk=lsblk_sda)
sda = device.Device("/dev/sda")
+ device_info(devices=self.data,lsblk=lsblk_sdc)
sdc = device.Device("/dev/sdc")
assert sda < sdc
assert sdc > sda
def test_invalid_alphabetical_ordering(self, fake_call, device_info):
- lsblk = {"TYPE": "disk"}
- device_info(devices=self.data,lsblk=lsblk)
+ lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
+ lsblk_sdd = {"NAME": "sdd", "TYPE": "disk"}
+ device_info(devices=self.data,lsblk=lsblk_sdb)
sdb = device.Device("/dev/sdb")
+ device_info(devices=self.data,lsblk=lsblk_sdd)
sdd = device.Device("/dev/sdd")
assert sdb < sdd
class TestCephDiskDevice(object):
def test_partlabel_lsblk(self, fake_call, device_info):
- lsblk = {"TYPE": "disk", "PARTLABEL": ""}
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": ""}
device_info(lsblk=lsblk)
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.partlabel == ''
def test_partlabel_blkid(self, fake_call, device_info):
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph data"}
blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"}
- device_info(blkid=blkid)
+ device_info(blkid=blkid, lsblk=lsblk)
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.partlabel == 'ceph data'
- @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "blkid_ceph_disk_member",
"disable_kernel_queries")
def test_is_member_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries")
def test_is_member_lsblk(self, fake_call, patch_bluestore_label, device_info):
- lsblk = {"TYPE": "disk", "PARTLABEL": "ceph"}
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph"}
device_info(lsblk=lsblk)
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.is_member is True
def test_unknown_type(self, fake_call, device_info):
- lsblk = {"TYPE": "disk", "PARTLABEL": "gluster"}
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "gluster"}
device_info(lsblk=lsblk)
disk = device.CephDiskDevice(device.Device("/dev/sda"))
ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
- @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "blkid_ceph_disk_member",
"disable_kernel_queries")
def test_type_blkid(self, monkeypatch, fake_call, device_info, ceph_partlabel):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
"""
def __init__(self, filter_for_batch=False, with_lsm=False):
+ lvs = lvm.get_lvs()
+ lsblk_all = disk.lsblk_all()
+ all_devices_vgs = lvm.get_all_devices_vgs()
if not sys_info.devices:
sys_info.devices = disk.get_devices()
- self.devices = [Device(k, with_lsm) for k in
- sys_info.devices.keys()]
+ self.devices = [Device(k,
+ with_lsm,
+ lvs=lvs,
+ lsblk_all=lsblk_all,
+ all_devices_vgs=all_devices_vgs) for k in
+ sys_info.devices.keys()]
if filter_for_batch:
self.devices = [d for d in self.devices if d.available_lvm_batch]
# unittests
lvs = []
- def __init__(self, path, with_lsm=False):
+ def __init__(self, path, with_lsm=False, lvs=None, lsblk_all=None, all_devices_vgs=None):
self.path = path
# LVs can have a vg/lv path, while disks will have /dev/sda
self.abspath = path
+ if not sys_info.devices:
+ sys_info.devices = disk.get_devices()
+ self.sys_api = sys_info.devices.get(self.abspath, {})
+ self.partitions = self._get_partitions()
self.lv_api = None
- self.lvs = []
+ self.lvs = [] if not lvs else lvs
+ self.lsblk_all = lsblk_all
+ self.all_devices_vgs = all_devices_vgs
self.vgs = []
self.vg_name = None
self.lv_name = None
self.disk_api = {}
- self.blkid_api = {}
- self.sys_api = {}
+ self.blkid_api = None
self._exists = None
self._is_lvm_member = None
self._parse()
def __hash__(self):
return hash(self.path)
+ def load_blkid_api(self):
+ if self.blkid_api is None:
+ self.blkid_api = disk.blkid(self.path)
+
def _parse(self):
- if not sys_info.devices:
- sys_info.devices = disk.get_devices()
- self.sys_api = sys_info.devices.get(self.abspath, {})
+ lv = None
if not self.sys_api:
# if no device was found check if we are a partition
partname = self.abspath.split('/')[-1]
self.sys_api = part
break
- # if the path is not absolute, we have 'vg/lv', let's use LV name
- # to get the LV.
- if self.path[0] == '/':
- lv = lvm.get_single_lv(filters={'lv_path': self.path})
+ if self.lvs:
+ for _lv in self.lvs:
+ # if the path is not absolute, we have 'vg/lv', let's use LV name
+ # to get the LV.
+ if self.path[0] == '/':
+ if _lv.lv_path == self.path:
+ lv = _lv
+ break
+ else:
+ vgname, lvname = self.path.split('/')
+ if _lv.lv_name == lvname and _lv.vg_name == vgname:
+ lv = _lv
+ break
else:
- vgname, lvname = self.path.split('/')
- lv = lvm.get_single_lv(filters={'lv_name': lvname,
- 'vg_name': vgname})
+ if self.path[0] == '/':
+ lv = lvm.get_single_lv(filters={'lv_path': self.path})
+ else:
+ vgname, lvname = self.path.split('/')
+ lv = lvm.get_single_lv(filters={'lv_name': lvname,
+ 'vg_name': vgname})
+
if lv:
self.lv_api = lv
self.lvs = [lv]
self.lv_name = lv.name
self.ceph_device = lvm.is_ceph_device(lv)
else:
- dev = disk.lsblk(self.path)
- self.blkid_api = disk.blkid(self.path)
+ if self.lsblk_all:
+ for dev in self.lsblk_all:
+ if dev['NAME'] == os.path.basename(self.path):
+ break
+ else:
+ dev = disk.lsblk(self.path)
self.disk_api = dev
device_type = dev.get('TYPE', '')
# always check is this is an lvm member
# VGs, should we consider it as part of LVM? We choose not to
# here, because most likely, we need to use VGs from this PV.
self._is_lvm_member = False
- for path in self._get_device_with_partitions_list():
- vgs = lvm.get_device_vgs(path)
+ device_to_check = [self.abspath]
+ device_to_check.extend(self.partitions)
+
+ # a pv can only be in one vg, so this should be safe
+ # FIXME: While the above assumption holds, sda1 and sda2
+ # can each host a PV and VG. I think the vg_name property is
+ # actually unused (not 100% sure) and can simply be removed
+ vgs = None
+ for path in device_to_check:
+ if self.all_devices_vgs:
+ for dev_vg in self.all_devices_vgs:
+ if dev_vg.pv_name == path:
+ vgs = [dev_vg]
+ else:
+ vgs = lvm.get_device_vgs(path)
if vgs:
self.vgs.extend(vgs)
- # a pv can only be in one vg, so this should be safe
- # FIXME: While the above assumption holds, sda1 and sda2
- # can each host a PV and VG. I think the vg_name property is
- # actually unused (not 100% sure) and can simply be removed
self.vg_name = vgs[0]
self._is_lvm_member = True
self.lvs.extend(lvm.get_device_lvs(path))
- return self._is_lvm_member
- def _get_device_with_partitions_list(self):
+ def _get_partitions(self):
"""
For block devices LVM can reside on the raw block device or on a
partition. Return a list of paths to be checked for a pv.
"""
- paths = [self.abspath]
+ partitions = []
path_dir = os.path.dirname(self.abspath)
- for part in self.sys_api.get('partitions', {}).keys():
- paths.append(os.path.join(path_dir, part))
- return paths
+ for partition in self.sys_api.get('partitions', {}).keys():
+ partitions.append(os.path.join(path_dir, partition))
+ return partitions
@property
def exists(self):
@property
def has_fs(self):
+ self.load_blkid_api()
return 'TYPE' in self.blkid_api
@property
def has_gpt_headers(self):
+ self.load_blkid_api()
return self.blkid_api.get("PTTYPE") == "gpt"
@property
@property
def is_ceph_disk_member(self):
- is_member = self.ceph_disk.is_member
+ def is_member(device):
+ return 'ceph' in device.get('PARTLABEL', '') or \
+ device.get('PARTTYPE', '') in ceph_disk_guids.keys()
+ # If we come from Devices(), self.lsblk_all is set already.
+ # Otherwise, we have to grab the data.
+ details = self.lsblk_all or disk.lsblk_all()
if self.sys_api.get("partitions"):
for part in self.sys_api.get("partitions").keys():
- part = Device("/dev/%s" % part)
- if part.is_ceph_disk_member:
- is_member = True
- break
- return is_member
+ for dev in details:
+ if dev['NAME'] == part:
+ return is_member(dev)
+ else:
+ return is_member(self.disk_api)
+ raise RuntimeError(f"Couln't check if device {self.path} is a ceph-disk member.")
@property
def has_bluestore_label(self):
@property
def device_type(self):
+ self.load_blkid_api()
if self.disk_api:
return self.disk_api['TYPE']
elif self.blkid_api:
@property
def is_partition(self):
+ self.load_blkid_api()
if self.disk_api:
return self.disk_api['TYPE'] == 'part'
elif self.blkid_api:
@property
def is_device(self):
+ self.load_blkid_api()
api = None
if self.disk_api:
api = self.disk_api
Only correct for LVs, device mappers, and partitions. Will report a ``None``
for raw devices.
"""
+ self.load_blkid_api()
crypt_reports = [self.blkid_api.get('TYPE', ''), self.disk_api.get('FSTYPE', '')]
if self.is_lv:
# if disk APIs are reporting this is encrypted use that: