from ceph_volume.objectstore.lvm import Lvm as LVMActivate
from ceph_volume.objectstore.raw import Raw as RAWActivate
from ceph_volume.devices.simple.activate import Activate as SimpleActivate
-
+from ceph_volume.util.lvm_osd_mappers import OsdLvmMappers
class Activate(object):
)
self.args = parser.parse_args(self.argv)
+ # Close the LVM mappers to force a 'refresh'
+ # Avoid that raw activates a LVM osd that is already activated
+ # Only do this when both filters are available, because the mapper
+ # lookup builds LVM tag strings and will fail if osd_fsid is None.
+ if self.args.osd_id is not None and self.args.osd_fsid is not None:
+ OsdLvmMappers(self.args.osd_id, self.args.osd_fsid).close()
+
# first try raw
try:
raw_activate = RAWActivate(self.args)
from ceph_volume.util import encryption as encryption_utils
from ceph_volume.util import system, disk
from ceph_volume.util import nvme as nvme_utils
+from ceph_volume.util.lvm_osd_mappers import OsdLvmMappers
from ceph_volume.systemd import systemctl
from ceph_volume.devices.lvm.common import rollback_osd
from ceph_volume.devices.lvm.listing import direct_report
osd_block_lv.tags['ceph.cluster_name'])
configuration.load()
+ OsdLvmMappers(osd_id, osd_fsid).refresh()
+
# mount on tmpfs the osd directory
self.osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
if not system.path_is_mounted(self.osd_path):
from ceph_volume.util import prepare as prepare_utils
from ceph_volume.util import encryption as encryption_utils
from ceph_volume.util import nvme as nvme_utils
+from ceph_volume.util.raw_osd_crypt_mappers import RawOsdCryptMappers
from ceph_volume.api import lvm as lvm_api
from ceph_volume.devices.lvm.common import rollback_osd
from ceph_volume.devices.raw.list import direct_report
import argparse
logger = logging.getLogger(__name__)
+mlogger = terminal.MultiLogger(__name__)
class Raw(BaseObjectStore):
return False
return nvme_utils.preformat(self.block_device_path)
- def _activate(self, osd_id: str, osd_fsid: str) -> None:
+ def _activate(self) -> None:
+ mappers: Optional[RawOsdCryptMappers] = None
+ if RawOsdCryptMappers.backing_device_path(self.block_device_path):
+ mappers = RawOsdCryptMappers(
+ self.osd_id,
+ self.osd_fsid,
+ self.block_device_path,
+ self.db_device_path,
+ self.wal_device_path,
+ cluster_name=conf.cluster,
+ dmcrypt_secret=os.getenv('CEPH_VOLUME_DMCRYPT_SECRET') or None,
+ with_tpm=bool(self.with_tpm),
+ )
+ if mappers is not None and mappers.applies():
+ try:
+ mappers.refresh()
+ except RuntimeError as e:
+ mlogger.info(
+ 'Failed to refresh dmcrypt mappers for osd.%s uuid %s: %s (is the OSD already running?)',
+ self.osd_id,
+ self.osd_fsid,
+ e,
+ )
+ (
+ self.block_device_path,
+ self.db_device_path,
+ self.wal_device_path,
+ ) = mappers.mapper_paths()
+
# mount on tmpfs the osd directory
- self.osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
+ self.osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, self.osd_id)
if not system.path_is_mounted(self.osd_path):
# mkdir -p and mount as tmpfs
- prepare_utils.create_osd_path(osd_id, tmpfs=not self.args.no_tmpfs)
+ prepare_utils.create_osd_path(self.osd_id, tmpfs=not self.args.no_tmpfs)
# XXX This needs to be removed once ceph-bluestore-tool can deal with
# symlinks that exist in the osd dir
# always re-do the symlink regardless if it exists, so that the block,
# block.wal, and block.db devices that may have changed can be mapped
# correctly every time
- prepare_utils.link_block(self.block_device_path, osd_id)
+ prepare_utils.link_block(self.block_device_path, self.osd_id)
if self.db_device_path:
- prepare_utils.link_db(self.db_device_path, osd_id, osd_fsid)
+ prepare_utils.link_db(self.db_device_path, self.osd_id, self.osd_fsid)
if self.wal_device_path:
- prepare_utils.link_wal(self.wal_device_path, osd_id, osd_fsid)
+ prepare_utils.link_wal(self.wal_device_path, self.osd_id, self.osd_fsid)
system.chown(self.osd_path)
terminal.success("ceph-volume raw activate "
- "successful for osd ID: %s" % osd_id)
+ "successful for osd ID: %s" % self.osd_id)
@decorators.needs_root
def activate(self) -> None:
self.pre_activate_tpm2(device)
found = direct_report(self.devices)
+ filter_osd_id = self.osd_id
+ filter_osd_fsid = self.osd_fsid
+
for osd_uuid, meta in found.items():
realpath_device = os.path.realpath(meta['device'])
if lvm_api.is_ceph_volume_lvm_prepared(realpath_device,
lvm_prepare_lv_paths):
continue
osd_id = meta['osd_id']
- if self.osd_id is not None and str(osd_id) != str(self.osd_id):
+ if filter_osd_id is not None and str(osd_id) != str(filter_osd_id):
continue
- if self.osd_fsid is not None and osd_uuid != self.osd_fsid:
+ if filter_osd_fsid is not None and osd_uuid != filter_osd_fsid:
continue
+ self.osd_id = str(osd_id)
+ self.osd_fsid = str(osd_uuid)
self.block_device_path = meta.get('device')
self.db_device_path = meta.get('device_db', '')
self.wal_device_path = meta.get('device_wal', '')
logger.info(f'Activating osd.{osd_id} uuid {osd_uuid} cluster {meta["ceph_fsid"]}')
- self._activate(osd_id, osd_uuid)
+ self._activate()
activated_any = True
if not activated_any:
m_nvme.assert_called_once_with(args.data)
@patch('ceph_volume.conf.cluster', 'ceph')
+ @patch(
+ 'ceph_volume.objectstore.raw.RawOsdCryptMappers.backing_device_path',
+ return_value='',
+ )
@patch('ceph_volume.objectstore.raw.prepare_utils.link_wal')
@patch('ceph_volume.objectstore.raw.prepare_utils.link_db')
@patch('ceph_volume.objectstore.raw.prepare_utils.link_block')
m_link_block,
m_link_db,
m_link_wal,
+ m_backing_device_path,
monkeypatch,
factory):
args = factory(no_tmpfs=False)
m_unlink.return_value = MagicMock()
monkeypatch.setattr(system, 'chown', lambda path: 0)
monkeypatch.setattr(system, 'path_is_mounted', lambda path: 0)
- self.raw_bs._activate('1', True)
+ self.raw_bs.osd_id = '1'
+ self.raw_bs.osd_fsid = 'test-fsid'
+ self.raw_bs._activate()
calls = [call('/var/lib/ceph/osd/ceph-1/block'),
call('/var/lib/ceph/osd/ceph-1/block.db'),
call('/var/lib/ceph/osd/ceph-1/block.wal')]
rawbluestore.osd_id = '0'
rawbluestore._activate = MagicMock()
rawbluestore.activate()
- assert rawbluestore._activate.mock_calls == [call(0, 'db32a338-b640-4cbc-af17-f63808b1c36e')]
+ rawbluestore._activate.assert_called_once_with()
assert rawbluestore.block_device_path == '/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdb-block-dmcrypt'
assert rawbluestore.db_device_path == '/dev/mapper/ceph-db32a338-b640-4cbc-af17-f63808b1c36e-sdc-db-dmcrypt'
assert encryption.status('/dev/sdc1') == {}
+class TestDmsetupRemove(object):
+
+ def test_mapper_exists(self, fake_run, fake_filesystem):
+ mapper_name = 'ceph-fsid-nvme2n2-block-dmcrypt'
+ fake_filesystem.create_file('/dev/mapper/%s' % mapper_name)
+ encryption.dmsetup_remove(mapper_name)
+ arguments = fake_run.calls[0]['args'][0]
+ assert arguments == ['dmsetup', 'remove', mapper_name]
+
+ def test_mapper_does_not_exist(self, fake_run):
+ encryption.dmsetup_remove('ceph-fsid-missing-block-dmcrypt')
+ assert fake_run.calls == []
+
+
class TestDmcryptClose(object):
def test_mapper_exists(self, fake_run, fake_filesystem):
--- /dev/null
+from unittest.mock import patch
+
+from ceph_volume import conf
+from ceph_volume.util.osd_luks_credentials import OsdLuksCredentials
+
+
+class TestOsdLuksCredentialsApplyClusterContext:
+ @patch('ceph_volume.util.osd_luks_credentials.configuration.load')
+ @patch('ceph_volume.util.osd_luks_credentials.configuration.load_ceph_conf_path')
+ def test_sets_cluster_and_loads_conf(self, m_load_path, m_load):
+ OsdLuksCredentials(42, 'osd-fsid').apply_cluster_context('mycluster')
+ assert conf.cluster == 'mycluster'
+ m_load_path.assert_called_once_with('mycluster')
+ m_load.assert_called_once_with()
+
+
+class TestOsdLuksCredentialsResolveSecret:
+ def test_tpm_returns_empty_string(self):
+ c = OsdLuksCredentials(1, 'fsid', luks_secret='should-ignore', with_tpm=True)
+ assert c.resolve_secret('lockbox') == ''
+
+ def test_preconfigured_secret_short_circuits(self):
+ c = OsdLuksCredentials(1, 'fsid', luks_secret='cached-key', with_tpm=False)
+ with patch(
+ 'ceph_volume.util.osd_luks_credentials.encryption_utils.get_dmcrypt_key'
+ ) as m_get:
+ assert c.resolve_secret('lockbox') == 'cached-key'
+ m_get.assert_not_called()
+
+ @patch('ceph_volume.util.osd_luks_credentials.encryption_utils.get_dmcrypt_key')
+ @patch('ceph_volume.util.osd_luks_credentials.encryption_utils.write_lockbox_keyring')
+ @patch('ceph_volume.util.osd_luks_credentials.system.path_is_mounted', return_value=True)
+ def test_lockbox_then_get_dmcrypt_key(
+ self, m_mounted, m_write_lb, m_get_key,
+ ):
+ m_get_key.return_value = 'from-mon'
+ c = OsdLuksCredentials('0', 'osd-fsid-1', luks_secret=None, with_tpm=False)
+ out = c.resolve_secret('lockbox-secret-b64')
+ assert out == 'from-mon'
+ assert c.luks_secret == 'from-mon'
+ m_write_lb.assert_called_once_with('0', 'osd-fsid-1', 'lockbox-secret-b64')
+ m_get_key.assert_called_once_with('0', 'osd-fsid-1')
+
+ @patch('ceph_volume.util.osd_luks_credentials.encryption_utils.get_dmcrypt_key')
+ @patch('ceph_volume.util.osd_luks_credentials.encryption_utils.write_lockbox_keyring')
+ @patch('ceph_volume.util.osd_luks_credentials.prepare_utils.create_osd_path')
+ @patch('ceph_volume.util.osd_luks_credentials.system.path_is_mounted', return_value=False)
+ def test_lockbox_creates_tmpfs_osd_path_when_unmounted(
+ self, m_mounted, m_create, m_write_lb, m_get_key,
+ ):
+ conf.cluster = 'ceph'
+ m_get_key.return_value = 'k'
+ c = OsdLuksCredentials(3, 'fsid-2', with_tpm=False)
+ assert c.resolve_secret('lb') == 'k'
+ m_create.assert_called_once_with('3', tmpfs=True)
+ m_write_lb.assert_called_once()
+
+ @patch('ceph_volume.util.osd_luks_credentials.encryption_utils.get_dmcrypt_key')
+ @patch('ceph_volume.util.osd_luks_credentials.encryption_utils.write_lockbox_keyring')
+ def test_missing_lockbox_skips_write_still_fetches_key(
+ self, m_write_lb, m_get_key,
+ ):
+ m_get_key.return_value = 'nomon'
+ c = OsdLuksCredentials(9, 'fsid-3', with_tpm=False)
+ assert c.resolve_secret(None) == 'nomon'
+ m_write_lb.assert_not_called()
+ m_get_key.assert_called_once_with('9', 'fsid-3')
+
+ def test_osd_id_normalized_to_str(self):
+ c = OsdLuksCredentials(7, 'x')
+ assert c.osd_id == '7'
--- /dev/null
+from unittest.mock import MagicMock, patch
+
+from ceph_volume.util.raw_osd_crypt_mappers import RawOsdCryptMappers
+
+
+class TestBackingDevicePath:
+ def test_empty(self) -> None:
+ assert RawOsdCryptMappers.backing_device_path('') == ''
+
+ def test_plain_block_device(self) -> None:
+ assert RawOsdCryptMappers.backing_device_path('/dev/sda1') == '/dev/sda1'
+
+ @patch('ceph_volume.util.raw_osd_crypt_mappers.disk.get_parent_device_from_mapper')
+ def test_mapper_resolves_parent(self, m_parent: MagicMock) -> None:
+ m_parent.return_value = '/dev/sda1'
+ assert RawOsdCryptMappers.backing_device_path(
+ '/dev/mapper/ceph-fsid-sda1-block-dmcrypt',
+ ) == '/dev/sda1'
+
+ @patch('ceph_volume.util.raw_osd_crypt_mappers.disk.get_parent_device_from_mapper')
+ def test_mapper_without_parent_returns_empty(self, m_parent: MagicMock) -> None:
+ m_parent.return_value = ''
+ assert RawOsdCryptMappers.backing_device_path(
+ '/dev/mapper/ceph-fsid-sda1-block-dmcrypt',
+ ) == ''
+
+
+@patch(
+ 'ceph_volume.util.raw_osd_crypt_mappers.OsdLuksCredentials.apply_cluster_context'
+)
+class TestMapperName:
+ def test_mapper_name_from_backing_path(
+ self, m_cluster: MagicMock,
+ ) -> None:
+ mappers = RawOsdCryptMappers(
+ '0', 'fsid-uuid', '/dev/nvme2n2', cluster_name='ceph',
+ )
+ assert mappers._mapper_name_for_role('block') == (
+ 'ceph-fsid-uuid-nvme2n2-block-dmcrypt'
+ )
+ assert mappers._mapper_path_for_backing('/dev/nvme2n2', 'block') == (
+ '/dev/mapper/ceph-fsid-uuid-nvme2n2-block-dmcrypt'
+ )
+
+ @patch(
+ 'ceph_volume.util.raw_osd_crypt_mappers.disk.get_parent_device_from_mapper'
+ )
+ def test_mapper_name_from_activate_mapper_path(
+ self,
+ m_parent: MagicMock,
+ m_cluster: MagicMock,
+ ) -> None:
+ m_parent.return_value = '/dev/nvme2n2'
+ mappers = RawOsdCryptMappers(
+ '0',
+ 'fsid-uuid',
+ '/dev/mapper/ceph-fsid-uuid-nvme2n2-block-dmcrypt',
+ cluster_name='ceph',
+ )
+ assert mappers._mapper_name_for_role('block') == (
+ 'ceph-fsid-uuid-nvme2n2-block-dmcrypt'
+ )
+
+
+@patch(
+ 'ceph_volume.util.raw_osd_crypt_mappers.OsdLuksCredentials.apply_cluster_context'
+)
+class TestApplies:
+ def test_tpm(self, m_cluster: MagicMock) -> None:
+ mappers = RawOsdCryptMappers(
+ '0', 'fsid', '/dev/sda', cluster_name='ceph', with_tpm=True,
+ )
+ assert mappers.applies()
+
+ @patch('ceph_volume.util.raw_osd_crypt_mappers.disk.get_parent_device_from_mapper')
+ def test_existing_dmcrypt_mapper_path(
+ self, m_parent: MagicMock, m_cluster: MagicMock,
+ ) -> None:
+ m_parent.return_value = '/dev/sda1'
+ mappers = RawOsdCryptMappers(
+ '0',
+ 'fsid',
+ '/dev/mapper/ceph-fsid-sda1-block-dmcrypt',
+ cluster_name='ceph',
+ )
+ assert mappers.applies()
+
+ @patch('ceph_volume.util.raw_osd_crypt_mappers.encryption_utils.CephLuks2')
+ def test_luks2_ceph_encrypted_backing(
+ self, m_luks_cls: MagicMock, m_cluster: MagicMock,
+ ) -> None:
+ m_luks_cls.return_value.is_ceph_encrypted = True
+ mappers = RawOsdCryptMappers(
+ '0', 'fsid', '/dev/sda1', cluster_name='ceph',
+ )
+ assert mappers.applies()
+
+ @patch('ceph_volume.util.raw_osd_crypt_mappers.encryption_utils.CephLuks2')
+ def test_clear_device(self, m_luks_cls: MagicMock, m_cluster: MagicMock) -> None:
+ m_luks_cls.return_value.is_ceph_encrypted = False
+ mappers = RawOsdCryptMappers(
+ '0', 'fsid', '/dev/sda1', cluster_name='ceph',
+ )
+ assert not mappers.applies()
+
+
+class TestRefresh:
+ @patch(
+ 'ceph_volume.util.raw_osd_crypt_mappers.OsdLuksCredentials.apply_cluster_context'
+ )
+ @patch(
+ 'ceph_volume.util.raw_osd_crypt_mappers.OsdLuksCredentials.resolve_secret',
+ return_value='test-key',
+ )
+ @patch('ceph_volume.util.raw_osd_crypt_mappers.encryption_utils.dmsetup_remove')
+ @patch('ceph_volume.util.raw_osd_crypt_mappers.encryption_utils.luks_open')
+ def test_refresh_closes_then_opens(
+ self,
+ m_luks_open: MagicMock,
+ m_dmsetup_remove: MagicMock,
+ m_resolve: MagicMock,
+ m_cluster: MagicMock,
+ ) -> None:
+ mappers = RawOsdCryptMappers(
+ '0', 'fsid', '/dev/sda1', cluster_name='ceph',
+ )
+ mappers.refresh()
+ m_dmsetup_remove.assert_called_once_with(
+ 'ceph-fsid-sda1-block-dmcrypt',
+ terminal_logging=False,
+ )
+ assert m_luks_open.called
raise RuntimeError(f"Can't rename mapper '{current}' to '{new}': {err}")
+def dmsetup_remove(mapper_name: str = '', skip_path_check: bool = False, **kwargs: Any) -> None:
+ """Remove a device mapper device by name.
+
+ Unlike `cryptsetup remove`, `dmsetup remove` does not retry on busy devices.
+
+ Args:
+ mapper_name: Device mapper name (not `/dev/mapper/...`).
+ skip_path_check: When False, skip removal if `/dev/mapper/<name>` is absent.
+ """
+ mapper_path = '/dev/mapper/%s' % mapper_name
+ if not skip_path_check and not os.path.exists(mapper_path):
+ logger.debug('device mapper path does not exist %s', mapper_path)
+ logger.debug('will skip dmsetup removal')
+ return
+ process.run(['dmsetup', 'remove', mapper_name], **kwargs)
+
+
def luks_open(key: str,
device: str,
mapping: str,
self.device: str = device
self.osd_fsid: str = ''
if self.is_ceph_encrypted:
- self.osd_fsid = self.get_osd_fsid()
+ try:
+ self.osd_fsid = self.get_osd_fsid()
+ except RuntimeError:
+ logger.debug(
+ 'LUKS2 device %s looks like Ceph encrypted but osd_fsid '
+ 'could not be read',
+ device,
+ )
@property
def has_luks2_signature(self) -> bool:
--- /dev/null
+from typing import List, Optional, Union
+
+from ceph_volume import process
+from ceph_volume.api.lvm import Volume, get_lvs
+from ceph_volume.util import disk
+from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.util.osd_luks_credentials import OsdLuksCredentials
+
+
+class OsdLvmMappers:
+ def __init__(
+ self,
+ osd_id: Union[int, str],
+ osd_fsid: str,
+ lvs: Optional[List[Volume]] = None,
+ dmcrypt_secret: Optional[str] = None,
+ dmcrypt_open_opts: Optional[str] = None,
+ ) -> None:
+ self.osd_fsid = osd_fsid
+ self.credentials = OsdLuksCredentials(
+ osd_id,
+ osd_fsid,
+ luks_secret=dmcrypt_secret,
+ with_tpm=False,
+ )
+ self.dmcrypt_open_opts = dmcrypt_open_opts
+ self.encrypted = False
+ if lvs is None:
+ lvs = get_lvs(
+ tags={'ceph.osd_id': self.credentials.osd_id, 'ceph.osd_fsid': osd_fsid}
+ )
+ self._assign_role_volumes_from_lv_list(lvs)
+
+ def close(self) -> None:
+ self._close_wal_mapper()
+ self._close_db_mapper()
+ self._close_block_mapper()
+
+ def open(self) -> None:
+ self._rescan_physical_volumes()
+ self._refresh_osd_volumes_from_lvm()
+ paths = [
+ vol.lv_path
+ for vol in (self.block_volume, self.db_volume, self.wal_volume)
+ if vol is not None
+ ]
+ if paths:
+ process.call(['lvchange', '-ay'] + paths, run_on_host=True)
+ if self.encrypted:
+ for role in ('block', 'db', 'wal'):
+ self._luks_open_role(role)
+
+ def refresh(self) -> None:
+ self.close()
+ self.open()
+
+ def _assign_role_volumes_from_lv_list(self, lvs: List[Volume]) -> None:
+ self.lvs = lvs
+ self.block_volume: Optional[Volume] = None
+ self.db_volume: Optional[Volume] = None
+ self.wal_volume: Optional[Volume] = None
+ for lv in lvs:
+ kind = lv.tags.get('ceph.type')
+ if kind == 'block' and self.block_volume is None:
+ self.block_volume = lv
+ elif kind == 'db' and self.db_volume is None:
+ self.db_volume = lv
+ elif kind == 'wal' and self.wal_volume is None:
+ self.wal_volume = lv
+ self._sync_encryption_flags()
+ self.credentials.with_tpm = self.with_tpm
+ cluster_name = self._cluster_name_for_context()
+ if cluster_name is not None:
+ self.credentials.apply_cluster_context(cluster_name)
+
+ def _cluster_name_for_context(self) -> Optional[str]:
+ if self.block_volume is None:
+ return None
+ return self.block_volume.tags.get('ceph.cluster_name') or 'ceph'
+
+ def _sync_encryption_flags(self) -> None:
+ self.encrypted = False
+ self.with_tpm = False
+ if self.block_volume is None:
+ return
+ self.encrypted = self.block_volume.tags.get('ceph.encrypted', '0') == '1'
+ self.with_tpm = self.block_volume.tags.get('ceph.with_tpm') == '1'
+
+ def _device_uuid_for_role(self, role: str) -> Optional[str]:
+ if self.block_volume is None:
+ return None
+ if role == 'block':
+ from_tag = self.block_volume.tags.get('ceph.block_uuid', '')
+ if from_tag:
+ return from_tag
+ return self.block_volume.lv_uuid or None
+ tag = 'ceph.%s_uuid' % role
+ value = self.block_volume.tags.get(tag, '')
+ return value if value else None
+
+ def _crypt_mapper_device_path(self, uuid_value: Optional[str]) -> Optional[str]:
+ if not uuid_value:
+ return None
+ return '/dev/mapper/%s' % uuid_value
+
+ def _block_crypt_path(self) -> Optional[str]:
+ if not self.encrypted:
+ return None
+ return self._crypt_mapper_device_path(self._device_uuid_for_role('block'))
+
+ def _db_crypt_path(self) -> Optional[str]:
+ if not self.encrypted:
+ return None
+ return self._crypt_mapper_device_path(self._device_uuid_for_role('db'))
+
+ def _wal_crypt_path(self) -> Optional[str]:
+ if not self.encrypted:
+ return None
+ return self._crypt_mapper_device_path(self._device_uuid_for_role('wal'))
+
+ def _underlying_device_for_encrypted_role(self, role: str) -> Optional[str]:
+ if role == 'block':
+ if self.block_volume is None:
+ return None
+ return self.block_volume.lv_path
+ uuid_value = self._device_uuid_for_role(role)
+ if not uuid_value:
+ return None
+ if role == 'db' and self.db_volume is not None:
+ return self.db_volume.lv_path
+ if role == 'wal' and self.wal_volume is not None:
+ return self.wal_volume.lv_path
+ return disk.get_device_from_partuuid(uuid_value)
+
+ def _lockbox_secret_from_block_lv(self) -> Optional[str]:
+ if self.block_volume is None:
+ return None
+ return self.block_volume.tags.get('ceph.cephx_lockbox_secret')
+
+ def _luks_open_role(self, role: str) -> None:
+ if not self.encrypted or self.block_volume is None:
+ return
+ uuid_value = self._device_uuid_for_role(role)
+ if not uuid_value:
+ return
+ device = self._underlying_device_for_encrypted_role(role)
+ if not device:
+ return
+ encryption_utils.luks_open(
+ self.credentials.resolve_secret(self._lockbox_secret_from_block_lv()),
+ device,
+ uuid_value,
+ with_tpm=self.with_tpm,
+ options=self.dmcrypt_open_opts,
+ )
+
+ def _refresh_osd_volumes_from_lvm(self) -> None:
+ refreshed = get_lvs(
+ tags={'ceph.osd_id': self.credentials.osd_id, 'ceph.osd_fsid': self.osd_fsid}
+ )
+ self._assign_role_volumes_from_lv_list(refreshed)
+
+ @staticmethod
+ def _deactivate_logical_volume(volume: Volume) -> None:
+ process.call(
+ [volume.binary_change, '-an', volume.lv_path],
+ run_on_host=True,
+ show_command=True,
+ )
+
+ def _close_block_crypt_mapper(self) -> None:
+ path = self._block_crypt_path()
+ if path:
+ encryption_utils.dmcrypt_close(path)
+
+ def _close_db_crypt_mapper(self) -> None:
+ path = self._db_crypt_path()
+ if path:
+ encryption_utils.dmcrypt_close(path)
+
+ def _close_wal_crypt_mapper(self) -> None:
+ path = self._wal_crypt_path()
+ if path:
+ encryption_utils.dmcrypt_close(path)
+
+ def _close_block_mapper(self) -> None:
+ if self.block_volume is None:
+ return
+ self._close_block_crypt_mapper()
+ self._deactivate_logical_volume(self.block_volume)
+
+ def _close_db_mapper(self) -> None:
+ self._close_db_crypt_mapper()
+ if self.db_volume is not None:
+ self._deactivate_logical_volume(self.db_volume)
+
+ def _close_wal_mapper(self) -> None:
+ self._close_wal_crypt_mapper()
+ if self.wal_volume is not None:
+ self._deactivate_logical_volume(self.wal_volume)
+
+ def _rescan_physical_volumes(self) -> None:
+ process.call(['pvscan', '--cache'], run_on_host=True)
--- /dev/null
+from typing import Optional, Union
+
+from ceph_volume import conf, configuration
+from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.util import prepare as prepare_utils
+from ceph_volume.util import system
+
+
+class OsdLuksCredentials:
+ def __init__(
+ self,
+ osd_id: Union[int, str],
+ osd_fsid: str,
+ luks_secret: Optional[str] = None,
+ with_tpm: bool = False,
+ ) -> None:
+ self.osd_id = str(osd_id)
+ self.osd_fsid = osd_fsid
+ self.luks_secret = luks_secret
+ self.with_tpm = bool(with_tpm)
+
+ def apply_cluster_context(self, cluster_name: str) -> None:
+ conf.cluster = cluster_name
+ configuration.load_ceph_conf_path(cluster_name)
+ configuration.load()
+
+ def _write_lockbox_keyring_if_needed(self, lockbox_secret: Optional[str]) -> None:
+ if self.with_tpm or lockbox_secret is None:
+ return
+ osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, self.osd_id)
+ if not system.path_is_mounted(osd_path):
+ prepare_utils.create_osd_path(self.osd_id, tmpfs=True)
+ encryption_utils.write_lockbox_keyring(
+ self.osd_id,
+ self.osd_fsid,
+ lockbox_secret,
+ )
+
+ def resolve_secret(self, lockbox_secret: Optional[str]) -> str:
+ if self.with_tpm:
+ return ''
+ if self.luks_secret is not None:
+ return self.luks_secret
+ self._write_lockbox_keyring_if_needed(lockbox_secret)
+ self.luks_secret = encryption_utils.get_dmcrypt_key(
+ self.osd_id,
+ self.osd_fsid,
+ )
+ return self.luks_secret
--- /dev/null
+import os
+from typing import Dict, List, Optional, Tuple, Union
+
+from ceph_volume import conf
+from ceph_volume.util import disk
+from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.util.osd_luks_credentials import OsdLuksCredentials
+
+
+class RawOsdCryptMappers:
+ def __init__(
+ self,
+ osd_id: Union[int, str],
+ osd_fsid: str,
+ block_path: str,
+ db_path: str = '',
+ wal_path: str = '',
+ cluster_name: Optional[str] = None,
+ dmcrypt_secret: Optional[str] = None,
+ dmcrypt_open_opts: Optional[str] = None,
+ lockbox_secret: Optional[str] = None,
+ with_tpm: bool = False,
+ ) -> None:
+ backing_block = self.backing_device_path(block_path)
+ if not backing_block:
+ raise ValueError('block backing device is required')
+ self.activate_block_path = block_path
+ self.activate_db_path = db_path or ''
+ self.activate_wal_path = wal_path or ''
+ self.block_device = backing_block
+ self.db_device = self.backing_device_path(db_path) if db_path else ''
+ self.wal_device = self.backing_device_path(wal_path) if wal_path else ''
+ self.credentials = OsdLuksCredentials(
+ osd_id,
+ osd_fsid,
+ luks_secret=dmcrypt_secret,
+ with_tpm=with_tpm,
+ )
+ self.dmcrypt_open_opts = dmcrypt_open_opts
+ self.lockbox_secret = lockbox_secret
+ name = cluster_name or conf.cluster or 'ceph'
+ self.credentials.apply_cluster_context(name)
+ self._mapper_names = self._build_mapper_names()
+
+ @staticmethod
+ def backing_device_path(device_path: str) -> str:
+ if not device_path:
+ return ''
+ if device_path.startswith('/dev/mapper/'):
+ return disk.get_parent_device_from_mapper(device_path) or ''
+ return device_path
+
+ @staticmethod
+ def _kname_from_activate_path(
+ activate_path: str, osd_fsid: str, role: str,
+ ) -> Optional[str]:
+ if not activate_path.startswith('/dev/mapper/'):
+ return None
+ base = os.path.basename(activate_path)
+ prefix = 'ceph-{}-'.format(osd_fsid)
+ suffix = '-{}-dmcrypt'.format(role)
+ if base.startswith(prefix) and base.endswith(suffix):
+ return base[len(prefix):-len(suffix)]
+ return None
+
+ @staticmethod
+ def _kname_from_backing_device(backing_device_path: str) -> str:
+ return os.path.basename(os.path.realpath(backing_device_path))
+
+ def _kname_for_role(self, backing: str, activate_path: str, role: str) -> str:
+ kname = self._kname_from_activate_path(
+ activate_path, self.credentials.osd_fsid, role,
+ )
+ if kname is not None:
+ return kname
+ return self._kname_from_backing_device(backing)
+
+ def _build_mapper_names(self) -> Dict[str, str]:
+ names: Dict[str, str] = {}
+ for role, backing, activate_path in (
+ ('block', self.block_device, self.activate_block_path),
+ ('db', self.db_device, self.activate_db_path),
+ ('wal', self.wal_device, self.activate_wal_path),
+ ):
+ if not backing:
+ continue
+ kname = self._kname_for_role(backing, activate_path, role)
+ names[role] = 'ceph-{}-{}-{}-dmcrypt'.format(
+ self.credentials.osd_fsid, kname, role,
+ )
+ return names
+
+ def applies(self) -> bool:
+ if self.credentials.with_tpm:
+ return True
+ for path in (
+ self.activate_block_path,
+ self.activate_db_path,
+ self.activate_wal_path,
+ ):
+ if not path:
+ continue
+ if path.startswith('/dev/mapper/') and path.endswith('-dmcrypt'):
+ return True
+ backing = self.backing_device_path(path)
+ if not backing:
+ continue
+ if encryption_utils.CephLuks2(backing).is_ceph_encrypted:
+ return True
+ return False
+
+ def _mapper_name_for_role(self, role: str) -> str:
+ return self._mapper_names[role]
+
+ def _mapper_path_for_backing(self, backing_device_path: str, role: str) -> str:
+ return '/dev/mapper/%s' % self._mapper_name_for_role(role)
+
+ def mapper_paths(self) -> Tuple[str, str, str]:
+ return (
+ self._mapper_path_for_backing(self.block_device, 'block'),
+ self._mapper_path_for_backing(self.db_device, 'db') if self.db_device else '',
+ self._mapper_path_for_backing(self.wal_device, 'wal') if self.wal_device else '',
+ )
+
+ def close(self) -> None:
+ for role, device in self._role_devices():
+ self._close_crypt_for_role(role)
+
+ def open(self) -> None:
+ for role, device in self._role_devices():
+ self._luks_open_for_role(role, device)
+
+ def refresh(self) -> None:
+ self.close()
+ self.open()
+
+ def _role_devices(self) -> Tuple[Tuple[str, str], ...]:
+ out: List[Tuple[str, str]] = []
+ if self.block_device:
+ out.append(('block', self.block_device))
+ if self.db_device:
+ out.append(('db', self.db_device))
+ if self.wal_device:
+ out.append(('wal', self.wal_device))
+ return tuple(out)
+
+ def _close_crypt_for_role(self, role: str) -> None:
+ encryption_utils.dmsetup_remove(
+ self._mapper_name_for_role(role),
+ terminal_logging=False,
+ )
+
+ def _luks_open_for_role(self, role: str, device_path: str) -> None:
+ encryption_utils.luks_open(
+ self.credentials.resolve_secret(self.lockbox_secret),
+ device_path,
+ self._mapper_name_for_role(role),
+ with_tpm=1 if self.credentials.with_tpm else 0,
+ options=self.dmcrypt_open_opts,
+ )