]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
python-common/cryptotools: create CrytpoCaller interface class
authorJohn Mulligan <jmulligan@redhat.com>
Wed, 16 Apr 2025 18:56:28 +0000 (14:56 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Mon, 7 Jul 2025 13:32:24 +0000 (09:32 -0400)
Create a class to act as a common shim between the cryptotools external
functions and the mgr. It provides common conversion mechanisms and
could possibly act as an abstraction in case we decide to make
the external function calls in different ways in the future.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/python-common/ceph/cryptotools/__init__.py [new file with mode: 0644]
src/python-common/ceph/cryptotools/remote.py [new file with mode: 0644]

diff --git a/src/python-common/ceph/cryptotools/__init__.py b/src/python-common/ceph/cryptotools/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/python-common/ceph/cryptotools/remote.py b/src/python-common/ceph/cryptotools/remote.py
new file mode 100644 (file)
index 0000000..2edc9fa
--- /dev/null
@@ -0,0 +1,169 @@
+"""Remote execution of cryptographic functions for the ceph mgr
+"""
+# NB. This module exists to enapsulate the logic around running
+# the cryptotools module that are forked off of the parent process
+# to avoid the pyo3 subintepreters problem.
+#
+# The current implementation is simple using the command line and either raw
+# blobs or JSON as stdin inputs and raw blobs or JSON as outputs. It is important
+# that we avoid putting the sensitive data on the command line as that
+# is visible in /proc.
+#
+# This simple implementation incurs the cost of starting a python process
+# for every function call. CryptoCaller is written as a class so that if
+# we choose to we can have multiple implementations of the CryptoCaller
+# sharing the same protocol.
+# For instance we could have a persistent process listening on a unix
+# socket accepting the crypto functions as RPCs. For now, we keep it
+# simple though :-)
+
+from typing import List, Union, Dict, Any, Optional, Tuple
+
+import json
+import logging
+import subprocess
+
+
+_ctmodule = 'ceph.pybind.mgr.cryptotools'
+
+logger = logging.getLogger('ceph.cryptotools.remote')
+
+
+class CryptoCallError(ValueError):
+    pass
+
+
+class CryptoCaller:
+    """CryptoCaller encapsulates cryptographic functions used by the
+    ceph mgr into a suite of functions that can be executed in a
+    different process.
+    Running the crypto functions in a separate process avoids conflicts
+    between the mgr's use of subintepreters and the cryptography module's
+    use of PyO3 rust bindings.
+
+    If you want to raise different error types set the json_error_cls
+    attribute and/or subclass and override the map_error method.
+    """
+
+    def __init__(
+        self, errors_from_json: bool = True, module: str = _ctmodule
+    ):
+        self._module = module
+        self.errors_from_json = errors_from_json
+        self.json_error_cls = ValueError
+
+    def _run(
+        self,
+        args: List[str],
+        input_data: Union[str, bytes, None] = None,
+        capture_output: bool = False,
+        check: bool = False,
+    ) -> subprocess.CompletedProcess:
+        if input_data is None:
+            _input = None
+        elif isinstance(input_data, str):
+            _input = input_data.encode('utf-8')
+        else:
+            _input = input_data
+        cmd = ['python3', '-m', _ctmodule] + list(args)
+        logger.warning('CryptoCaller will run: %r', cmd)
+        try:
+            return subprocess.run(
+                cmd, capture_output=capture_output, input=_input, check=check
+            )
+        except Exception as err:
+            mapped_err = self.map_error(err)
+            if mapped_err:
+                raise mapped_err from err
+            raise
+
+    def _result_json(self, result: subprocess.CompletedProcess) -> Any:
+        result_obj = json.loads(result.stdout)
+        if self.errors_from_json and 'error' in result_obj:
+            raise self.json_error_cls(str(result_obj['error']))
+        return result_obj
+
+    def _result_str(self, result: subprocess.CompletedProcess) -> str:
+        return result.stdout.decode('utf-8')
+
+    def map_error(self, err: Exception) -> Optional[Exception]:
+        """Convert between error types raised by the subprocesses
+        running the crypto functions and what the mgr caller expects.
+        """
+        if isinstance(err, subprocess.CalledProcessError):
+            return CryptoCallError(
+                f'failed crypto call: {err.cmd}: {err.stderr}'
+            )
+        return None
+
+    def create_private_key(self) -> str:
+        """Create a new TLS private key, returning it as a string."""
+        result = self._run(
+            ['create_self_signed_cert', '--private_key'],
+            capture_output=True,
+            check=True,
+        )
+        return self._result_str(result).strip()
+
+    def create_self_signed_cert(
+        self, dname: Dict[str, str], pkey: str
+    ) -> str:
+        """Given TLS certificate subject parameters and a private key,
+        create a new self signed certificate - returned as a string.
+        """
+        result = self._run(
+            ['create_self_signed_cert'],
+            input_data=json.dumps({'dname': dname, 'pkey': pkey}),
+            capture_output=True,
+            check=True,
+        )
+        return self._result_str(result).strip()
+
+    def verify_tls(self, crt: str, key: str) -> None:
+        """Given a TLS certificate and a private key raise an error
+        if the combination is not valid.
+        """
+        self._run(
+            ['verify_tls'],
+            input_data=json.dumps({'crt': crt, 'key': key}),
+            check=True,
+        )
+
+    def verify_cacrt_content(self, crt: str) -> int:
+        """Verify a CA Certificate return the number of days until expiration."""
+        result = self._run(
+            ["verify_cacrt_content"],
+            input_data=crt,
+            capture_output=True,
+            check=True,
+        )
+        result_obj = self._result_json(result)
+        return int(result_obj['days_until_expiration'])
+
+    def get_cert_issuer_info(self, crt: str) -> Tuple[str, str]:
+        """Basic validation of a ca cert"""
+        result = self._run(
+            ["get_cert_issuer_info"],
+            input_data=crt,
+            capture_output=True,
+            check=True,
+        )
+        result_obj = self._result_json(result)
+        org_name = str(result_obj.get('org_name', ''))
+        cn = str(result_obj.get('cn', ''))
+        return org_name, cn
+
+    def password_hash(self, password: str, salt_password: str) -> str:
+        """Hash a password. Returns the hashed password as a string."""
+        pwdata = {"password": password, "salt_password": salt_password}
+        result = self._run(
+            ["password_hash"],
+            input_data=json.dumps(pwdata),
+            capture_output=True,
+            check=True,
+        )
+        result_obj = self._result_json(result)
+        pw_hash = result_obj.get("hash")
+        if not pw_hash:
+            raise CryptoCallError('no password hash')
+        return pw_hash