return generate_file
+@pytest.fixture(params=[
+ 'ceph data', 'ceph journal', 'ceph block',
+ 'ceph block.wal', 'ceph block.db', 'ceph lockbox'])
+def ceph_partlabel(request):
+ return request.param
+
+
+@pytest.fixture
+def disable_kernel_queries(monkeypatch):
+ '''
+ This speeds up calls to Device and Disk
+ '''
+ monkeypatch.setattr("ceph_volume.util.device.disk.get_devices", lambda: {})
+ monkeypatch.setattr("ceph_volume.util.disk.udevadm_property", lambda *a, **kw: {})
+
+
+@pytest.fixture
+def disable_lvm_queries(monkeypatch):
+ '''
+ This speeds up calls to Device and Disk
+ '''
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name, lv_uuid: None)
+
+
+@pytest.fixture
+def lsblk_ceph_disk_member(monkeypatch, request, ceph_partlabel):
+ monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
+ lambda path: {'PARTLABEL': ceph_partlabel})
+
+
+@pytest.fixture
+def blkid_ceph_disk_member(monkeypatch, request, ceph_partlabel):
+ monkeypatch.setattr("ceph_volume.util.device.disk.blkid",
+ lambda path: {'PARTLABEL': ceph_partlabel})
+
+
+@pytest.fixture(params=[
+ ('gluster partition', 'gluster partition'),
+ # falls back to blkid
+ ('', 'gluster partition'),
+ ('gluster partition', ''),
+])
+def device_info_not_ceph_disk_member(monkeypatch, request):
+ monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
+ lambda path: {'PARTLABEL': request.param[0]})
+ monkeypatch.setattr("ceph_volume.util.device.disk.blkid",
+ lambda path: {'PARTLABEL': request.param[1]})
+
+
@pytest.fixture
def device_info(monkeypatch):
def apply(devices=None, lsblk=None, lv=None, blkid=None, udevadm=None):
disk = device.Device("/dev/sda")
assert not disk.is_mapper
- def test_is_ceph_disk_member_lsblk(self, device_info):
- lsblk = {"PARTLABEL": "ceph data"}
- device_info(lsblk=lsblk)
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "disable_kernel_queries",
+ "disable_lvm_queries")
+ def test_is_ceph_disk_lsblk(self, monkeypatch):
+ monkeypatch.setattr("ceph_volume.util.device.disk.blkid",
+ lambda path: {'PARTLABEL': ""})
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
- def test_is_ceph_disk_member_not_available(self, device_info):
- lsblk = {"PARTLABEL": "ceph data"}
- device_info(lsblk=lsblk)
+ @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "disable_kernel_queries",
+ "disable_lvm_queries")
+ def test_is_ceph_disk_blkid(self, monkeypatch):
+ monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
+ lambda path: {'PARTLABEL': ""})
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
- assert not disk.available
- assert "Used by ceph-disk" in disk.rejected_reasons
- def test_is_not_ceph_disk_member_lsblk(self, device_info):
- lsblk = {"PARTLABEL": "gluster partition"}
- device_info(lsblk=lsblk)
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "disable_kernel_queries",
+ "disable_lvm_queries")
+ def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch):
+ monkeypatch.setattr("ceph_volume.util.device.disk.blkid",
+ lambda path: {'PARTLABEL': ""})
disk = device.Device("/dev/sda")
- assert disk.is_ceph_disk_member is False
+ assert disk.is_ceph_disk_member
+ assert not disk.available
+ assert "Used by ceph-disk" in disk.rejected_reasons
- def test_is_ceph_disk_member_blkid(self, device_info):
- # falls back to blkid
- lsblk = {"PARTLABEL": ""}
- blkid = {"PARTLABEL": "ceph data"}
- device_info(lsblk=lsblk, blkid=blkid)
+ @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "disable_kernel_queries",
+ "disable_lvm_queries")
+ def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch):
+ monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
+ lambda path: {'PARTLABEL': ""})
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
+ assert not disk.available
+ assert "Used by ceph-disk" in disk.rejected_reasons
- def test_is_not_ceph_disk_member_blkid(self, device_info):
- # falls back to blkid
- lsblk = {"PARTLABEL": ""}
- blkid = {"PARTLABEL": "gluster partition"}
- device_info(lsblk=lsblk, blkid=blkid)
+ @pytest.mark.usefixtures("device_info_not_ceph_disk_member",
+ "disable_lvm_queries",
+ "disable_kernel_queries")
+ def test_is_not_ceph_disk_member_lsblk(self):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member is False
assert sdd > sdb
-ceph_partlabels = [
- 'ceph data', 'ceph journal', 'ceph block',
- 'ceph block.wal', 'ceph block.db', 'ceph lockbox'
-]
-
-
class TestCephDiskDevice(object):
def test_partlabel_lsblk(self, device_info):
assert disk.partlabel == 'ceph data'
- @pytest.mark.parametrize("label", ceph_partlabels)
- def test_is_member_blkid(self, device_info, label):
- lsblk = {"PARTLABEL": ""}
- blkid = {"PARTLABEL": label}
- device_info(lsblk=lsblk, blkid=blkid)
+ @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "disable_kernel_queries",
+ "disable_lvm_queries")
+ def test_is_member_blkid(self, monkeypatch):
+ monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
+ lambda path: {'PARTLABEL': ""})
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.is_member is True
disk = device.Device("/dev/sda")
assert disk.available
- @pytest.mark.parametrize("label", ceph_partlabels)
- def test_is_member_lsblk(self, device_info, label):
- lsblk = {"PARTLABEL": label}
+ def test_is_member_lsblk(self, device_info, ceph_partlabel):
+ lsblk = {"PARTLABEL": ceph_partlabel}
device_info(lsblk=lsblk)
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.type == 'unknown'
- @pytest.mark.parametrize("label", ceph_partlabels)
- def test_type_blkid(self, device_info, label):
- expected = label.split()[-1].split('.')[-1]
+ def test_type_blkid(self, device_info, ceph_partlabel):
+ expected = ceph_partlabel.split()[-1].split('.')[-1]
lsblk = {"PARTLABEL": ""}
- blkid = {"PARTLABEL": label}
+ blkid = {"PARTLABEL": ceph_partlabel}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.type == expected
- @pytest.mark.parametrize("label", ceph_partlabels)
- def test_type_lsblk(self, device_info, label):
- expected = label.split()[-1].split('.')[-1]
- lsblk = {"PARTLABEL": label}
+ def test_type_lsblk(self, device_info, ceph_partlabel):
+ expected = ceph_partlabel.split()[-1].split('.')[-1]
+ lsblk = {"PARTLABEL": ceph_partlabel}
blkid = {"PARTLABEL": ''}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.CephDiskDevice(device.Device("/dev/sda"))