from math import floor
from ceph_volume import process, util
from ceph_volume.exceptions import (
- MultipleLVsError, MultipleVGsError,
- SizeAllocationError
+ MultipleLVsError, SizeAllocationError
)
logger = logging.getLogger(__name__)
return int(int(self.vg_extent_count) / slots)
-class VolumeGroups(list):
- """
- A list of all known volume groups for the current system, with the ability
- to filter them via keyword arguments.
- """
-
- def __init__(self, populate=True):
- if populate:
- self._populate()
-
- def _populate(self):
- # get all the vgs in the current system
- for vg_item in get_api_vgs():
- self.append(VolumeGroup(**vg_item))
-
- def _purge(self):
- """
- Deplete all the items in the list, used internally only so that we can
- dynamically allocate the items when filtering without the concern of
- messing up the contents
- """
- self[:] = []
-
- def _filter(self, vg_name=None, vg_tags=None):
- """
- The actual method that filters using a new list. Useful so that other
- methods that do not want to alter the contents of the list (e.g.
- ``self.find``) can operate safely.
-
- .. note:: ``vg_tags`` is not yet implemented
- """
- filtered = [i for i in self]
- if vg_name:
- filtered = [i for i in filtered if i.vg_name == vg_name]
-
- # at this point, `filtered` has either all the volumes in self or is an
- # actual filtered list if any filters were applied
- if vg_tags:
- tag_filtered = []
- for volume in filtered:
- matches = all(volume.tags.get(k) == str(v) for k, v in vg_tags.items())
- if matches:
- tag_filtered.append(volume)
- return tag_filtered
-
- return filtered
-
- def filter(self, vg_name=None, vg_tags=None):
- """
- Filter out groups on top level attributes like ``vg_name`` or by
- ``vg_tags`` where a dict is required. For example, to find a Ceph group
- with dmcache as the type, the filter would look like::
-
- vg_tags={'ceph.type': 'dmcache'}
-
- .. warning:: These tags are not documented because they are currently
- unused, but are here to maintain API consistency
- """
- if not any([vg_name, vg_tags]):
- raise TypeError('.filter() requires vg_name or vg_tags (none given)')
-
- filtered_vgs = VolumeGroups(populate=False)
- filtered_vgs.extend(self._filter(vg_name, vg_tags))
- return filtered_vgs
-
- def get(self, vg_name=None, vg_tags=None):
- """
- This is a bit expensive, since it will try to filter out all the
- matching items in the list, filter them out applying anything that was
- added and return the matching item.
-
- This method does *not* alter the list, and it will raise an error if
- multiple VGs are matched
-
- It is useful to use ``tags`` when trying to find a specific volume group,
- but it can also lead to multiple vgs being found (although unlikely)
- """
- if not any([vg_name, vg_tags]):
- return None
- vgs = self._filter(
- vg_name=vg_name,
- vg_tags=vg_tags
- )
- if not vgs:
- return None
- if len(vgs) > 1:
- # this is probably never going to happen, but it is here to keep
- # the API code consistent
- raise MultipleVGsError(vg_name)
- return vgs[0]
-
-
def create_vg(devices, name=None, name_prefix=None):
"""
Create a Volume Group. Command looks like::
name] + devices
)
- vg = get_vg(vg_name=name)
- return vg
+ return get_first_vg(filters={'vg_name': name})
def extend_vg(vg, devices):
vg.name] + devices
)
- vg = get_vg(vg_name=vg.name)
- return vg
+ return get_first_vg(filters={'vg_name': vg.name})
def reduce_vg(vg, devices):
vg.name] + devices
)
- vg = get_vg(vg_name=vg.name)
- return vg
+ return get_first_vg(filter={'vg_name': vg.name})
def remove_vg(vg_name):
)
-def get_vg(vg_name=None, vg_tags=None, vgs=None):
- """
- Return a matching vg for the current system, requires ``vg_name`` or
- ``tags``. Raises an error if more than one vg is found.
-
- It is useful to use ``tags`` when trying to find a specific volume group,
- but it can also lead to multiple vgs being found.
- """
- if not any([vg_name, vg_tags]):
- return None
- if vgs is None or len(vgs) == 0:
- vgs = VolumeGroups()
-
- return vgs.get(vg_name=vg_name, vg_tags=vg_tags)
-
-
def get_device_vgs(device, name_prefix=''):
stdout, stderr, returncode = process.call(
['pvs'] + VG_CMD_OPTIONS + ['-o', VG_FIELDS, device],
return msg
-class MultipleVGsError(Exception):
-
- def __init__(self, vg_name):
- self.vg_name = vg_name
-
- def __str__(self):
- msg = "Got more than 1 result looking for volume group: %s" % self.vg_name
- return msg
-
-
class SizeAllocationError(Exception):
def __init__(self, requested, available):
return volumes
-@pytest.fixture
-def volume_groups(monkeypatch):
- monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
- vgs = api.VolumeGroups()
- vgs._purge()
- return vgs
-
-
-class TestGetVG(object):
-
- def test_nothing_is_passed_in(self):
- # so we return a None
- assert api.get_vg() is None
-
- def test_single_vg_is_matched(self, volume_groups, monkeypatch):
- FooVG = api.VolumeGroup(vg_name='foo')
- volume_groups.append(FooVG)
- monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
- assert api.get_vg(vg_name='foo') == FooVG
-
-
class TestVolume(object):
def test_is_ceph_device(self):
api.VolumeGroup(vg_name='')
-class TestVolumeGroups(object):
-
- def test_volume_get_has_no_volume_groups(self, volume_groups):
- assert volume_groups.get() is None
-
- def test_volume_get_filtered_has_no_volumes(self, volume_groups):
- assert volume_groups.get(vg_name='ceph') is None
-
- def test_volume_has_multiple_matches(self, volume_groups):
- volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
- volume_groups.append(volume1)
- volume_groups.append(volume2)
- with pytest.raises(exceptions.MultipleVGsError):
- volume_groups.get(vg_name='foo')
-
- def test_find_the_correct_one(self, volume_groups):
- volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
- volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
- volume_groups.append(volume1)
- volume_groups.append(volume2)
- assert volume_groups.get(vg_name='volume1') == volume1
-
- def test_filter_by_tag(self, volume_groups):
- vg_tags = "ceph.group=dmcache"
- osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
- journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
- volume_groups.append(osd)
- volume_groups.append(journal)
- volume_groups = volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
- assert len(volume_groups) == 1
- assert volume_groups[0].vg_name == 'volume1'
-
- def test_filter_by_tag_does_not_match_one(self, volume_groups):
- vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
- osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
- volume_groups.append(osd)
- volume_groups = volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
- assert volume_groups == []
-
- def test_filter_by_vg_name(self, volume_groups):
- vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
- osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
- journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
- volume_groups.append(osd)
- volume_groups.append(journal)
- volume_groups = volume_groups.filter(vg_name='ceph_vg')
- assert len(volume_groups) == 1
- assert volume_groups[0].vg_name == 'ceph_vg'
-
- def test_filter_requires_params(self, volume_groups):
- with pytest.raises(TypeError):
- volume_groups = volume_groups.filter()
-
-
class TestVolumeGroupFree(object):
def test_integer_gets_produced(self):
self.foo_volume = api.VolumeGroup(vg_name='foo', lv_tags='')
def test_uses_single_device_in_list(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.extend_vg(self.foo_volume, ['/dev/sda'])
expected = ['vgextend', '--force', '--yes', 'foo', '/dev/sda']
assert fake_run.calls[0]['args'][0] == expected
def test_uses_single_device(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.extend_vg(self.foo_volume, '/dev/sda')
expected = ['vgextend', '--force', '--yes', 'foo', '/dev/sda']
assert fake_run.calls[0]['args'][0] == expected
def test_uses_multiple_devices(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.extend_vg(self.foo_volume, ['/dev/sda', '/dev/sdb'])
expected = ['vgextend', '--force', '--yes', 'foo', '/dev/sda', '/dev/sdb']
assert fake_run.calls[0]['args'][0] == expected
self.foo_volume = api.VolumeGroup(vg_name='foo', lv_tags='')
def test_uses_single_device_in_list(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.reduce_vg(self.foo_volume, ['/dev/sda'])
expected = ['vgreduce', '--force', '--yes', 'foo', '/dev/sda']
assert fake_run.calls[0]['args'][0] == expected
def test_uses_single_device(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.reduce_vg(self.foo_volume, '/dev/sda')
expected = ['vgreduce', '--force', '--yes', 'foo', '/dev/sda']
assert fake_run.calls[0]['args'][0] == expected
def test_uses_multiple_devices(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.reduce_vg(self.foo_volume, ['/dev/sda', '/dev/sdb'])
expected = ['vgreduce', '--force', '--yes', 'foo', '/dev/sda', '/dev/sdb']
assert fake_run.calls[0]['args'][0] == expected
self.foo_volume = api.VolumeGroup(vg_name='foo', lv_tags='')
def test_no_name(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.create_vg('/dev/sda')
result = fake_run.calls[0]['args'][0]
assert '/dev/sda' in result
assert result[-2].startswith('ceph-')
def test_devices_list(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.create_vg(['/dev/sda', '/dev/sdb'], name='ceph')
result = fake_run.calls[0]['args'][0]
expected = ['vgcreate', '--force', '--yes', 'ceph', '/dev/sda', '/dev/sdb']
assert result == expected
def test_name_prefix(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.create_vg('/dev/sda', name_prefix='master')
result = fake_run.calls[0]['args'][0]
assert '/dev/sda' in result
assert result[-2].startswith('master-')
def test_specific_name(self, monkeypatch, fake_run):
- monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ monkeypatch.setattr(api, 'get_first_vg', lambda **kw: True)
api.create_vg('/dev/sda', name='master')
result = fake_run.calls[0]['args'][0]
assert '/dev/sda' in result
return volumes
-@pytest.fixture
-def volume_groups(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- vgs = lvm_api.VolumeGroups()
- vgs._purge()
- return vgs
-
-def volume_groups_empty(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- vgs = lvm_api.VolumeGroups(populate=False)
- return vgs
-
-@pytest.fixture
-def stub_vgs(monkeypatch, volume_groups):
- def apply(vgs):
- monkeypatch.setattr(lvm_api, 'get_api_vgs', lambda: vgs)
- return apply
-
-
@pytest.fixture
def is_root(monkeypatch):
"""
block_db_size=None, block_wal_size=None,
osd_ids=[])
monkeypatch.setattr(lvm, 'VolumeGroup', lambda x, **kw: [])
- monkeypatch.setattr(lvm, 'get_vgs', lambda: [])
bluestore.MixedType(args, [], [db_dev], [])
# uses a block.db size that has been configured via ceph.conf, instead of
# defaulting to 'as large as possible'
- def test_hdd_device_is_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_hdd_device_is_large_enough(self, fakedevice, factory, conf_ceph):
# 3GB block.db in ceph.conf
conf_ceph(get_safe=lambda *a: 3147483640)
args = factory(filtered_devices=[], osds_per_device=1,
assert osd['block.db']['path'] == 'vg: vg/lv'
assert osd['block.db']['percentage'] == 100
- def test_ssd_device_is_not_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_ssd_device_is_not_large_enough(self, fakedevice, factory, conf_ceph):
# 7GB block.db in ceph.conf
conf_ceph(get_safe=lambda *a: 7747483640)
args = factory(filtered_devices=[], osds_per_device=1,
expected = 'Not enough space in fast devices (5.66 GB) to create 1 x 7.22 GB block.db LV'
assert expected in str(error.value)
- def test_multi_hdd_device_is_not_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_multi_hdd_device_is_not_large_enough(self, fakedevice, factory, conf_ceph):
# 3GB block.db in ceph.conf
conf_ceph(get_safe=lambda *a: 3147483640)
args = factory(filtered_devices=[], osds_per_device=2,
class TestMixedTypeLargeAsPossible(object):
- def test_hdd_device_is_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_hdd_device_is_large_enough(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: None)
args = factory(filtered_devices=[], osds_per_device=1,
block_db_size=None, block_wal_size=None,
# as large as possible
assert osd['block.db']['percentage'] == 100
- def test_multi_hdd_device_is_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_multi_hdd_device_is_large_enough(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: None)
args = factory(filtered_devices=[], osds_per_device=2,
block_db_size=None, block_wal_size=None,
# as large as possible
assert osd['block.db']['percentage'] == 50
- def test_multi_hdd_device_is_not_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_multi_hdd_device_is_not_large_enough(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: None)
args = factory(filtered_devices=[], osds_per_device=2,
block_db_size=None, block_wal_size=None,
class TestMixedTypeWithExplicitDevices(object):
- def test_multi_hdd_device_is_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_multi_hdd_device_is_large_enough(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: None)
args = factory(filtered_devices=[], osds_per_device=2,
block_db_size=None, block_wal_size=None,
# as large as possible
assert osd['block.wal']['percentage'] == 50
- def test_wal_device_is_not_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_wal_device_is_not_large_enough(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: None)
args = factory(filtered_devices=[], osds_per_device=2,
block_db_size=None, block_wal_size=None,
class TestMixedType(object):
- def test_minimum_size_is_not_met(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_minimum_size_is_not_met(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: '120')
args = factory(filtered_devices=[], osds_per_device=1,
journal_size=None, osd_ids=[])
msg = "journal sizes must be larger than 2GB, detected: 120.00 MB"
assert msg in str(error.value)
- def test_ssd_device_is_not_large_enough(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_ssd_device_is_not_large_enough(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: '7120')
args = factory(filtered_devices=[], osds_per_device=1,
journal_size=None, osd_ids=[])
msg = "Not enough space in fast devices (5.66 GB) to create 1 x 6.95 GB journal LV"
assert msg in str(error.value)
- def test_hdd_device_is_lvm_member_fails(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_hdd_device_is_lvm_member_fails(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: '5120')
args = factory(filtered_devices=[], osds_per_device=1,
journal_size=None, osd_ids=[])
filestore.MixedType.with_auto_devices(args, devices)
assert 'Could not find a common VG between devices' in str(error.value)
- def test_ssd_device_fails_multiple_osds(self, stub_vgs, fakedevice, factory, conf_ceph):
+ def test_ssd_device_fails_multiple_osds(self, fakedevice, factory, conf_ceph):
conf_ceph(get_safe=lambda *a: '15120')
args = factory(filtered_devices=[], osds_per_device=2,
journal_size=None, osd_ids=[])