From: John Mulligan Date: Thu, 24 Apr 2025 18:56:58 +0000 (-0400) Subject: python-common/cryptotools: move internal crypto caller to new file X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=20ddf9b5f9e11ac2acfa920dbc179182078817cc;p=ceph.git python-common/cryptotools: move internal crypto caller to new file Signed-off-by: John Mulligan (cherry picked from commit 0c774d5c767ef9875250de5a95e421a6b837b85e) --- diff --git a/src/python-common/ceph/cryptotools/cryptotools.py b/src/python-common/ceph/cryptotools/cryptotools.py index 1466d4b606d..4aae0d8c933 100644 --- a/src/python-common/ceph/cryptotools/cryptotools.py +++ b/src/python-common/ceph/cryptotools/cryptotools.py @@ -4,138 +4,15 @@ in a subprocess therefore sidestepping the `PyO3 modules may only be initialized once per interpreter process` problem. """ +from typing import Any, Dict + import argparse -import bcrypt -import datetime import json import sys -import warnings from argparse import Namespace -from OpenSSL import crypto, SSL -from uuid import uuid4 -from typing import Tuple, Any, Dict, Union - - -class InternalError(ValueError): - pass - - -class InternalCryptoCaller: - def fail(self, msg: str) -> None: - raise ValueError(msg) - - def password_hash(self, password: str, salt_password: str) -> str: - salt = salt_password.encode() if salt_password else bcrypt.gensalt() - return bcrypt.hashpw(password.encode(), salt).decode() - - def verify_password(self, password: str, hashed_password: str) -> bool: - _password = password.encode() - _hashed_password = hashed_password.encode() - try: - ok = bcrypt.checkpw(_password, _hashed_password) - except ValueError as err: - self.fail(str(err)) - return ok - - def create_private_key(self) -> str: - pkey = crypto.PKey() - pkey.generate_key(crypto.TYPE_RSA, 2048) - return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode() - - def create_self_signed_cert( - self, dname: Dict[str, str], pkey: str - ) -> str: - _pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, pkey) - - # Create a "subject" object - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - req = crypto.X509Req() - subj = req.get_subject() - - # populate the subject with the dname settings - for k, v in dname.items(): - setattr(subj, k, v) - - # create a self-signed cert - cert = crypto.X509() - cert.set_subject(req.get_subject()) - cert.set_serial_number(int(uuid4())) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years - cert.set_issuer(cert.get_subject()) - cert.set_pubkey(_pkey) - cert.sign(_pkey, 'sha512') - return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode() - - def _load_cert(self, crt: Union[str, bytes]) -> Any: - crt_buffer = crt.encode() if isinstance(crt, str) else crt - cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) - return cert - - def _issuer_info(self, cert: Any) -> Tuple[str, str]: - components = cert.get_issuer().get_components() - org_name = cn = '' - for c in components: - if c[0].decode() == 'O': # org comp - org_name = c[1].decode() - elif c[0].decode() == 'CN': # common name comp - cn = c[1].decode() - return (org_name, cn) - - def certificate_days_to_expire(self, crt: str) -> int: - x509 = self._load_cert(crt) - no_after = x509.get_notAfter() - if not no_after: - self.fail("Certificate does not have an expiration date.") - - end_date = datetime.datetime.strptime( - no_after.decode(), '%Y%m%d%H%M%SZ' - ) - - if x509.has_expired(): - org, cn = self._issuer_info(x509) - msg = 'Certificate issued by "%s/%s" expired on %s' % ( - org, - cn, - end_date, - ) - self.fail(msg) - - # Certificate still valid, calculate and return days until expiration - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - days_until_exp = (end_date - datetime.datetime.utcnow()).days - return int(days_until_exp) - - def get_cert_issuer_info(self, crt: str) -> Tuple[str, str]: - return self._issuer_info(self._load_cert(crt)) - - def verify_tls(self, crt: str, key: str) -> None: - try: - _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key) - _key.check() - except (ValueError, crypto.Error) as e: - self.fail('Invalid private key: %s' % str(e)) - try: - _crt = self._load_cert(crt) - except ValueError as e: - self.fail('Invalid certificate key: %s' % str(e)) - - try: - context = SSL.Context(SSL.TLSv1_METHOD) - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - context.use_certificate(_crt) - context.use_privatekey(_key) - context.check_privatekey() - except crypto.Error as e: - self.fail( - 'Private key and certificate do not match up: %s' % str(e) - ) - except SSL.Error as e: - self.fail(f'Invalid cert/key pair: {e}') + +from .internal import InternalCryptoCaller, InternalError def _read() -> str: diff --git a/src/python-common/ceph/cryptotools/internal.py b/src/python-common/ceph/cryptotools/internal.py new file mode 100644 index 00000000000..2de8d742ced --- /dev/null +++ b/src/python-common/ceph/cryptotools/internal.py @@ -0,0 +1,135 @@ +"""Internal execution of cryptographic functions for the ceph mgr +""" + +from typing import Dict, Any, Tuple, Union + +from uuid import uuid4 +import datetime +import warnings + +from OpenSSL import crypto, SSL +import bcrypt + + +from .caller import CryptoCaller, CryptoCallError + + +class InternalError(CryptoCallError): + pass + + +class InternalCryptoCaller(CryptoCaller): + def fail(self, msg: str) -> None: + raise InternalError(msg) + + def password_hash(self, password: str, salt_password: str) -> str: + salt = salt_password.encode() if salt_password else bcrypt.gensalt() + return bcrypt.hashpw(password.encode(), salt).decode() + + def verify_password(self, password: str, hashed_password: str) -> bool: + _password = password.encode() + _hashed_password = hashed_password.encode() + try: + ok = bcrypt.checkpw(_password, _hashed_password) + except ValueError as err: + self.fail(str(err)) + return ok + + def create_private_key(self) -> str: + pkey = crypto.PKey() + pkey.generate_key(crypto.TYPE_RSA, 2048) + return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode() + + def create_self_signed_cert( + self, dname: Dict[str, str], pkey: str + ) -> str: + _pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, pkey) + + # Create a "subject" object + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + req = crypto.X509Req() + subj = req.get_subject() + + # populate the subject with the dname settings + for k, v in dname.items(): + setattr(subj, k, v) + + # create a self-signed cert + cert = crypto.X509() + cert.set_subject(req.get_subject()) + cert.set_serial_number(int(uuid4())) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(_pkey) + cert.sign(_pkey, 'sha512') + return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode() + + def _load_cert(self, crt: Union[str, bytes]) -> Any: + crt_buffer = crt.encode() if isinstance(crt, str) else crt + cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) + return cert + + def _issuer_info(self, cert: Any) -> Tuple[str, str]: + components = cert.get_issuer().get_components() + org_name = cn = '' + for c in components: + if c[0].decode() == 'O': # org comp + org_name = c[1].decode() + elif c[0].decode() == 'CN': # common name comp + cn = c[1].decode() + return (org_name, cn) + + def certificate_days_to_expire(self, crt: str) -> int: + x509 = self._load_cert(crt) + no_after = x509.get_notAfter() + if not no_after: + self.fail("Certificate does not have an expiration date.") + + end_date = datetime.datetime.strptime( + no_after.decode(), '%Y%m%d%H%M%SZ' + ) + + if x509.has_expired(): + org, cn = self._issuer_info(x509) + msg = 'Certificate issued by "%s/%s" expired on %s' % ( + org, + cn, + end_date, + ) + self.fail(msg) + + # Certificate still valid, calculate and return days until expiration + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + days_until_exp = (end_date - datetime.datetime.utcnow()).days + return int(days_until_exp) + + def get_cert_issuer_info(self, crt: str) -> Tuple[str, str]: + return self._issuer_info(self._load_cert(crt)) + + def verify_tls(self, crt: str, key: str) -> None: + try: + _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key) + _key.check() + except (ValueError, crypto.Error) as e: + self.fail('Invalid private key: %s' % str(e)) + try: + _crt = self._load_cert(crt) + except ValueError as e: + self.fail('Invalid certificate key: %s' % str(e)) + + try: + context = SSL.Context(SSL.TLSv1_METHOD) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + context.use_certificate(_crt) + context.use_privatekey(_key) + context.check_privatekey() + except crypto.Error as e: + self.fail( + 'Private key and certificate do not match up: %s' % str(e) + ) + except SSL.Error as e: + self.fail(f'Invalid cert/key pair: {e}')