return dev
return mock_lv
-def mock_device():
+def mock_device(name='foo',
+ vg_name='vg_foo',
+ vg_size=None,
+ lv_name='lv_foo',
+ lv_size=None,
+ path='foo',
+ lv_path='',
+ number_lvs=0):
dev = create_autospec(device.Device)
- dev.path = '/dev/foo'
- dev.vg_name = 'vg_foo'
- dev.lv_name = 'lv_foo'
+ if vg_size is None:
+ dev.vg_size = [21474836480]
+ if lv_size is None:
+ lv_size = dev.vg_size
+ dev.lv_size = lv_size
+ dev.path = f'/dev/{path}'
+ dev.vg_name = f'{vg_name}'
+ dev.lv_name = f'{lv_name}'
+ dev.lv_path = lv_path if lv_path else f'/dev/{dev.vg_name}/{dev.lv_name}'
dev.symlink = None
dev.vgs = [lvm.VolumeGroup(vg_name=dev.vg_name, lv_name=dev.lv_name)]
dev.available_lvm = True
- dev.vg_size = [21474836480]
dev.vg_free = dev.vg_size
dev.lvs = []
+ for n in range(0, number_lvs):
+ dev.lvs.append(lvm.Volume(vg_name=f'{dev.vg_name}{n}',
+ lv_name=f'{dev.lv_name}-{n}',
+ lv_path=f'{dev.lv_path}-{n}',
+ lv_size=dev.lv_size,
+ lv_tags=''))
+ dev.is_device = True
return dev
@pytest.fixture(params=range(1,4))
ret.append(dev)
return ret
+@pytest.fixture(params=range(2,5))
+def mock_devices_available_multi_pvs_per_vg(request):
+ ret = []
+ number_lvs = 1
+ # for n in range(0, 2):
+ for n in range(0, request.param):
+ if n == request.param - 1:
+ number_lvs = 2
+ dev = mock_device(path=f'foo{str(n)}',
+ vg_name='vg_foo',
+ lv_name=f'lv_foo{str(n)}',
+ lv_size=[21474836480],
+ number_lvs=number_lvs)
+ # after v15.2.8, a single VG is created for each PV
+ dev.vgs = [lvm.VolumeGroup(vg_name=dev.vg_name,
+ pv_name=dev.path,
+ pv_count=request.param)]
+ ret.append(dev)
+ return ret
+
@pytest.fixture
def mock_device_generator():
return mock_device
from argparse import ArgumentError
from mock import MagicMock, patch
-from ceph_volume.api import lvm
from ceph_volume.devices.lvm import batch
from ceph_volume.util import arg_validators
for (_, _, slot_size, _) in fasts:
assert slot_size == expected_slot_size
- def test_get_physical_fast_allocs_abs_size_multi_pvs_per_vg(self, factory,
- conf_ceph_stub,
- mock_devices_available):
+ def test_get_physical_fast_allocs_abs_size_multi_pvs_per_vg(self,
+ factory,
+ conf_ceph_stub,
+ mock_device_generator,
+ mock_devices_available_multi_pvs_per_vg):
conf_ceph_stub('[global]\nfsid=asdf-lkjh')
- args = factory(block_db_slots=None, get_block_db_size=None)
- dev_size = 21474836480
- num_devices = len(mock_devices_available)
+ data_devices = []
+ # existing_osds = sum([len(dev.lvs) for dev in mock_devices_available_multi_pvs_per_vg])
+ for i in range(len(mock_devices_available_multi_pvs_per_vg)+2):
+ data_devices.append(mock_device_generator(name='data',
+ vg_name=f'vg_foo_data{str(i)}',
+ lv_name=f'lv_foo_data{str(i)}'))
+ args = factory(block_db_slots=None,
+ block_db_size=None,
+ devices=[dev.lv_path for dev in data_devices])
+ dev_size = 53687091200
+ num_devices = len(mock_devices_available_multi_pvs_per_vg)
vg_size = dev_size * num_devices
- vg_name = 'vg_foo'
- for dev in mock_devices_available:
- dev.vg_name = vg_name
- dev.vg_size = [vg_size]
- dev.vg_free = dev.vg_size
- dev.vgs = [lvm.VolumeGroup(vg_name=dev.vg_name, lv_name=dev.lv_name)]
- slots_per_device = 2
- slots_per_vg = slots_per_device * num_devices
- fasts = batch.get_physical_fast_allocs(mock_devices_available,
- 'block_db', slots_per_device, 2, args)
- expected_slot_size = int(vg_size / slots_per_vg)
+ vg_free = vg_size
+ for dev in mock_devices_available_multi_pvs_per_vg:
+ for lv in dev.lvs:
+ vg_free -= lv.lv_size[0]
+ dev.vg_size = [vg_size] # override the `vg_size` set in mock_device() since it's 1VG that has multiple PVs
+ for dev in mock_devices_available_multi_pvs_per_vg:
+ dev.vg_free = [vg_free] # override the `vg_free` set in mock_device() since it's 1VG that has multiple PVs
+ b = batch.Batch([])
+ b.args = args
+ new_osds = len(data_devices) - len(mock_devices_available_multi_pvs_per_vg)
+ fasts = b.fast_allocations(mock_devices_available_multi_pvs_per_vg,
+ len(data_devices),
+ new_osds,
+ 'block_db')
+ expected_slot_size = int(vg_size / len(data_devices))
for (_, _, slot_size, _) in fasts:
assert slot_size == expected_slot_size
- def test_batch_fast_allocations_one_block_db_length(self, factory, conf_ceph_stub,
- mock_lv_device_generator):
+ def test_batch_fast_allocations_one_block_db_length(self,
+ factory, conf_ceph_stub,
+ mock_device_generator):
conf_ceph_stub('[global]\nfsid=asdf-lkjh')
b = batch.Batch([])
- db_lv_devices = [mock_lv_device_generator()]
- fast = b.fast_allocations(db_lv_devices, 1, 0, 'block_db')
+ db_device = [mock_device_generator()]
+ fast = b.fast_allocations(db_device, 1, 1, 'block_db')
assert len(fast) == 1
@pytest.mark.parametrize('occupied_prior', range(7))
mock_device_generator):
conf_ceph_stub('[global]\nfsid=asdf-lkjh')
occupied_prior = min(occupied_prior, slots)
- devs = [mock_device_generator() for _ in range(num_devs)]
+ devs = [mock_device_generator(lv_name=f'foo{n}') for n in range(slots)]
+ dev_paths = [dev.path for dev in devs]
+ fast_devs = [mock_device_generator(lv_name=f'ssd{n}') for n in range(num_devs)]
already_assigned = 0
while already_assigned < occupied_prior:
dev_i = random.randint(0, num_devs - 1)
- dev = devs[dev_i]
+ dev = fast_devs[dev_i]
if len(dev.lvs) < occupied_prior:
dev.lvs.append('foo')
dev.path = '/dev/bar'
- already_assigned = sum([len(d.lvs) for d in devs])
- args = factory(block_db_slots=None, get_block_db_size=None)
- expected_num_osds = max(len(devs) * slots - occupied_prior, 0)
- fast = batch.get_physical_fast_allocs(devs,
+ already_assigned = sum([len(dev.lvs) for dev in fast_devs])
+ args = factory(block_db_slots=None, get_block_db_size=None, devices=dev_paths)
+ expected_num_osds = max(len(fast_devs) * slots - occupied_prior, 0)
+ fast = batch.get_physical_fast_allocs(fast_devs,
'block_db', slots,
expected_num_osds, args)
assert len(fast) == expected_num_osds
- expected_assignment_on_used_devices = sum([slots - len(d.lvs) for d in devs if len(d.lvs) > 0])
+ expected_assignment_on_used_devices = sum([slots - len(d.lvs) for d in fast_devs if len(d.lvs) > 0])
assert len([f for f in fast if f[0] == '/dev/bar']) == expected_assignment_on_used_devices
assert len([f for f in fast if f[0] != '/dev/bar']) == expected_num_osds - expected_assignment_on_used_devices
import pytest
from ceph_volume.devices import lvm
from ceph_volume.api import lvm as api
+from mock import patch, Mock
# TODO: add tests for following commands -
# ceph-volume list
stdout, stderr = capsys.readouterr()
assert stdout == '{}\n'
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_empty_device_json_zero_exit_status(self, is_root,factory,capsys):
args = factory(format='json', device='/dev/sda1')
lvm.listing.List([]).list(args)
with pytest.raises(SystemExit):
lvm.listing.List([]).list(args)
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_empty_device_zero_exit_status(self, is_root, factory):
args = factory(format='pretty', device='/dev/sda1')
with pytest.raises(SystemExit):
class TestFullReport(object):
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_no_ceph_lvs(self, monkeypatch):
# ceph lvs are detected by looking into its tags
osd = api.Volume(lv_name='volume1', lv_path='/dev/VolGroup/lv',
result = lvm.listing.List([]).full_report()
assert result == {}
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_ceph_data_lv_reported(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
pv = api.PVolume(pv_name='/dev/sda1', pv_tags={}, pv_uuid="0000",
result = lvm.listing.List([]).full_report()
assert result['0'][0]['name'] == 'volume1'
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_ceph_journal_lv_reported(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
journal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
assert result['0'][0]['name'] == 'volume1'
assert result['0'][1]['name'] == 'journal'
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_ceph_wal_lv_reported(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
wal_tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=wal'
assert result['0'][0]['name'] == 'volume1'
assert result['0'][1]['name'] == 'wal'
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
@pytest.mark.parametrize('type_', ['journal', 'db', 'wal'])
def test_physical_2nd_device_gets_reported(self, type_, monkeypatch):
tags = ('ceph.osd_id=0,ceph.{t}_uuid=x,ceph.type=data,'
class TestSingleReport(object):
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_not_a_ceph_lv(self, monkeypatch):
# ceph lvs are detected by looking into its tags
lv = api.Volume(lv_name='lv', lv_tags={}, lv_path='/dev/VolGroup/lv',
result = lvm.listing.List([]).single_report('VolGroup/lv')
assert result == {}
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_a_ceph_lv(self, monkeypatch):
# ceph lvs are detected by looking into its tags
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
assert result['0'][0]['path'] == '/dev/VolGroup/lv'
assert result['0'][0]['devices'] == []
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_a_ceph_journal_device(self, monkeypatch):
# ceph lvs are detected by looking into its tags
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,' + \
assert result['0'][0]['path'] == '/dev/VolGroup/lv'
assert result['0'][0]['devices'] == ['/dev/sda1', '/dev/sdb1']
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_by_osd_id_for_just_block_dev(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=block'
lvs = [ api.Volume(lv_name='lv1', lv_tags=tags, lv_path='/dev/vg/lv1',
assert result['0'][0]['lv_path'] == '/dev/vg/lv1'
assert result['0'][0]['vg_name'] == 'vg'
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_by_osd_id_for_just_data_dev(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
lvs = [ api.Volume(lv_name='lv1', lv_tags=tags, lv_path='/dev/vg/lv1',
assert result['0'][0]['lv_path'] == '/dev/vg/lv1'
assert result['0'][0]['vg_name'] == 'vg'
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_by_osd_id_for_just_block_wal_and_db_dev(self, monkeypatch):
tags1 = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=block'
tags2 = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=wal'
assert result['0'][2]['lv_path'] == '/dev/vg/lv3'
assert result['0'][2]['vg_name'] == 'vg'
-
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_by_osd_id_for_data_and_journal_dev(self, monkeypatch):
tags1 = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
tags2 = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
assert result['0'][1]['lv_path'] == '/dev/vg/lv2'
assert result['0'][1]['vg_name'] == 'vg'
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_by_nonexistent_osd_id(self, monkeypatch):
lv = api.Volume(lv_name='lv', lv_tags={}, lv_path='/dev/VolGroup/lv',
vg_name='VolGroup')
result = lvm.listing.List([]).single_report('1')
assert result == {}
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_report_a_ceph_lv_with_no_matching_devices(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.type=data'
lv = api.Volume(lv_name='lv', vg_name='VolGroup', lv_uuid='aaaa',
import pytest
-from mock.mock import patch
+from mock.mock import patch, Mock
from ceph_volume import process
from ceph_volume.api import lvm as api
from ceph_volume.devices.lvm import migrate
from ceph_volume.util.device import Device
from ceph_volume.util import system
from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.devices.lvm.zap import Zap
+
class TestGetClusterName(object):
expected = 'This command needs to be executed with sudo or as root'
assert expected in str(error.value)
- @patch('os.getuid')
- def test_newdb_not_target_lvm(self, m_getuid, capsys):
- m_getuid.return_value = 0
+ @patch('ceph_volume.api.lvm.get_lv_by_fullname', Mock(return_value=None))
+ def test_newdb_not_target_lvm(self, is_root, capsys):
with pytest.raises(SystemExit) as error:
migrate.NewDB(argv=[
'--osd-id', '1',
assert not stderr
- @patch('os.getuid')
- def test_migrate_data_db_to_new_db(self, m_getuid, monkeypatch):
- m_getuid.return_value = 0
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_to_new_db(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
+ m_zap.assert_called_once()
+ @patch.object(Zap, 'main')
@patch('os.getuid')
- def test_migrate_data_db_to_new_db_encrypted(self, m_getuid, monkeypatch):
+ def test_migrate_data_db_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
+ m_zap.assert_called_once()
+
def test_migrate_data_db_to_new_db_active_systemd(self, is_root, monkeypatch, capsys):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
assert '--> OSD is running, stop it with: systemctl stop ceph-osd@2' == stderr.rstrip()
assert not stdout
- def test_migrate_data_db_to_new_db_no_systemd(self, is_root, monkeypatch):
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_to_new_db_no_systemd(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
- @patch('os.getuid')
- def test_migrate_data_db_to_new_db_skip_wal(self, m_getuid, monkeypatch):
- m_getuid.return_value = 0
+ m_zap.assert_called_once()
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_to_new_db_skip_wal(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
- @patch('os.getuid')
- def test_migrate_data_db_wal_to_new_db(self, m_getuid, monkeypatch):
- m_getuid.return_value = 0
+ m_zap.assert_called_once()
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_wal_to_new_db(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ assert len(m_zap.mock_calls) == 2
+
+ @patch.object(Zap, 'main')
@patch('os.getuid')
- def test_migrate_data_db_wal_to_new_db_encrypted(self, m_getuid, monkeypatch):
+ def test_migrate_data_db_wal_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ assert len(m_zap.mock_calls) == 2
+
@patch('os.getuid')
def test_dont_migrate_data_db_wal_to_new_data(self,
m_getuid,
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block']
- @patch('os.getuid')
+ @patch.object(Zap, 'main')
def test_migrate_data_wal_to_db(self,
- m_getuid,
+ m_zap,
+ is_root,
monkeypatch,
capsys):
- m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ m_zap.assert_called_once()
+
+ @patch.object(Zap, 'main')
@patch('os.getuid')
def test_migrate_wal_to_db(self,
- m_getuid,
- monkeypatch,
- capsys):
+ m_getuid,
+ m_zap,
+ monkeypatch,
+ capsys):
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ m_zap.assert_called_once()
+
+ @patch.object(Zap, 'main')
@patch('os.getuid')
def test_migrate_data_wal_to_db_encrypted(self,
m_getuid,
+ m_zap,
monkeypatch,
capsys):
m_getuid.return_value = 0
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ m_zap.assert_called_once()
+
def test_migrate_data_wal_to_db_active_systemd(self, is_root, monkeypatch, capsys):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
assert '--> OSD is running, stop it with: systemctl stop ceph-osd@2' == stderr.rstrip()
assert not stdout
- def test_migrate_data_wal_to_db_no_systemd(self, is_root, monkeypatch):
+ @patch.object(Zap, 'main')
+ def test_migrate_data_wal_to_db_no_systemd(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+
+ m_zap.assert_called_once()
import os
import pytest
from copy import deepcopy
-from mock.mock import patch, call
+from mock.mock import patch, call, Mock
from ceph_volume import process
from ceph_volume.api import lvm as api
from ceph_volume.devices.lvm import zap
class TestEnsureAssociatedLVs(object):
+ @patch('ceph_volume.devices.lvm.zap.api', Mock(return_value=[]))
def test_nothing_is_found(self):
volumes = []
result = zap.ensure_associated_lvs(volumes)
out, err = capsys.readouterr()
assert "Zapping successful for OSD: 1" in err
- def test_block_and_partition_are_found(self, monkeypatch):
- monkeypatch.setattr(zap.disk, 'get_device_from_partuuid', lambda x: '/dev/sdb1')
- tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block'
- osd = api.Volume(
- lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags)
- volumes = []
- volumes.append(osd)
- result = zap.ensure_associated_lvs(volumes)
- assert '/dev/sdb1' in result
- assert '/dev/VolGroup/block' in result
-
def test_journal_is_found(self, fake_call):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
osd = api.Volume(
result = zap.ensure_associated_lvs(volumes)
assert result == ['/dev/VolGroup/lv']
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_multiple_journals_are_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
volumes = []
assert '/dev/VolGroup/lv1' in result
assert '/dev/VolGroup/lv2' in result
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_multiple_dbs_are_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=db'
volumes = []
assert '/dev/VolGroup/lv1' in result
assert '/dev/VolGroup/lv2' in result
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_multiple_wals_are_found(self):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=wal'
volumes = []
assert '/dev/VolGroup/lv1' in result
assert '/dev/VolGroup/lv2' in result
+ @patch('ceph_volume.api.lvm.process.call', Mock(return_value=('', '', 0)))
def test_multiple_backing_devs_are_found(self):
volumes = []
for _type in ['journal', 'db', 'wal']:
def test_ensure_associated_lvs(self, m_get_lvs):
zap.ensure_associated_lvs([], lv_tags={'ceph.osd_id': '1'})
calls = [
- call(tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}),
call(tags={'ceph.type': 'db', 'ceph.osd_id': '1'}),
call(tags={'ceph.type': 'wal', 'ceph.osd_id': '1'})
]
m_get_lvs.assert_has_calls(calls, any_order=True)
-
class TestWipeFs(object):
def setup_method(self):
"/dev/sdb3": {},
"/dev/sdc": {},
"/dev/sdd": {},
+ "/dev/sde": {},
+ "/dev/sde1": {},
"/dev/mapper/ceph--osd--block--1": {},
"/dev/mapper/ceph--osd--block--2": {},
}
def _lsblk_all_devices(abspath=True):
return [
- {"NAME": "/dev/sda", "KNAME": "/dev/sda", "PKNAME": ""},
- {"NAME": "/dev/sda1", "KNAME": "/dev/sda1", "PKNAME": "/dev/sda"},
- {"NAME": "/dev/sda2", "KNAME": "/dev/sda2", "PKNAME": "/dev/sda"},
- {"NAME": "/dev/sda3", "KNAME": "/dev/sda3", "PKNAME": "/dev/sda"},
- {"NAME": "/dev/sdb", "KNAME": "/dev/sdb", "PKNAME": ""},
- {"NAME": "/dev/sdb2", "KNAME": "/dev/sdb2", "PKNAME": "/dev/sdb"},
- {"NAME": "/dev/sdb3", "KNAME": "/dev/sdb3", "PKNAME": "/dev/sdb"},
- {"NAME": "/dev/sdc", "KNAME": "/dev/sdc", "PKNAME": ""},
- {"NAME": "/dev/sdd", "KNAME": "/dev/sdd", "PKNAME": ""},
- {"NAME": "/dev/mapper/ceph--osd--block--1", "KNAME": "/dev/mapper/ceph--osd--block--1", "PKNAME": "/dev/sdd"},
- {"NAME": "/dev/mapper/ceph--osd--block--2", "KNAME": "/dev/mapper/ceph--osd--block--2", "PKNAME": "/dev/sdd"},
+ {"NAME": "/dev/sda", "KNAME": "/dev/sda", "PKNAME": "", "TYPE": "disk"},
+ {"NAME": "/dev/sda1", "KNAME": "/dev/sda1", "PKNAME": "/dev/sda", "TYPE": "part"},
+ {"NAME": "/dev/sda2", "KNAME": "/dev/sda2", "PKNAME": "/dev/sda", "TYPE": "part"},
+ {"NAME": "/dev/sda3", "KNAME": "/dev/sda3", "PKNAME": "/dev/sda", "TYPE": "part"},
+ {"NAME": "/dev/sdb", "KNAME": "/dev/sdb", "PKNAME": "", "TYPE": "disk"},
+ {"NAME": "/dev/sdb2", "KNAME": "/dev/sdb2", "PKNAME": "/dev/sdb", "TYPE": "part"},
+ {"NAME": "/dev/sdb3", "KNAME": "/dev/sdb3", "PKNAME": "/dev/sdb", "TYPE": "part"},
+ {"NAME": "/dev/sdc", "KNAME": "/dev/sdc", "PKNAME": "", "TYPE": "disk"},
+ {"NAME": "/dev/sdd", "KNAME": "/dev/sdd", "PKNAME": "", "TYPE": "disk"},
+ {"NAME": "/dev/sde", "KNAME": "/dev/sde", "PKNAME": "", "TYPE": "disk"},
+ {"NAME": "/dev/sde1", "KNAME": "/dev/sde1", "PKNAME": "/dev/sde", "TYPE": "part"},
+ {"NAME": "/dev/mapper/ceph--osd--block--1", "KNAME": "/dev/mapper/ceph--osd--block--1", "PKNAME": "/dev/sdd", "TYPE": "lvm"},
+ {"NAME": "/dev/mapper/ceph--osd--block--2", "KNAME": "/dev/mapper/ceph--osd--block--2", "PKNAME": "/dev/sdd", "TYPE": "lvm"},
]
# dummy lsblk output for device with optional parent output
}
}'''
+def _bluestore_tool_label_output_sde1():
+ return '''{
+ "/dev/sde1": {
+ "osd_uuid": "sde1-uuid",
+ "size": 214747316224,
+ "btime": "2023-07-26T13:20:19.509457+0000",
+ "description": "main",
+ "bfm_blocks": "268435456",
+ "bfm_blocks_per_key": "128",
+ "bfm_bytes_per_block": "4096",
+ "bfm_size": "214747316224",
+ "bluefs": "1",
+ "ceph_fsid": "sde1-fsid",
+ "kv_backend": "rocksdb",
+ "magic": "ceph osd volume v026",
+ "mkfs_done": "yes",
+ "osd_key": "AQCSHcFkUeLIMBAAjKqANkXafjvVISkXt6FGCA==",
+ "ready": "ready",
+ "require_osd_release": "16",
+ "whoami": "1"
+ }
+}'''
+
def _bluestore_tool_label_output_dm_okay():
return '''{
"/dev/mapper/ceph--osd--block--1": {
return _lsblk_output(dev, parent="/dev/sdb"), '', 0
if dev == "/dev/sda" or dev == "/dev/sdb" or dev == "/dev/sdc" or dev == "/dev/sdd":
return _lsblk_output(dev), '', 0
+ if dev == "/dev/sde1":
+ return _lsblk_output(dev, parent="/dev/sde"), '', 0
if "mapper" in dev:
return _lsblk_output(dev, parent="/dev/sdd"), '', 0
pytest.fail('dev {} needs behavior specified for it'.format(dev))
if "/dev/sdb2" in command:
# sdb2 is a phantom atari partition that appears to have some valid bluestore info
return _bluestore_tool_label_output_sdb2(), '', 0
+ if "/dev/sde1" in command:
+ return _bluestore_tool_label_output_sde1(), '', 0
if "/dev/mapper/ceph--osd--block--1" in command:
# dm device 1 is a valid bluestore OSD (the other is corrupted/invalid)
return _bluestore_tool_label_output_dm_okay(), '', 0
return False # empty disk
if disk_path == "/dev/sdd":
return False # has LVM subdevices
+ if disk_path == "/dev/sde":
+ return False # has partitions, it means it shouldn't be an OSD
+ if disk_path == "/dev/sde1":
+ return True # is a valid OSD
if disk_path == "/dev/mapper/ceph--osd--block--1":
return True # good OSD
if disk_path == "/dev/mapper/ceph--osd--block--2":
return False # corrupted
pytest.fail('device {} needs behavior specified for it'.format(disk_path))
+
class TestList(object):
@patch('ceph_volume.util.device.disk.get_devices')
assert sdb['device'] == '/dev/sdb'
assert sdb['ceph_fsid'] == 'sdb-fsid'
assert sdb['type'] == 'bluestore'
-
lvm1 = result['lvm-1-uuid']
assert lvm1['osd_uuid'] == 'lvm-1-uuid'
assert lvm1['osd_id'] == 2
assert lvm1['device'] == '/dev/mapper/ceph--osd--block--1'
assert lvm1['ceph_fsid'] == 'lvm-1-fsid'
assert lvm1['type'] == 'bluestore'
+ sde1 = result['sde1-uuid']
+ assert sde1['osd_uuid'] == 'sde1-uuid'
+ assert sde1['osd_id'] == 1
+ assert sde1['device'] == '/dev/sde1'
+ assert sde1['ceph_fsid'] == 'sde1-fsid'
+ assert sde1['type'] == 'bluestore'
@patch('ceph_volume.util.device.disk.get_devices')
@patch('ceph_volume.util.disk.has_bluestore_label')
patched_get_devices.side_effect = _devices_side_effect
result = raw.list.List([]).generate()
- assert len(result) == 3
+ assert len(result) == 2
assert 'sdb-uuid' in result
import pytest
from ceph_volume.devices import raw
-from mock.mock import patch
+from mock.mock import patch, MagicMock
class TestRaw(object):
assert 'Path to bluestore block.wal block device' in stdout
assert 'Enable device encryption via dm-crypt' in stdout
+ @patch('ceph_volume.util.prepare.create_key', return_value='fake-secret')
+ @patch('ceph_volume.util.arg_validators.set_dmcrypt_no_workqueue', return_value=MagicMock())
@patch('ceph_volume.util.arg_validators.ValidRawDevice.__call__')
- def test_prepare_dmcrypt_no_secret_passed(self, m_valid_device, capsys):
+ def test_prepare_dmcrypt_no_secret_passed(self,
+ m_valid_device,
+ m_set_dmcrypt_no_workqueue,
+ m_create_key,
+ capsys):
m_valid_device.return_value = '/dev/foo'
with pytest.raises(SystemExit):
raw.prepare.Prepare(argv=['--bluestore', '--data', '/dev/foo', '--dmcrypt']).main()
import os
import pytest
from ceph_volume.devices.simple import activate
+from mock.mock import patch
class TestActivate(object):
- def test_no_data_uuid(self, factory, is_root, monkeypatch, capture, fake_filesystem):
+ @patch('ceph_volume.decorators.os.getuid', return_value=0)
+ def test_no_data_uuid(self, m_getuid, factory, capture, fake_filesystem):
fake_filesystem.create_file('/tmp/json-config', contents='{}')
args = factory(osd_id='0', osd_fsid='1234', json_config='/tmp/json-config')
with pytest.raises(RuntimeError):
with pytest.raises(exceptions.SuperUserError):
self.validator('')
- def test_path_is_not_a_directory(self, is_root, monkeypatch, fake_filesystem):
+ def test_path_is_not_a_directory(self, monkeypatch, fake_filesystem):
fake_file = fake_filesystem.create_file('/tmp/foo')
+ monkeypatch.setattr('ceph_volume.decorators.os.getuid', lambda : 0)
monkeypatch.setattr(arg_validators.disk, 'is_partition', lambda x: False)
- validator = arg_validators.OSDPath()
with pytest.raises(argparse.ArgumentError):
- validator(fake_file.path)
+ self.validator(fake_file.path)
def test_files_are_missing(self, is_root, tmpdir, monkeypatch):
tmppath = str(tmpdir)
result = disk.get_devices(_sys_block_path=str(tmpdir))
assert result == {}
- def test_sda_block_is_found(self, patched_get_block_devs_sysfs, fake_filesystem):
+ @patch('ceph_volume.util.disk.udevadm_property')
+ def test_sda_block_is_found(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
- patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
+ patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
result = disk.get_devices()
assert len(result.keys()) == 1
assert result[sda_path]['human_readable_size'] == '0.00 B'
assert result[sda_path]['model'] == ''
assert result[sda_path]['partitions'] == {}
- def test_sda_size(self, patched_get_block_devs_sysfs, fake_filesystem):
+ @patch('ceph_volume.util.disk.udevadm_property')
+ def test_sda_size(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
- patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
+ patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/size', contents = '1024')
result = disk.get_devices()
assert list(result.keys()) == [sda_path]
assert result[sda_path]['human_readable_size'] == '512.00 KB'
- def test_sda_sectorsize_fallsback(self, patched_get_block_devs_sysfs, fake_filesystem):
+ @patch('ceph_volume.util.disk.udevadm_property')
+ def test_sda_sectorsize_fallsback(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
# if no sectorsize, it will use queue/hw_sector_size
sda_path = '/dev/sda'
- patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
+ patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/queue/hw_sector_size', contents = '1024')
result = disk.get_devices()
assert list(result.keys()) == [sda_path]
assert result[sda_path]['sectorsize'] == '1024'
- def test_sda_sectorsize_from_logical_block(self, patched_get_block_devs_sysfs, fake_filesystem):
+ @patch('ceph_volume.util.disk.udevadm_property')
+ def test_sda_sectorsize_from_logical_block(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
- patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
+ patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/queue/logical_block_size', contents = '99')
result = disk.get_devices()
assert result[sda_path]['sectorsize'] == '99'
- def test_sda_sectorsize_does_not_fallback(self, patched_get_block_devs_sysfs, fake_filesystem):
+ @patch('ceph_volume.util.disk.udevadm_property')
+ def test_sda_sectorsize_does_not_fallback(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
- patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
+ patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/queue/logical_block_size', contents = '99')
fake_filesystem.create_file('/sys/block/sda/queue/hw_sector_size', contents = '1024')
result = disk.get_devices()
assert result[sda_path]['sectorsize'] == '99'
- def test_is_rotational(self, patched_get_block_devs_sysfs, fake_filesystem):
+ @patch('ceph_volume.util.disk.udevadm_property')
+ def test_is_rotational(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
- patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
+ patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/queue/rotational', contents = '1')
result = disk.get_devices()
assert result[sda_path]['rotational'] == '1'
result = disk.get_devices()
assert rbd_path not in result
- def test_actuator_device(self, patched_get_block_devs_sysfs, fake_filesystem):
+ @patch('ceph_volume.util.disk.udevadm_property')
+ def test_actuator_device(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
fake_actuator_nb = 2
- patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
+ patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
for actuator in range(0, fake_actuator_nb):
fake_filesystem.create_dir(f'/sys/block/sda/queue/independent_access_ranges/{actuator}')
result = disk.get_devices()
def test_device_path_is_a_path(self, fake_filesystem):
device_path = '/var/lib/ceph/osd/ceph-0'
fake_filesystem.create_dir(device_path)
- assert not disk.has_bluestore_label(device_path)
\ No newline at end of file
+ assert not disk.has_bluestore_label(device_path)
class TestLuksOpen(object):
+ @patch('ceph_volume.util.encryption.bypass_workqueue', return_value=False)
@patch('ceph_volume.util.encryption.process.call')
- def test_luks_open_command_with_default_size(self, m_call, conf_ceph_stub):
+ def test_luks_open_command_with_default_size(self, m_call, m_bypass_workqueue, conf_ceph_stub):
conf_ceph_stub('[global]\nfsid=abcd')
expected = [
'cryptsetup',
encryption.luks_open('abcd', '/dev/foo', '/dev/bar')
assert m_call.call_args[0][0] == expected
+ @patch('ceph_volume.util.encryption.bypass_workqueue', return_value=False)
@patch('ceph_volume.util.encryption.process.call')
- def test_luks_open_command_with_custom_size(self, m_call, conf_ceph_stub):
+ def test_luks_open_command_with_custom_size(self, m_call, m_bypass_workqueue, conf_ceph_stub):
conf_ceph_stub('[global]\nfsid=abcd\n[osd]\nosd_dmcrypt_key_size=256')
expected = [
'cryptsetup',
from ceph_volume.util.device import Device
from ceph_volume.util import disk
from ceph_volume.util.encryption import set_dmcrypt_no_workqueue
-from ceph_volume import process, conf
def valid_osd_id(val):
return str(int(val))