Add support for passing extra options to dmcrypt commands.
This makes it possible to provide custom parameters to
`cryptsetup luksFormat` and `cryptsetup luksOpen`, using the same
syntax as the native cryptsetup CLI.
Fixes: https://tracker.ceph.com/issues/72801
Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
help='Reuse existing OSD ids',
type=arg_validators.valid_osd_id
)
+ parser.add_argument(
+ '--dmcrypt-format-opts',
+ type=str,
+ default=None,
+ help="Additional cryptsetup luksFormat options (use the same syntax as the cryptsetup CLI)",
+ )
+ parser.add_argument(
+ '--dmcrypt-open-opts',
+ type=str,
+ default=None,
+ help="Additional cryptsetup luksOpen options (use the same syntax as the cryptsetup CLI)",
+ )
self.args = parser.parse_args(argv)
if self.args.bluestore:
self.args.objectstore = 'bluestore'
'with_tpm',
'crush_device_class',
'no_systemd',
+ 'dmcrypt_format_opts',
+ 'dmcrypt_open_opts',
]
defaults.update({arg: getattr(self.args, arg) for arg in global_args})
for osd in plan:
'action': arg_validators.DmcryptAction,
'help': 'Enable device encryption via dm-crypt',
},
+ '--dmcrypt-format-opts': {
+ 'default': None,
+ 'type': Optional[str],
+ },
+ '--dmcrypt-open-opts': {
+ 'default': None,
+ 'type': Optional[str],
+ },
'--with-tpm': {
'dest': 'with_tpm',
'help': 'Whether encrypted OSDs should be enrolled with TPM.',
secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
mlogger.info(' preparing dmcrypt for {}, uuid {}'.format(target_lv.lv_path, target_lv.lv_uuid))
target_path = encryption_utils.prepare_dmcrypt(
- key=secret, device=target_path, mapping=target_lv.lv_uuid)
+ key=secret,
+ device=target_path,
+ mapping=target_lv.lv_uuid,
+ format_options=self.args.dmcrypt_format_opts,
+ open_options=self.args.dmcrypt_open_opts)
try:
# we need to update lvm tags for all the remaining volumes
# and clear for ones which to be removed
action='store_true',
help='Skip checking OSD systemd unit',
)
+ parser.add_argument(
+ '--dmcrypt-format-opts',
+ type=str,
+ default=None,
+ help="Additional cryptsetup luksFormat options (use the same syntax as the cryptsetup CLI)",
+ )
+ parser.add_argument(
+ '--dmcrypt-open-opts',
+ type=str,
+ default=None,
+ help="Additional cryptsetup luksOpen options (use the same syntax as the cryptsetup CLI)",
+ )
return parser
def main(self):
action='store_true',
help='Skip checking OSD systemd unit',
)
+ parser.add_argument(
+ '--dmcrypt-format-opts',
+ type=str,
+ default=None,
+ help="Additional cryptsetup luksFormat options (use the same syntax as the cryptsetup CLI)",
+ )
+ parser.add_argument(
+ '--dmcrypt-open-opts',
+ type=str,
+ default=None,
+ help="Additional cryptsetup luksOpen options (use the same syntax as the cryptsetup CLI)",
+ )
return parser
@decorators.needs_root
secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
mlogger.info(' preparing dmcrypt for {}, uuid {}'.format(target_lv.lv_path, target_lv.lv_uuid))
target_path = encryption_utils.prepare_dmcrypt(
- key=secret, device=target_path, mapping=target_lv.lv_uuid)
+ key=secret,
+ device=target_path,
+ mapping=target_lv.lv_uuid,
+ format_options=self.args.dmcrypt_format_opts,
+ open_options=self.args.dmcrypt_open_opts
+ )
try:
tag_tracker.update_tags_when_lv_create(self.create_type)
# format data device
encryption_utils.luks_format(
self.dmcrypt_key,
- device
+ device,
+ self.args.dmcrypt_format_opts,
)
if self.with_tpm:
self.dmcrypt_key,
device,
uuid,
- self.with_tpm)
+ self.with_tpm,
+ self.args.dmcrypt_open_opts)
return '/dev/mapper/%s' % uuid
class TestLvm:
@patch('ceph_volume.objectstore.lvm.prepare_utils.create_key', Mock(return_value=['AQCee6ZkzhOrJRAAZWSvNC3KdXOpC2w8ly4AZQ==']))
def setup_method(self, m_create_key):
- self.lvm = Lvm([])
+ args = Namespace(dmcrypt_format_opts=None, dmcrypt_open_opts=None)
+ self.lvm = Lvm(args)
@patch('ceph_volume.conf.cluster', 'ceph')
@patch('ceph_volume.api.lvm.get_single_lv')
lv_tags='',
lv_uuid='fake-uuid')
self.lvm.encrypted = True
+ self.lvm.with_tpm = 0
self.lvm.dmcrypt_key = 'fake-dmcrypt-key'
self.lvm.args = args
self.lvm.objectstore = 'seastore'
lv_uuid='fake-uuid')
self.lvm.encrypted = True
self.lvm.dmcrypt_key = 'fake-dmcrypt-key'
+ self.lvm.with_tpm = 0
self.lvm.args = args
self.lvm.objectstore = 'seastore'
self.lvm.pre_prepare()
encryption.luks_format('abcd', '/dev/foo')
assert m_call.call_args[0][0] == expected
+ @patch('ceph_volume.util.encryption.process.call')
+ def test_luks_format_with_extra_option(self, m_call, conf_ceph_stub):
+ conf_ceph_stub('[global]\nfsid=abcd')
+ expected = [
+ 'cryptsetup',
+ '--batch-mode',
+ '--key-size',
+ '512',
+ '--key-file',
+ '-',
+ 'luksFormat',
+ '--fake-custom-opt1',
+ '/dev/foo'
+ ]
+ encryption.luks_format('abcd', '/dev/foo', '--fake-custom-opt1')
+ assert m_call.call_args[0][0] == expected
+
@patch('ceph_volume.util.encryption.process.call')
def test_luks_format_command_with_custom_size(self, m_call, conf_ceph_stub):
conf_ceph_stub('[global]\nfsid=abcd\n[osd]\nosd_dmcrypt_key_size=256')
encryption.luks_open('abcd', '/dev/foo', '/dev/bar')
assert m_call.call_args[0][0] == expected
+ @patch('ceph_volume.util.encryption.bypass_workqueue', return_value=False)
+ @patch('ceph_volume.util.encryption.process.call')
+ def test_luks_format_with_extra_option(self, m_call, m_bypass_workqueue, conf_ceph_stub):
+ conf_ceph_stub('[global]\nfsid=abcd')
+ expected = [
+ 'cryptsetup',
+ '--key-size',
+ '512',
+ '--key-file',
+ '-',
+ '--allow-discards',
+ 'luksOpen',
+ '--fake-custom-opt1',
+ '/dev/foo',
+ '/dev/bar'
+ ]
+ encryption.luks_open('abcd', '/dev/foo', '/dev/bar', options='--fake-custom-opt1')
+ assert m_call.call_args[0][0] == expected
+
@patch('ceph_volume.util.encryption.bypass_workqueue', return_value=False)
@patch('ceph_volume.util.encryption.process.call')
def test_luks_open_command_with_tpm(self, m_call, m_bypass_workqueue, conf_ceph_stub):
import logging
import re
import json
+import shlex
from ceph_volume import process, conf, terminal
from ceph_volume.util import constants, system
from ceph_volume.util.device import Device
from .prepare import write_keyring
from .disk import lsblk, device_family, get_part_entry_type, _dd_read
from packaging import version
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
mlogger = terminal.MultiLogger(__name__)
return key
-def luks_format(key: str, device: str) -> None:
+def luks_format(key: str, device: str, options: Optional[str] = None) -> None:
"""
Decrypt (open) an encrypted device, previously prepared with cryptsetup
:param key: dmcrypt secret key, will be used for decrypting
:param device: Absolute path to device
+ :param options: A list of additional cryptsetup args
"""
- command = [
+ extra_args: List[str] = []
+ base_cmd: List[str] = [
'cryptsetup',
'--batch-mode', # do not prompt
- '--key-size',
- get_key_size_from_conf(),
- '--key-file', # misnomer, should be key
- '-', # because we indicate stdin for the key here
- 'luksFormat',
- device,
+ '--key-size', get_key_size_from_conf(),
+ '--key-file', '-',
]
+
+ if options is not None:
+ extra_args: List[str] = shlex.split(options)
+
+ command: List[str] = base_cmd + ["luksFormat"] + extra_args + [device]
+
process.call(command, stdin=key, terminal_verbose=True, show_command=True)
def luks_open(key: str,
device: str,
mapping: str,
- with_tpm: int = 0) -> None:
+ with_tpm: int = 0,
+ options: Optional[str] = None) -> None:
"""
Decrypt (open) an encrypted device, previously prepared with cryptsetup
:param device: absolute path to device
:param mapping: mapping name used to correlate device. Usually a UUID
:param with_tpm: whether to use tpm2 token enrollment.
+ :param options: A list of additional cryptsetup args
"""
command: List[str] = []
+ extra_args: List[str] = []
if with_tpm:
command = ['/usr/lib/systemd/systemd-cryptsetup',
'attach',
if bypass_workqueue(device):
command[-1] += ',no-read-workqueue,no-write-workqueue'
else:
- command = [
- 'cryptsetup',
- '--key-size',
- get_key_size_from_conf(),
- '--key-file',
- '-',
- '--allow-discards', # allow discards (aka TRIM) requests for device
- 'luksOpen',
- device,
- mapping,
+ base_cmd: List[str] = [
+ "cryptsetup",
+ "--key-size", str(get_key_size_from_conf()),
+ "--key-file", "-",
+ "--allow-discards",
]
+ if options is not None:
+ extra_args = shlex.split(options)
+ command: List[str] = (
+ base_cmd
+ + ["luksOpen"]
+ + extra_args
+ + [device, mapping]
+ )
if bypass_workqueue(device):
command.extend(['--perf-no_read_workqueue',
break
return metadata
-def prepare_dmcrypt(key, device, mapping):
+def prepare_dmcrypt(key: str,
+ device: str,
+ mapping: str,
+ format_options: Optional[str] = None,
+ open_options: Optional[str] = None):
"""
Helper for devices that are encrypted. The operations needed for
block, db, wal, or data/journal devices are all the same
# format data device
luks_format(
key,
- device
+ device,
+ format_options
)
luks_open(
key,
device,
- mapping
+ mapping,
+ options=open_options
)
return '/dev/mapper/%s' % mapping