ceph-volume lvm prepare --bluestore --dmcrypt --data vg/lv
+Starting with Ceph Squid, you can opt for TPM2 token enrollment for the created LUKS2 devices with the ``--with-tpm`` flag:
+
+.. prompt:: bash #
+
+ ceph-volume lvm prepare --bluestore --dmcrypt --with-tpm --data vg/lv
+
If a ``block.db`` device or a ``block.wal`` device is needed, it can be
specified with ``--block.db`` or ``--block.wal``. These can be physical
devices, partitions, or logical volumes. ``block.db`` and ``block.wal`` are
all: true
encrypted: true
+Ceph Squid onwards support tpm2 token enrollment to LUKS2 devices.
+You can add the `tpm2` to your OSD spec:
+
+.. code-block:: yaml
+
+ service_type: osd
+ service_id: example_osd_spec_with_tpm2
+ placement:
+ host_pattern: '*'
+ spec:
+ data_devices:
+ all: true
+ encrypted: true
+ tpm2: true
+
See a full list in the DriveGroupSpecs
.. py:currentmodule:: ceph.deployment.drive_group
+import os
+import logging
from collections import namedtuple
sys_info = namedtuple('sys_info', ['devices'])
sys_info.devices = dict()
+logger = logging.getLogger(__name__)
+
+
+class AllowLoopDevices:
+ allow = False
+ warned = False
+
+ @classmethod
+ def __call__(cls) -> bool:
+ val = os.environ.get("CEPH_VOLUME_ALLOW_LOOP_DEVICES", "false").lower()
+ if val not in ("false", 'no', '0'):
+ cls.allow = True
+ if not cls.warned:
+ logger.warning(
+ "CEPH_VOLUME_ALLOW_LOOP_DEVICES is set in your "
+ "environment, so we will allow the use of unattached loop"
+ " devices as disks. This feature is intended for "
+ "development purposes only and will never be supported in"
+ " production. Issues filed based on this behavior will "
+ "likely be ignored."
+ )
+ cls.warned = True
+ return cls.allow
class UnloadedConfig(object):
def __getattr__(self, *a):
raise RuntimeError("No valid ceph configuration file was loaded.")
+
+allow_loop_devices = AllowLoopDevices()
conf = namedtuple('config', ['ceph', 'cluster', 'verbosity', 'path', 'log_path', 'dmcrypt_no_workqueue'])
conf.ceph = UnloadedConfig()
conf.dmcrypt_no_workqueue = None
action=arg_validators.DmcryptAction,
help='Enable device encryption via dm-crypt',
)
+ parser.add_argument(
+ '--with-tpm',
+ dest='with_tpm',
+ help='Whether encrypted OSDs should be enrolled with TPM.',
+ action='store_true'
+ )
parser.add_argument(
'--crush-device-class',
dest='crush_device_class',
global_args = [
'bluestore',
'dmcrypt',
+ 'with_tpm',
'crush_device_class',
'no_systemd',
]
'action': arg_validators.DmcryptAction,
'help': 'Enable device encryption via dm-crypt',
},
+ '--with-tpm': {
+ 'dest': 'with_tpm',
+ 'help': 'Whether encrypted OSDs should be enrolled with TPM.',
+ 'action': 'store_true'
+ },
'--no-systemd': {
'dest': 'no_systemd',
'action': 'store_true',
'--device',
help='The device for the OSD to start'
)
+ parser.add_argument(
+ '--devices',
+ help='The device for the OSD to start',
+ nargs='*',
+ default=[]
+ )
parser.add_argument(
'--osd-id',
help='OSD ID to activate'
)
parser.add_argument(
'--osd-uuid',
+ dest='osd_fsid',
help='OSD UUID to active'
)
parser.add_argument(
return
self.args = parser.parse_args(self.argv)
- devs = []
if self.args.device:
- devs = [self.args.device]
- if self.args.block_wal:
- devs.append(self.args.block_wal)
- if self.args.block_db:
- devs.append(self.args.block_db)
+ if self.args.devices is None:
+ self.args.devices = [self.args.device]
+ else:
+ self.args.devices.append(self.args.device)
+
self.objectstore = objectstore.mapping['RAW'][self.args.objectstore](args=self.args)
- self.objectstore.activate(devs=devs,
- start_osd_id=self.args.osd_id,
- start_osd_uuid=self.args.osd_uuid,
- tmpfs=not self.args.no_tmpfs)
+ self.objectstore.activate()
import argparse
from ceph_volume.util import arg_validators
-def create_parser(prog, description):
+def create_parser(prog: str, description: str) -> argparse.ArgumentParser:
"""
Both prepare and create share the same parser, those are defined here to
avoid duplication
action=arg_validators.DmcryptAction,
help='Enable device encryption via dm-crypt',
)
+ parser.add_argument(
+ '--with-tpm',
+ dest='with_tpm',
+ help='Whether encrypted OSDs should be enrolled with TPM.',
+ action='store_true'
+ ),
parser.add_argument(
'--osd-id',
help='Reuse an existing OSD id',
from textwrap import dedent
from ceph_volume import decorators, process
from ceph_volume.util import disk
-from typing import Any, Dict, List
+from typing import Any, Dict, List as _List
logger = logging.getLogger(__name__)
_list = List([])
return _list.generate(devices)
-def _get_bluestore_info(dev):
+def _get_bluestore_info(dev: str) -> Dict[str, Any]:
+ result: Dict[str, Any] = {}
out, err, rc = process.call([
'ceph-bluestore-tool', 'show-label',
'--dev', dev], verbose_on_failure=False)
# ceph-bluestore-tool returns an error (below) if device is not bluestore OSD
# > unable to read label for <device>: (2) No such file or directory
# but it's possible the error could be for a different reason (like if the disk fails)
- logger.debug('assuming device {} is not BlueStore; ceph-bluestore-tool failed to get info from device: {}\n{}'.format(dev, out, err))
- return None
- oj = json.loads(''.join(out))
- if dev not in oj:
- # should be impossible, so warn
- logger.warning('skipping device {} because it is not reported in ceph-bluestore-tool output: {}'.format(dev, out))
- return None
- try:
- r = {
- 'osd_uuid': oj[dev]['osd_uuid'],
- }
- if oj[dev]['description'] == 'main':
- whoami = oj[dev]['whoami']
- r.update({
- 'type': 'bluestore',
- 'osd_id': int(whoami),
- 'ceph_fsid': oj[dev]['ceph_fsid'],
- 'device': dev,
- })
- elif oj[dev]['description'] == 'bluefs db':
- r['device_db'] = dev
- elif oj[dev]['description'] == 'bluefs wal':
- r['device_wal'] = dev
- return r
- except KeyError as e:
- # this will appear for devices that have a bluestore header but aren't valid OSDs
- # for example, due to incomplete rollback of OSDs: https://tracker.ceph.com/issues/51869
- logger.error('device {} does not have all BlueStore data needed to be a valid OSD: {}\n{}'.format(dev, out, e))
- return None
+ logger.debug(f'assuming device {dev} is not BlueStore; ceph-bluestore-tool failed to get info from device: {out}\n{err}')
+ else:
+ oj = json.loads(''.join(out))
+ if dev not in oj:
+ # should be impossible, so warn
+ logger.warning(f'skipping device {dev} because it is not reported in ceph-bluestore-tool output: {out}')
+ try:
+ result = disk.bluestore_info(dev, oj)
+ except KeyError as e:
+ # this will appear for devices that have a bluestore header but aren't valid OSDs
+ # for example, due to incomplete rollback of OSDs: https://tracker.ceph.com/issues/51869
+ logger.error(f'device {dev} does not have all BlueStore data needed to be a valid OSD: {out}\n{e}')
+ return result
class List(object):
help = 'list BlueStore OSDs on raw devices'
- def __init__(self, argv):
+ def __init__(self, argv: _List[str]) -> None:
self.argv = argv
def is_atari_partitions(self, _lsblk: Dict[str, Any]) -> bool:
return True
return False
- def exclude_atari_partitions(self, _lsblk_all: Dict[str, Any]) -> List[Dict[str, Any]]:
+ def exclude_atari_partitions(self, _lsblk_all: Dict[str, Any]) -> _List[Dict[str, Any]]:
return [_lsblk for _lsblk in _lsblk_all if not self.is_atari_partitions(_lsblk)]
def generate(self, devs=None):
logger.debug('inspecting devices: {}'.format(devs))
for info_device in info_devices:
bs_info = _get_bluestore_info(info_device['NAME'])
- if bs_info is None:
+ if not bs_info:
# None is also returned in the rare event that there is an issue reading info from
# a BlueStore disk, so be sure to log our assumption that it isn't bluestore
logger.info('device {} does not have BlueStore information'.format(info_device['NAME']))
self.args = parser.parse_args(self.argv)
if self.args.bluestore:
self.args.objectstore = 'bluestore'
- if self.args.dmcrypt and not os.getenv('CEPH_VOLUME_DMCRYPT_SECRET'):
- terminal.error('encryption was requested (--dmcrypt) but environment variable ' \
- 'CEPH_VOLUME_DMCRYPT_SECRET is not set, you must set ' \
- 'this variable to provide a dmcrypt secret.')
- raise SystemExit(1)
+ if self.args.dmcrypt:
+ if not self.args.with_tpm and not os.getenv('CEPH_VOLUME_DMCRYPT_SECRET'):
+ terminal.error('encryption was requested (--dmcrypt) but environment variable ' \
+ 'CEPH_VOLUME_DMCRYPT_SECRET is not set, you must set ' \
+ 'this variable to provide a dmcrypt secret or use --with-tpm ' \
+ 'in order to enroll a tpm2 token.')
+ raise SystemExit(1)
self.objectstore = objectstore.mapping['RAW'][self.args.objectstore](args=self.args)
self.objectstore.safe_prepare(self.args)
from . import lvmbluestore
from . import rawbluestore
+from typing import Any, Dict
-mapping = {
+
+mapping: Dict[str, Any] = {
'LVM': {
'bluestore': lvmbluestore.LvmBlueStore
},
import os
import errno
import time
+import tempfile
from ceph_volume import conf, terminal, process
from ceph_volume.util import prepare as prepare_utils
from ceph_volume.util import system, disk
+from ceph_volume.util import encryption as encryption_utils
from typing import Dict, Any, List, Optional, TYPE_CHECKING
if TYPE_CHECKING:
# for the OSD, this needs to be fixed. This could either be a file (!)
# or a string (!!) or some flags that we would need to compound
# into a dict so that we can convert to JSON (!!!)
- self.secrets = {'cephx_secret': prepare_utils.create_key()}
- self.cephx_secret = self.secrets.get('cephx_secret',
- prepare_utils.create_key())
- self.encrypted = 0
+ self.secrets: Dict[str, str] = {'cephx_secret': prepare_utils.create_key()}
+ self.cephx_secret: str = self.secrets.get('cephx_secret',
+ prepare_utils.create_key())
+ self.encrypted: int = 0
self.tags: Dict[str, Any] = {}
self.osd_id: str = ''
- self.osd_fsid = ''
- self.block_lv: Optional["Volume"] = None
- self.cephx_lockbox_secret = ''
+ self.osd_fsid: str = ''
+ self.cephx_lockbox_secret: str = ''
self.objectstore: str = ''
self.osd_mkfs_cmd: List[str] = []
- self.block_device_path = ''
- if hasattr(self.args, 'dmcrypt'):
- if self.args.dmcrypt:
- self.encrypted = 1
+ self.block_device_path: str = ''
+ self.dmcrypt_key: str = encryption_utils.create_dmcrypt_key()
+ self.with_tpm: int = int(getattr(self.args, 'with_tpm', False))
+ self.method: str = ''
+ if getattr(self.args, 'dmcrypt', False):
+ self.encrypted = 1
+ if not self.with_tpm:
self.cephx_lockbox_secret = prepare_utils.create_key()
self.secrets['cephx_lockbox_secret'] = \
self.cephx_lockbox_secret
def activate(self) -> None:
raise NotImplementedError()
+
+ def enroll_tpm2(self, device: str) -> None:
+ """
+ Enrolls a device with TPM2 (Trusted Platform Module 2.0) using systemd-cryptenroll.
+ This method creates a temporary file to store the dmcrypt key and uses it to enroll the device.
+
+ Args:
+ device (str): The device path to be enrolled with TPM2.
+ """
+
+ if self.with_tpm:
+ tmp_dir: str = '/rootfs/tmp' if os.environ.get('I_AM_IN_A_CONTAINER', False) else '/tmp'
+ with tempfile.NamedTemporaryFile(mode='w', delete=True, dir=tmp_dir) as temp_file:
+ temp_file.write(self.dmcrypt_key)
+ temp_file.flush()
+ temp_file_name: str = temp_file.name.replace('/rootfs', '', 1)
+ cmd: List[str] = ['systemd-cryptenroll', '--tpm2-device=auto',
+ device, '--unlock-key-file', temp_file_name,
+ '--tpm2-pcrs', '9+12', '--wipe-slot', 'tpm2']
+ process.call(cmd, run_on_host=True, show_command=True)
import os
from .baseobjectstore import BaseObjectStore
from ceph_volume.util import system
-from typing import Optional, TYPE_CHECKING
+from ceph_volume.util.encryption import CephLuks2
+from ceph_volume import process
+from typing import Any, Dict, List, Optional, TYPE_CHECKING
if TYPE_CHECKING:
import argparse
+ from ceph_volume.api.lvm import Volume
logger = logging.getLogger(__name__)
self.block_device_path: str = ''
self.wal_device_path: str = ''
self.db_device_path: str = ''
+ self.block_lv: Volume
def add_objectstore_opts(self) -> None:
"""
link_path = os.path.join(self.osd_path, link_name)
if os.path.exists(link_path):
os.unlink(os.path.join(self.osd_path, link_name))
+
+
+ def add_label(self, key: str,
+ value: str,
+ device: str) -> None:
+ """Add a label to a BlueStore device.
+ Args:
+ key (str): The name of the label being added.
+ value (str): Value of the label being added.
+ device (str): The path of the BlueStore device.
+ Raises:
+ RuntimeError: If `ceph-bluestore-tool` command doesn't success.
+ """
+
+ command: List[str] = ['ceph-bluestore-tool',
+ 'set-label-key',
+ '-k',
+ key,
+ '-v',
+ value,
+ '--dev',
+ device]
+
+ _, err, rc = process.call(command,
+ terminal_verbose=True,
+ show_command=True)
+ if rc:
+ raise RuntimeError(f"Can't add BlueStore label '{key}' to device {device}: {err}")
+
+ def osd_mkfs(self) -> None:
+ super().osd_mkfs()
+ mapping: Dict[str, Any] = {'raw': ['data', 'block_db', 'block_wal'],
+ 'lvm': ['ceph.block_device', 'ceph.db_device', 'ceph.wal_device']}
+ if self.args.dmcrypt:
+ for dev_type in mapping[self.method]:
+ if self.method == 'raw':
+ path = self.args.__dict__.get(dev_type, None)
+ else:
+ path = self.block_lv.tags.get(dev_type, None)
+ if path is not None:
+ CephLuks2(path).config_luks2({'subsystem': f'ceph_fsid={self.osd_fsid}'})
class LvmBlueStore(BlueStore):
def __init__(self, args: "argparse.Namespace") -> None:
super().__init__(args)
+ self.method = 'lvm'
self.tags: Dict[str, Any] = {}
- self.block_lv: Optional["Volume"] = None
def pre_prepare(self) -> None:
- if self.encrypted:
- self.secrets['dmcrypt_key'] = encryption_utils.create_dmcrypt_key()
+ if self.encrypted and not self.with_tpm:
+ self.secrets['dmcrypt_key'] = self.dmcrypt_key
cluster_fsid = self.get_cluster_fsid()
self.tags['ceph.block_uuid'] = self.block_lv.__dict__['lv_uuid']
self.tags['ceph.cephx_lockbox_secret'] = self.cephx_lockbox_secret
self.tags['ceph.encrypted'] = self.encrypted
+ self.tags['ceph.with_tpm'] = 1 if self.with_tpm else 0
self.tags['ceph.vdo'] = api.is_vdo(self.block_lv.__dict__['lv_path'])
def prepare_data_device(self,
self.block_lv.set_tags(self.tags) # type: ignore
# 3/ encryption-only operations
- if self.secrets.get('dmcrypt_key'):
+ if self.encrypted:
self.prepare_dmcrypt()
# 4/ osd_prepare req
# done on activation. Format and open ('decrypt' devices) and
# re-assign the device and journal variables so that the rest of the
# process can use the mapper paths
- key = self.secrets['dmcrypt_key']
-
- self.block_device_path = \
- self.luks_format_and_open(key,
- self.block_device_path,
- 'block',
- self.tags)
- self.wal_device_path = self.luks_format_and_open(key,
- self.wal_device_path,
- 'wal',
- self.tags)
- self.db_device_path = self.luks_format_and_open(key,
- self.db_device_path,
- 'db',
- self.tags)
+
+ device_types = ('block', 'db', 'wal')
+
+ for device_type in device_types:
+ attr_name: str = f'{device_type}_device_path'
+ path: str = self.__dict__[attr_name]
+ if path:
+ self.__dict__[attr_name] = self.luks_format_and_open(path,
+ device_type,
+ self.tags)
def luks_format_and_open(self,
- key: Optional[str],
device: str,
device_type: str,
tags: Dict[str, Any]) -> str:
uuid = tags[tag_name]
# format data device
encryption_utils.luks_format(
- key,
+ self.dmcrypt_key,
device
)
+
+ if self.with_tpm:
+ self.enroll_tpm2(device)
+
encryption_utils.luks_open(
- key,
+ self.dmcrypt_key,
device,
- uuid
- )
+ uuid,
+ self.with_tpm)
return '/dev/mapper/%s' % uuid
raise RuntimeError('could not find a bluestore OSD to activate')
is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
- dmcrypt_secret = None
+ dmcrypt_secret = ''
osd_id = osd_block_lv.tags['ceph.osd_id']
conf.cluster = osd_block_lv.tags['ceph.cluster_name']
osd_fsid = osd_block_lv.tags['ceph.osd_fsid']
if is_encrypted:
osd_lv_path = '/dev/mapper/%s' % osd_block_lv.__dict__['lv_uuid']
lockbox_secret = osd_block_lv.tags['ceph.cephx_lockbox_secret']
- encryption_utils.write_lockbox_keyring(osd_id,
- osd_fsid,
- lockbox_secret)
- dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
+ self.with_tpm = bool(osd_block_lv.tags.get('ceph.with_tpm', 0))
+ if not self.with_tpm:
+ encryption_utils.write_lockbox_keyring(osd_id,
+ osd_fsid,
+ lockbox_secret)
+ dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
encryption_utils.luks_open(dmcrypt_secret,
osd_block_lv.__dict__['lv_path'],
- osd_block_lv.__dict__['lv_uuid'])
+ osd_block_lv.__dict__['lv_uuid'],
+ with_tpm=self.with_tpm)
else:
osd_lv_path = osd_block_lv.__dict__['lv_path']
class RawBlueStore(BlueStore):
def __init__(self, args: "argparse.Namespace") -> None:
super().__init__(args)
- if hasattr(self.args, 'data'):
- self.block_device_path = self.args.data
- if hasattr(self.args, 'block_db'):
- self.db_device_path = self.args.block_db
- if hasattr(self.args, 'block_wal'):
- self.wal_device_path = self.args.block_wal
+ self.method = 'raw'
+ self.devices: List[str] = getattr(args, 'devices', [])
+ self.osd_id = getattr(self.args, 'osd_id', '')
+ self.osd_fsid = getattr(self.args, 'osd_fsid', '')
+ self.block_device_path = getattr(self.args, 'data', '')
+ self.db_device_path = getattr(self.args, 'block_db', '')
+ self.wal_device_path = getattr(self.args, 'block_wal', '')
def prepare_dmcrypt(self) -> None:
"""
Helper for devices that are encrypted. The operations needed for
block, db, wal, devices are all the same
"""
- key = self.secrets['dmcrypt_key']
for device, device_type in [(self.block_device_path, 'block'),
(self.db_device_path, 'db'),
device_type)
# format data device
encryption_utils.luks_format(
- key,
+ self.dmcrypt_key,
device
)
+ if self.with_tpm:
+ self.enroll_tpm2(device)
encryption_utils.luks_open(
- key,
+ self.dmcrypt_key,
device,
- mapping
+ mapping,
+ self.with_tpm
)
self.__dict__[f'{device_type}_device_path'] = \
- '/dev/mapper/{}'.format(mapping)
+ '/dev/mapper/{}'.format(mapping) # TODO(guits): need to preserve path or find a way to get the parent device from the mapper ?
def safe_prepare(self,
args: Optional["argparse.Namespace"] = None) -> None:
@decorators.needs_root
def prepare(self) -> None:
- if self.encrypted:
- self.secrets['dmcrypt_key'] = \
- os.getenv('CEPH_VOLUME_DMCRYPT_SECRET')
self.osd_fsid = system.generate_uuid()
crush_device_class = self.args.crush_device_class
+ if self.encrypted and not self.with_tpm:
+ self.dmcrypt_key = os.getenv('CEPH_VOLUME_DMCRYPT_SECRET', '')
+ self.secrets['dmcrypt_key'] = self.dmcrypt_key
if crush_device_class:
self.secrets['crush_device_class'] = crush_device_class
tmpfs = not self.args.no_tmpfs
- if self.args.block_wal:
- self.wal = self.args.block_wal
- if self.args.block_db:
- self.db = self.args.block_db
# reuse a given ID if it exists, otherwise create a new ID
self.osd_id = prepare_utils.create_id(
self.osd_fsid, json.dumps(self.secrets))
- if self.secrets.get('dmcrypt_key'):
+ if self.encrypted:
self.prepare_dmcrypt()
self.prepare_osd_req(tmpfs=tmpfs)
# prepare the osd filesystem
self.osd_mkfs()
- def _activate(self,
- meta: Dict[str, Any],
- tmpfs: bool) -> None:
- # find the osd
- osd_id = meta['osd_id']
- osd_uuid = meta['osd_uuid']
-
+ def _activate(self, osd_id: str, osd_fsid: str) -> None:
# mount on tmpfs the osd directory
self.osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
if not system.path_is_mounted(self.osd_path):
# mkdir -p and mount as tmpfs
- prepare_utils.create_osd_path(osd_id, tmpfs=tmpfs)
+ prepare_utils.create_osd_path(osd_id, tmpfs=not self.args.no_tmpfs)
# XXX This needs to be removed once ceph-bluestore-tool can deal with
# symlinks that exist in the osd dir
'prime-osd-dir',
'--path', self.osd_path,
'--no-mon-config',
- '--dev', meta['device'],
+ '--dev', self.block_device_path,
]
process.run(prime_command)
# always re-do the symlink regardless if it exists, so that the block,
# block.wal, and block.db devices that may have changed can be mapped
# correctly every time
- prepare_utils.link_block(meta['device'], osd_id)
+ prepare_utils.link_block(self.block_device_path, osd_id)
- if 'device_db' in meta:
- prepare_utils.link_db(meta['device_db'], osd_id, osd_uuid)
+ if self.db_device_path:
+ prepare_utils.link_db(self.db_device_path, osd_id, osd_fsid)
- if 'device_wal' in meta:
- prepare_utils.link_wal(meta['device_wal'], osd_id, osd_uuid)
+ if self.wal_device_path:
+ prepare_utils.link_wal(self.wal_device_path, osd_id, osd_fsid)
system.chown(self.osd_path)
terminal.success("ceph-volume raw activate "
"successful for osd ID: %s" % osd_id)
@decorators.needs_root
- def activate(self,
- devs: List[str],
- start_osd_id: str,
- start_osd_uuid: str,
- tmpfs: bool) -> None:
- """
- :param args: The parsed arguments coming from the CLI
+ def activate(self) -> None:
+ """Activate Ceph OSDs on the system.
+
+ This function activates Ceph Object Storage Daemons (OSDs) on the system.
+ It iterates over all block devices, checking if they have a LUKS2 signature and
+ are encrypted for Ceph. If a device's OSD fsid matches and it is enrolled with TPM2,
+ the function pre-activates it. After collecting the relevant devices, it attempts to
+ activate any OSDs found.
+
+ Raises:
+ RuntimeError: If no matching OSDs are found to activate.
"""
- assert devs or start_osd_id or start_osd_uuid
- found = direct_report(devs)
+ assert self.devices or self.osd_id or self.osd_fsid
+
+ activated_any: bool = False
+
+ for d in disk.lsblk_all(abspath=True):
+ device: str = d.get('NAME')
+ luks2 = encryption_utils.CephLuks2(device)
+ if luks2.is_ceph_encrypted:
+ if luks2.is_tpm2_enrolled and self.osd_fsid == luks2.osd_fsid:
+ self.pre_activate_tpm2(device)
+ found = direct_report(self.devices)
- activated_any = False
for osd_uuid, meta in found.items():
osd_id = meta['osd_id']
- if start_osd_id is not None and str(osd_id) != str(start_osd_id):
+ if self.osd_id is not None and str(osd_id) != str(self.osd_id):
continue
- if start_osd_uuid is not None and osd_uuid != start_osd_uuid:
+ if self.osd_fsid is not None and osd_uuid != self.osd_fsid:
continue
- logger.info('Activating osd.%s uuid %s cluster %s' % (
- osd_id, osd_uuid, meta['ceph_fsid']))
- self._activate(meta,
- tmpfs=tmpfs)
+ self.block_device_path = meta.get('device')
+ self.db_device_path = meta.get('device_db', '')
+ self.wal_device_path = meta.get('device_wal', '')
+ logger.info(f'Activating osd.{osd_id} uuid {osd_uuid} cluster {meta["ceph_fsid"]}')
+ self._activate(osd_id, osd_uuid)
activated_any = True
if not activated_any:
raise RuntimeError('did not find any matching OSD to activate')
+
+ def pre_activate_tpm2(self, device: str) -> None:
+ """Pre-activate a TPM2-encrypted device for Ceph.
+
+ This function pre-activates a TPM2-encrypted device for Ceph by opening the
+ LUKS encryption, checking the BlueStore header, and renaming the device
+ mapper according to the BlueStore mapping type.
+
+ Args:
+ device (str): The path to the device to be pre-activated.
+
+ Raises:
+ RuntimeError: If the device does not have a BlueStore signature.
+ """
+ bs_mapping_type: Dict[str, str] = {'bluefs db': 'db',
+ 'bluefs wal': 'wal',
+ 'main': 'block'}
+ self.with_tpm = 1
+ self.temp_mapper: str = f'activating-{os.path.basename(device)}'
+ self.temp_mapper_path: str = f'/dev/mapper/{self.temp_mapper}'
+ encryption_utils.luks_open(
+ '',
+ device,
+ self.temp_mapper,
+ self.with_tpm
+ )
+ bluestore_header: Dict[str, Any] = disk.get_bluestore_header(self.temp_mapper_path)
+ if not bluestore_header:
+ raise RuntimeError(f"{device} doesn't have BlueStore signature.")
+
+ kname: str = disk.get_parent_device_from_mapper(self.temp_mapper_path, abspath=False)
+ device_type = bs_mapping_type[bluestore_header[self.temp_mapper_path]['description']]
+ new_mapper: str = f'ceph-{self.osd_fsid}-{kname}-{device_type}-dmcrypt'
+ self.block_device_path = f'/dev/mapper/{new_mapper}'
+ self.devices.append(self.block_device_path)
+ encryption_utils.rename_mapper(self.temp_mapper, new_mapper)
import os
import pytest
-from mock.mock import patch, PropertyMock, create_autospec
+from mock.mock import patch, PropertyMock, create_autospec, Mock
from ceph_volume.api import lvm
from ceph_volume.util import disk
from ceph_volume.util import device
from ceph_volume.util.constants import ceph_disk_guids
from ceph_volume import conf, configuration, objectstore
+from ceph_volume.objectstore.rawbluestore import RawBlueStore
+from typing import Any, Dict, List, Optional, Callable
class Capture(object):
"osd_id": 9,
"osd_uuid": "a0e07c5b-bee1-4ea2-ae07-cb89deda9b27",
"type": "bluestore"
+ },
+ "db32a338-b640-4cbc-af17-f63808b1c36e": {
+ "ceph_fsid": "c301d0aa-288d-11ef-b535-c84bd6975560",
+ "device": "/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdb-block-dmcrypt",
+ "device_db": "/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdc-db-dmcrypt",
+ "osd_id": 0,
+ "osd_uuid": "db32a338-b640-4cbc-af17-f63808b1c36e",
+ "type": "bluestore"
}
}
@pytest.fixture
def mock_raw_direct_report(monkeypatch):
- monkeypatch.setattr('ceph_volume.objectstore.rawbluestore.direct_report', lambda x: raw_direct_report_data)
\ No newline at end of file
+ monkeypatch.setattr('ceph_volume.objectstore.rawbluestore.direct_report', lambda x: raw_direct_report_data)
+
+@pytest.fixture
+def fake_lsblk_all(monkeypatch: Any) -> Callable:
+ def apply(data: Optional[List[Dict[str, Any]]] = None) -> None:
+ if data is None:
+ devices = []
+ else:
+ devices = data
+ monkeypatch.setattr("ceph_volume.util.device.disk.lsblk_all", lambda *a, **kw: devices)
+ return apply
+
+@pytest.fixture
+def rawbluestore(factory: type[Factory]) -> RawBlueStore:
+ args = factory(devices=['/dev/foo'])
+ with patch('ceph_volume.objectstore.rawbluestore.prepare_utils.create_key', Mock(return_value=['AQCee6ZkzhOrJRAAZWSvNC3KdXOpC2w8ly4AZQ=='])):
+ r = RawBlueStore(args) # type: ignore
+ return r
with pytest.raises(RuntimeError) as error:
self.p.args = Mock()
self.p.args.data = '/dev/sdfoo'
+ self.p.args.with_tpm = '0'
self.p.get_lv = Mock()
self.p.objectstore = objectstore.lvmbluestore.LvmBlueStore(args=self.p.args)
self.p.objectstore.safe_prepare()
osd_fsid='123',
secrets=dict(dmcrypt_key='foo'))
self.p.objectstore.prepare_dmcrypt()
- m_luks_open.assert_called_with('foo', '/dev/foo', 'ceph-123-foo-block-dmcrypt')
- m_luks_format.assert_called_with('foo', '/dev/foo')
+ m_luks_open.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/foo', 'ceph-123-foo-block-dmcrypt', 0)
+ m_luks_format.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/foo')
assert self.p.objectstore.__dict__['block_device_path'] == '/dev/mapper/ceph-123-foo-block-dmcrypt'
@patch('ceph_volume.util.encryption.luks_open')
osd_fsid='456',
secrets=dict(dmcrypt_key='foo'))
self.p.objectstore.prepare_dmcrypt()
- m_luks_open.assert_called_with('foo', '/dev/db-foo', 'ceph-456-foo-db-dmcrypt')
- m_luks_format.assert_called_with('foo', '/dev/db-foo')
+ m_luks_open.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/db-foo', 'ceph-456-foo-db-dmcrypt', 0)
+ m_luks_format.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/db-foo')
assert self.p.objectstore.__dict__['db_device_path'] == '/dev/mapper/ceph-456-foo-db-dmcrypt'
@patch('ceph_volume.util.encryption.luks_open')
osd_fsid='789',
secrets=dict(dmcrypt_key='foo'))
self.p.objectstore.prepare_dmcrypt()
- m_luks_open.assert_called_with('foo', '/dev/wal-foo', 'ceph-789-foo-wal-dmcrypt')
- m_luks_format.assert_called_with('foo', '/dev/wal-foo')
+ m_luks_open.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/wal-foo', 'ceph-789-foo-wal-dmcrypt', 0)
+ m_luks_format.assert_called_with(self.p.objectstore.dmcrypt_key, '/dev/wal-foo')
assert self.p.objectstore.__dict__['wal_device_path'] == '/dev/mapper/ceph-789-foo-wal-dmcrypt'
@patch('ceph_volume.objectstore.rawbluestore.rollback_osd')
@patch('ceph_volume.conf.cluster', 'ceph')
@patch('ceph_volume.api.lvm.get_single_lv')
@patch('ceph_volume.objectstore.lvmbluestore.prepare_utils.create_id', Mock(return_value='111'))
- @patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.create_dmcrypt_key', Mock(return_value='fake-dmcrypt-key'))
def test_pre_prepare_lv(self, m_get_single_lv, factory):
args = factory(cluster_fsid='abcd',
osd_fsid='abc123',
lv_tags='',
lv_uuid='fake-uuid')
self.lvm_bs.encrypted = True
+ self.lvm_bs.dmcrypt_key = 'fake-dmcrypt-key'
self.lvm_bs.args = args
self.lvm_bs.pre_prepare()
assert self.lvm_bs.secrets['dmcrypt_key'] == 'fake-dmcrypt-key'
'ceph.block_uuid': 'fake-uuid',
'ceph.cephx_lockbox_secret': '',
'ceph.encrypted': True,
- 'ceph.vdo': '0'}
+ 'ceph.vdo': '0',
+ 'ceph.with_tpm': 0}
+
+ @patch('ceph_volume.conf.cluster', 'ceph')
+ @patch('ceph_volume.api.lvm.get_single_lv')
+ @patch('ceph_volume.objectstore.lvmbluestore.prepare_utils.create_id', Mock(return_value='111'))
+ def test_pre_prepare_lv_with_dmcrypt_and_tpm(self, m_get_single_lv, factory):
+ args = factory(cluster_fsid='abcd',
+ osd_fsid='abc123',
+ crush_device_class='ssd',
+ osd_id='111',
+ data='vg_foo/lv_foo',
+ dmcrypt=True,
+ with_tpm=True)
+ m_get_single_lv.return_value = Volume(lv_name='lv_foo',
+ lv_path='/fake-path',
+ vg_name='vg_foo',
+ lv_tags='',
+ lv_uuid='fake-uuid')
+ self.lvm_bs.encrypted = True
+ self.lvm_bs.with_tpm = True
+ self.lvm_bs.dmcrypt_key = 'fake-dmcrypt-key-tpm2'
+ self.lvm_bs.args = args
+ self.lvm_bs.pre_prepare()
+ assert 'dmcrypt_key' not in self.lvm_bs.secrets.keys()
+ assert self.lvm_bs.secrets['crush_device_class'] == 'ssd'
+ assert self.lvm_bs.osd_id == '111'
+ assert self.lvm_bs.block_device_path == '/fake-path'
+ assert self.lvm_bs.tags == {'ceph.osd_fsid': 'abc123',
+ 'ceph.osd_id': '111',
+ 'ceph.cluster_fsid': 'abcd',
+ 'ceph.cluster_name': 'ceph',
+ 'ceph.crush_device_class': 'ssd',
+ 'ceph.osdspec_affinity': '',
+ 'ceph.block_device': '/fake-path',
+ 'ceph.block_uuid': 'fake-uuid',
+ 'ceph.cephx_lockbox_secret': '',
+ 'ceph.encrypted': True,
+ 'ceph.vdo': '0',
+ 'ceph.with_tpm': 1}
@patch('ceph_volume.objectstore.lvmbluestore.prepare_utils.create_id', Mock(return_value='111'))
- @patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.create_dmcrypt_key', Mock(return_value='fake-dmcrypt-key'))
def test_pre_prepare_no_lv(self, factory):
args = factory(cluster_fsid='abcd',
osd_fsid='abc123',
crush_device_class='ssd',
osd_id='111',
- data='/dev/foo')
+ data='/dev/foo',
+ dmcrypt_key='fake-dmcrypt-key')
self.lvm_bs.prepare_data_device = lambda x, y: Volume(lv_name='lv_foo',
lv_path='/fake-path',
vg_name='vg_foo',
lv_tags='',
lv_uuid='fake-uuid')
self.lvm_bs.encrypted = True
+ self.lvm_bs.dmcrypt_key = 'fake-dmcrypt-key'
self.lvm_bs.args = args
self.lvm_bs.pre_prepare()
assert self.lvm_bs.secrets['dmcrypt_key'] == 'fake-dmcrypt-key'
'ceph.block_uuid': 'fake-uuid',
'ceph.cephx_lockbox_secret': '',
'ceph.encrypted': True,
- 'ceph.vdo': '0'}
+ 'ceph.vdo': '0',
+ 'ceph.with_tpm': 0}
@patch('ceph_volume.util.disk.is_partition', Mock(return_value=True))
@patch('ceph_volume.api.lvm.create_lv')
block_db_size=123,
block_wal_slots=1,
block_db_slots=1,
+ with_tpm=False
)
self.lvm_bs.args = args
self.lvm_bs.pre_prepare = lambda: None
assert self.lvm_bs.wal_device_path == '/dev/foo1'
assert self.lvm_bs.db_device_path == '/dev/foo2'
assert self.lvm_bs.block_lv.set_tags.mock_calls == [call({'ceph.type': 'block', 'ceph.vdo': '0', 'ceph.wal_uuid': 'c6798f59-01', 'ceph.wal_device': '/dev/foo1', 'ceph.db_uuid': 'c6798f59-01', 'ceph.db_device': '/dev/foo2'})]
- assert self.lvm_bs.prepare_dmcrypt.called
+ assert not self.lvm_bs.prepare_dmcrypt.called
assert self.lvm_bs.osd_mkfs.called
assert self.lvm_bs.prepare_osd_req.called
self.lvm_bs.secrets = {'dmcrypt_key': 'fake-secret'}
self.lvm_bs.tags = {'ceph.block_uuid': 'block-uuid1',
'ceph.db_uuid': 'db-uuid2',
- 'ceph.wal_uuid': 'wal-uuid3'}
- self.lvm_bs.luks_format_and_open = lambda *a: f'/dev/mapper/{a[3]["ceph."+a[2]+"_uuid"]}'
+ 'ceph.wal_uuid': 'wal-uuid3',
+ 'ceph.with_tpm': 0}
+ self.lvm_bs.block_device_path = '/dev/sdb'
+ self.lvm_bs.db_device_path = '/dev/sdc'
+ self.lvm_bs.wal_device_path = '/dev/sdb'
+ self.lvm_bs.luks_format_and_open = lambda *a: f'/dev/mapper/{a[2]["ceph."+a[1]+"_uuid"]}'
self.lvm_bs.prepare_dmcrypt()
assert self.lvm_bs.block_device_path == '/dev/mapper/block-uuid1'
assert self.lvm_bs.db_device_path == '/dev/mapper/db-uuid2'
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_open')
@patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_format')
def test_luks_format_and_open(self, m_luks_format, m_luks_open):
- result = self.lvm_bs.luks_format_and_open('key',
- '/dev/foo',
+ result = self.lvm_bs.luks_format_and_open('/dev/foo',
+ 'block',
+ {'ceph.block_uuid': 'block-uuid1'})
+ assert result == '/dev/mapper/block-uuid1'
+
+ @patch('ceph_volume.objectstore.lvmbluestore.LvmBlueStore.enroll_tpm2', Mock(return_value=MagicMock()))
+ @patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_open')
+ @patch('ceph_volume.objectstore.lvmbluestore.encryption_utils.luks_format')
+ def test_luks_format_and_open_with_tpm(self, m_luks_format, m_luks_open):
+ self.lvm_bs.with_tpm = True
+ result = self.lvm_bs.luks_format_and_open('/dev/foo',
'block',
{'ceph.block_uuid': 'block-uuid1'})
assert result == '/dev/mapper/block-uuid1'
+ self.lvm_bs.enroll_tpm2.assert_called_once()
def test_luks_format_and_open_not_device(self):
- result = self.lvm_bs.luks_format_and_open('key',
- '',
+ result = self.lvm_bs.luks_format_and_open('',
'block',
{})
assert result == ''
assert self.raw_bs.db_device_path == "/dev/mapper/ceph--foo0-db-dmcrypt"
assert self.raw_bs.wal_device_path == "/dev/mapper/ceph--foo0-wal-dmcrypt"
+ @patch('ceph_volume.objectstore.rawbluestore.RawBlueStore.enroll_tpm2', Mock(return_value=MagicMock()))
+ def test_prepare_dmcrypt_with_tpm(self,
+ device_info,
+ fake_call,
+ key_size):
+ self.raw_bs.block_device_path = '/dev/foo0'
+ self.raw_bs.db_device_path = '/dev/foo1'
+ self.raw_bs.wal_device_path = '/dev/foo2'
+ self.raw_bs.with_tpm = 1
+ lsblk = {"TYPE": "disk",
+ "NAME": "foo0",
+ 'KNAME': 'foo0'}
+ device_info(lsblk=lsblk)
+ self.raw_bs.prepare_dmcrypt()
+ assert 'dmcrypt_key' not in self.raw_bs.secrets.keys()
+ assert self.raw_bs.block_device_path == "/dev/mapper/ceph--foo0-block-dmcrypt"
+ assert self.raw_bs.db_device_path == "/dev/mapper/ceph--foo0-db-dmcrypt"
+ assert self.raw_bs.wal_device_path == "/dev/mapper/ceph--foo0-wal-dmcrypt"
+ assert self.raw_bs.enroll_tpm2.mock_calls == [call('/dev/foo0'), call('/dev/foo1'), call('/dev/foo2')]
+
@patch('ceph_volume.objectstore.rawbluestore.rollback_osd')
@patch('ceph_volume.objectstore.rawbluestore.RawBlueStore.prepare')
def test_safe_prepare_raises_exception(self,
capsys):
args = factory(dmcrypt=True,
data='/dev/foo')
- # self.raw_bs.args = args
self.raw_bs.safe_prepare(args)
- stdout, stderr = capsys.readouterr()
+ _, stderr = capsys.readouterr()
assert "prepare successful for: /dev/foo" in stderr
- # @patch('ceph_volume.objectstore.rawbluestore.prepare_utils.create_id')
- # @patch('ceph_volume.objectstore.rawbluestore.system.generate_uuid', return_value='fake-uuid')
@patch.dict('os.environ', {'CEPH_VOLUME_DMCRYPT_SECRET': 'dmcrypt-key'})
@patch('ceph_volume.objectstore.rawbluestore.prepare_utils.create_id')
@patch('ceph_volume.objectstore.rawbluestore.system.generate_uuid')
m_link_block,
m_link_db,
m_link_wal,
- monkeypatch):
- meta = dict(osd_id='1',
- osd_uuid='fake-uuid',
- device='/dev/foo',
- device_db='/dev/foo1',
- device_wal='/dev/foo2')
+ monkeypatch,
+ factory):
+ args = factory(no_tmpfs=False)
+ self.raw_bs.args = args
+ self.raw_bs.block_device_path = '/dev/sda'
+ self.raw_bs.db_device_path = '/dev/sdb'
+ self.raw_bs.wal_device_path = '/dev/sdc'
m_run.return_value = MagicMock()
m_exists.side_effect = lambda path: True
m_create_osd_path.return_value = MagicMock()
m_unlink.return_value = MagicMock()
monkeypatch.setattr(system, 'chown', lambda path: 0)
monkeypatch.setattr(system, 'path_is_mounted', lambda path: 0)
- self.raw_bs._activate(meta, True)
+ self.raw_bs._activate('1', True)
calls = [call('/var/lib/ceph/osd/ceph-1/block'),
call('/var/lib/ceph/osd/ceph-1/block.db'),
call('/var/lib/ceph/osd/ceph-1/block.wal')]
assert m_run.mock_calls == [call(['ceph-bluestore-tool',
'prime-osd-dir',
'--path', '/var/lib/ceph/osd/ceph-1',
- '--no-mon-config', '--dev', '/dev/foo'])]
+ '--no-mon-config', '--dev', '/dev/sda'])]
assert m_unlink.mock_calls == calls
assert m_exists.mock_calls == calls
assert m_create_osd_path.mock_calls == [call('1', tmpfs=True)]
is_root,
mock_raw_direct_report):
with pytest.raises(RuntimeError) as error:
- self.raw_bs.activate([],
- '123',
- 'fake-uuid',
- True)
+ self.raw_bs.osd_id = '1'
+ self.raw_bs.activate()
assert str(error.value) == 'did not find any matching OSD to activate'
- def test_activate_osd_id(self,
- is_root,
- mock_raw_direct_report):
+ def test_activate_osd_id_and_fsid(self,
+ is_root,
+ mock_raw_direct_report):
self.raw_bs._activate = MagicMock()
- self.raw_bs.activate([],
- '8',
- '824f7edf-371f-4b75-9231-4ab62a32d5c0',
- True)
+ self.raw_bs.osd_id = '8'
+ self.raw_bs.osd_fsid = '824f7edf-371f-4b75-9231-4ab62a32d5c0'
+ self.raw_bs.activate()
self.raw_bs._activate.mock_calls == [call({'ceph_fsid': '7dccab18-14cf-11ee-837b-5254008f8ca5',
'device': '/dev/mapper/ceph--40bc7bd7--4aee--483e--ba95--89a64bc8a4fd-osd--block--824f7edf--371f--4b75--9231--4ab62a32d5c0',
'device_db': '/dev/mapper/ceph--73d6d4db--6528--48f2--a4e2--1c82bc87a9ac-osd--db--b82d920d--be3c--4e4d--ba64--18f7e8445892',
'osd_id': 8,
'osd_uuid': '824f7edf-371f-4b75-9231-4ab62a32d5c0',
'type': 'bluestore'},
- tmpfs=True)]
+ tmpfs=True)]
- def test_activate_osd_fsid(self,
- is_root,
- mock_raw_direct_report):
- self.raw_bs._activate = MagicMock()
- with pytest.raises(RuntimeError):
- self.raw_bs.activate([],
- '8',
- 'a0e07c5b-bee1-4ea2-ae07-cb89deda9b27',
- True)
- self.raw_bs._activate.mock_calls == [call({'ceph_fsid': '7dccab18-14cf-11ee-837b-5254008f8ca5',
- 'device': '/dev/mapper/ceph--e34cc3f5--a70d--49df--82b3--46bcbd63d4b0-osd--block--a0e07c5b--bee1--4ea2--ae07--cb89deda9b27',
- 'osd_id': 9,
- 'osd_uuid': 'a0e07c5b-bee1-4ea2-ae07-cb89deda9b27',
- 'type': 'bluestore'},
- tmpfs=True)]
\ No newline at end of file
+ @patch('ceph_volume.objectstore.rawbluestore.encryption_utils.rename_mapper', Mock(return_value=MagicMock()))
+ @patch('ceph_volume.util.disk.get_bluestore_header')
+ @patch('ceph_volume.objectstore.rawbluestore.encryption_utils.luks_open', Mock(return_value=MagicMock()))
+ def test_activate_dmcrypt_tpm(self, m_bs_header, rawbluestore, fake_lsblk_all, mock_raw_direct_report, is_root) -> None:
+ m_bs_header.return_value = {
+ "/dev/mapper/activating-sdb": {
+ "osd_uuid": "db32a338-b640-4cbc-af17-f63808b1c36e",
+ "size": 20000572178432,
+ "btime": "2024-06-13T12:16:57.607442+0000",
+ "description": "main",
+ "bfm_blocks": "4882952192",
+ "bfm_blocks_per_key": "128",
+ "bfm_bytes_per_block": "4096",
+ "bfm_size": "20000572178432",
+ "bluefs": "1",
+ "ceph_fsid": "c301d0aa-288d-11ef-b535-c84bd6975560",
+ "ceph_version_when_created": "ceph version 19.0.0-4242-gf2f7cc60 (f2f7cc609cdbae767486cf2fe6872a4789adffb2) squid (dev)",
+ "created_at": "2024-06-13T12:17:20.122565Z",
+ "elastic_shared_blobs": "1",
+ "kv_backend": "rocksdb",
+ "magic": "ceph osd volume v026",
+ "mkfs_done": "yes",
+ "osd_key": "AQAk42pmt7tqFxAAHlaETFm33yFtEuoQAh/cpQ==",
+ "ready": "ready",
+ "whoami": "0"}
+ }
+ mock_luks2_1 = Mock()
+ mock_luks2_1.is_ceph_encrypted = True
+ mock_luks2_1.is_tpm2_enrolled = True
+ mock_luks2_1.osd_fsid = 'db32a338-b640-4cbc-af17-f63808b1c36e'
+
+ mock_luks2_2 = Mock()
+ mock_luks2_2.is_ceph_encrypted = True
+ mock_luks2_2.is_tpm2_enrolled = False
+ mock_luks2_2.osd_fsid = 'db32a338-b640-4cbc-af17-f63808b1c36e'
+
+ mock_luks2_3 = Mock()
+ mock_luks2_3.is_ceph_encrypted = False
+ mock_luks2_3.is_tpm2_enrolled = False
+ mock_luks2_3.osd_fsid = ''
+
+ mock_luks2_4 = Mock()
+ mock_luks2_4.is_ceph_encrypted = True
+ mock_luks2_4.is_tpm2_enrolled = True
+ mock_luks2_4.osd_fsid = 'abcd'
+ with patch('ceph_volume.objectstore.rawbluestore.encryption_utils.CephLuks2', side_effect=[mock_luks2_1,
+ mock_luks2_2,
+ mock_luks2_3,
+ mock_luks2_4]):
+ fake_lsblk_all([{'NAME': '/dev/sdb', 'FSTYPE': 'crypto_LUKS'},
+ {'NAME': '/dev/sdc', 'FSTYPE': 'crypto_LUKS'},
+ {'NAME': '/dev/sdd', 'FSTYPE': ''}])
+ rawbluestore.osd_fsid = 'db32a338-b640-4cbc-af17-f63808b1c36e'
+ rawbluestore.osd_id = '0'
+ rawbluestore._activate = MagicMock()
+ rawbluestore.activate()
+ assert rawbluestore._activate.mock_calls == [call(0, 'db32a338-b640-4cbc-af17-f63808b1c36e')]
+ assert rawbluestore.block_device_path == '/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdb-block-dmcrypt'
+ assert rawbluestore.db_device_path == '/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdc-db-dmcrypt'
--- /dev/null
+import os
+from ceph_volume import AllowLoopDevices, allow_loop_devices
+from typing import Any
+
+
+class TestAllowLoopDevsWarning:
+ def setup_method(self) -> None:
+ AllowLoopDevices.allow = False
+ AllowLoopDevices.warned = False
+ self.teardown_method()
+
+ def teardown_method(self) -> None:
+ AllowLoopDevices.allow = False
+ AllowLoopDevices.warned = False
+ if os.environ.get('CEPH_VOLUME_ALLOW_LOOP_DEVICES'):
+ os.environ.pop('CEPH_VOLUME_ALLOW_LOOP_DEVICES')
+
+ def test_loop_dev_warning(self, fake_call: Any, caplog: Any) -> None:
+ AllowLoopDevices.warned = False
+ assert allow_loop_devices() is False
+ assert not caplog.records
+ os.environ['CEPH_VOLUME_ALLOW_LOOP_DEVICES'] = "y"
+ assert allow_loop_devices() is True
+ log = caplog.records[0]
+ assert log.levelname == "WARNING"
+ assert "will never be supported in production" in log.message
-import os
import pytest
from ceph_volume.util import disk
from mock.mock import patch, MagicMock
assert result == "1027.00 TB"
-class TestAllowLoopDevsWarning(object):
- def setup_method(self):
- disk.AllowLoopDevices.allow = False
- disk.AllowLoopDevices.warned = False
- if os.environ.get('CEPH_VOLUME_ALLOW_LOOP_DEVICES'):
- os.environ.pop('CEPH_VOLUME_ALLOW_LOOP_DEVICES')
-
- def test_loop_dev_warning(self, fake_call, caplog):
- disk.AllowLoopDevices.warned = False
- assert disk.allow_loop_devices() is False
- assert not caplog.records
- os.environ['CEPH_VOLUME_ALLOW_LOOP_DEVICES'] = "y"
- assert disk.allow_loop_devices() is True
- log = caplog.records[0]
- assert log.levelname == "WARNING"
- assert "will never be supported in production" in log.message
-
-
class TestHasBlueStoreLabel(object):
def test_device_path_is_a_path(self, fake_filesystem):
device_path = '/var/lib/ceph/osd/ceph-0'
from ceph_volume.util import encryption
-from mock.mock import patch, Mock
+from mock.mock import call, patch, Mock, MagicMock
+from typing import Any
import base64
import pytest
+import json
class TestNoWorkqueue:
]
encryption.luks_open('abcd', '/dev/foo', '/dev/bar')
assert m_call.call_args[0][0] == expected
+
+
+class TestCephLuks2:
+ @patch.object(encryption.CephLuks2, 'get_osd_fsid', Mock(return_value='abcd-1234'))
+ @patch.object(encryption.CephLuks2, 'is_ceph_encrypted', Mock(return_value=True))
+ def test_init_ceph_encrypted(self) -> None:
+ assert encryption.CephLuks2('/dev/foo').osd_fsid == 'abcd-1234'
+
+ @patch.object(encryption.CephLuks2, 'get_osd_fsid', Mock(return_value=''))
+ @patch.object(encryption.CephLuks2, 'is_ceph_encrypted', Mock(return_value=False))
+ def test_init_not_ceph_encrypted(self) -> None:
+ assert encryption.CephLuks2('/dev/foo').osd_fsid == ''
+
+ def test_has_luks2_signature(self) -> None:
+ with patch('ceph_volume.util.encryption._dd_read', return_value='LUKS'):
+ assert encryption.CephLuks2('/dev/foo').has_luks2_signature
+
+ @patch('ceph_volume.util.encryption._dd_read', side_effect=Exception('foo'))
+ def test_has_luks2_signature_raises_exception(self, m_dd_read: Any) -> None:
+ with pytest.raises(RuntimeError):
+ encryption.CephLuks2('/dev/foo').has_luks2_signature
+
+ @patch.object(encryption.CephLuks2, 'get_subsystem', Mock(return_value='ceph_fsid=abcd'))
+ @patch.object(encryption.CephLuks2, 'has_luks2_signature', Mock(return_value=True))
+ def test_is_ceph_encrypted(self) -> None:
+ assert encryption.CephLuks2('/dev/foo').is_ceph_encrypted
+
+ @patch.object(encryption.CephLuks2, 'get_label', Mock(return_value=''))
+ @patch.object(encryption.CephLuks2, 'has_luks2_signature', Mock(return_value=True))
+ def test_is_not_ceph_encrypted(self) -> None:
+ assert not encryption.CephLuks2('/dev/foo').is_ceph_encrypted
+
+ @patch('ceph_volume.util.encryption.process.call', Mock(return_value=MagicMock()))
+ def test_config_luks2_invalid_config(self) -> None:
+ with pytest.raises(RuntimeError):
+ encryption.CephLuks2('/dev/foo').config_luks2({'subsystem': 'ceph_fsid=1234-abcd', 'label': 'foo', 'foo': 'bar'})
+
+ @patch('ceph_volume.util.encryption.process.call', Mock(return_value=MagicMock()))
+ def test_config_luks2_invalid_config_keys(self) -> None:
+ with pytest.raises(RuntimeError):
+ encryption.CephLuks2('/dev/foo').config_luks2({'fake': 'fake-value', 'subsystem': 'ceph_fsid=1234-abcd'})
+
+ @patch('ceph_volume.util.encryption.process.call')
+ def test_config_luks2_ok(self, m_call: Any) -> None:
+ m_call.return_value = ('', '', 0)
+ encryption.CephLuks2('/dev/foo').config_luks2({'label': 'foo', 'subsystem': 'ceph_fsid=1234-abcd'})
+ assert m_call.mock_calls == [call(['cryptsetup', 'config', '/dev/foo', '--label', 'foo', '--subsystem', 'ceph_fsid=1234-abcd'], verbose_on_failure=False)]
+
+ @patch('ceph_volume.util.encryption.process.call')
+ def test_config_luks2_raises_exception(self, m_call: Any) -> None:
+ m_call.return_value = ('', '', 1)
+ with pytest.raises(RuntimeError):
+ encryption.CephLuks2('/dev/foo').config_luks2({'label': 'foo', 'subsystem': 'ceph_fsid=1234-abcd'})
+
+ def test_get_label(self) -> None:
+ with patch('ceph_volume.util.encryption._dd_read', return_value='fake-luks2-label'):
+ label: str = encryption.CephLuks2('/dev/foo').get_label()
+ assert label == 'fake-luks2-label'
+
+ def test_get_label_raises_exception(self) -> None:
+ with patch('ceph_volume.util.encryption._dd_read', side_effect=Exception('fake-error')):
+ with pytest.raises(RuntimeError):
+ encryption.CephLuks2('/dev/foo').get_label()
+
+ @patch.object(encryption.CephLuks2, 'get_subsystem', Mock(return_value='ceph_fsid=abcd'))
+ def test_get_osd_fsid(self) -> None:
+ assert encryption.CephLuks2('/dev/foo').get_osd_fsid() == 'abcd'
+
+ @patch.object(encryption.CephLuks2, 'get_label', Mock(return_value='ceph'))
+ def test_get_osd_fsid_error(self) -> None:
+ result: str = encryption.CephLuks2('/dev/foo').get_osd_fsid()
+ assert result == ''
+
+ def test_get_subsystem(self) -> None:
+ with patch('ceph_volume.util.encryption._dd_read', return_value='fake-luks2-subsystem'):
+ assert encryption.CephLuks2('/dev/foo').get_subsystem() == 'fake-luks2-subsystem'
+
+ def test_get_subsystem_raises_exception(self) -> None:
+ with patch('ceph_volume.util.encryption._dd_read', side_effect=Exception('fake-error')):
+ with pytest.raises(RuntimeError):
+ encryption.CephLuks2('/dev/foo').get_subsystem()
+
+ def test_get_json_area(self) -> None:
+ mock_json_data = '{"tokens": {"1": {"type": "systemd-tpm2"}}}'
+ with patch('ceph_volume.util.encryption._dd_read', return_value=mock_json_data):
+ assert encryption.CephLuks2('/dev/foo').get_json_area() == json.loads(mock_json_data)
+
+ def test_get_json_area_invalid(self) -> None:
+ with patch('ceph_volume.util.encryption._dd_read', return_value='invalid-json-data'):
+ with pytest.raises(RuntimeError):
+ encryption.CephLuks2('/dev/foo').get_json_area()
+
+ def test_get_json_area_exception_caught(self) -> None:
+ with patch('ceph_volume.util.encryption._dd_read', side_effect=OSError):
+ with pytest.raises(OSError):
+ encryption.CephLuks2('/dev/foo').get_json_area()
+
+ @patch('ceph_volume.util.encryption.lsblk', Mock(return_value={'FSTYPE': 'crypto_LUKS'}))
+ @patch.object(encryption.CephLuks2, 'get_json_area', Mock(return_value={"tokens": {"1": {"type": "systemd-tpm2"}}}))
+ def test_is_tpm2_enrolled_true(self) -> None:
+ assert encryption.CephLuks2('/dev/foo').is_tpm2_enrolled
+
+ @patch('ceph_volume.util.encryption.lsblk', Mock(return_value={'FSTYPE': 'whatever'}))
+ def test_is_tpm2_enrolled_false_not_a_luks_device(self) -> None:
+ assert not encryption.CephLuks2('/dev/foo').is_tpm2_enrolled
+
+ @patch('ceph_volume.util.encryption.lsblk', Mock(return_value={'FSTYPE': 'crypto_LUKS'}))
+ @patch.object(encryption.CephLuks2, 'get_json_area', Mock(return_value={"whatever": "fake-value"}))
+ def test_is_tpm2_enrolled_false_not_enrolled_with_tpm2(self) -> None:
+ assert not encryption.CephLuks2('/dev/foo').is_tpm2_enrolled
o.osd_mkfs()
assert '--keyfile' in fake_call.calls[2]['args'][0]
- def test_keyring_is_not_added(self, fake_call, monkeypatch):
+ def test_keyring_is_not_added(self, fake_call, monkeypatch, factory):
+ args = factory(dmcrypt=False)
monkeypatch.setattr(system, 'chown', lambda path: True)
o = objectstore.bluestore.BlueStore([])
+ o.args = args
o.osd_id = '1'
o.osd_fsid = 'asdf'
o.osd_mkfs()
assert '--keyfile' not in fake_call.calls[0]['args'][0]
- def test_wal_is_added(self, fake_call, monkeypatch, objectstore_bluestore):
+ def test_wal_is_added(self, fake_call, monkeypatch, objectstore_bluestore, factory):
+ args = factory(dmcrypt=False)
monkeypatch.setattr(system, 'chown', lambda path: True)
bs = objectstore_bluestore(objecstore='bluestore',
osd_id='1',
osd_fid='asdf',
wal_device_path='/dev/smm1',
- cephx_secret='foo',)
+ cephx_secret='foo',
+ dmcrypt=False)
+ bs.args = args
bs.osd_mkfs()
assert '--bluestore-block-wal-path' in fake_call.calls[2]['args'][0]
assert '/dev/smm1' in fake_call.calls[2]['args'][0]
- def test_db_is_added(self, fake_call, monkeypatch):
+ def test_db_is_added(self, fake_call, monkeypatch, factory):
+ args = factory(dmcrypt=False)
monkeypatch.setattr(system, 'chown', lambda path: True)
bs = objectstore.bluestore.BlueStore([])
+ bs.args = args
bs.db_device_path = '/dev/smm2'
bs.osd_mkfs()
assert '--bluestore-block-db-path' in fake_call.calls[2]['args'][0]
import logging
import os
from functools import total_ordering
-from ceph_volume import sys_info
+from ceph_volume import sys_info, allow_loop_devices
from ceph_volume.api import lvm
from ceph_volume.util import disk, system
from ceph_volume.util.lsmdisk import LSMDisk
from ceph_volume.util.constants import ceph_disk_guids
-from ceph_volume.util.disk import allow_loop_devices
logger = logging.getLogger(__name__)
import re
import stat
import time
-from ceph_volume import process
+import json
+from ceph_volume import process, allow_loop_devices
from ceph_volume.api import lvm
from ceph_volume.util.system import get_file_contents
from typing import Dict, List, Any
return device_name.startswith(('/dev/mapper', '/dev/dm-'))
-class AllowLoopDevices(object):
- allow = False
- warned = False
-
- @classmethod
- def __call__(cls):
- val = os.environ.get("CEPH_VOLUME_ALLOW_LOOP_DEVICES", "false").lower()
- if val not in ("false", 'no', '0'):
- cls.allow = True
- if not cls.warned:
- logger.warning(
- "CEPH_VOLUME_ALLOW_LOOP_DEVICES is set in your "
- "environment, so we will allow the use of unattached loop"
- " devices as disks. This feature is intended for "
- "development purposes only and will never be supported in"
- " production. Issues filed based on this behavior will "
- "likely be ignored."
- )
- cls.warned = True
- return cls.allow
-
-
-allow_loop_devices = AllowLoopDevices()
-
-
def get_block_devs_sysfs(_sys_block_path: str = '/sys/block', _sys_dev_block_path: str = '/sys/dev/block', device: str = '') -> List[List[str]]:
def holder_inner_loop() -> bool:
for holder in holders:
result.append(f'/dev/mapper/{name.strip()}')
result.append(f'/dev/{device}')
return result
+
+def _dd_read(device: str, count: int, skip: int = 0) -> str:
+ """Read bytes from a device
+
+ Args:
+ device (str): The device to read bytes from.
+ count (int): The number of bytes to read.
+ skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0.
+
+ Returns:
+ str: A string containing the read bytes.
+ """
+ result: str = ''
+ try:
+ with open(device, 'rb') as b:
+ b.seek(skip)
+ data: bytes = b.read(count)
+ result = data.decode('utf-8').replace('\x00', '')
+ except OSError:
+ logger.warning(f"Can't read from {device}")
+ pass
+ except UnicodeDecodeError:
+ pass
+ except Exception as e:
+ logger.error(f"An error occurred while reading from {device}: {e}")
+ raise
+
+ return result
+
+def _dd_write(device: str, data: str, skip: int = 0) -> None:
+ """Write bytes to a device
+
+ Args:
+ device (str): The device to write bytes to.
+ data (str): The data to write to the device.
+ skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0.
+
+ Raises:
+ OSError: If there is an error opening or writing to the device.
+ Exception: If any other error occurs during the write operation.
+ """
+ try:
+ with open(device, 'r+b') as b:
+ b.seek(skip)
+ b.write(data.encode('utf-8'))
+ except OSError:
+ logger.warning(f"Can't write to {device}")
+ raise
+ except Exception as e:
+ logger.error(f"An error occurred while writing to {device}: {e}")
+ raise
+
+def get_bluestore_header(device: str) -> Dict[str, Any]:
+ """Retrieve BlueStore header information from a given device.
+
+ This function retrieves BlueStore header information from the specified 'device'.
+ It first checks if the device exists. If the device does not exist, a RuntimeError
+ is raised. Then, it calls the 'ceph-bluestore-tool' command to show the label
+ information of the device. If the command execution is successful, it parses the
+ JSON output containing the BlueStore header information and returns it as a dictionary.
+
+ Args:
+ device (str): The path to the device.
+
+ Returns:
+ Dict[str, Any]: A dictionary containing BlueStore header information.
+ """
+ data: Dict[str, Any] = {}
+
+ if os.path.exists(device):
+ out, err, rc = process.call([
+ 'ceph-bluestore-tool', 'show-label',
+ '--dev', device], verbose_on_failure=False)
+ if rc:
+ logger.debug(f'device {device} is not BlueStore; ceph-bluestore-tool failed to get info from device: {out}\n{err}')
+ else:
+ data = json.loads(''.join(out))
+ else:
+ logger.warning(f'device {device} not found.')
+ return data
+
+def bluestore_info(device: str, bluestore_labels: Dict[str, Any]) -> Dict[str, Any]:
+ """Build a dict representation of a BlueStore header
+
+ Args:
+ device (str): The path of the BlueStore device.
+ bluestore_labels (Dict[str, Any]): Plain text output from `ceph-bluestore-tool show-label`
+
+ Returns:
+ Dict[str, Any]: Generated dict representation of the BlueStore header
+ """
+ result: Dict[str, Any] = {}
+ result['osd_uuid'] = bluestore_labels[device]['osd_uuid']
+ if bluestore_labels[device]['description'] == 'main':
+ whoami = bluestore_labels[device]['whoami']
+ result.update({
+ 'type': bluestore_labels[device].get('type', 'bluestore'),
+ 'osd_id': int(whoami),
+ 'ceph_fsid': bluestore_labels[device]['ceph_fsid'],
+ 'device': device,
+ })
+ if bluestore_labels[device].get('db_device_uuid', ''):
+ result['db_device_uuid'] = bluestore_labels[device].get('db_device_uuid')
+ if bluestore_labels[device].get('wal_device_uuid', ''):
+ result['wal_device_uuid'] = bluestore_labels[device].get('wal_device_uuid')
+ elif bluestore_labels[device]['description'] == 'bluefs db':
+ result['device_db'] = device
+ elif bluestore_labels[device]['description'] == 'bluefs wal':
+ result['device_wal'] = device
+ return result
+
+def get_block_device_holders(sys_block: str = '/sys/block') -> Dict[str, Any]:
+ """Get a dictionary of device mappers with their corresponding parent devices.
+
+ This function retrieves information about device mappers and their parent devices
+ from the '/sys/block' directory. It iterates through each directory within 'sys_block',
+ and for each directory, it checks if a 'holders' directory exists. If so, it lists
+ the contents of the 'holders' directory and constructs a dictionary where the keys
+ are the device mappers and the values are their corresponding parent devices.
+
+ Args:
+ sys_block (str, optional): The path to the '/sys/block' directory. Defaults to '/sys/block'.
+
+ Returns:
+ Dict[str, Any]: A dictionary where keys are device mappers (e.g., '/dev/mapper/...') and
+ values are their corresponding parent devices (e.g., '/dev/sdX').
+ """
+ result: Dict[str, Any] = {}
+ for b in os.listdir(sys_block):
+ path: str = os.path.join(sys_block, b, 'holders')
+ if os.path.exists(path):
+ for h in os.listdir(path):
+ result[f'/dev/{h}'] = f'/dev/{b}'
+
+ return result
+
+def get_parent_device_from_mapper(mapper: str, abspath: bool = True) -> str:
+ """Get the parent device corresponding to a given device mapper.
+
+ This function retrieves the parent device corresponding to a given device mapper
+ from the dictionary returned by the 'get_block_device_holders' function. It first
+ checks if the specified 'mapper' exists. If it does, it resolves the real path of
+ the mapper using 'os.path.realpath'. Then, it attempts to retrieve the parent device
+ from the dictionary. If the mapper is not found in the dictionary, an empty string
+ is returned.
+
+ Args:
+ mapper (str): The path to the device mapper.
+ abspath (bool, optional): If True (default), returns the absolute path of the parent device.
+ If False, returns only the basename of the parent device.
+
+ Returns:
+ str: The parent device corresponding to the given device mapper, or an empty string
+ if the mapper is not found in the dictionary of device mappers.
+ """
+ result: str = ''
+ if os.path.exists(mapper):
+ _mapper: str = os.path.realpath(mapper)
+ try:
+ result = get_block_device_holders()[_mapper]
+ if not abspath:
+ result = os.path.basename(result)
+ except KeyError:
+ pass
+ return result
import os
import logging
import re
+import json
from ceph_volume import process, conf, terminal
from ceph_volume.util import constants, system
from ceph_volume.util.device import Device
from .prepare import write_keyring
-from .disk import lsblk, device_family, get_part_entry_type
+from .disk import lsblk, device_family, get_part_entry_type, _dd_read
from packaging import version
+from typing import Any, Dict, List
logger = logging.getLogger(__name__)
mlogger = terminal.MultiLogger(__name__)
return key_size
-def create_dmcrypt_key():
+def create_dmcrypt_key() -> str:
"""
Create the secret dm-crypt key (KEK) used to encrypt/decrypt the Volume Key.
"""
return key
-def luks_format(key, device):
+def luks_format(key: str, device: str) -> None:
"""
Decrypt (open) an encrypted device, previously prepared with cryptsetup
process.call(command, stdin=key, terminal_verbose=True, show_command=True)
-def luks_open(key, device, mapping):
+def luks_close(mapping: str) -> None:
+ """Close a LUKS2 mapper device.
+
+ Args:
+ mapping (str): the name of the mapper to be closed.
+ """
+ command: List[str] = ['cryptsetup',
+ 'luksClose',
+ mapping]
+
+ process.call(command,
+ terminal_verbose=True,
+ show_command=True)
+
+
+def rename_mapper(current: str, new: str) -> None:
+ """Rename a mapper
+
+ Args:
+ old (str): current name
+ new (str): new name
+ """
+
+ command: List[str] = [
+ 'dmsetup',
+ 'rename',
+ current,
+ new
+ ]
+
+ _, err, rc = process.call(command,
+ terminal_verbose=True,
+ show_command=True)
+ if rc:
+ raise RuntimeError(f"Can't rename mapper '{current}' to '{new}': {err}")
+
+
+def luks_open(key: str,
+ device: str,
+ mapping: str,
+ with_tpm: int = 0) -> None:
"""
Decrypt (open) an encrypted device, previously prepared with cryptsetup
:param device: absolute path to device
:param mapping: mapping name used to correlate device. Usually a UUID
"""
- command = [
- 'cryptsetup',
- '--key-size',
- get_key_size_from_conf(),
- '--key-file',
- '-',
- '--allow-discards', # allow discards (aka TRIM) requests for device
- 'luksOpen',
- device,
- mapping,
- ]
-
- if bypass_workqueue(device):
- command.extend(['--perf-no_read_workqueue',
- '--perf-no_write_workqueue'])
-
- process.call(command, stdin=key, terminal_verbose=True, show_command=True)
+ command: List[str] = []
+ if with_tpm:
+ command = ['/usr/lib/systemd/systemd-cryptsetup',
+ 'attach',
+ mapping,
+ device,
+ '-',
+ 'tpm2-device=auto,discard']
+ if bypass_workqueue(device):
+ command[-1] += ',no-read-workqueue,no-write-workqueue'
+ else:
+ command = [
+ 'cryptsetup',
+ '--key-size',
+ get_key_size_from_conf(),
+ '--key-file',
+ '-',
+ '--allow-discards', # allow discards (aka TRIM) requests for device
+ 'luksOpen',
+ device,
+ mapping,
+ ]
+
+ if bypass_workqueue(device):
+ command.extend(['--perf-no_read_workqueue',
+ '--perf-no_write_workqueue'])
+
+ process.call(command,
+ run_on_host=with_tpm,
+ stdin=key,
+ terminal_verbose=True,
+ show_command=True)
def dmcrypt_close(mapping, skip_path_check=False):
mapping
)
return '/dev/mapper/%s' % mapping
+
+
+class CephLuks2:
+ def __init__(self, device: str) -> None:
+ self.device: str = device
+ self.osd_fsid: str = ''
+ if self.is_ceph_encrypted:
+ self.osd_fsid = self.get_osd_fsid()
+
+ @property
+ def has_luks2_signature(self) -> bool:
+ try:
+ return _dd_read(self.device, 4) == 'LUKS'
+ except Exception as e:
+ raise RuntimeError(e)
+
+ @property
+ def is_ceph_encrypted(self) -> bool:
+ """Check whether a device is used for a Ceph encrypted OSD
+
+ Args:
+ device (str): The path of the device being checked.
+
+ Returns:
+ bool: `True` if the device is used by an encrypted Ceph OSD, else `False`.
+ """
+ result: bool = False
+ try:
+ result = self.has_luks2_signature and 'ceph_fsid=' in self.get_subsystem()
+ except RuntimeError:
+ pass
+ return result
+
+ def config_luks2(self, config: Dict[str, str]) -> None:
+ """Set the subsystem of a LUKS2 device
+
+ Args:
+ config (str): The config to apply to the LUKS2 device.
+
+ Raises:
+ RuntimeError: If it can't set LUKS2 configuration.
+ """
+ if not (0 < len(config) <= 2):
+ raise RuntimeError(f'Invalid config for LUKS2 device {self.device}')
+
+ valid_keys = ['label', 'subsystem']
+ if not all(key in valid_keys for key in config.keys()):
+ raise RuntimeError(f'LUKS2 config for device {self.device} can only be "label" and/or "subsystem".')
+
+ command: List[str] = ['cryptsetup', 'config',
+ self.device]
+ for k, v in config.items():
+ command.extend([f'--{k}', v])
+ _, err, rc = process.call(command, verbose_on_failure=False)
+ if rc:
+ raise RuntimeError(f"Can't set luks2 config to {self.device}:\n{err}")
+
+ def get_label(self) -> str:
+ """Get the label of a LUKS2 device
+
+ Args:
+ device (str): The device to get the LUKS label from.
+
+ Returns:
+ str: The LUKS2 label of the device.
+ """
+ result: str = ''
+ try:
+ result = _dd_read(self.device, 48, 24)
+ except Exception:
+ raise RuntimeError(f"Can't get luks2 label from {self.device}")
+ return result
+
+ def get_osd_fsid(self) -> str:
+ """Get the osd fsid.
+
+ Returns:
+ str: The OSD fsid
+ """
+
+ result: str = ''
+ try:
+ subsystem = self.get_subsystem()
+ result = subsystem.split('=')[1]
+ except IndexError:
+ logger.debug(f"LUKS2 device {self.device} doesn't have ceph osd fsid detail. Please check LUKS2 label for this device.")
+ return result
+
+ def get_subsystem(self) -> str:
+ """Get the subsystem of a LUKS2 device
+
+ Args:
+ device (str): The device to get the LUKS subsystem from.
+
+ Returns:
+ str: The LUKS2 subsystem of the device.
+ """
+ result: str = ''
+ try:
+ result = _dd_read(self.device, 48, 208)
+ except Exception as e:
+ raise RuntimeError(f"Can't get luks2 label from {self.device}:\n{e}")
+ return result
+
+ def get_json_area(self) -> Dict[str, Any]:
+ """Retrieve the LUKS2 JSON configuration area from a given device.
+
+ This function reads the LUKS2 JSON configuration area from the specified 'device'.
+ It first checks if the device contains a LUKS2 signature. If not, an empty dictionary
+ is returned. If a LUKS2 signature is found, it reads the JSON configuration area
+ starting from byte offset 4096 (4 KB) and extracts the configuration data.
+
+ Args:
+ device (str): The path to the device.
+
+ Raises:
+ RuntimeError: If the LUKS2 JSON area on the device is invalid or cannot be decoded.
+
+ Returns:
+ Dict[str, Any]: A dictionary containing the extracted LUKS2 JSON configuration data.
+ """
+ result: Dict[str, Any] = {}
+ try:
+ data: str = _dd_read(self.device, 12288, 4096)
+ result = json.loads(data)
+ except json.JSONDecodeError:
+ msg: str = f"LUKS2 json area for device {self.device} seems invalid."
+ raise RuntimeError(msg)
+ except Exception:
+ raise
+
+ return result
+
+ @property
+ def is_tpm2_enrolled(self) -> bool:
+ """Check if a given device is enrolled with TPM2.
+
+ This function checks if the specified 'device' is enrolled with TPM2.
+ It first determines if the device is a LUKS encrypted volume by checking
+ its filesystem type using lsblk. If the filesystem type is 'crypto_LUKS',
+ it extracts the LUKS2 JSON configuration area from the device using the
+ 'get_luks2_json_area' function. If the JSON area contains a 'systemd-tpm2'
+ token, it indicates that the device is enrolled with TPM2.
+
+ Args:
+ device (str): The path to the device.
+
+ Returns:
+ bool: True if the device is enrolled with TPM2, False otherwise.
+ """
+ if lsblk(self.device).get('FSTYPE', '') == 'crypto_LUKS':
+ json_area: Dict[str, Any] = self.get_json_area()
+ if 'tokens' in json_area.keys():
+ for token in json_area['tokens'].keys():
+ if json_area['tokens'][token].get('type', '') == 'systemd-tpm2':
+ return True
+ return False
"""
_supported_features = [
- "encrypted", "block_wal_size", "osds_per_device",
+ "encrypted", "tpm2", "block_wal_size", "osds_per_device",
"db_slots", "wal_slots", "block_db_size", "placement", "service_id", "service_type",
"data_devices", "db_devices", "wal_devices", "journal_devices",
"data_directories", "osds_per_device", "objectstore", "osd_id_claims",
osds_per_device=None, # type: Optional[int]
objectstore='bluestore', # type: str
encrypted=False, # type: bool
+ tpm2=False, # type: bool
db_slots=None, # type: Optional[int]
wal_slots=None, # type: Optional[int]
osd_id_claims=None, # type: Optional[Dict[str, List[str]]]
#: ``true`` or ``false``
self.encrypted = encrypted
+ #: ``true`` or ``false``
+ self.tpm2 = tpm2
+
#: How many OSDs per DB device
self.db_slots = db_slots
if self.spec.encrypted:
cmds[i] += " --dmcrypt"
+ if self.spec.tpm2:
+ cmds[i] += " --with-tpm"
+
if self.spec.osds_per_device:
cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device)