pcall.return_value = ('', '', '')
vgs = api.get_device_lvs('/dev/foo')
assert vgs == []
+
+
+# NOTE: api.convert_filters_to_str() and api.convert_tags_to_str() should get
+# tested automatically while testing api.make_filters_lvmcmd_ready()
+class TestMakeFiltersLVMCMDReady(object):
+
+ def test_with_no_filters_and_no_tags(self):
+ retval = api.make_filters_lvmcmd_ready(None, None)
+
+ assert isinstance(retval, str)
+ assert retval == ''
+
+ def test_with_filters_and_no_tags(self):
+ filters = {'lv_name': 'lv1', 'lv_path': '/dev/sda'}
+
+ retval = api.make_filters_lvmcmd_ready(filters, None)
+
+ assert isinstance(retval, str)
+ for k, v in filters.items():
+ assert k in retval
+ assert v in retval
+
+ def test_with_no_filters_and_with_tags(self):
+ tags = {'ceph.type': 'data', 'ceph.osd_id': '0'}
+
+ retval = api.make_filters_lvmcmd_ready(None, tags)
+
+ assert isinstance(retval, str)
+ assert 'tags' in retval
+ for k, v in tags.items():
+ assert k in retval
+ assert v in retval
+ assert retval.find('tags') < retval.find(k) < retval.find(v)
+
+ def test_with_filters_and_tags(self):
+ filters = {'lv_name': 'lv1', 'lv_path': '/dev/sda'}
+ tags = {'ceph.type': 'data', 'ceph.osd_id': '0'}
+
+ retval = api.make_filters_lvmcmd_ready(filters, tags)
+
+ assert isinstance(retval, str)
+ for f, t in zip(filters.items(), tags.items()):
+ assert f[0] in retval
+ assert f[1] in retval
+ assert t[0] in retval
+ assert t[1] in retval
+ assert retval.find(f[0]) < retval.find(f[1]) < \
+ retval.find('tags') < retval.find(t[0]) < retval.find(t[1])
+
+
+class TestGetPVs(object):
+
+ def test_get_pvs(self, monkeypatch):
+ pv1 = api.PVolume(pv_name='/dev/sda', pv_uuid='0000', pv_tags={},
+ vg_name='vg1')
+ pv2 = api.PVolume(pv_name='/dev/sdb', pv_uuid='0001', pv_tags={},
+ vg_name='vg2')
+ pvs = [pv1, pv2]
+ stdout = ['{};{};{};{};;'.format(pv1.pv_name, pv1.pv_tags, pv1.pv_uuid, pv1.vg_name),
+ '{};{};{};{};;'.format(pv2.pv_name, pv2.pv_tags, pv2.pv_uuid, pv2.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ pvs_ = api.get_pvs()
+ assert len(pvs_) == len(pvs)
+ for pv, pv_ in zip(pvs, pvs_):
+ assert pv_.pv_name == pv.pv_name
+
+ def test_get_pvs_single_pv(self, monkeypatch):
+ pv1 = api.PVolume(pv_name='/dev/sda', pv_uuid='0000', pv_tags={},
+ vg_name='vg1')
+ pvs = [pv1]
+ stdout = ['{};;;;;;'.format(pv1.pv_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ pvs_ = api.get_pvs()
+ assert len(pvs_) == 1
+ assert pvs_[0].pv_name == pvs[0].pv_name
+
+ def test_get_pvs_empty(self, monkeypatch):
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: ('', '', 0))
+ assert api.get_pvs() == []
+
+
+class TestGetVGs(object):
+
+ def test_get_vgs(self, monkeypatch):
+ vg1 = api.VolumeGroup(vg_name='vg1')
+ vg2 = api.VolumeGroup(vg_name='vg2')
+ vgs = [vg1, vg2]
+ stdout = ['{};;;;;;'.format(vg1.vg_name),
+ '{};;;;;;'.format(vg2.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ vgs_ = api.get_vgs()
+ assert len(vgs_) == len(vgs)
+ for vg, vg_ in zip(vgs, vgs_):
+ assert vg_.vg_name == vg.vg_name
+
+ def test_get_vgs_single_vg(self, monkeypatch):
+ vg1 = api.VolumeGroup(vg_name='vg'); vgs = [vg1]
+ stdout = ['{};;;;;;'.format(vg1.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ vgs_ = api.get_vgs()
+ assert len(vgs_) == 1
+ assert vgs_[0].vg_name == vgs[0].vg_name
+
+ def test_get_vgs_empty(self, monkeypatch):
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: ('', '', 0))
+ assert api.get_vgs() == []
+
+
+class TestGetLVs(object):
+
+ def test_get_lvs(self, monkeypatch):
+ lv1 = api.Volume(lv_tags='ceph.type=data', lv_path='/dev/vg1/lv1',
+ lv_name='lv1', vg_name='vg1')
+ lv2 = api.Volume(lv_tags='ceph.type=data', lv_path='/dev/vg2/lv2',
+ lv_name='lv2', vg_name='vg2')
+ lvs = [lv1, lv2]
+ stdout = ['{};{};{};{}'.format(lv1.lv_tags, lv1.lv_path, lv1.lv_name,
+ lv1.vg_name),
+ '{};{};{};{}'.format(lv2.lv_tags, lv2.lv_path, lv2.lv_name,
+ lv2.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ lvs_ = api.get_lvs()
+ assert len(lvs_) == len(lvs)
+ for lv, lv_ in zip(lvs, lvs_):
+ assert lv.__dict__ == lv_.__dict__
+
+ def test_get_lvs_single_lv(self, monkeypatch):
+ stdout = ['ceph.type=data;/dev/vg/lv;lv;vg']
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+ lvs = []
+ lvs.append((api.Volume(lv_tags='ceph.type=data',
+ lv_path='/dev/vg/lv',
+ lv_name='lv', vg_name='vg')))
+
+ lvs_ = api.get_lvs()
+ assert len(lvs_) == len(lvs)
+ assert lvs[0].__dict__ == lvs_[0].__dict__
+
+ def test_get_lvs_empty(self, monkeypatch):
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: ('', '', 0))
+ assert api.get_lvs() == []
+
+
+class TestGetFirstPV(object):
+
+ def test_get_first_pv(self, monkeypatch):
+ pv1 = api.PVolume(pv_name='/dev/sda', pv_uuid='0000', pv_tags={},
+ vg_name='vg1')
+ pv2 = api.PVolume(pv_name='/dev/sdb', pv_uuid='0001', pv_tags={},
+ vg_name='vg2')
+ stdout = ['{};{};{};{};;'.format(pv1.pv_name, pv1.pv_tags, pv1.pv_uuid, pv1.vg_name),
+ '{};{};{};{};;'.format(pv2.pv_name, pv2.pv_tags, pv2.pv_uuid, pv2.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ pv_ = api.get_first_pv()
+ assert isinstance(pv_, api.PVolume)
+ assert pv_.pv_name == pv1.pv_name
+
+ def test_get_first_pv_single_pv(self, monkeypatch):
+ pv = api.PVolume(pv_name='/dev/sda', pv_uuid='0000', pv_tags={},
+ vg_name='vg1')
+ stdout = ['{};;;;;;'.format(pv.pv_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ pv_ = api.get_first_pv()
+ assert isinstance(pv_, api.PVolume)
+ assert pv_.pv_name == pv.pv_name
+
+ def test_get_first_pv_empty(self, monkeypatch):
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: ('', '', 0))
+ assert api.get_first_pv() == []
+
+
+class TestGetFirstVG(object):
+
+ def test_get_first_vg(self, monkeypatch):
+ vg1 = api.VolumeGroup(vg_name='vg1')
+ vg2 = api.VolumeGroup(vg_name='vg2')
+ stdout = ['{};;;;;;'.format(vg1.vg_name), '{};;;;;;'.format(vg2.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ vg_ = api.get_first_vg()
+ assert isinstance(vg_, api.VolumeGroup)
+ assert vg_.vg_name == vg1.vg_name
+
+ def test_get_first_vg_single_vg(self, monkeypatch):
+ vg = api.VolumeGroup(vg_name='vg')
+ stdout = ['{};;;;;;'.format(vg.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ vg_ = api.get_first_vg()
+ assert isinstance(vg_, api.VolumeGroup)
+ assert vg_.vg_name == vg.vg_name
+
+ def test_get_first_vg_empty(self, monkeypatch):
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: ('', '', 0))
+ vg_ = api.get_first_vg()
+ assert vg_ == []
+
+
+class TestGetFirstLV(object):
+
+ def test_get_first_lv(self, monkeypatch):
+ lv1 = api.Volume(lv_tags='ceph.type=data', lv_path='/dev/vg1/lv1',
+ lv_name='lv1', vg_name='vg1')
+ lv2 = api.Volume(lv_tags='ceph.type=data', lv_path='/dev/vg2/lv2',
+ lv_name='lv2', vg_name='vg2')
+ stdout = ['{};{};{};{}'.format(lv1.lv_tags, lv1.lv_path, lv1.lv_name,
+ lv1.vg_name),
+ '{};{};{};{}'.format(lv2.lv_tags, lv2.lv_path, lv2.lv_name,
+ lv2.vg_name)]
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+
+ lv_ = api.get_first_lv()
+ assert isinstance(lv_, api.Volume)
+ assert lv_.lv_name == lv1.lv_name
+
+ def test_get_first_lv_single_lv(self, monkeypatch):
+ stdout = ['ceph.type=data;/dev/vg/lv;lv;vg']
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: (stdout, '', 0))
+ lv = api.Volume(lv_tags='ceph.type=data',
+ lv_path='/dev/vg/lv',
+ lv_name='lv', vg_name='vg')
+
+ lv_ = api.get_first_lv()
+ assert isinstance(lv_, api.Volume)
+ assert lv_.lv_name == lv.lv_name
+
+ def test_get_first_lv_empty(self, monkeypatch):
+ monkeypatch.setattr(api.process, 'call', lambda x,**kw: ('', '', 0))
+ assert api.get_lvs() == []