from itertools import repeat
from math import floor
from ceph_volume import process, util
-from ceph_volume.exceptions import (
- MultipleLVsError, SizeAllocationError
-)
+from ceph_volume.exceptions import SizeAllocationError
logger = logging.getLogger(__name__)
process.call(['lvchange', '-an', self.lv_path])
-class Volumes(list):
- """
- A list of all known (logical) volumes for the current system, with the ability
- to filter them via keyword arguments.
- """
-
- def __init__(self):
- self._populate()
-
- def _populate(self):
- # get all the lvs in the current system
- for lv_item in get_api_lvs():
- self.append(Volume(**lv_item))
-
- def _purge(self):
- """
- Delete all the items in the list, used internally only so that we can
- dynamically allocate the items when filtering without the concern of
- messing up the contents
- """
- self[:] = []
-
- def _filter(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
- """
- The actual method that filters using a new list. Useful so that other
- methods that do not want to alter the contents of the list (e.g.
- ``self.find``) can operate safely.
- """
- filtered = [i for i in self]
- if lv_name:
- filtered = [i for i in filtered if i.lv_name == lv_name]
-
- if vg_name:
- filtered = [i for i in filtered if i.vg_name == vg_name]
-
- if lv_uuid:
- filtered = [i for i in filtered if i.lv_uuid == lv_uuid]
-
- if lv_path:
- filtered = [i for i in filtered if i.lv_path == lv_path]
-
- # at this point, `filtered` has either all the volumes in self or is an
- # actual filtered list if any filters were applied
- if lv_tags:
- tag_filtered = []
- for volume in filtered:
- # all the tags we got need to match on the volume
- matches = all(volume.tags.get(k) == str(v) for k, v in lv_tags.items())
- if matches:
- tag_filtered.append(volume)
- return tag_filtered
-
- return filtered
-
- def filter(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
- """
- Filter out volumes on top level attributes like ``lv_name`` or by
- ``lv_tags`` where a dict is required. For example, to find a volume
- that has an OSD ID of 0, the filter would look like::
-
- lv_tags={'ceph.osd_id': '0'}
-
- """
- if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
- raise TypeError('.filter() requires lv_name, vg_name, lv_path, lv_uuid, or tags (none given)')
- # first find the filtered volumes with the values in self
- filtered_volumes = self._filter(
- lv_name=lv_name,
- vg_name=vg_name,
- lv_path=lv_path,
- lv_uuid=lv_uuid,
- lv_tags=lv_tags
- )
- # then purge everything
- self._purge()
- # and add the filtered items
- self.extend(filtered_volumes)
-
- def get(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
- """
- This is a bit expensive, since it will try to filter out all the
- matching items in the list, filter them out applying anything that was
- added and return the matching item.
-
- This method does *not* alter the list, and it will raise an error if
- multiple LVs are matched
-
- It is useful to use ``tags`` when trying to find a specific logical volume,
- but it can also lead to multiple lvs being found, since a lot of metadata
- is shared between lvs of a distinct OSD.
- """
- if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
- return None
- lvs = self._filter(
- lv_name=lv_name,
- vg_name=vg_name,
- lv_path=lv_path,
- lv_uuid=lv_uuid,
- lv_tags=lv_tags
- )
- if not lvs:
- return None
- if len(lvs) > 1:
- raise MultipleLVsError(lv_name, lv_path)
- return lvs[0]
-
-
def create_lv(name_prefix,
uuid,
vg=None,
]
process.run(command)
- lv = get_lv(lv_name=name, vg_name=vg.vg_name)
+ lv = get_first_lv(filters={'lv_name': name, 'vg_name': vg.vg_name})
if tags is None:
tags = {
return True
-def is_lv(dev, lvs=None):
- """
- Boolean to detect if a device is an LV or not.
- """
- splitname = dmsetup_splitname(dev)
- # Allowing to optionally pass `lvs` can help reduce repetitive checks for
- # multiple devices at once.
- if lvs is None or len(lvs) == 0:
- lvs = Volumes()
-
- if splitname.get('LV_NAME'):
- lvs.filter(lv_name=splitname['LV_NAME'], vg_name=splitname['VG_NAME'])
- return len(lvs) > 0
- return False
-
def get_lv_by_name(name):
stdout, stderr, returncode = process.call(
['lvs', '--noheadings', '-o', LV_FIELDS, '-S',
lvs = _output_parser(stdout, LV_FIELDS)
return [Volume(**lv) for lv in lvs]
-def get_lv(lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None, lvs=None):
- """
- Return a matching lv for the current system, requiring ``lv_name``,
- ``vg_name``, ``lv_path`` or ``tags``. Raises an error if more than one lv
- is found.
-
- It is useful to use ``tags`` when trying to find a specific logical volume,
- but it can also lead to multiple lvs being found, since a lot of metadata
- is shared between lvs of a distinct OSD.
- """
- if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
- return None
- if lvs is None:
- lvs = Volumes()
- return lvs.get(
- lv_name=lv_name, vg_name=vg_name, lv_path=lv_path, lv_uuid=lv_uuid,
- lv_tags=lv_tags
- )
-
-
-def get_lv_from_argument(argument):
- """
- Helper proxy function that consumes a possible logical volume passed in from the CLI
- in the form of `vg/lv`, but with some validation so that an argument that is a full
- path to a device can be ignored
- """
- if argument.startswith('/'):
- lv = get_lv(lv_path=argument)
- return lv
- try:
- vg_name, lv_name = argument.split('/')
- except (ValueError, AttributeError):
- return None
- return get_lv(lv_name=lv_name, vg_name=vg_name)
-
-
def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'):
"""
Create multiple Logical Volumes from a Volume Group by calculating the
def get_osd_device_path(osd_lvs, device_type, dmcrypt_secret=None):
"""
- ``device_type`` can be one of ``db``, ``wal`` or ``block`` so that
- we can query ``lvs`` (a ``Volumes`` object) and fallback to querying the uuid
- if that is not present.
+ ``device_type`` can be one of ``db``, ``wal`` or ``block`` so that we can
+ query LVs on system and fallback to querying the uuid if that is not
+ present.
- Return a path if possible, failing to do that a ``None``, since some of these devices
- are optional
+ Return a path if possible, failing to do that a ``None``, since some of
+ these devices are optional.
"""
osd_block_lv = None
for lv in osd_lvs:
return 'This command needs to be executed with sudo or as root'
-class MultipleLVsError(Exception):
-
- def __init__(self, lv_name, lv_path):
- self.lv_name = lv_name
- self.lv_path = lv_path
-
- def __str__(self):
- msg = "Got more than 1 result looking for %s with path: %s" % (self.lv_name, self.lv_path)
- return msg
-
-
class SizeAllocationError(Exception):
def __init__(self, requested, available):
assert result[1]['lv_name'] == 'ceph_lv'
-@pytest.fixture
-def volumes(monkeypatch):
- monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
- volumes = api.Volumes()
- volumes._purge()
- # also patch api.Volumes so that when it is called, it will use the newly
- # created fixture, with whatever the test method wants to append to it
- monkeypatch.setattr(api, 'Volumes', lambda: volumes)
- return volumes
-
-
class TestVolume(object):
def test_is_ceph_device(self):
api.Volume(lv_name='', lv_tags='')
-class TestVolumes(object):
-
- def test_volume_get_has_no_volumes(self, volumes):
- assert volumes.get() is None
-
- def test_volume_get_filtered_has_no_volumes(self, volumes):
- assert volumes.get(lv_name='ceph') is None
-
- def test_volume_has_multiple_matches(self, volumes):
- volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
- volumes.append(volume1)
- volumes.append(volume2)
- with pytest.raises(exceptions.MultipleLVsError):
- volumes.get(lv_name='foo')
-
- def test_as_dict_infers_type_from_tags(self, volumes):
- lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
- osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
- volumes.append(osd)
- result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
- assert result['type'] == 'data'
-
- def test_as_dict_populates_path_from_lv_api(self, volumes):
- lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
- osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
- volumes.append(osd)
- result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
- assert result['path'] == '/dev/vg/lv'
-
- def test_find_the_correct_one(self, volumes):
- volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
- volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
- volumes.append(volume1)
- volumes.append(volume2)
- assert volumes.get(lv_name='volume1') == volume1
-
- def test_filter_by_tag(self, volumes):
- lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
- osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
- journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
- volumes.append(osd)
- volumes.append(journal)
- volumes.filter(lv_tags={'ceph.type': 'data'})
- assert len(volumes) == 1
- assert volumes[0].lv_name == 'volume1'
-
- def test_filter_by_tag_does_not_match_one(self, volumes):
- lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
- osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
- journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal')
- volumes.append(osd)
- volumes.append(journal)
- # note the different osd_id!
- volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'})
- assert volumes == []
-
- def test_filter_by_vg_name(self, volumes):
- lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
- osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
- journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
- volumes.append(osd)
- volumes.append(journal)
- volumes.filter(vg_name='ceph_vg')
- assert len(volumes) == 1
- assert volumes[0].lv_name == 'volume1'
-
- def test_filter_by_lv_path(self, volumes):
- osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
- journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
- volumes.append(osd)
- volumes.append(journal)
- volumes.filter(lv_path='/dev/volume1')
- assert len(volumes) == 1
- assert volumes[0].lv_name == 'volume1'
-
- def test_filter_by_lv_uuid(self, volumes):
- osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
- journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
- volumes.append(osd)
- volumes.append(journal)
- volumes.filter(lv_uuid='1111')
- assert len(volumes) == 1
- assert volumes[0].lv_name == 'volume1'
-
- def test_filter_by_lv_uuid_nothing_found(self, volumes):
- osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
- journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
- volumes.append(osd)
- volumes.append(journal)
- volumes.filter(lv_uuid='22222')
- assert volumes == []
-
- def test_filter_requires_params(self, volumes):
- with pytest.raises(TypeError):
- volumes.filter()
-
-
class TestVolumeGroup(object):
def test_volume_group_no_empty_name(self):
self.vg.sizing(size=2048)
-class TestGetLVFromArgument(object):
-
- def setup(self):
- self.foo_volume = api.Volume(
- lv_name='foo', lv_path='/path/to/lv',
- vg_name='foo_group', lv_tags=''
- )
-
- def test_non_absolute_path_is_not_valid(self, volumes):
- volumes.append(self.foo_volume)
- assert api.get_lv_from_argument('foo') is None
-
- def test_too_many_slashes_is_invalid(self, volumes):
- volumes.append(self.foo_volume)
- assert api.get_lv_from_argument('path/to/lv') is None
-
- def test_absolute_path_is_not_lv(self, volumes):
- volumes.append(self.foo_volume)
- assert api.get_lv_from_argument('/path') is None
-
- def test_absolute_path_is_lv(self, volumes):
- volumes.append(self.foo_volume)
- assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume
-
-
class TestRemoveLV(object):
def test_removes_lv(self, monkeypatch):
@patch('ceph_volume.api.lvm.process.run')
@patch('ceph_volume.api.lvm.process.call')
- @patch('ceph_volume.api.lvm.get_lv')
- def test_uses_size(self, m_get_lv, m_call, m_run, monkeypatch):
- m_get_lv.return_value = self.foo_volume
+ @patch('ceph_volume.api.lvm.get_first_lv')
+ def test_uses_size(self, m_get_first_lv, m_call, m_run, monkeypatch):
+ m_get_first_lv.return_value = self.foo_volume
api.create_lv('foo', 0, vg=self.foo_group, size=5368709120, tags={'ceph.type': 'data'})
expected = ['lvcreate', '--yes', '-l', '1280', '-n', 'foo-0', 'foo_group']
m_run.assert_called_with(expected)
@patch('ceph_volume.api.lvm.process.run')
@patch('ceph_volume.api.lvm.process.call')
- @patch('ceph_volume.api.lvm.get_lv')
- def test_uses_extents(self, m_get_lv, m_call, m_run, monkeypatch):
- m_get_lv.return_value = self.foo_volume
+ @patch('ceph_volume.api.lvm.get_first_lv')
+ def test_uses_extents(self, m_get_first_lv, m_call, m_run, monkeypatch):
+ m_get_first_lv.return_value = self.foo_volume
api.create_lv('foo', 0, vg=self.foo_group, extents='50', tags={'ceph.type': 'data'})
expected = ['lvcreate', '--yes', '-l', '50', '-n', 'foo-0', 'foo_group']
m_run.assert_called_with(expected)
(3, 33),])
@patch('ceph_volume.api.lvm.process.run')
@patch('ceph_volume.api.lvm.process.call')
- @patch('ceph_volume.api.lvm.get_lv')
- def test_uses_slots(self, m_get_lv, m_call, m_run, monkeypatch, test_input, expected):
- m_get_lv.return_value = self.foo_volume
+ @patch('ceph_volume.api.lvm.get_first_lv')
+ def test_uses_slots(self, m_get_first_lv, m_call, m_run, monkeypatch, test_input, expected):
+ m_get_first_lv.return_value = self.foo_volume
api.create_lv('foo', 0, vg=self.foo_group, slots=test_input, tags={'ceph.type': 'data'})
expected = ['lvcreate', '--yes', '-l', str(expected), '-n', 'foo-0', 'foo_group']
m_run.assert_called_with(expected)
@patch('ceph_volume.api.lvm.process.run')
@patch('ceph_volume.api.lvm.process.call')
- @patch('ceph_volume.api.lvm.get_lv')
- def test_uses_all(self, m_get_lv, m_call, m_run, monkeypatch):
- m_get_lv.return_value = self.foo_volume
+ @patch('ceph_volume.api.lvm.get_first_lv')
+ def test_uses_all(self, m_get_first_lv, m_call, m_run, monkeypatch):
+ m_get_first_lv.return_value = self.foo_volume
api.create_lv('foo', 0, vg=self.foo_group, tags={'ceph.type': 'data'})
expected = ['lvcreate', '--yes', '-l', '100%FREE', '-n', 'foo-0', 'foo_group']
m_run.assert_called_with(expected)
@patch('ceph_volume.api.lvm.process.run')
@patch('ceph_volume.api.lvm.process.call')
@patch('ceph_volume.api.lvm.Volume.set_tags')
- @patch('ceph_volume.api.lvm.get_lv')
- def test_calls_to_set_tags_default(self, m_get_lv, m_set_tags, m_call, m_run, monkeypatch):
- m_get_lv.return_value = self.foo_volume
+ @patch('ceph_volume.api.lvm.get_first_lv')
+ def test_calls_to_set_tags_default(self, m_get_first_lv, m_set_tags, m_call, m_run, monkeypatch):
+ m_get_first_lv.return_value = self.foo_volume
api.create_lv('foo', 0, vg=self.foo_group)
tags = {
"ceph.osd_id": "null",
@patch('ceph_volume.api.lvm.process.run')
@patch('ceph_volume.api.lvm.process.call')
@patch('ceph_volume.api.lvm.Volume.set_tags')
- @patch('ceph_volume.api.lvm.get_lv')
- def test_calls_to_set_tags_arg(self, m_get_lv, m_set_tags, m_call, m_run, monkeypatch):
- m_get_lv.return_value = self.foo_volume
+ @patch('ceph_volume.api.lvm.get_first_lv')
+ def test_calls_to_set_tags_arg(self, m_get_first_lv, m_set_tags, m_call, m_run, monkeypatch):
+ m_get_first_lv.return_value = self.foo_volume
api.create_lv('foo', 0, vg=self.foo_group, tags={'ceph.type': 'data'})
tags = {
"ceph.type": "data",
@patch('ceph_volume.api.lvm.process.call')
@patch('ceph_volume.api.lvm.get_device_vgs')
@patch('ceph_volume.api.lvm.create_vg')
- @patch('ceph_volume.api.lvm.get_lv')
- def test_create_vg(self, m_get_lv, m_create_vg, m_get_device_vgs, m_call,
+ @patch('ceph_volume.api.lvm.get_first_lv')
+ def test_create_vg(self, m_get_first_lv, m_create_vg, m_get_device_vgs, m_call,
m_run, monkeypatch):
- m_get_lv.return_value = self.foo_volume
+ m_get_first_lv.return_value = self.foo_volume
m_get_device_vgs.return_value = []
api.create_lv('foo', 0, device='dev/foo', size='5G', tags={'ceph.type': 'data'})
m_create_vg.assert_called_with('dev/foo', name_prefix='ceph')
assert '/dev/mapper' not in result['VG_NAME']
-class TestIsLV(object):
-
- def test_is_not_an_lv(self, monkeypatch):
- monkeypatch.setattr(api.process, 'call', lambda x, **kw: ('', '', 0))
- monkeypatch.setattr(api, 'dmsetup_splitname', lambda x, **kw: {})
- assert api.is_lv('/dev/sda1', lvs=[]) is False
-
- def test_lvs_not_found(self, monkeypatch, volumes):
- CephVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
- volumes.append(CephVolume)
- splitname = {'LV_NAME': 'data', 'VG_NAME': 'ceph'}
- monkeypatch.setattr(api, 'dmsetup_splitname', lambda x, **kw: splitname)
- assert api.is_lv('/dev/sda1', lvs=volumes) is False
-
- def test_is_lv(self, monkeypatch, volumes):
- CephVolume = api.Volume(
- vg_name='ceph', lv_name='data',
- lv_path='/dev/vg/foo', lv_tags="ceph.type=data"
- )
- volumes.append(CephVolume)
- splitname = {'LV_NAME': 'data', 'VG_NAME': 'ceph'}
- monkeypatch.setattr(api, 'dmsetup_splitname', lambda x, **kw: splitname)
- assert api.is_lv('/dev/sda1', lvs=volumes) is True
-
class TestGetDeviceVgs(object):
@patch('ceph_volume.process.call')
from mock.mock import patch, PropertyMock
from ceph_volume.util import disk
from ceph_volume.util.constants import ceph_disk_guids
-from ceph_volume.api import lvm as lvm_api
from ceph_volume import conf, configuration
return apply
-@pytest.fixture
-def volumes(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- volumes = lvm_api.Volumes()
- volumes._purge()
- return volumes
-
-
@pytest.fixture
def is_root(monkeypatch):
"""
monkeypatch.setattr("ceph_volume.util.disk.udevadm_property", lambda *a, **kw: {})
-@pytest.fixture
-def disable_lvm_queries(monkeypatch):
- '''
- This speeds up calls to Device and Disk
- '''
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name, lv_uuid: None)
-
-
@pytest.fixture(params=[
'', 'ceph data', 'ceph journal', 'ceph block',
'ceph block.wal', 'ceph block.db', 'ceph lockbox'])
if not devices:
monkeypatch.setattr("ceph_volume.util.device.lvm.get_first_lv", lambda filters: lv)
else:
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
monkeypatch.setattr("ceph_volume.util.device.lvm.get_device_lvs",
lambda path: [lv])
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name, lv_uuid: lv)
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", lambda path: lsblk)
monkeypatch.setattr("ceph_volume.util.device.disk.blkid", lambda path: blkid)
monkeypatch.setattr("ceph_volume.util.disk.udevadm_property", lambda *a, **kw: udevadm)
@patch('ceph_volume.devices.lvm.strategies.strategies.MixedStrategy.get_common_vg')
def test_ssd_is_lvm_member_doesnt_fail(self,
patched_get_common_vg,
- volumes,
fakedevice,
factory,
conf_ceph):
assert result['journal']['human_readable_size'] == '5.00 GB'
@patch('ceph_volume.api.lvm.get_device_vgs')
- def test_no_common_vg(self, patched_get_device_vgs, volumes, fakedevice, factory, conf_ceph):
+ def test_no_common_vg(self, patched_get_device_vgs, fakedevice, factory, conf_ceph):
patched_get_device_vgs.side_effect = lambda x: [lvm.VolumeGroup(vg_name='{}'.format(x[-1]), vg_tags='')]
ssd1 = fakedevice(
used_by_ceph=False, is_lvm_member=True, rotational=False, sys_api=dict(size=6073740000)
# test the negative side effect with an actual functional run, so we must
# setup a perfect scenario for this test to check it can really work
# with/without osd_id
- def test_no_osd_id_matches_fsid(self, is_root, volumes, monkeypatch, capture):
+ def test_no_osd_id_matches_fsid(self, is_root, monkeypatch, capture):
FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo',
lv_tags="ceph.osd_fsid=1234")
+ volumes = []
volumes.append(FooVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: volumes)
monkeypatch.setattr(activate, 'activate_filestore', capture)
activate.Activate([]).activate(args)
assert capture.calls[0]['args'][0] == [FooVolume]
- def test_no_osd_id_matches_fsid_bluestore(self, is_root, volumes, monkeypatch, capture):
+ def test_no_osd_id_matches_fsid_bluestore(self, is_root, monkeypatch, capture):
FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo',
lv_tags="ceph.osd_fsid=1234")
+ volumes = []
volumes.append(FooVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: volumes)
monkeypatch.setattr(activate, 'activate_bluestore', capture)
activate.Activate([]).activate(args)
assert capture.calls[0]['args'][0] == [FooVolume]
- def test_no_osd_id_no_matching_fsid(self, is_root, volumes, monkeypatch, capture):
+ def test_no_osd_id_no_matching_fsid(self, is_root, monkeypatch, capture):
FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo',
lv_tags="ceph.osd_fsid=1111")
+ volumes = []
volumes.append(FooVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: [])
monkeypatch.setattr(api, 'get_first_lv', lambda **kwargs: [])
with pytest.raises(RuntimeError):
activate.Activate([]).activate(args)
- def test_filestore_no_systemd(self, is_root, volumes, monkeypatch, capture):
+ def test_filestore_no_systemd(self, is_root, monkeypatch, capture):
monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
fake_enable = Capture()
fake_start_osd = Capture()
lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
"journal,ceph.journal_uuid=000,ceph.type=data," + \
"ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
volumes.append(JournalVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
assert fake_enable.calls == []
assert fake_start_osd.calls == []
- def test_filestore_no_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ def test_filestore_no_systemd_autodetect(self, is_root, monkeypatch, capture):
monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
fake_enable = Capture()
fake_start_osd = Capture()
lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
"journal,ceph.journal_uuid=000,ceph.type=data," + \
"ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
volumes.append(JournalVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
assert fake_enable.calls == []
assert fake_start_osd.calls == []
- def test_filestore_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ def test_filestore_systemd_autodetect(self, is_root, monkeypatch, capture):
fake_enable = Capture()
fake_start_osd = Capture()
monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
"journal,ceph.journal_uuid=000,ceph.type=data," + \
"ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
volumes.append(JournalVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
assert fake_enable.calls != []
assert fake_start_osd.calls != []
- def test_filestore_systemd(self, is_root, volumes, monkeypatch, capture):
+ def test_filestore_systemd(self, is_root, monkeypatch, capture):
fake_enable = Capture()
fake_start_osd = Capture()
monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
"journal,ceph.journal_uuid=000,ceph.type=data," + \
"ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
volumes.append(JournalVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
assert fake_enable.calls != []
assert fake_start_osd.calls != []
- def test_bluestore_no_systemd(self, is_root, volumes, monkeypatch, capture):
+ def test_bluestore_no_systemd(self, is_root, monkeypatch, capture):
fake_enable = Capture()
fake_start_osd = Capture()
monkeypatch.setattr('ceph_volume.util.system.path_is_mounted', lambda *a, **kw: True)
lv_path='/dev/vg/data',
lv_tags="ceph.cluster_name=ceph,,ceph.journal_uuid=000," + \
"ceph.type=block,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
assert fake_enable.calls == []
assert fake_start_osd.calls == []
- def test_bluestore_systemd(self, is_root, volumes, monkeypatch, capture):
+ def test_bluestore_systemd(self, is_root, monkeypatch, capture):
fake_enable = Capture()
fake_start_osd = Capture()
monkeypatch.setattr('ceph_volume.util.system.path_is_mounted', lambda *a, **kw: True)
lv_path='/dev/vg/data',
lv_tags="ceph.cluster_name=ceph,,ceph.journal_uuid=000," + \
"ceph.type=block,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
assert fake_enable.calls != []
assert fake_start_osd.calls != []
- def test_bluestore_no_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ def test_bluestore_no_systemd_autodetect(self, is_root, monkeypatch, capture):
fake_enable = Capture()
fake_start_osd = Capture()
monkeypatch.setattr('ceph_volume.util.system.path_is_mounted', lambda *a, **kw: True)
lv_path='/dev/vg/data',
lv_tags="ceph.cluster_name=ceph,,ceph.block_uuid=000," + \
"ceph.type=block,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
assert fake_enable.calls == []
assert fake_start_osd.calls == []
- def test_bluestore_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ def test_bluestore_systemd_autodetect(self, is_root, monkeypatch, capture):
fake_enable = Capture()
fake_start_osd = Capture()
monkeypatch.setattr('ceph_volume.util.system.path_is_mounted',
lv_path='/dev/vg/data',
lv_tags="ceph.cluster_name=ceph,,ceph.journal_uuid=000," + \
"ceph.type=block,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes = []
volumes.append(DataVolume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
class TestList(object):
- def test_empty_full_json_zero_exit_status(self, is_root, volumes,
- factory, capsys):
+ def test_empty_full_json_zero_exit_status(self, is_root,factory,capsys):
args = factory(format='json', device=None)
lvm.listing.List([]).list(args)
stdout, stderr = capsys.readouterr()
assert stdout == '{}\n'
- def test_empty_device_json_zero_exit_status(self, is_root, volumes,
- factory, capsys):
+ def test_empty_device_json_zero_exit_status(self, is_root,factory,capsys):
args = factory(format='json', device='/dev/sda1')
lvm.listing.List([]).list(args)
stdout, stderr = capsys.readouterr()
assert stdout == '{}\n'
- def test_empty_full_zero_exit_status(self, is_root, volumes, factory):
+ def test_empty_full_zero_exit_status(self, is_root, factory):
args = factory(format='pretty', device=None)
with pytest.raises(SystemExit):
lvm.listing.List([]).list(args)
- def test_empty_device_zero_exit_status(self, is_root, volumes, factory):
+ def test_empty_device_zero_exit_status(self, is_root, factory):
args = factory(format='pretty', device='/dev/sda1')
with pytest.raises(SystemExit):
lvm.listing.List([]).list(args)
class TestFullReport(object):
- def test_no_ceph_lvs(self, volumes, monkeypatch):
+ def test_no_ceph_lvs(self, monkeypatch):
# ceph lvs are detected by looking into its tags
osd = api.Volume(lv_name='volume1', lv_path='/dev/VolGroup/lv',
lv_tags={})
+ volumes = []
volumes.append(osd)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
volumes)
result = lvm.listing.List([]).full_report()
assert result == {}
- def test_ceph_data_lv_reported(self, volumes, monkeypatch):
+ def test_ceph_data_lv_reported(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
pv = api.PVolume(pv_name='/dev/sda1', pv_tags={}, pv_uuid="0000",
vg_name='VolGroup', lv_uuid="aaaa")
osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags,
lv_path='/dev/VolGroup/lv', vg_name='VolGroup')
+ volumes = []
volumes.append(osd)
monkeypatch.setattr(lvm.listing.api, 'get_first_pv', lambda **kwargs: pv)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
result = lvm.listing.List([]).full_report()
assert result['0'][0]['name'] == 'volume1'
- def test_ceph_journal_lv_reported(self, volumes, monkeypatch):
+ def test_ceph_journal_lv_reported(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
journal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
pv = api.PVolume(pv_name='/dev/sda1', pv_tags={}, pv_uuid="0000",
journal = api.Volume(
lv_name='journal', lv_uuid='x', lv_tags=journal_tags,
lv_path='/dev/VolGroup/journal', vg_name='VolGroup')
+ volumes = []
volumes.append(osd)
volumes.append(journal)
monkeypatch.setattr(lvm.listing.api,'get_first_pv',lambda **kwargs:pv)
assert result['0'][0]['name'] == 'volume1'
assert result['0'][1]['name'] == 'journal'
- def test_ceph_wal_lv_reported(self, volumes, monkeypatch):
+ def test_ceph_wal_lv_reported(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
wal_tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=wal'
osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags,
lv_path='/dev/VolGroup/lv', vg_name='VolGroup')
wal = api.Volume(lv_name='wal', lv_uuid='x', lv_tags=wal_tags,
lv_path='/dev/VolGroup/wal', vg_name='VolGroup')
+ volumes = []
volumes.append(osd)
volumes.append(wal)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
class TestSingleReport(object):
- def test_not_a_ceph_lv(self, volumes, monkeypatch):
+ def test_not_a_ceph_lv(self, monkeypatch):
# ceph lvs are detected by looking into its tags
lv = api.Volume(lv_name='lv', lv_tags={}, lv_path='/dev/VolGroup/lv',
vg_name='VolGroup')
result = lvm.listing.List([]).single_report('VolGroup/lv')
assert result == {}
- def test_report_a_ceph_lv(self, volumes, monkeypatch):
+ def test_report_a_ceph_lv(self, monkeypatch):
# ceph lvs are detected by looking into its tags
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
lv = api.Volume(lv_name='lv', vg_name='VolGroup', lv_uuid='aaaa',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
volumes.append(lv)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
volumes)
assert result['0'][0]['type'] == 'journal'
assert result['0'][0]['path'] == '/dev/sda1'
- def test_report_a_ceph_lv_with_devices(self, volumes, monkeypatch):
+ def test_report_a_ceph_lv_with_devices(self, monkeypatch):
pvolumes = []
tags = 'ceph.osd_id=0,ceph.type=data'
pvolumes.append(pv1)
pvolumes.append(pv2)
+
+ volumes = []
lv = api.Volume(lv_name='lv', vg_name='VolGroup',lv_uuid='aaaa',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
volumes.append(lv)
assert result['0'][0]['path'] == '/dev/VolGroup/lv'
assert result['0'][0]['devices'] == ['/dev/sda1', '/dev/sdb1']
- def test_report_a_ceph_lv_with_no_matching_devices(self, volumes,
- monkeypatch):
+ def test_report_a_ceph_lv_with_no_matching_devices(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.type=data'
lv = api.Volume(lv_name='lv', vg_name='VolGroup', lv_uuid='aaaa',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
volumes.append(lv)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
volumes)
import pytest
from copy import deepcopy
from mock.mock import patch, call
+from ceph_volume import process
from ceph_volume.api import lvm as api
from ceph_volume.devices.lvm import zap
class TestFindAssociatedDevices(object):
- def test_no_lvs_found_that_match_id(self, volumes, monkeypatch, device_info):
+ def test_no_lvs_found_that_match_id(self, monkeypatch, device_info):
tags = 'ceph.osd_id=9,ceph.journal_uuid=x,ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg',
lv_tags=tags, lv_path='/dev/VolGroup/lv')
+ volumes = []
volumes.append(osd)
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
with pytest.raises(RuntimeError):
zap.find_associated_devices(osd_id=10)
- def test_no_lvs_found_that_match_fsid(self, volumes, monkeypatch, device_info):
+ def test_no_lvs_found_that_match_fsid(self, monkeypatch, device_info):
tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,'+\
'ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags,
vg_name='vg', lv_path='/dev/VolGroup/lv')
+ volumes = []
volumes.append(osd)
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
with pytest.raises(RuntimeError):
zap.find_associated_devices(osd_fsid='aaaa-lkjh')
- def test_no_lvs_found_that_match_id_fsid(self, volumes, monkeypatch, device_info):
+ def test_no_lvs_found_that_match_id_fsid(self, monkeypatch, device_info):
tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,'+\
'ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg',
lv_tags=tags, lv_path='/dev/VolGroup/lv')
+ volumes = []
volumes.append(osd)
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
with pytest.raises(RuntimeError):
zap.find_associated_devices(osd_id='9', osd_fsid='aaaa-lkjh')
- def test_no_ceph_lvs_found(self, volumes, monkeypatch):
+ def test_no_ceph_lvs_found(self, monkeypatch):
osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags='',
lv_path='/dev/VolGroup/lv')
+ volumes = []
volumes.append(osd)
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
with pytest.raises(RuntimeError):
zap.find_associated_devices(osd_id=100)
- def test_lv_is_matched_id(self, volumes, monkeypatch):
+ def test_lv_is_matched_id(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
volumes.append(osd)
- monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs:
- deepcopy(volumes))
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
+ monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
result = zap.find_associated_devices(osd_id='0')
assert result[0].abspath == '/dev/VolGroup/lv'
- def test_lv_is_matched_fsid(self, volumes, monkeypatch):
+ def test_lv_is_matched_fsid(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
'ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
volumes.append(osd)
- monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs:
- deepcopy(volumes))
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: deepcopy(volumes))
+ monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
result = zap.find_associated_devices(osd_fsid='asdf-lkjh')
assert result[0].abspath == '/dev/VolGroup/lv'
- def test_lv_is_matched_id_fsid(self, volumes, monkeypatch):
+ def test_lv_is_matched_id_fsid(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
'ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
volumes.append(osd)
- monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs:
- deepcopy(volumes))
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
+ monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
result = zap.find_associated_devices(osd_id='0', osd_fsid='asdf-lkjh')
assert result[0].abspath == '/dev/VolGroup/lv'
class TestEnsureAssociatedLVs(object):
- def test_nothing_is_found(self, volumes):
+ def test_nothing_is_found(self):
+ volumes = []
result = zap.ensure_associated_lvs(volumes)
assert result == []
- def test_data_is_found(self, volumes):
+ def test_data_is_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data'
osd = api.Volume(
lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/data', lv_tags=tags)
+ volumes = []
volumes.append(osd)
result = zap.ensure_associated_lvs(volumes)
assert result == ['/dev/VolGroup/data']
- def test_block_is_found(self, volumes):
+ def test_block_is_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block'
osd = api.Volume(
lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags)
+ volumes = []
volumes.append(osd)
result = zap.ensure_associated_lvs(volumes)
assert result == ['/dev/VolGroup/block']
out, err = capsys.readouterr()
assert "Zapping successful for OSD: 1" in err
- def test_block_and_partition_are_found(self, volumes, monkeypatch):
+ def test_block_and_partition_are_found(self, monkeypatch):
monkeypatch.setattr(zap.disk, 'get_device_from_partuuid', lambda x: '/dev/sdb1')
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block'
osd = api.Volume(
lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags)
+ volumes = []
volumes.append(osd)
result = zap.ensure_associated_lvs(volumes)
assert '/dev/sdb1' in result
assert '/dev/VolGroup/block' in result
- def test_journal_is_found(self, volumes):
+ def test_journal_is_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
osd = api.Volume(
lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
volumes.append(osd)
result = zap.ensure_associated_lvs(volumes)
assert result == ['/dev/VolGroup/lv']
- def test_multiple_journals_are_found(self, volumes):
+ def test_multiple_journals_are_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
+ volumes = []
for i in range(3):
osd = api.Volume(
lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
assert '/dev/VolGroup/lv1' in result
assert '/dev/VolGroup/lv2' in result
- def test_multiple_dbs_are_found(self, volumes):
+ def test_multiple_dbs_are_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=db'
+ volumes = []
for i in range(3):
osd = api.Volume(
lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
assert '/dev/VolGroup/lv1' in result
assert '/dev/VolGroup/lv2' in result
- def test_multiple_wals_are_found(self, volumes):
+ def test_multiple_wals_are_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=wal'
+ volumes = []
for i in range(3):
osd = api.Volume(
lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
assert '/dev/VolGroup/lv1' in result
assert '/dev/VolGroup/lv2' in result
- def test_multiple_backing_devs_are_found(self, volumes):
+ def test_multiple_backing_devs_are_found(self):
+ volumes = []
for _type in ['journal', 'db', 'wal']:
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=%s' % _type
osd = api.Volume(
class TestDevice(object):
- def test_sys_api(self, volumes, monkeypatch, device_info):
+ def test_sys_api(self, monkeypatch, device_info):
volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes = []
volumes.append(volume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
deepcopy(volumes))
assert disk.sys_api
assert "foo" in disk.sys_api
- def test_lvm_size(self, volumes, monkeypatch, device_info):
+ def test_lvm_size(self, monkeypatch, device_info):
volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes = []
volumes.append(volume)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
deepcopy(volumes))
disk = device.Device("/dev/sda")
assert disk.lvm_size.gb == 4
- def test_lvm_size_rounds_down(self, device_info, volumes):
+ def test_lvm_size_rounds_down(self, device_info):
# 5.5GB in size
data = {"/dev/sda": {"size": "5905580032"}}
lsblk = {"TYPE": "disk"}
assert not disk.is_mapper
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
assert "Used by ceph-disk" in disk.rejected_reasons
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
assert "Has BlueStore device label" in disk.rejected_reasons
@pytest.mark.usefixtures("device_info_not_ceph_disk_member",
- "disable_lvm_queries",
"disable_kernel_queries")
def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert not disk.available_raw
@pytest.mark.parametrize("ceph_type", ["data", "block"])
- def test_used_by_ceph(self, device_info, volumes,
+ def test_used_by_ceph(self, device_info,
monkeypatch, ceph_type):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
"lv_uuid": "0000", "lv_tags":
"ceph.osd_id=0,ceph.type="+ceph_type}
+ volumes = []
lv = api.Volume(**lv_data)
volumes.append(lv)
monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
assert disk.partlabel == 'ceph data'
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_member_blkid(self, monkeypatch, patch_bluestore_label):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.is_member is True
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_member_lsblk(self, patch_bluestore_label, device_info):
lsblk = {"TYPE": "disk", "PARTLABEL": "ceph"}
device_info(lsblk=lsblk)
ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_type_lsblk(self, device_info, ceph_partlabel):
disk = device.CephDiskDevice(device.Device("/dev/sda"))