seems useless to have both self.path and self.abspath attributes.
Signed-off-by: Guillaume Abrioux <gabrioux@redhat.com>
pv = api.get_single_pv(filters={'lv_uuid': lv.lv_uuid})
self.unmount_lv(lv)
- wipefs(device.abspath)
- zap_data(device.abspath)
+ wipefs(device.path)
+ zap_data(device.path)
if self.args.destroy:
lvs = api.get_lvs(filters={'vg_name': device.vg_name})
mlogger.info('More than 1 LV left in VG, will proceed to '
'destroy LV only')
mlogger.info('Removing LV because --destroy was given: %s',
- device.abspath)
- api.remove_lv(device.abspath)
+ device.path)
+ api.remove_lv(device.path)
elif lv:
# just remove all lvm metadata, leaving the LV around
lv.clear_tags()
if os.path.realpath(mapper_path) in holders:
self.dmcrypt_close(mapper_uuid)
- if system.device_is_mounted(device.abspath):
- mlogger.info("Unmounting %s", device.abspath)
- system.unmount(device.abspath)
+ if system.device_is_mounted(device.path):
+ mlogger.info("Unmounting %s", device.path)
+ system.unmount(device.path)
- wipefs(device.abspath)
- zap_data(device.abspath)
+ wipefs(device.path)
+ zap_data(device.path)
if self.args.destroy:
- mlogger.info("Destroying partition since --destroy was used: %s" % device.abspath)
+ mlogger.info("Destroying partition since --destroy was used: %s" % device.path)
disk.remove_partition(device)
def zap_lvm_member(self, device):
"""
for lv in device.lvs:
if lv.lv_name:
- mlogger.info('Zapping lvm member {}. lv_path is {}'.format(device.abspath, lv.lv_path))
+ mlogger.info('Zapping lvm member {}. lv_path is {}'.format(device.path, lv.lv_path))
self.zap_lv(Device(lv.lv_path))
else:
vg = api.get_single_vg(filters={'vg_name': lv.vg_name})
for part_name in device.sys_api.get('partitions', {}).keys():
self.zap_partition(Device('/dev/%s' % part_name))
- wipefs(device.abspath)
- zap_data(device.abspath)
+ wipefs(device.path)
+ zap_data(device.path)
@decorators.needs_root
def zap(self, devices=None):
devices = devices or self.args.devices
for device in devices:
- mlogger.info("Zapping: %s", device.abspath)
+ mlogger.info("Zapping: %s", device.path)
if device.is_mapper and not device.is_mpath:
terminal.error("Refusing to zap the mapper device: {}".format(device))
raise SystemExit(1)
result = migrate.find_associated_devices(osd_id='0', osd_fsid='1234')
assert len(result) == 1
- assert result[0][0].abspath == '/dev/VolGroup/lv1'
+ assert result[0][0].path == '/dev/VolGroup/lv1'
assert result[0][0].lvs == [vol]
assert result[0][1] == 'block'
assert len(result) == 2
for d in result:
if d[1] == 'block':
- assert d[0].abspath == '/dev/VolGroup/lv1'
+ assert d[0].path == '/dev/VolGroup/lv1'
assert d[0].lvs == [vol]
elif d[1] == 'wal':
- assert d[0].abspath == '/dev/VolGroup/lv2'
+ assert d[0].path == '/dev/VolGroup/lv2'
assert d[0].lvs == [vol2]
else:
assert False
assert len(result) == 3
for d in result:
if d[1] == 'block':
- assert d[0].abspath == '/dev/VolGroup/lv1'
+ assert d[0].path == '/dev/VolGroup/lv1'
assert d[0].lvs == [vol]
elif d[1] == 'wal':
- assert d[0].abspath == '/dev/VolGroup/lv2'
+ assert d[0].path == '/dev/VolGroup/lv2'
assert d[0].lvs == [vol2]
elif d[1] == 'db':
- assert d[0].abspath == '/dev/VolGroup/lv3'
+ assert d[0].path == '/dev/VolGroup/lv3'
assert d[0].lvs == [vol3]
else:
assert False
monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
result = zap.find_associated_devices(osd_id='0')
- assert result[0].abspath == '/dev/VolGroup/lv'
+ assert result[0].path == '/dev/VolGroup/lv'
def test_lv_is_matched_fsid(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
result = zap.find_associated_devices(osd_fsid='asdf-lkjh')
- assert result[0].abspath == '/dev/VolGroup/lv'
+ assert result[0].path == '/dev/VolGroup/lv'
def test_lv_is_matched_id_fsid(self, monkeypatch):
tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
result = zap.find_associated_devices(osd_id='0', osd_fsid='asdf-lkjh')
- assert result[0].abspath == '/dev/VolGroup/lv'
+ assert result[0].path == '/dev/VolGroup/lv'
class TestEnsureAssociatedLVs(object):
lsblk = {"TYPE": "disk", "NAME": "sda"}
device_info(lsblk=lsblk)
result = self.validator('/dev/sda')
- assert result.abspath == '/dev/sda'
+ assert result.path == '/dev/sda'
@patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
def test_path_is_invalid(self, m_has_bs_label,
# low-level behavior of has_bluestore_label
with patch.object(device.Device, "__init__", lambda self, path, with_lsm=False: None):
disk = device.Device("/dev/sda")
- disk.abspath = "/dev/sda"
+ disk.path = "/dev/sda"
with patch('builtins.open', mock_open(read_data=b'bluestore block device\n')):
assert disk.has_bluestore_label
with patch('builtins.open', mock_open(read_data=b'not a bluestore block device\n')):
def __init__(self, path, with_lsm=False, lvs=None, lsblk_all=None, all_devices_vgs=None):
self.path = path
- # LVs can have a vg/lv path, while disks will have /dev/sda
- self.abspath = path
if not sys_info.devices:
sys_info.devices = disk.get_devices()
- self.sys_api = sys_info.devices.get(self.abspath, {})
+ self.sys_api = sys_info.devices.get(self.path, {})
self.partitions = self._get_partitions()
self.lv_api = None
self.lvs = [] if not lvs else lvs
lv = None
if not self.sys_api:
# if no device was found check if we are a partition
- partname = self.abspath.split('/')[-1]
+ partname = self.path.split('/')[-1]
for device, info in sys_info.devices.items():
part = info['partitions'].get(partname, {})
if part:
if lv:
self.lv_api = lv
self.lvs = [lv]
- self.abspath = lv.lv_path
+ self.path = lv.lv_path
self.vg_name = lv.vg_name
self.lv_name = lv.name
self.ceph_device = lvm.is_ceph_device(lv)
prefix = 'Partition'
elif self.is_device:
prefix = 'Raw Device'
- return '<%s: %s>' % (prefix, self.abspath)
+ return '<%s: %s>' % (prefix, self.path)
def pretty_report(self):
def format_value(v):
def report(self):
return report_template.format(
- dev=self.abspath,
+ dev=self.path,
size=self.size_human,
rot=self.rotational,
available=self.available,
"""
props = ['ID_VENDOR', 'ID_MODEL', 'ID_MODEL_ENC', 'ID_SERIAL_SHORT', 'ID_SERIAL',
'ID_SCSI_SERIAL']
- p = disk.udevadm_property(self.abspath, props)
+ p = disk.udevadm_property(self.path, props)
if p.get('ID_MODEL','').startswith('LVM PV '):
p['ID_MODEL'] = p.get('ID_MODEL_ENC', '').replace('\\x20', ' ').strip()
if 'ID_VENDOR' in p and 'ID_MODEL' in p and 'ID_SCSI_SERIAL' in p:
# VGs, should we consider it as part of LVM? We choose not to
# here, because most likely, we need to use VGs from this PV.
self._is_lvm_member = False
- device_to_check = [self.abspath]
+ device_to_check = [self.path]
device_to_check.extend(self.partitions)
# a pv can only be in one vg, so this should be safe
partition. Return a list of paths to be checked for a pv.
"""
partitions = []
- path_dir = os.path.dirname(self.abspath)
+ path_dir = os.path.dirname(self.path)
for partition in self.sys_api.get('partitions', {}).keys():
partitions.append(os.path.join(path_dir, partition))
return partitions
@property
def exists(self):
- return os.path.exists(self.abspath)
+ return os.path.exists(self.path)
@property
def has_fs(self):
@property
def has_bluestore_label(self):
- return disk.has_bluestore_label(self.abspath)
+ return disk.has_bluestore_label(self.path)
@property
def is_mapper(self):
elif self.is_partition:
return 'crypto_LUKS' in crypt_reports
elif self.is_mapper:
- active_mapper = encryption_status(self.abspath)
+ active_mapper = encryption_status(self.path)
if active_mapper:
# normalize a bit to ensure same values regardless of source
encryption_type = active_mapper['type'].lower().strip('12') # turn LUKS1 or LUKS2 into luks
except OSError as e:
# likely failed to open the device. assuming it is BlueStore is the safest option
# so that a possibly-already-existing OSD doesn't get overwritten
- logger.error('failed to determine if device {} is BlueStore. device should not be used to avoid false negatives. err: {}'.format(self.abspath, e))
+ logger.error('failed to determine if device {} is BlueStore. device should not be used to avoid false negatives. err: {}'.format(self.path, e))
rejected.append('Failed to determine if device is BlueStore')
if self.is_partition:
except OSError as e:
# likely failed to open the device. assuming the parent is BlueStore is the safest
# option so that a possibly-already-existing OSD doesn't get overwritten
- logger.error('failed to determine if partition {} (parent: {}) has a BlueStore parent. partition should not be used to avoid false negatives. err: {}'.format(self.abspath, self.parent_device, e))
+ logger.error('failed to determine if partition {} (parent: {}) has a BlueStore parent. partition should not be used to avoid false negatives. err: {}'.format(self.path, self.parent_device, e))
rejected.append('Failed to determine if parent device is BlueStore')
if self.has_gpt_headers:
:param device: A ``Device()`` object
"""
- udev_info = udevadm_property(device.abspath)
+ udev_info = udevadm_property(device.path)
partition_number = udev_info.get('ID_PART_ENTRY_NUMBER')
if not partition_number:
- raise RuntimeError('Unable to detect the partition number for device: %s' % device.abspath)
+ raise RuntimeError('Unable to detect the partition number for device: %s' % device.path)
process.run(
['parted', device.parent_device, '--script', '--', 'rm', partition_number]
devices = [Device(i['NAME']) for i in device_family(parent_device)]
for d in devices:
if d.ceph_disk.type == 'lockbox':
- metadata['lockbox'] = d.abspath
+ metadata['lockbox'] = d.path
break
return metadata