lv = Factory(**lv) if lv else None
monkeypatch.setattr("ceph_volume.sys_info.devices", {})
monkeypatch.setattr("ceph_volume.util.device.disk.get_devices", lambda: devices)
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: lv)
+ if not devices:
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: lv)
+ else:
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name, lv_uuid: lv)
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", lambda path: lsblk)
monkeypatch.setattr("ceph_volume.util.device.disk.blkid", lambda path: blkid)
return apply
disk = device.Device("/dev/sda")
assert disk.pvs_api
+ @pytest.mark.parametrize("ceph_type", ["data", "block"])
+ def test_used_by_ceph(self, device_info, pvolumes, monkeypatch, ceph_type):
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes.append(FooPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "part"}
+ lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
+ device_info(devices=data, lsblk=lsblk, lv=lv_data)
+ disk = device.Device("/dev/sda")
+ assert disk.used_by_ceph
+
+ def test_not_used_by_ceph(self, device_info, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes.append(FooPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "part"}
+ lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
+ device_info(devices=data, lsblk=lsblk, lv=lv_data)
+ disk = device.Device("/dev/sda")
+ assert not disk.used_by_ceph
+
ceph_partlabels = [
'ceph data', 'ceph journal', 'ceph block',