import os
import logging
from ceph_volume import process, conf
+from ceph_volume.util import constants
from .prepare import write_keyring
+from .disk import lsblk, device_family
logger = logging.getLogger(__name__)
keyring_name='lockbox.keyring',
name=name
)
+
+
+def status(device):
+ """
+ Capture the metadata information of a possibly encrypted device, returning
+ a dictionary with all the values found (if any).
+
+ An encrypted device will contain information about a device. Example
+ successful output looks like::
+
+ $ cryptsetup status /dev/mapper/ed6b5a26-eafe-4cd4-87e3-422ff61e26c4
+ /dev/mapper/ed6b5a26-eafe-4cd4-87e3-422ff61e26c4 is active and is in use.
+ type: LUKS1
+ cipher: aes-xts-plain64
+ keysize: 256 bits
+ device: /dev/sdc2
+ offset: 4096 sectors
+ size: 20740063 sectors
+ mode: read/write
+
+ As long as the mapper device is in 'open' state, the ``status`` call will work.
+
+ :param device: Absolute path or UUID of the device mapper
+ """
+ command = [
+ 'cryptsetup',
+ 'status',
+ device,
+ ]
+ out, err, code = process.call(command, show_command=True)
+ metadata = {}
+ if code != 0:
+ logger.warning('failed to detect device mapper information')
+ return metadata
+ for line in out:
+ # get rid of lines that might not be useful to construct the report:
+ if not line.startswith(' '):
+ continue
+ try:
+ column, value = line.split(': ')
+ except ValueError:
+ continue
+ metadata[column.strip()] = value.strip().strip('"')
+ return metadata
+
+
+def legacy_encrypted(device):
+ """
+ Detect if a device was encrypted with ceph-disk or not. In the case of
+ encrypted devices, include the type of encryption (LUKS, or PLAIN), and
+ infer what the lockbox partition is.
+
+ This function assumes that ``device`` will be a partition.
+ """
+ metadata = {'encrypted': False, 'type': None, 'lockbox': ''}
+ # check if the device is online/decrypted first
+ active_mapper = status(device)
+ if active_mapper:
+ # normalize a bit to ensure same values regardless of source
+ metadata['type'] = active_mapper['type'].lower().strip('12') # turn LUKS1 or LUKS2 into luks
+ metadata['encrypted'] = True if metadata['type'] in ['plain', 'luks'] else False
+ # The true device is now available to this function, so it gets
+ # re-assigned here for the lockbox checks to succeed (it is not
+ # possible to guess partitions from a device mapper device otherwise
+ device = active_mapper.get('device', device)
+ else:
+ out, err, rc = process.call(['blkid', '-p', '-s', 'PART_ENTRY_TYPE', '-o', 'value', device])
+ uuid = ' '.join(out).strip()
+ guid_match = constants.ceph_disk_guids.get(uuid, {})
+ encrypted_guid = guid_match.get('encrypted', False)
+ if encrypted_guid:
+ metadata['encrypted'] = True
+ metadata['type'] = guid_match['encryption_type']
+
+ # Lets find the lockbox location now, to do this, we need to find out the
+ # parent device name for the device so that we can query all of its
+ # associated devices and *then* look for one that has the 'lockbox' label
+ # on it. Thanks for being awesome ceph-disk
+ disk_meta = lsblk(device, abspath=True)
+ if not disk_meta:
+ return metadata
+ parent_device = disk_meta['PKNAME']
+ # With the parent device set, we can now look for the lockbox listing associated devices
+ devices = device_family(parent_device)
+ for i in devices:
+ if 'lockbox' in i.get('PARTLABEL', ''):
+ metadata['lockbox'] = i['NAME']
+ break
+ return metadata