monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
assert api.get_pv(pv_uuid='0000') == FooPVolume
- def test_single_pv_is_matched_by_uuid(self, volumes, monkeypatch):
- FooVolume = api.Volume(
- lv_name='foo', lv_path='/dev/vg/foo',
- lv_uuid='1111', lv_tags="ceph.type=data")
- volumes.append(FooVolume)
- monkeypatch.setattr(api, 'Volumes', lambda: volumes)
- assert api.get_lv(lv_uuid='1111') == FooVolume
+ def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(
+ pv_name='/dev/vg/foo',
+ pv_uuid='1111', pv_tags="ceph.type=data")
+ pvolumes.append(FooPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ assert api.get_pv(pv_uuid='1111') == FooPVolume
+
+
+class TestPVolumes(object):
+
+ def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch):
+ pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
+ FooPVolume = api.PVolume(
+ pv_name='/dev/vg/foo',
+ pv_uuid='1111', pv_tags=pv_tags)
+ pvolumes.append(FooPVolume)
+ pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
+ assert pvolumes == []
+
+ def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
+ pv_tags = "ceph.type=journal,ceph.osd_id=1"
+ FooPVolume = api.PVolume(
+ pv_name='/dev/vg/foo',
+ pv_uuid='1111', pv_tags=pv_tags)
+ pvolumes.append(FooPVolume)
+ pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
+ assert pvolumes == [FooPVolume]
+
class TestGetVG(object):