from ceph_volume import process, util
from ceph_volume.exceptions import (
MultipleLVsError, MultipleVGsError,
- MultiplePVsError, SizeAllocationError
+ SizeAllocationError
)
logger = logging.getLogger(__name__)
self.set_tag(k, v)
# after setting all the tags, refresh them for the current object, use the
# pv_* identifiers to filter because those shouldn't change
- pv_object = get_pv(pv_name=self.pv_name, pv_uuid=self.pv_uuid)
+ pv_object = self.get_first_pv(filter={'pv_name': self.pv_name,
+ 'pv_uuid': self.pv_uuid})
self.tags = pv_object.tags
def set_tag(self, key, value):
)
-class PVolumes(list):
- """
- A list of all known (physical) volumes for the current system, with the ability
- to filter them via keyword arguments.
- """
-
- def __init__(self, populate=True):
- if populate:
- self._populate()
-
- def _populate(self):
- # get all the pvs in the current system
- for pv_item in get_api_pvs():
- self.append(PVolume(**pv_item))
-
- def _purge(self):
- """
- Deplete all the items in the list, used internally only so that we can
- dynamically allocate the items when filtering without the concern of
- messing up the contents
- """
- self[:] = []
-
- def _filter(self, pv_name=None, pv_uuid=None, pv_tags=None):
- """
- The actual method that filters using a new list. Useful so that other
- methods that do not want to alter the contents of the list (e.g.
- ``self.find``) can operate safely.
- """
- filtered = [i for i in self]
- if pv_name:
- filtered = [i for i in filtered if i.pv_name == pv_name]
-
- if pv_uuid:
- filtered = [i for i in filtered if i.pv_uuid == pv_uuid]
-
- # at this point, `filtered` has either all the physical volumes in self
- # or is an actual filtered list if any filters were applied
- if pv_tags:
- tag_filtered = []
- for pvolume in filtered:
- matches = all(pvolume.tags.get(k) == str(v) for k, v in pv_tags.items())
- if matches:
- tag_filtered.append(pvolume)
- # return the tag_filtered pvolumes here, the `filtered` list is no
- # longer usable
- return tag_filtered
-
- return filtered
-
- def filter(self, pv_name=None, pv_uuid=None, pv_tags=None):
- """
- Filter out volumes on top level attributes like ``pv_name`` or by
- ``pv_tags`` where a dict is required. For example, to find a physical
- volume that has an OSD ID of 0, the filter would look like::
-
- pv_tags={'ceph.osd_id': '0'}
-
- """
- if not any([pv_name, pv_uuid, pv_tags]):
- raise TypeError('.filter() requires pv_name, pv_uuid, or pv_tags'
- '(none given)')
-
- filtered_pvs = PVolumes(populate=False)
- filtered_pvs.extend(self._filter(pv_name, pv_uuid, pv_tags))
- return filtered_pvs
-
- def get(self, pv_name=None, pv_uuid=None, pv_tags=None):
- """
- This is a bit expensive, since it will try to filter out all the
- matching items in the list, filter them out applying anything that was
- added and return the matching item.
-
- This method does *not* alter the list, and it will raise an error if
- multiple pvs are matched
-
- It is useful to use ``tags`` when trying to find a specific logical volume,
- but it can also lead to multiple pvs being found, since a lot of metadata
- is shared between pvs of a distinct OSD.
- """
- if not any([pv_name, pv_uuid, pv_tags]):
- return None
- pvs = self._filter(
- pv_name=pv_name,
- pv_uuid=pv_uuid,
- pv_tags=pv_tags
- )
- if not pvs:
- return None
- if len(pvs) > 1 and pv_tags:
- raise MultiplePVsError(pv_name)
- return pvs[0]
-
-
def create_pv(device):
"""
Create a physical volume from a device, useful when devices need to be later mapped
)
-def get_pv(pv_name=None, pv_uuid=None, pv_tags=None, pvs=None):
- """
- Return a matching pv (physical volume) for the current system, requiring
- ``pv_name``, ``pv_uuid``, or ``pv_tags``. Raises an error if more than one
- pv is found.
- """
- if not any([pv_name, pv_uuid, pv_tags]):
- return None
- if pvs is None or len(pvs) == 0:
- pvs = PVolumes()
-
- return pvs.get(pv_name=pv_name, pv_uuid=pv_uuid, pv_tags=pv_tags)
-
-
################################
#
# Code for LVM Volume Groups
return 'This command needs to be executed with sudo or as root'
-class MultiplePVsError(Exception):
-
- def __init__(self, pv_name):
- self.pv_name = pv_name
-
- def __str__(self):
- msg = "Got more than 1 result looking for physical volume: %s" % self.pv_name
- return msg
-
-
class MultipleLVsError(Exception):
def __init__(self, lv_name, lv_path):
return vgs
-class TestGetLV(object):
-
- def test_nothing_is_passed_in(self):
- # so we return a None
- assert api.get_lv() is None
-
- def test_single_lv_is_matched(self, volumes, monkeypatch):
- FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
- volumes.append(FooVolume)
- monkeypatch.setattr(api, 'Volumes', lambda: volumes)
- assert api.get_lv(lv_name='foo') == FooVolume
-
- def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch):
- FooVolume = api.Volume(
- lv_name='foo', lv_path='/dev/vg/foo',
- lv_uuid='1111', lv_tags="ceph.type=data")
- volumes.append(FooVolume)
- monkeypatch.setattr(api, 'Volumes', lambda: volumes)
- assert api.get_lv(lv_uuid='1111') == FooVolume
-
-
-class TestGetPV(object):
-
- def test_nothing_is_passed_in(self):
- # so we return a None
- assert api.get_pv() is None
-
- def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, vg_name="vg")
- pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
- assert api.get_pv(pv_uuid='foo') is None
-
- def test_single_pv_is_matched(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
- pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
- assert api.get_pv(pv_uuid='0000') == FooPVolume
-
- def test_multiple_pvs_is_matched_by_uuid(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, lv_uuid="0000000")
- BarPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
- pvolumes.append(FooPVolume)
- pvolumes.append(BarPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
- assert api.get_pv(pv_uuid='0000') == FooPVolume
-
- def test_multiple_pvs_is_matched_by_name(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, lv_uuid="0000000")
- BarPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
- pvolumes.append(FooPVolume)
- pvolumes.append(BarPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
- assert api.get_pv(pv_name='/dev/sda') == FooPVolume
-
- def test_multiple_pvs_is_matched_by_tags(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(vg_name="vg1", pv_name='/dev/sdc', pv_uuid="1000", pv_tags="ceph.foo=bar", lv_uuid="0000000")
- BarPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags="ceph.foo=bar")
- pvolumes.append(FooPVolume)
- pvolumes.append(BarPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
- with pytest.raises(exceptions.MultiplePVsError):
- api.get_pv(pv_tags={"ceph.foo": "bar"})
-
- def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(
- pv_name='/dev/vg/foo',
- pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
- pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
- assert api.get_pv(pv_uuid='1111') == FooPVolume
-
- def test_vg_name_is_set(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(
- pv_name='/dev/vg/foo',
- pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
- pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
- pv = api.get_pv(pv_name="/dev/vg/foo")
- assert pv.vg_name == "vg"
-
-
-class TestPVolumes(object):
-
- def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch):
- pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
- FooPVolume = api.PVolume(
- pv_name='/dev/vg/foo',
- pv_uuid='1111', pv_tags=pv_tags, vg_name='vg')
- pvolumes.append(FooPVolume)
- assert pvolumes.filter(pv_tags={'ceph.type': 'journal',
- 'ceph.osd_id': '2'}) == []
-
- def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
- pv_tags = "ceph.type=journal,ceph.osd_id=1"
- FooPVolume = api.PVolume(
- pv_name='/dev/vg/foo',
- pv_uuid='1111', pv_tags=pv_tags, vg_name="vg")
- pvolumes.append(FooPVolume)
- assert pvolumes.filter(pv_tags={'ceph.type': 'journal',
- 'ceph.osd_id': '1'}) == [FooPVolume]
-
-
class TestGetVG(object):
def test_nothing_is_passed_in(self):
return apply
-# TODO: allow init-ing pvolumes to list we want
-@pytest.fixture
-def pvolumes(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- pvolumes = lvm_api.PVolumes()
- pvolumes._purge()
- return pvolumes
-
-@pytest.fixture
-def pvolumes_empty(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- pvolumes = lvm_api.PVolumes(populate=False)
- return pvolumes
-
-
-
@pytest.fixture
def is_root(monkeypatch):
"""
result = lvm.listing.List([]).full_report()
assert result == {}
- def test_ceph_data_lv_reported(self, pvolumes, volumes, monkeypatch):
+ def test_ceph_data_lv_reported(self, volumes, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
pv = api.PVolume(pv_name='/dev/sda1', pv_tags={}, pv_uuid="0000",
vg_name='VolGroup', lv_uuid="aaaa")
osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags,
lv_path='/dev/VolGroup/lv', vg_name='VolGroup')
- pvolumes.append(pv)
volumes.append(osd)
- monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs:
- pvolumes)
+ monkeypatch.setattr(lvm.listing.api, 'get_first_pv', lambda **kwargs: pv)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
volumes)
result = lvm.listing.List([]).full_report()
assert result['0'][0]['name'] == 'volume1'
- def test_ceph_journal_lv_reported(self, pvolumes, volumes, monkeypatch):
+ def test_ceph_journal_lv_reported(self, volumes, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
journal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
pv = api.PVolume(pv_name='/dev/sda1', pv_tags={}, pv_uuid="0000",
journal = api.Volume(
lv_name='journal', lv_uuid='x', lv_tags=journal_tags,
lv_path='/dev/VolGroup/journal', vg_name='VolGroup')
- pvolumes.append(pv)
volumes.append(osd)
volumes.append(journal)
- monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs:
- pvolumes)
+ monkeypatch.setattr(lvm.listing.api,'get_first_pv',lambda **kwargs:pv)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
volumes)
result = lvm.listing.List([]).single_report('VolGroup/lv')
assert result == {}
- def test_report_a_ceph_lv(self, pvolumes, volumes, monkeypatch):
+ def test_report_a_ceph_lv(self, volumes, monkeypatch):
# ceph lvs are detected by looking into its tags
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
lv = api.Volume(lv_name='lv', vg_name='VolGroup', lv_uuid='aaaa',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
volumes.append(lv)
- monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs:
- pvolumes)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
volumes)
assert result['0'][0]['type'] == 'journal'
assert result['0'][0]['path'] == '/dev/sda1'
- def test_report_a_ceph_lv_with_devices(self, volumes, pvolumes, monkeypatch):
+ def test_report_a_ceph_lv_with_devices(self, volumes, monkeypatch):
+ pvolumes = []
+
tags = 'ceph.osd_id=0,ceph.type=data'
pv1 = api.PVolume(vg_name="VolGroup", pv_name='/dev/sda1',
pv_uuid='', pv_tags={}, lv_uuid="aaaa")
pv2 = api.PVolume(vg_name="VolGroup", pv_name='/dev/sdb1',
pv_uuid='', pv_tags={}, lv_uuid="aaaa")
- lv = api.Volume(lv_name='lv', vg_name='VolGroup',lv_uuid='aaaa',
- lv_path='/dev/VolGroup/lv', lv_tags=tags)
pvolumes.append(pv1)
pvolumes.append(pv2)
+
+ lv = api.Volume(lv_name='lv', vg_name='VolGroup',lv_uuid='aaaa',
+ lv_path='/dev/VolGroup/lv', lv_tags=tags)
volumes.append(lv)
+
monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs:
pvolumes)
monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs:
disk = device.Device("vg/lv")
assert disk.is_lv
- def test_vgs_is_empty(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_vgs_is_empty(self, device_info, monkeypatch):
BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
pv_tags={})
+ pvolumes = []
pvolumes.append(BarPVolume)
lsblk = {"TYPE": "disk"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/nvme0n1")
assert len(disk.vgs) == 1
- def test_device_is_device(self, device_info, pvolumes):
+ def test_device_is_device(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_device is True
- def test_device_is_rotational(self, device_info, pvolumes):
+ def test_device_is_rotational(self, device_info):
data = {"/dev/sda": {"rotational": "1"}}
lsblk = {"TYPE": "device"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.rotational
- def test_device_is_not_rotational(self, device_info, pvolumes):
+ def test_device_is_not_rotational(self, device_info):
data = {"/dev/sda": {"rotational": "0"}}
lsblk = {"TYPE": "device"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.rotational
- def test_device_is_rotational_lsblk(self, device_info, pvolumes):
+ def test_device_is_rotational_lsblk(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device", "ROTA": "1"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.rotational
- def test_device_is_not_rotational_lsblk(self, device_info, pvolumes):
+ def test_device_is_not_rotational_lsblk(self, device_info):
data = {"/dev/sda": {"rotational": "0"}}
lsblk = {"TYPE": "device", "ROTA": "0"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.rotational
- def test_device_is_rotational_defaults_true(self, device_info, pvolumes):
+ def test_device_is_rotational_defaults_true(self, device_info):
# rotational will default true if no info from sys_api or lsblk is found
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device", "foo": "bar"}
disk = device.Device("/dev/sda")
assert disk.rotational
- def test_disk_is_device(self, device_info, pvolumes):
+ def test_disk_is_device(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "disk"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_device is True
- def test_is_partition(self, device_info, pvolumes):
+ def test_is_partition(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_partition
- def test_is_not_lvm_memeber(self, device_info, pvolumes):
+ def test_is_not_acceptable_device(self, device_info):
+ data = {"/dev/dm-0": {"foo": "bar"}}
+ lsblk = {"TYPE": "mpath"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/dm-0")
+ assert not disk.is_device
+
+ def test_is_not_lvm_memeber(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.is_lvm_member
- def test_is_lvm_memeber(self, device_info, pvolumes):
+ def test_is_lvm_memeber(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
device_info(devices=data, lsblk=lsblk)
assert not disk.available_raw
@pytest.mark.parametrize("ceph_type", ["data", "block"])
- def test_used_by_ceph(self, device_info, volumes, pvolumes, pvolumes_empty,
+ def test_used_by_ceph(self, device_info, volumes,
monkeypatch, ceph_type):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes = []
pvolumes.append(FooPVolume)
lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
"lv_uuid": "0000", "lv_tags":
"ceph.osd_id=0,ceph.type="+ceph_type}
lv = api.Volume(**lv_data)
volumes.append(lv)
- monkeypatch.setattr(api, 'get_pvs', lambda **kwargs:
- deepcopy(pvolumes))
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
deepcopy(volumes))
disk = device.Device("/dev/sda")
assert disk.used_by_ceph
- def test_not_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_not_used_by_ceph(self, device_info, monkeypatch):
FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes = []
pvolumes.append(FooPVolume)
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
- monkeypatch.setattr(api, 'get_pvs', lambda **kwargs:
- deepcopy(pvolumes))
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
device_info(devices=data, lsblk=lsblk, lv=lv_data)
disk = device.Device("/dev/sda")
class TestDeviceEncryption(object):
- def test_partition_is_not_encrypted_lsblk(self, device_info, pvolumes):
+ def test_partition_is_not_encrypted_lsblk(self, device_info):
lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
- def test_partition_is_encrypted_lsblk(self, device_info, pvolumes):
+ def test_partition_is_encrypted_lsblk(self, device_info):
lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
- def test_partition_is_not_encrypted_blkid(self, device_info, pvolumes):
+ def test_partition_is_not_encrypted_blkid(self, device_info):
lsblk = {'TYPE': 'part'}
blkid = {'TYPE': 'ceph data'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
- def test_partition_is_encrypted_blkid(self, device_info, pvolumes):
+ def test_partition_is_encrypted_blkid(self, device_info):
lsblk = {'TYPE': 'part'}
blkid = {'TYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
- def test_mapper_is_encrypted_luks1(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_encrypted_luks1(self, device_info, monkeypatch):
status = {'type': 'LUKS1'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
- def test_mapper_is_encrypted_luks2(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_encrypted_luks2(self, device_info, monkeypatch):
status = {'type': 'LUKS2'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
- def test_mapper_is_encrypted_plain(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_encrypted_plain(self, device_info, monkeypatch):
status = {'type': 'PLAIN'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
- def test_mapper_is_not_encrypted_plain(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_not_encrypted_plain(self, device_info, monkeypatch):
monkeypatch.setattr(device, 'encryption_status', lambda x: {})
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is False
- def test_lv_is_encrypted_blkid(self, device_info, pvolumes):
+ def test_lv_is_encrypted_blkid(self, device_info):
lsblk = {'TYPE': 'lvm'}
blkid = {'TYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = {}
assert disk.is_encrypted is True
- def test_lv_is_not_encrypted_blkid(self, factory, device_info, pvolumes):
+ def test_lv_is_not_encrypted_blkid(self, factory, device_info):
lsblk = {'TYPE': 'lvm'}
blkid = {'TYPE': 'xfs'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = factory(encrypted=None)
assert disk.is_encrypted is False
- def test_lv_is_encrypted_lsblk(self, device_info, pvolumes):
+ def test_lv_is_encrypted_lsblk(self, device_info):
lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = {}
assert disk.is_encrypted is True
- def test_lv_is_not_encrypted_lsblk(self, factory, device_info, pvolumes):
+ def test_lv_is_not_encrypted_lsblk(self, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = factory(encrypted=None)
assert disk.is_encrypted is False
- def test_lv_is_encrypted_lvm_api(self, factory, device_info, pvolumes):
+ def test_lv_is_encrypted_lvm_api(self, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = factory(encrypted=True)
assert disk.is_encrypted is True
- def test_lv_is_not_encrypted_lvm_api(self, factory, device_info, pvolumes):
+ def test_lv_is_not_encrypted_lvm_api(self, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)