from ..security import Permission, Scope
from ..settings import Settings
-import ceph.cryptotools.remote
+from ceph.cryptotools.select import get_crypto_caller
logger = logging.getLogger('access_control')
DEFAULT_FILE_DESC = 'password/secret'
try:
# make sure the hashed_password is actually a bcrypt hash
# catch a ValueError if hashed_password is not valid.
- cc = ceph.cryptotools.remote.CryptoCaller()
+ cc = get_crypto_caller()
cc.verify_password('', hashed_password)
user = mgr.ACCESS_CTRL_DB.get_user(username)
from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
from ceph.deployment.utils import wrap_ipv6
-import ceph.cryptotools.remote
+from ceph.cryptotools.select import get_crypto_caller
T = TypeVar('T')
else:
dname = {"O": organisation, "CN": common_name}
- cc = ceph.cryptotools.remote.CryptoCaller()
+ cc = get_crypto_caller()
pkey = cc.create_private_key()
cert = cc.create_self_signed_cert(dname, pkey)
return cert, pkey
def certificate_days_to_expire(crt: str) -> int:
try:
- cc = ceph.cryptotools.remote.CryptoCaller()
+ cc = get_crypto_caller()
return cc.certificate_days_to_expire(crt)
except ValueError as err:
raise ServerConfigException(f'Invalid certificate: {err}')
def get_cert_issuer_info(crt: str) -> Tuple[Optional[str], Optional[str]]:
"""Basic validation of a ca cert"""
- cc = ceph.cryptotools.remote.CryptoCaller()
+ cc = get_crypto_caller()
try:
return cc.get_cert_issuer_info(crt)
except ValueError as err:
def verify_tls(crt, key):
# type: (str, str) -> int
- cc = ceph.cryptotools.remote.CryptoCaller()
+ cc = get_crypto_caller()
try:
days_to_expiration = cc.certificate_days_to_expire(crt)
cc.verify_tls(crt, key)
if not salt_password:
salt_password = ''
- cc = ceph.cryptotools.remote.CryptoCaller()
+ cc = get_crypto_caller()
return cc.password_hash(password, salt_password)
"""Remote execution of cryptographic functions for the ceph mgr
"""
+
# NB. This module exists to enapsulate the logic around running
# the cryptotools module that are forked off of the parent process
# to avoid the pyo3 subintepreters problem.
import logging
import subprocess
+from .caller import CryptoCaller, CryptoCallError
+
_ctmodule = 'ceph.cryptotools.cryptotools'
logger = logging.getLogger('ceph.cryptotools.remote')
-class CryptoCallError(ValueError):
- pass
-
-
-class CryptoCaller:
- """CryptoCaller encapsulates cryptographic functions used by the
+class ProcessCryptoCaller(CryptoCaller):
+ """ProcessCryptoCaller encapsulates cryptographic functions used by the
ceph mgr into a suite of functions that can be executed in a
different process.
Running the crypto functions in a separate process avoids conflicts
--- /dev/null
+from typing import Dict
+
+import os
+
+from .caller import CryptoCaller
+
+
+_CC_ENV = 'CEPH_CRYPTOCALLER'
+_CC_KEY = 'crypto_caller'
+_CC_REMOTE = 'remote'
+_CC_INTERNAL = 'internal'
+
+_CACHE: Dict[str, CryptoCaller] = {}
+
+
+def _check_name(name: str) -> None:
+ if name and name not in (_CC_REMOTE, _CC_INTERNAL):
+ raise ValueError(f'unexpected crypto caller name: {name}')
+
+
+def choose_crypto_caller(name: str = '') -> None:
+ _check_name(name)
+ if not name:
+ name = os.environ.get(_CC_ENV, '')
+ _check_name(name)
+ if not name:
+ name = _CC_REMOTE
+
+ if name == _CC_REMOTE:
+ import ceph.cryptotools.remote
+
+ _CACHE[_CC_KEY] = ceph.cryptotools.remote.ProcessCryptoCaller()
+ return
+ if name == _CC_INTERNAL:
+ import ceph.cryptotools.internal
+
+ _CACHE[_CC_KEY] = ceph.cryptotools.internal.InternalCryptoCaller()
+ return
+ # should be unreachable
+ raise RuntimeError('failed to setup a valid crypto caller')
+
+
+def get_crypto_caller() -> CryptoCaller:
+ """Return the currently selected crypto caller object."""
+ caller = _CACHE.get(_CC_KEY)
+ if not caller:
+ choose_crypto_caller()
+ caller = _CACHE.get(_CC_KEY)
+ if caller is None:
+ raise RuntimeError('failed to select crypto caller')
+ return caller