disk = device.Device("/dev/sda")
assert disk.lvm_size.gb == 4
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_lvm_size_rounds_down(self, fake_call, device_info):
# 5.5GB in size
data = {"/dev/sda": {"size": "5905580032"}}
disk = device.Device("vg/lv")
assert disk.is_lv
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_vgs_is_empty(self, fake_call, device_info, monkeypatch):
BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
pv_tags={})
disk = device.Device("/dev/nvme0n1")
assert disk.vgs == []
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_vgs_is_not_empty(self, fake_call, device_info, monkeypatch):
vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=6,
vg_extent_size=1073741824)
disk = device.Device("/dev/nvme0n1")
assert len(disk.vgs) == 1
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_device_is_device(self, fake_call, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert disk.is_device is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_loop_device_is_not_device(self, fake_call, device_info):
data = {"/dev/loop0": {"foo": "bar"}}
lsblk = {"TYPE": "loop"}
disk = device.Device("/dev/loop0")
assert disk.is_device is False
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_loop_device_is_device(self, fake_call, device_info):
data = {"/dev/loop0": {"foo": "bar"}}
lsblk = {"TYPE": "loop"}
assert disk.is_device is True
del os.environ["CEPH_VOLUME_ALLOW_LOOP_DEVICES"]
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_device_is_rotational(self, fake_call, device_info):
data = {"/dev/sda": {"rotational": "1"}}
lsblk = {"TYPE": "device", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert disk.rotational
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_device_is_not_rotational(self, fake_call, device_info):
data = {"/dev/sda": {"rotational": "0"}}
lsblk = {"TYPE": "device", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert not disk.rotational
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_device_is_rotational_lsblk(self, fake_call, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device", "ROTA": "1", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert disk.rotational
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_device_is_not_rotational_lsblk(self, fake_call, device_info):
data = {"/dev/sda": {"rotational": "0"}}
lsblk = {"TYPE": "device", "ROTA": "0", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert not disk.rotational
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_device_is_rotational_defaults_true(self, fake_call, device_info):
# rotational will default true if no info from sys_api or lsblk is found
data = {"/dev/sda": {"foo": "bar"}}
disk = device.Device("/dev/sda")
assert disk.rotational
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_disk_is_device(self, fake_call, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "disk", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert disk.is_device is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_partition(self, fake_call, device_info):
data = {"/dev/sda1": {"foo": "bar"}}
lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
disk = device.Device("/dev/sda1")
assert disk.is_partition
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_mpath_device_is_device(self, fake_call, device_info):
data = {"/dev/foo": {"foo": "bar"}}
lsblk = {"TYPE": "mpath", "NAME": "foo"}
disk = device.Device("/dev/foo")
assert disk.is_device is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_not_lvm_member(self, fake_call, device_info):
data = {"/dev/sda1": {"foo": "bar"}}
lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
disk = device.Device("/dev/sda1")
assert not disk.is_lvm_member
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_lvm_member(self, fake_call, device_info):
data = {"/dev/sda1": {"foo": "bar"}}
lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
disk = device.Device("/dev/sda1")
assert not disk.is_lvm_member
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_mapper_device(self, fake_call, device_info):
lsblk = {"TYPE": "lvm", "NAME": "foo"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/mapper/foo")
assert disk.is_mapper
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_dm_is_mapper_device(self, fake_call, device_info):
lsblk = {"TYPE": "lvm", "NAME": "dm-4"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/dm-4")
assert disk.is_mapper
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_not_mapper_device(self, fake_call, device_info):
lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(lsblk=lsblk)
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_ceph_disk_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"lsblk_ceph_disk_member",
"disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_ceph_disk_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_ceph_disk_member_not_available_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"lsblk_ceph_disk_member",
"disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_ceph_disk_member_not_available_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
assert not disk.available
assert "Used by ceph-disk" in disk.rejected_reasons
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_reject_removable_device(self, fake_call, device_info):
data = {"/dev/sdb": {"removable": 1}}
lsblk = {"TYPE": "disk", "NAME": "sdb"}
disk = device.Device("/dev/sdb")
assert not disk.available
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_reject_device_with_gpt_headers(self, fake_call, device_info):
data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
lsblk = {"TYPE": "disk", "NAME": "sdb"}
disk = device.Device("/dev/sdb")
assert not disk.available
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_accept_non_removable_device(self, fake_call, device_info):
data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
lsblk = {"TYPE": "disk", "NAME": "sdb"}
disk = device.Device("/dev/sdb")
assert disk.available
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_reject_not_acceptable_device(self, fake_call, device_info):
data = {"/dev/dm-0": {"foo": "bar"}}
lsblk = {"TYPE": "mpath", "NAME": "dm-0"}
disk = device.Device("/dev/cdrom")
assert not disk.available
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_reject_smaller_than_5gb(self, fake_call, device_info):
data = {"/dev/sda": {"size": 5368709119}}
lsblk = {"TYPE": "disk", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert not disk.available, 'too small device is available'
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_accept_non_readonly_device(self, fake_call, device_info):
data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
lsblk = {"TYPE": "disk", "NAME": "sda"}
disk = device.Device("/dev/sda")
assert disk.available
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_reject_bluestore_device(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
patch_bluestore_label.return_value = True
lsblk = {"TYPE": "disk", "NAME": "sda"}
assert not disk.available
assert "Has BlueStore device label" in disk.rejected_reasons
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_reject_device_with_oserror(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
patch_bluestore_label.side_effect = OSError('test failure')
lsblk = {"TYPE": "disk", "NAME": "sda"}
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"device_info_not_ceph_disk_member",
"disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_not_ceph_disk_member_lsblk(self, fake_call, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member is False
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_existing_vg_available(self, fake_call, monkeypatch, device_info):
vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1536,
vg_extent_size=4194304)
assert not disk.available
assert not disk.available_raw
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_existing_vg_too_small(self, fake_call, monkeypatch, device_info):
vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=4,
vg_extent_size=1073741824)
assert not disk.available
assert not disk.available_raw
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_multiple_existing_vgs(self, fake_call, monkeypatch, device_info):
vg1 = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1000,
vg_extent_size=4194304)
disk = device.Device("/dev/sda")
assert disk.used_by_ceph
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_not_used_by_ceph(self, fake_call, device_info, monkeypatch):
FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
pvolumes = []
disk = device.Device("/dev/sda")
assert not disk.used_by_ceph
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_get_device_id(self, fake_call, device_info):
udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
lsblk = {"TYPE": "disk", "NAME": "sda"}
class TestDeviceEncryption(object):
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_partition_is_not_encrypted_lsblk(self, fake_call, device_info):
lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'NAME': 'sda', 'PKNAME': 'sda'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_partition_is_encrypted_lsblk(self, fake_call, device_info):
lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'PKNAME': 'sda'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_partition_is_not_encrypted_blkid(self, fake_call, device_info):
lsblk = {'TYPE': 'part', 'NAME': 'sda', 'PKNAME': 'sda'}
blkid = {'TYPE': 'ceph data'}
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_partition_is_encrypted_blkid(self, fake_call, device_info):
lsblk = {'TYPE': 'part', 'NAME': 'sda' ,'PKNAME': 'sda'}
blkid = {'TYPE': 'crypto_LUKS'}
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_mapper_is_encrypted_luks1(self, fake_call, device_info, monkeypatch):
status = {'type': 'LUKS1'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_mapper_is_encrypted_luks2(self, fake_call, device_info, monkeypatch):
status = {'type': 'LUKS2'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_mapper_is_encrypted_plain(self, fake_call, device_info, monkeypatch):
status = {'type': 'PLAIN'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_mapper_is_not_encrypted_plain(self, fake_call, device_info, monkeypatch):
monkeypatch.setattr(device, 'encryption_status', lambda x: {})
lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is False
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_lv_is_encrypted_blkid(self, fake_call, device_info):
lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
blkid = {'TYPE': 'crypto_LUKS'}
disk.lv_api = {}
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_lv_is_not_encrypted_blkid(self, fake_call, factory, device_info):
lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
blkid = {'TYPE': 'xfs'}
disk.lv_api = factory(encrypted=None)
assert disk.is_encrypted is False
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_lv_is_encrypted_lsblk(self, fake_call, device_info):
lsblk = {'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
disk.lv_api = {}
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_lv_is_not_encrypted_lsblk(self, fake_call, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
disk.lv_api = factory(encrypted=None)
assert disk.is_encrypted is False
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_lv_is_encrypted_lvm_api(self, fake_call, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
disk.lv_api = factory(encrypted=True)
assert disk.is_encrypted is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_lv_is_not_encrypted_lvm_api(self, fake_call, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
"/dev/sdd": {"removable": 1}, # invalid
}
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_valid_before_invalid(self, fake_call, device_info):
lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
assert sda < sdb
assert sdb > sda
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_valid_alphabetical_ordering(self, fake_call, device_info):
lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
lsblk_sdc = {"NAME": "sdc", "TYPE": "disk"}
assert sda < sdc
assert sdc > sda
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_invalid_alphabetical_ordering(self, fake_call, device_info):
lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
lsblk_sdd = {"NAME": "sdd", "TYPE": "disk"}
class TestCephDiskDevice(object):
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_partlabel_lsblk(self, fake_call, device_info):
lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": ""}
device_info(lsblk=lsblk)
assert disk.partlabel == ''
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_partlabel_blkid(self, fake_call, device_info):
lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph data"}
blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"}
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"blkid_ceph_disk_member",
"disable_kernel_queries")
- def test_is_member_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_member_blkid(self, fake_call, monkeypatch):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.is_member is True
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_is_member_lsblk(self, fake_call, patch_bluestore_label, device_info):
lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph"}
device_info(lsblk=lsblk)
assert disk.is_member is True
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_unknown_type(self, fake_call, device_info):
lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "gluster"}
device_info(lsblk=lsblk)
ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"blkid_ceph_disk_member",
"disable_kernel_queries")
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"lsblk_ceph_disk_member",
"disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_type_lsblk(self, fake_call, device_info, ceph_partlabel):
disk = device.CephDiskDevice(device.Device("/dev/sda"))