journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
volume_groups.append(osd)
volume_groups.append(journal)
- volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
+ volume_groups = volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
assert len(volume_groups) == 1
assert volume_groups[0].vg_name == 'volume1'
vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
volume_groups.append(osd)
- volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
+ volume_groups = volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
assert volume_groups == []
def test_filter_by_vg_name(self, volume_groups):
journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
volume_groups.append(osd)
volume_groups.append(journal)
- volume_groups.filter(vg_name='ceph_vg')
+ volume_groups = volume_groups.filter(vg_name='ceph_vg')
assert len(volume_groups) == 1
assert volume_groups[0].vg_name == 'ceph_vg'
def test_filter_requires_params(self, volume_groups):
with pytest.raises(TypeError):
- volume_groups.filter()
+ volume_groups = volume_groups.filter()
class TestVolumeGroupFree(object):
vgs._purge()
return vgs
+def volume_groups_empty(monkeypatch):
+ monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
+ vgs = lvm_api.VolumeGroups(populate=False)
+ return vgs
@pytest.fixture
def stub_vgs(monkeypatch, volume_groups):