return volumes
+@pytest.fixture
+def pvolumes(monkeypatch):
+ monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
+ pvolumes = api.PVolumes()
+ pvolumes._purge()
+ return pvolumes
+
+
@pytest.fixture
def volume_groups(monkeypatch):
monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
monkeypatch.setattr(api, 'Volumes', lambda: volumes)
assert api.get_lv(lv_name='foo') == FooVolume
+ def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch):
+ FooVolume = api.Volume(
+ lv_name='foo', lv_path='/dev/vg/foo',
+ lv_uuid='1111', lv_tags="ceph.type=data")
+ volumes.append(FooVolume)
+ monkeypatch.setattr(api, 'Volumes', lambda: volumes)
+ assert api.get_lv(lv_uuid='1111') == FooVolume
+
+
+class TestGetPV(object):
+
+ def test_nothing_is_passed_in(self):
+ # so we return a None
+ assert api.get_pv() is None
+
+ def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
+ pvolumes.append(FooPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ assert api.get_pv(pv_uuid='foo') is None
+
+ def test_single_pv_is_matched(self, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
+ pvolumes.append(FooPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ assert api.get_pv(pv_uuid='0000') == FooPVolume
+
+ def test_single_pv_is_matched_by_uuid(self, volumes, monkeypatch):
+ FooVolume = api.Volume(
+ lv_name='foo', lv_path='/dev/vg/foo',
+ lv_uuid='1111', lv_tags="ceph.type=data")
+ volumes.append(FooVolume)
+ monkeypatch.setattr(api, 'Volumes', lambda: volumes)
+ assert api.get_lv(lv_uuid='1111') == FooVolume
class TestGetVG(object):
assert len(volumes) == 1
assert volumes[0].lv_name == 'volume1'
+ def test_filter_by_lv_uuid(self, volumes):
+ osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
+ journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
+ volumes.append(osd)
+ volumes.append(journal)
+ volumes.filter(lv_uuid='1111')
+ assert len(volumes) == 1
+ assert volumes[0].lv_name == 'volume1'
+
+ def test_filter_by_lv_uuid_nothing_found(self, volumes):
+ osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
+ journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
+ volumes.append(osd)
+ volumes.append(journal)
+ volumes.filter(lv_uuid='22222')
+ assert volumes == []
+
def test_filter_requires_params(self, volumes):
with pytest.raises(TypeError):
volumes.filter()