]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
python-common/cryptotools: move internal crypto caller to new file
authorJohn Mulligan <jmulligan@redhat.com>
Thu, 24 Apr 2025 18:56:58 +0000 (14:56 -0400)
committerNizamudeen A <nia@redhat.com>
Mon, 5 Jan 2026 10:44:33 +0000 (16:14 +0530)
Signed-off-by: John Mulligan <jmulligan@redhat.com>
(cherry picked from commit 0c774d5c767ef9875250de5a95e421a6b837b85e)

src/python-common/ceph/cryptotools/cryptotools.py
src/python-common/ceph/cryptotools/internal.py [new file with mode: 0644]

index 1466d4b606db0f54c239266781d9d590f79b69e3..4aae0d8c9336bd32d1c9006563ee8cab5f0e8404 100644 (file)
@@ -4,138 +4,15 @@ in a subprocess therefore sidestepping the
 `PyO3 modules may only be initialized once per interpreter process` problem.
 """
 
+from typing import Any, Dict
+
 import argparse
-import bcrypt
-import datetime
 import json
 import sys
-import warnings
 
 from argparse import Namespace
-from OpenSSL import crypto, SSL
-from uuid import uuid4
-from typing import Tuple, Any, Dict, Union
-
-
-class InternalError(ValueError):
-    pass
-
-
-class InternalCryptoCaller:
-    def fail(self, msg: str) -> None:
-        raise ValueError(msg)
-
-    def password_hash(self, password: str, salt_password: str) -> str:
-        salt = salt_password.encode() if salt_password else bcrypt.gensalt()
-        return bcrypt.hashpw(password.encode(), salt).decode()
-
-    def verify_password(self, password: str, hashed_password: str) -> bool:
-        _password = password.encode()
-        _hashed_password = hashed_password.encode()
-        try:
-            ok = bcrypt.checkpw(_password, _hashed_password)
-        except ValueError as err:
-            self.fail(str(err))
-        return ok
-
-    def create_private_key(self) -> str:
-        pkey = crypto.PKey()
-        pkey.generate_key(crypto.TYPE_RSA, 2048)
-        return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode()
-
-    def create_self_signed_cert(
-        self, dname: Dict[str, str], pkey: str
-    ) -> str:
-        _pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, pkey)
-
-        # Create a "subject" object
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore")
-            req = crypto.X509Req()
-        subj = req.get_subject()
-
-        # populate the subject with the dname settings
-        for k, v in dname.items():
-            setattr(subj, k, v)
-
-        # create a self-signed cert
-        cert = crypto.X509()
-        cert.set_subject(req.get_subject())
-        cert.set_serial_number(int(uuid4()))
-        cert.gmtime_adj_notBefore(0)
-        cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)  # 10 years
-        cert.set_issuer(cert.get_subject())
-        cert.set_pubkey(_pkey)
-        cert.sign(_pkey, 'sha512')
-        return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode()
-
-    def _load_cert(self, crt: Union[str, bytes]) -> Any:
-        crt_buffer = crt.encode() if isinstance(crt, str) else crt
-        cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
-        return cert
-
-    def _issuer_info(self, cert: Any) -> Tuple[str, str]:
-        components = cert.get_issuer().get_components()
-        org_name = cn = ''
-        for c in components:
-            if c[0].decode() == 'O':  # org comp
-                org_name = c[1].decode()
-            elif c[0].decode() == 'CN':  # common name comp
-                cn = c[1].decode()
-        return (org_name, cn)
-
-    def certificate_days_to_expire(self, crt: str) -> int:
-        x509 = self._load_cert(crt)
-        no_after = x509.get_notAfter()
-        if not no_after:
-            self.fail("Certificate does not have an expiration date.")
-
-        end_date = datetime.datetime.strptime(
-            no_after.decode(), '%Y%m%d%H%M%SZ'
-        )
-
-        if x509.has_expired():
-            org, cn = self._issuer_info(x509)
-            msg = 'Certificate issued by "%s/%s" expired on %s' % (
-                org,
-                cn,
-                end_date,
-            )
-            self.fail(msg)
-
-        # Certificate still valid, calculate and return days until expiration
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore")
-            days_until_exp = (end_date - datetime.datetime.utcnow()).days
-        return int(days_until_exp)
-
-    def get_cert_issuer_info(self, crt: str) -> Tuple[str, str]:
-        return self._issuer_info(self._load_cert(crt))
-
-    def verify_tls(self, crt: str, key: str) -> None:
-        try:
-            _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
-            _key.check()
-        except (ValueError, crypto.Error) as e:
-            self.fail('Invalid private key: %s' % str(e))
-        try:
-            _crt = self._load_cert(crt)
-        except ValueError as e:
-            self.fail('Invalid certificate key: %s' % str(e))
-
-        try:
-            context = SSL.Context(SSL.TLSv1_METHOD)
-            with warnings.catch_warnings():
-                warnings.simplefilter("ignore")
-                context.use_certificate(_crt)
-                context.use_privatekey(_key)
-            context.check_privatekey()
-        except crypto.Error as e:
-            self.fail(
-                'Private key and certificate do not match up: %s' % str(e)
-            )
-        except SSL.Error as e:
-            self.fail(f'Invalid cert/key pair: {e}')
+
+from .internal import InternalCryptoCaller, InternalError
 
 
 def _read() -> str:
diff --git a/src/python-common/ceph/cryptotools/internal.py b/src/python-common/ceph/cryptotools/internal.py
new file mode 100644 (file)
index 0000000..2de8d74
--- /dev/null
@@ -0,0 +1,135 @@
+"""Internal execution of cryptographic functions for the ceph mgr
+"""
+
+from typing import Dict, Any, Tuple, Union
+
+from uuid import uuid4
+import datetime
+import warnings
+
+from OpenSSL import crypto, SSL
+import bcrypt
+
+
+from .caller import CryptoCaller, CryptoCallError
+
+
+class InternalError(CryptoCallError):
+    pass
+
+
+class InternalCryptoCaller(CryptoCaller):
+    def fail(self, msg: str) -> None:
+        raise InternalError(msg)
+
+    def password_hash(self, password: str, salt_password: str) -> str:
+        salt = salt_password.encode() if salt_password else bcrypt.gensalt()
+        return bcrypt.hashpw(password.encode(), salt).decode()
+
+    def verify_password(self, password: str, hashed_password: str) -> bool:
+        _password = password.encode()
+        _hashed_password = hashed_password.encode()
+        try:
+            ok = bcrypt.checkpw(_password, _hashed_password)
+        except ValueError as err:
+            self.fail(str(err))
+        return ok
+
+    def create_private_key(self) -> str:
+        pkey = crypto.PKey()
+        pkey.generate_key(crypto.TYPE_RSA, 2048)
+        return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode()
+
+    def create_self_signed_cert(
+        self, dname: Dict[str, str], pkey: str
+    ) -> str:
+        _pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, pkey)
+
+        # Create a "subject" object
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore")
+            req = crypto.X509Req()
+        subj = req.get_subject()
+
+        # populate the subject with the dname settings
+        for k, v in dname.items():
+            setattr(subj, k, v)
+
+        # create a self-signed cert
+        cert = crypto.X509()
+        cert.set_subject(req.get_subject())
+        cert.set_serial_number(int(uuid4()))
+        cert.gmtime_adj_notBefore(0)
+        cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)  # 10 years
+        cert.set_issuer(cert.get_subject())
+        cert.set_pubkey(_pkey)
+        cert.sign(_pkey, 'sha512')
+        return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode()
+
+    def _load_cert(self, crt: Union[str, bytes]) -> Any:
+        crt_buffer = crt.encode() if isinstance(crt, str) else crt
+        cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+        return cert
+
+    def _issuer_info(self, cert: Any) -> Tuple[str, str]:
+        components = cert.get_issuer().get_components()
+        org_name = cn = ''
+        for c in components:
+            if c[0].decode() == 'O':  # org comp
+                org_name = c[1].decode()
+            elif c[0].decode() == 'CN':  # common name comp
+                cn = c[1].decode()
+        return (org_name, cn)
+
+    def certificate_days_to_expire(self, crt: str) -> int:
+        x509 = self._load_cert(crt)
+        no_after = x509.get_notAfter()
+        if not no_after:
+            self.fail("Certificate does not have an expiration date.")
+
+        end_date = datetime.datetime.strptime(
+            no_after.decode(), '%Y%m%d%H%M%SZ'
+        )
+
+        if x509.has_expired():
+            org, cn = self._issuer_info(x509)
+            msg = 'Certificate issued by "%s/%s" expired on %s' % (
+                org,
+                cn,
+                end_date,
+            )
+            self.fail(msg)
+
+        # Certificate still valid, calculate and return days until expiration
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore")
+            days_until_exp = (end_date - datetime.datetime.utcnow()).days
+        return int(days_until_exp)
+
+    def get_cert_issuer_info(self, crt: str) -> Tuple[str, str]:
+        return self._issuer_info(self._load_cert(crt))
+
+    def verify_tls(self, crt: str, key: str) -> None:
+        try:
+            _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
+            _key.check()
+        except (ValueError, crypto.Error) as e:
+            self.fail('Invalid private key: %s' % str(e))
+        try:
+            _crt = self._load_cert(crt)
+        except ValueError as e:
+            self.fail('Invalid certificate key: %s' % str(e))
+
+        try:
+            context = SSL.Context(SSL.TLSv1_METHOD)
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore")
+                context.use_certificate(_crt)
+                context.use_privatekey(_key)
+            context.check_privatekey()
+        except crypto.Error as e:
+            self.fail(
+                'Private key and certificate do not match up: %s' % str(e)
+            )
+        except SSL.Error as e:
+            self.fail(f'Invalid cert/key pair: {e}')