if self.method == 'raw':
path = self.args.__dict__.get(dev_type, None)
else:
- path = self.block_lv.tags.get(dev_type, None)
+ if self.block_lv is not None:
+ path = self.block_lv.tags.get(dev_type, None)
+ else:
+ raise RuntimeError('Unexpected error while running bluestore mkfs.')
if path is not None:
CephLuks2(path).config_luks2({'subsystem': f'ceph_fsid={self.osd_fsid}'})
except ValueError:
lv = None
- if api.is_ceph_device(lv):
- logger.info("device {} is already used".format(self.args.data))
- raise RuntimeError("skipping {}, it is already prepared".format(
- self.args.data))
+ if lv is not None:
+ if api.is_ceph_device(lv):
+ logger.info("device {} is already used".format(self.args.data))
+ raise RuntimeError("skipping {}, it is already prepared".format(
+ self.args.data))
try:
self.prepare()
except Exception:
lv_type = "osd-{}".format(device_type)
name_uuid = system.generate_uuid()
kwargs = {
+ 'name_prefix': lv_type,
+ 'uuid': name_uuid,
+ 'vg': None,
'device': device_name,
+ 'slots': slots,
+ 'extents': None,
+ 'size': None,
'tags': tags,
- 'slots': slots
}
# TODO use get_block_db_size and co here to get configured size in
# conf file
if size != 0:
kwargs['size'] = size
- lv = api.create_lv(
- lv_type,
- name_uuid,
- **kwargs)
- path = lv.lv_path
- tags['ceph.{}_device'.format(device_type)] = path
- tags['ceph.{}_uuid'.format(device_type)] = lv.lv_uuid
- lv_uuid = lv.lv_uuid
- lv.set_tags(tags)
+ lv = api.create_lv(**kwargs)
+ if lv is not None:
+ path = lv.lv_path
+ lv_uuid = lv.lv_uuid
+ tags['ceph.{}_device'.format(device_type)] = path
+ tags['ceph.{}_uuid'.format(device_type)] = lv_uuid
+ lv.set_tags(tags)
else:
# otherwise assume this is a regular disk partition
name_uuid = self.get_ptuuid(device_name)
def get_osd_device_path(self,
osd_lvs: List["Volume"],
device_type: str,
- dmcrypt_secret: Optional[str] =
- None) -> Optional[str]:
+ dmcrypt_secret: str = '') -> Optional[str]:
"""
``device_type`` can be one of ``db``, ``wal`` or ``block`` so that we
can query LVs on system and fallback to querying the uuid if that is
logger.debug('Found block device (%s) with encryption: %s',
osd_block_lv.name, is_encrypted)
uuid_tag = 'ceph.%s_uuid' % device_type
- device_uuid = osd_block_lv.tags.get(uuid_tag)
+ device_uuid = osd_block_lv.tags.get(uuid_tag, '')
if not device_uuid:
return None
super().__init__(args)
self.method = 'raw'
self.devices: List[str] = getattr(args, 'devices', [])
- self.osd_id = getattr(self.args, 'osd_id', None)
+ self.osd_id = getattr(self.args, 'osd_id', '')
self.osd_fsid = getattr(self.args, 'osd_fsid', '')
self.block_device_path = getattr(self.args, 'data', '')
self.db_device_path = getattr(self.args, 'block_db', '')
activated_any: bool = False
for d in disk.lsblk_all(abspath=True):
- device: str = d.get('NAME')
+ device: str = d.get('NAME', '')
luks2 = encryption_utils.CephLuks2(device)
if luks2.is_ceph_encrypted:
if luks2.is_tpm2_enrolled and self.osd_fsid == luks2.osd_fsid: