monkeypatch.setattr("ceph_volume.sys_info.devices", {})
monkeypatch.setattr("ceph_volume.util.device.disk.get_devices", lambda: devices)
if not devices:
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: lv)
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_first_lv", lambda filters: lv)
else:
monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
monkeypatch.setattr("ceph_volume.util.device.lvm.get_device_lvs",
import pytest
+from copy import deepcopy
from ceph_volume.util import device
from ceph_volume.api import lvm as api
class TestDevice(object):
- def test_sys_api(self, device_info):
+ def test_sys_api(self, volumes, monkeypatch, device_info):
+ volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
+ lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes.append(volume)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "disk"}
device_info(devices=data,lsblk=lsblk)
assert disk.sys_api
assert "foo" in disk.sys_api
- def test_lvm_size(self, device_info):
+ def test_lvm_size(self, volumes, monkeypatch, device_info):
+ volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
+ lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes.append(volume)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
# 5GB in size
data = {"/dev/sda": {"size": "5368709120"}}
lsblk = {"TYPE": "disk"}
disk = device.Device("/dev/sda")
assert disk.lvm_size.gb == 4
- def test_lvm_size_rounds_down(self, device_info):
+ def test_lvm_size_rounds_down(self, device_info, volumes):
# 5.5GB in size
data = {"/dev/sda": {"size": "5905580032"}}
lsblk = {"TYPE": "disk"}
self.sys_api = part
break
- # start with lvm since it can use an absolute or relative path
- lv = lvm.get_lv_from_argument(self.path)
+ # if the path is not absolute, we have 'vg/lv', let's use LV name
+ # to get the LV.
+ if self.path[0] == '/':
+ lv = lvm.get_first_lv(filters={'lv_path': self.path})
+ else:
+ vgname, lvname = self.path.split('/')
+ lv = lvm.get_first_lv(filters={'lv_name': lvname,
+ 'vg_name': vgname})
if lv:
self.lv_api = lv
self.lvs = [lv]