This speeds up calls to Device and Disk
'''
monkeypatch.setattr("ceph_volume.util.device.disk.get_devices", lambda device='': {})
- monkeypatch.setattr("ceph_volume.util.disk.udevadm_property", lambda *a, **kw: {})
@pytest.fixture(params=[
yield p
@pytest.fixture
-def device_info(monkeypatch, patch_bluestore_label):
- def apply(devices=None, lsblk=None, lv=None, blkid=None, udevadm=None,
- has_bluestore_label=False):
+def patch_udevdata(monkeypatch):
+ fake_udevdata = MagicMock()
+ fake_udevdata.environment = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
+ monkeypatch.setattr("ceph_volume.util.disk.UdevData", lambda path: fake_udevdata)
+ yield
+
+@pytest.fixture
+def device_info(monkeypatch, patch_bluestore_label, patch_udevdata):
+ def apply(devices=None, lsblk=None, lv=None, blkid=None):
if devices:
for dev in devices.keys():
devices[dev]['device_nodes'] = [os.path.basename(dev)]
devices = {}
lsblk = lsblk if lsblk else {}
blkid = blkid if blkid else {}
- udevadm = udevadm if udevadm else {}
lv = Factory(**lv) if lv else None
monkeypatch.setattr("ceph_volume.sys_info.devices", {})
monkeypatch.setattr("ceph_volume.util.device.disk.get_devices", lambda device='': devices)
lambda path: [lv])
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", lambda path: lsblk)
monkeypatch.setattr("ceph_volume.util.device.disk.blkid", lambda path: blkid)
- monkeypatch.setattr("ceph_volume.util.disk.udevadm_property", lambda *a, **kw: udevadm)
return apply
@pytest.fixture(params=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.999, 1.0])
@pytest.fixture
def fake_filesystem(fs):
-
+ fs.create_dir('/dev')
fs.create_dir('/sys/block/sda/slaves')
fs.create_dir('/sys/block/sda/queue')
fs.create_dir('/sys/block/rbd0')
fs.create_dir('/var/log/ceph')
fs.create_dir('/tmp/osdpath')
+ fs.create_file('/sys/block/sda/dev', contents='8:0')
+ fs.create_dir('/run/udev/data')
+ fs.create_file('/run/udev/data/b8:0', contents="""
+P:8:0
+E:DEVNAME=/dev/sda
+E:DEVTYPE=disk
+E:ID_MODEL=
+E:ID_SERIAL=
+E:ID_VENDOR=
+""".strip())
+
yield fs
@pytest.fixture
'NAME="/dev/sdz" KNAME="/dev/sdz" PKNAME="" PARTLABEL=""']
blkid_output = ['/dev/ceph-1172bba3-3e0e-45e5-ace6-31ae8401221f/osd-block-5050a85c-d1a7-4d66-b4ba-2e9b1a2970ae: TYPE="ceph_bluestore" USAGE="other"']
-
-udevadm_property = '''DEVNAME=/dev/sdb
-DEVTYPE=disk
-ID_ATA=1
-ID_BUS=ata
-ID_MODEL=SK_hynix_SC311_SATA_512GB
-ID_PART_TABLE_TYPE=gpt
-ID_PART_TABLE_UUID=c8f91d57-b26c-4de1-8884-0c9541da288c
-ID_PATH=pci-0000:00:17.0-ata-3
-ID_PATH_TAG=pci-0000_00_17_0-ata-3
-ID_REVISION=70000P10
-ID_SERIAL=SK_hynix_SC311_SATA_512GB_MS83N71801150416A
-TAGS=:systemd:
-USEC_INITIALIZED=16117769'''.split('\n')
\ No newline at end of file
p = kwargs['filters']['lv_path']
return self.mock_single_volumes[p]
- def test_lv_is_matched_id(self, monkeypatch):
+ def test_lv_is_matched_id(self, monkeypatch, patch_udevdata):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234'
vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
lv_path='/dev/VolGroup/lv1', lv_tags=tags)
assert result[0][0].lvs == [vol]
assert result[0][1] == 'block'
- def test_lv_is_matched_id2(self, monkeypatch):
+ def test_lv_is_matched_id2(self, monkeypatch, patch_udevdata):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234'
vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg',
lv_path='/dev/VolGroup/lv1', lv_tags=tags)
else:
assert False
- def test_lv_is_matched_id3(self, monkeypatch):
+ def test_lv_is_matched_id3(self, monkeypatch, patch_udevdata):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234'
vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg',
lv_path='/dev/VolGroup/lv1', lv_tags=tags)
self.mock_process_input.append(args[0]);
return ('', '', 0)
- def test_init(self, monkeypatch):
+ def test_init(self, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234'
source_db_tags = 'ceph.osd_id=0,journal_uuid=x,ceph.type=db, osd_fsid=1234'
assert 3 == len(t.old_wal_tags)
assert 'wal' == t.old_wal_tags['ceph.type']
- def test_update_tags_when_lv_create(self, monkeypatch):
+ def test_update_tags_when_lv_create(self, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.journal_uuid=x,' \
'--addtag', 'ceph.wal_device=/dev/VolGroup/lv_target',
'/dev/VolGroup/lv2'] == self.mock_process_input[2]
- def test_remove_lvs(self, monkeypatch):
+ def test_remove_lvs(self, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.journal_uuid=x,' \
'--deltag', 'ceph.wal_device=aaaaa',
'/dev/VolGroup/lv2'] == self.mock_process_input[2]
- def test_replace_lvs(self, monkeypatch):
+ def test_replace_lvs(self, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\
'--addtag', 'ceph.db_device=/dev/VolGroup/lv_target',
'/dev/VolGroup/lv_target'].sort()
- def test_undo(self, monkeypatch):
+ def test_undo(self, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234'
source_db_tags = 'ceph.osd_id=0,journal_uuid=x,ceph.type=db, osd_fsid=1234'
expected = 'Target Logical Volume is already used by ceph: vgname/new_db'
assert expected in stderr
- def test_newdb(self, is_root, monkeypatch, capsys):
+ def test_newdb(self, is_root, monkeypatch, capsys, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\
assert '--> OSD ID is running, stop it with: systemctl stop ceph-osd@1' == stderr.rstrip()
assert not stdout
- def test_newdb_no_systemd(self, is_root, monkeypatch):
+ def test_newdb_no_systemd(self, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\
'--dev-target', '/dev/VolGroup/target_volume',
'--command', 'bluefs-bdev-new-db']
- def test_newwal(self, is_root, monkeypatch, capsys):
+ def test_newwal(self, is_root, monkeypatch, capsys, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234'
assert '--> OSD ID is running, stop it with: systemctl stop ceph-osd@2' == stderr.rstrip()
assert not stdout
- def test_newwal_no_systemd(self, is_root, monkeypatch):
+ def test_newwal_no_systemd(self, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234'
'--command', 'bluefs-bdev-new-wal']
@patch('os.getuid')
- def test_newwal_encrypted(self, m_getuid, monkeypatch, capsys):
+ def test_newwal_encrypted(self, m_getuid, monkeypatch, capsys, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
def mock_dmcrypt_close(self, *args, **kwargs):
self.mock_dmcrypt_close_uuid.append(kwargs['mapping'])
- def test_get_source_devices(self, monkeypatch):
+ def test_get_source_devices(self, monkeypatch, patch_udevdata):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234'
@patch.object(Zap, 'main')
- def test_migrate_data_db_to_new_db(self, m_zap, is_root, monkeypatch):
+ def test_migrate_data_db_to_new_db(self, m_zap, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
@patch.object(Zap, 'main')
@patch('os.getuid')
- def test_migrate_data_db_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
+ def test_migrate_data_db_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
m_zap.assert_called_once()
- def test_migrate_data_db_to_new_db_active_systemd(self, is_root, monkeypatch, capsys):
+ def test_migrate_data_db_to_new_db_active_systemd(self, is_root, monkeypatch, capsys, patch_udevdata):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \
assert not stdout
@patch.object(Zap, 'main')
- def test_migrate_data_db_to_new_db_no_systemd(self, m_zap, is_root, monkeypatch):
+ def test_migrate_data_db_to_new_db_no_systemd(self, m_zap, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
m_zap.assert_called_once()
@patch.object(Zap, 'main')
- def test_migrate_data_db_to_new_db_skip_wal(self, m_zap, is_root, monkeypatch):
+ def test_migrate_data_db_to_new_db_skip_wal(self, m_zap, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
m_zap.assert_called_once()
@patch.object(Zap, 'main')
- def test_migrate_data_db_wal_to_new_db(self, m_zap, is_root, monkeypatch):
+ def test_migrate_data_db_wal_to_new_db(self, m_zap, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
@patch.object(Zap, 'main')
@patch('os.getuid')
- def test_migrate_data_db_wal_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
+ def test_migrate_data_db_wal_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
def test_dont_migrate_data_db_wal_to_new_data(self,
m_getuid,
monkeypatch,
- capsys):
+ capsys,
+ patch_udevdata):
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
def test_dont_migrate_db_to_wal(self,
is_root,
monkeypatch,
- capsys):
+ capsys,
+ patch_udevdata):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
def test_migrate_data_db_to_db(self,
is_root,
monkeypatch,
- capsys):
+ capsys,
+ patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block']
- def test_migrate_data_db_to_db_active_systemd(self, is_root, monkeypatch, capsys):
+ def test_migrate_data_db_to_db_active_systemd(self, is_root, monkeypatch, capsys, patch_udevdata):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
assert '--> OSD is running, stop it with: systemctl stop ceph-osd@2' == stderr.rstrip()
assert not stdout
- def test_migrate_data_db_to_db_no_systemd(self, is_root, monkeypatch):
+ def test_migrate_data_db_to_db_no_systemd(self, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
m_zap,
is_root,
monkeypatch,
- capsys):
+ capsys,
+ patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
m_getuid,
m_zap,
monkeypatch,
- capsys):
+ capsys,
+ patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
m_getuid,
m_zap,
monkeypatch,
- capsys):
+ capsys,
+ patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
m_zap.assert_called_once()
- def test_migrate_data_wal_to_db_active_systemd(self, is_root, monkeypatch, capsys):
+ def test_migrate_data_wal_to_db_active_systemd(self, is_root, monkeypatch, capsys, patch_udevdata):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
assert not stdout
@patch.object(Zap, 'main')
- def test_migrate_data_wal_to_db_no_systemd(self, m_zap, is_root, monkeypatch):
+ def test_migrate_data_wal_to_db_no_systemd(self, m_zap, is_root, monkeypatch, patch_udevdata):
monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
def process_call(command, **kw):
result: Tuple[List[str], List[str], int] = ''
- if 'udevadm' in command:
- result = data_zap.udevadm_property, [], 0
if 'ceph-bluestore-tool' in command:
result = data_zap.ceph_bluestore_tool_output, [], 0
if 'is-active' in command:
@patch('ceph_volume.devices.lvm.zap.Zap.zap')
@patch('ceph_volume.process.call', Mock(side_effect=process_call))
- def test_lv_is_matched_id(self, mock_zap, monkeypatch, is_root):
+ def test_lv_is_matched_id(self, mock_zap, monkeypatch, is_root, patch_udevdata):
tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
lv_path='/dev/VolGroup/lv', lv_tags=tags)
@patch('ceph_volume.devices.lvm.zap.Zap.zap')
@patch('ceph_volume.devices.raw.list.List.filter_lvm_osd_devices', Mock(return_value='/dev/sdb'))
@patch('ceph_volume.process.call', Mock(side_effect=process_call))
- def test_raw_is_matched_id(self, mock_zap, monkeypatch, is_root):
+ def test_raw_is_matched_id(self, mock_zap, monkeypatch, is_root, patch_udevdata):
volumes = []
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
mock_zap.assert_called_once()
@patch('ceph_volume.devices.lvm.zap.Zap.zap')
- def test_lv_is_matched_fsid(self, mock_zap, monkeypatch, is_root):
+ def test_lv_is_matched_fsid(self, mock_zap, monkeypatch, is_root, patch_udevdata):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
'ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
@patch('ceph_volume.devices.lvm.zap.Zap.zap')
@patch('ceph_volume.devices.raw.list.List.filter_lvm_osd_devices', Mock(return_value='/dev/sdb'))
@patch('ceph_volume.process.call', Mock(side_effect=process_call))
- def test_raw_is_matched_fsid(self, mock_zap, monkeypatch, is_root):
+ def test_raw_is_matched_fsid(self, mock_zap, monkeypatch, is_root, patch_udevdata):
volumes = []
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
mock_zap.assert_called_once
@patch('ceph_volume.devices.lvm.zap.Zap.zap')
- def test_lv_is_matched_id_fsid(self, mock_zap, monkeypatch, is_root):
+ def test_lv_is_matched_id_fsid(self, mock_zap, monkeypatch, is_root, patch_udevdata):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
'ceph.type=data'
osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
@patch('ceph_volume.devices.lvm.zap.Zap.zap')
@patch('ceph_volume.devices.raw.list.List.filter_lvm_osd_devices', Mock(return_value='/dev/sdb'))
@patch('ceph_volume.process.call', Mock(side_effect=process_call))
- def test_raw_is_matched_id_fsid(self, mock_zap, monkeypatch, is_root):
+ def test_raw_is_matched_id_fsid(self, mock_zap, monkeypatch, is_root, patch_udevdata):
volumes = []
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
@patch('ceph_volume.devices.lvm.zap.Zap.zap')
@patch('ceph_volume.devices.raw.list.List.filter_lvm_osd_devices', Mock(side_effect=['/dev/vdx', '/dev/vdy', '/dev/vdz', None]))
@patch('ceph_volume.process.call', Mock(side_effect=process_call))
- def test_raw_multiple_devices(self, mock_zap, monkeypatch, is_root):
+ def test_raw_multiple_devices(self, mock_zap, monkeypatch, is_root, patch_udevdata):
volumes = []
monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
z = zap.Zap(['--osd-id', '5'])
@pytest.mark.parametrize("encrypted", ["ceph.encrypted=0", "ceph.encrypted=1"])
def test__activate(self,
m_success, m_create_osd_path,
- monkeypatch, fake_run, fake_call, encrypted, conf_ceph_stub):
+ monkeypatch, fake_run, fake_call, encrypted, conf_ceph_stub, patch_udevdata):
conf_ceph_stub('[global]\nfsid=asdf-lkjh')
monkeypatch.setattr(system, 'chown', lambda path: 0)
monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
@pytest.fixture
-@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def device_report_keys(device_info):
device_info(devices={
# example output of disk.get_devices()
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries")
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_is_ceph_disk_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
+ def test_is_ceph_disk_lsblk(self, patch_udevdata, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
"lsblk_ceph_disk_member",
"disable_kernel_queries")
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_is_ceph_disk_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
+ def test_is_ceph_disk_blkid(self, patch_udevdata, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries")
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_is_ceph_disk_member_not_available_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
+ def test_is_ceph_disk_member_not_available_lsblk(self, patch_udevdata, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
assert not disk.available
"lsblk_ceph_disk_member",
"disable_kernel_queries")
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_is_ceph_disk_member_not_available_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
+ def test_is_ceph_disk_member_not_available_blkid(self, patch_udevdata, fake_call, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
assert not disk.available
"device_info_not_ceph_disk_member",
"disable_kernel_queries")
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_is_not_ceph_disk_member_lsblk(self, fake_call, patch_bluestore_label):
+ def test_is_not_ceph_disk_member_lsblk(self, patch_udevdata, fake_call, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member is False
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_get_device_id(self, fake_call, device_info):
- udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
lsblk = {"TYPE": "disk", "NAME": "sda"}
- device_info(udevadm=udev,lsblk=lsblk)
+ device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
"blkid_ceph_disk_member",
"disable_kernel_queries")
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_is_member_blkid(self, fake_call, monkeypatch):
+ def test_is_member_blkid(self, patch_udevdata, fake_call, monkeypatch):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.is_member is True
assert result['UUID'] == '62416664-cbaf-40bd-9689-10bd337379c3'
assert result['TYPE'] == 'xfs'
-class TestUdevadmProperty(object):
-
- def test_good_output(self, stub_call):
- output = """ID_MODEL=SK_hynix_SC311_SATA_512GB
-ID_PART_TABLE_TYPE=gpt
-ID_SERIAL_SHORT=MS83N71801150416A""".split()
- stub_call((output, [], 0))
- result = disk.udevadm_property('dev/sda')
- assert result['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
- assert result['ID_PART_TABLE_TYPE'] == 'gpt'
- assert result['ID_SERIAL_SHORT'] == 'MS83N71801150416A'
-
- def test_property_filter(self, stub_call):
- output = """ID_MODEL=SK_hynix_SC311_SATA_512GB
-ID_PART_TABLE_TYPE=gpt
-ID_SERIAL_SHORT=MS83N71801150416A""".split()
- stub_call((output, [], 0))
- result = disk.udevadm_property('dev/sda', ['ID_MODEL',
- 'ID_SERIAL_SHORT'])
- assert result['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
- assert 'ID_PART_TABLE_TYPE' not in result
-
- def test_fail_on_broken_output(self, stub_call):
- output = ["ID_MODEL:SK_hynix_SC311_SATA_512GB"]
- stub_call((output, [], 0))
- with pytest.raises(ValueError):
- disk.udevadm_property('dev/sda')
-
class TestDeviceFamily(object):
result = disk.get_devices(_sys_block_path=str(tmpdir))
assert result == {}
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_sda_block_is_found(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_sda_block_is_found(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
result = disk.get_devices()
assert result[sda_path]['model'] == ''
assert result[sda_path]['partitions'] == {}
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_sda_size(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_sda_size(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/size', contents = '1024')
assert list(result.keys()) == [sda_path]
assert result[sda_path]['human_readable_size'] == '512.00 KB'
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_sda_sectorsize_fallsback(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_sda_sectorsize_fallsback(self, patched_get_block_devs_sysfs, fake_filesystem):
# if no sectorsize, it will use queue/hw_sector_size
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
assert list(result.keys()) == [sda_path]
assert result[sda_path]['sectorsize'] == '1024'
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_sda_sectorsize_from_logical_block(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_sda_sectorsize_from_logical_block(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/queue/logical_block_size', contents = '99')
result = disk.get_devices()
assert result[sda_path]['sectorsize'] == '99'
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_sda_sectorsize_does_not_fallback(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_sda_sectorsize_does_not_fallback(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/queue/logical_block_size', contents = '99')
result = disk.get_devices()
assert result[sda_path]['sectorsize'] == '99'
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_is_rotational(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_is_rotational(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
fake_filesystem.create_file('/sys/block/sda/queue/rotational', contents = '1')
result = disk.get_devices()
assert result[sda_path]['rotational'] == '1'
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_is_ceph_rbd(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_is_ceph_rbd(self, patched_get_block_devs_sysfs, fake_filesystem):
rbd_path = '/dev/rbd0'
patched_get_block_devs_sysfs.return_value = [[rbd_path, rbd_path, 'disk', rbd_path]]
result = disk.get_devices()
assert rbd_path not in result
- @patch('ceph_volume.util.disk.udevadm_property')
- def test_actuator_device(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
+ def test_actuator_device(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
fake_actuator_nb = 2
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
# -*- coding: utf-8 -*-
import logging
import os
+import re
from functools import total_ordering
from ceph_volume import sys_info, allow_loop_devices, BEING_REPLACED_HEADER
from ceph_volume.api import lvm
from ceph_volume.util.constants import ceph_disk_guids
from typing import Any, Dict, List, Tuple, Optional, Union
-
logger = logging.getLogger(__name__)
-
report_template = """
{dev:<25} {size:<12} {device_nodes:<15} {rot!s:<7} {available!s:<9} {model}"""
Please keep this implementation in sync with get_device_id() in
src/common/blkdev.cc
"""
- props = ['ID_VENDOR', 'ID_MODEL', 'ID_MODEL_ENC', 'ID_SERIAL_SHORT', 'ID_SERIAL',
- 'ID_SCSI_SERIAL']
- p = disk.udevadm_property(self.path, props)
- if p.get('ID_MODEL','').startswith('LVM PV '):
- p['ID_MODEL'] = p.get('ID_MODEL_ENC', '').replace('\\x20', ' ').strip()
- if 'ID_VENDOR' in p and 'ID_MODEL' in p and 'ID_SCSI_SERIAL' in p:
- dev_id = '_'.join([p['ID_VENDOR'], p['ID_MODEL'],
- p['ID_SCSI_SERIAL']])
- elif 'ID_MODEL' in p and 'ID_SERIAL_SHORT' in p:
- dev_id = '_'.join([p['ID_MODEL'], p['ID_SERIAL_SHORT']])
- elif 'ID_SERIAL' in p:
- dev_id = p['ID_SERIAL']
+
+ udev_data = disk.UdevData(self.path)
+ env = udev_data.environment
+ parts: list[str] = []
+ model = env.get('ID_MODEL', '')
+ if model.startswith('LVM PV '):
+ model = env.get('ID_MODEL_ENC', '').replace('\\x20', ' ').strip()
+
+ if 'ID_VENDOR' in env and 'ID_SCSI_SERIAL' in env:
+ parts = [env['ID_VENDOR'], model, env['ID_SCSI_SERIAL']]
+ elif 'ID_SERIAL_SHORT' in env:
+ parts = [model, env['ID_SERIAL_SHORT']]
+ elif 'ID_SERIAL' in env:
+ dev_id = env['ID_SERIAL']
if dev_id.startswith('MTFD'):
- # Micron NVMes hide the vendor
dev_id = 'Micron_' + dev_id
+ parts = [dev_id]
else:
# the else branch should fallback to using sysfs and ioctl to
# retrieve device_id on FreeBSD. Still figuring out if/how the
# python ioctl implementation does that on FreeBSD
dev_id = ''
+
+ dev_id = '_'.join(parts)
dev_id = dev_id.replace(' ', '_')
- while '__' in dev_id:
- dev_id = dev_id.replace('__', '_')
+ dev_id = re.sub(r'_+', '_', dev_id)
+
return dev_id
def _set_lvm_membership(self) -> None:
# in the output of `udevadm info --query=property`.
# Probably not ideal and not the best fix but this allows to get around that issue.
# The idea is to make it retry multiple times before actually failing.
- for i in range(10):
- udev_info = udevadm_property(device.path)
- partition_number = udev_info.get('ID_PART_ENTRY_NUMBER')
+ partition_number = None
+ for _ in range(10):
+ udev_data = UdevData(device.path)
+ partition_number = udev_data.environment.get("ID_PART_ENTRY_NUMBER", None)
if partition_number:
break
time.sleep(0.2)
- if not partition_number:
+ if partition_number is None:
raise RuntimeError('Unable to detect the partition number for device: %s' % device.path)
process.run(
return devices
-def udevadm_property(device, properties=[]):
- """
- Query udevadm for information about device properties.
- Optionally pass a list of properties to return. A requested property might
- not be returned if not present.
-
- Expected output format::
- # udevadm info --query=property --name=/dev/sda :(
- DEVNAME=/dev/sda
- DEVTYPE=disk
- ID_ATA=1
- ID_BUS=ata
- ID_MODEL=SK_hynix_SC311_SATA_512GB
- ID_PART_TABLE_TYPE=gpt
- ID_PART_TABLE_UUID=c8f91d57-b26c-4de1-8884-0c9541da288c
- ID_PATH=pci-0000:00:17.0-ata-3
- ID_PATH_TAG=pci-0000_00_17_0-ata-3
- ID_REVISION=70000P10
- ID_SERIAL=SK_hynix_SC311_SATA_512GB_MS83N71801150416A
- TAGS=:systemd:
- USEC_INITIALIZED=16117769
- ...
- """
- out = _udevadm_info(device)
- ret = {}
- for line in out:
- p, v = line.split('=', 1)
- if not properties or p in properties:
- ret[p] = v
- return ret
-
-
-def _udevadm_info(device):
- """
- Call udevadm and return the output
- """
- cmd = ['udevadm', 'info', '--query=property', device]
- out, _err, _rc = process.call(cmd)
- return out
-
-
def lsblk(device, columns=None, abspath=False):
result = []
if not os.path.isdir(device):
metadata['parent'] = block[3]
# some facts from udevadm
- p = udevadm_property(sysdir)
- metadata['id_bus'] = p.get('ID_BUS', '')
+ udev_data = UdevData(sysdir)
+ metadata['id_bus'] = udev_data.environment.get("ID_BUS", "")
device_facts[diskname] = metadata
return device_facts
raise RuntimeError(f'{path} not found.')
self.path: str = path
self.realpath: str = os.path.realpath(self.path)
- self.stats: os.stat_result = os.stat(self.realpath)
- self.major: int = os.major(self.stats.st_rdev)
- self.minor: int = os.minor(self.stats.st_rdev)
+
+ if path.startswith("/sys/block/") and os.path.isdir(path):
+ dev_file = os.path.join(path, "dev")
+ if not os.path.exists(dev_file):
+ raise RuntimeError(f"{dev_file} not found.")
+ with open(dev_file) as f:
+ self.major, self.minor = map(int, f.read().strip().split(":"))
+ else:
+ self.stats: os.stat_result = os.stat(self.realpath)
+ self.major: int = os.major(self.stats.st_rdev)
+ self.minor: int = os.minor(self.stats.st_rdev)
+
self.udev_data_path: str = f'/run/udev/data/b{self.major}:{self.minor}'
self.symlinks: List[str] = []
self.id: str = ''