]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
pybind/mgr: update mgr_util to use cryptotools CryptoCaller class
authorJohn Mulligan <jmulligan@redhat.com>
Thu, 17 Apr 2025 21:12:50 +0000 (17:12 -0400)
committerKefu Chai <k.chai@proxmox.com>
Thu, 5 Feb 2026 02:50:07 +0000 (10:50 +0800)
Signed-off-by: John Mulligan <jmulligan@redhat.com>
(cherry picked from commit 2b9cf2453f13eb48e43e4eb06c78365c397c50cd)

 Conflicts:
src/pybind/mgr/mgr_util.py
 - accepted incoming changes

src/pybind/mgr/mgr_util.py

index 94c52e1a399d322358bd83f81fb876dc4cd2ff88..050e6844abfe5016c1866394f96b70709bf008f0 100644 (file)
@@ -24,6 +24,7 @@ else:
 from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
 
 from ceph.deployment.utils import wrap_ipv6
+import ceph.cryptotools.remote
 
 T = TypeVar('T')
 
@@ -540,48 +541,18 @@ def create_self_signed_cert(organisation: str = 'Ceph',
     else:
         dname = {"O": organisation, "CN": common_name}
 
-    import json
-    import subprocess
-
-    private_key = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--private_key"],
-                                 capture_output=True)
-
-    pkey = private_key.stdout.strip().decode('utf-8')
-
-    data = {"dname": dname, "private_key": pkey}
-
-    result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--certificate"],
-                            input=json.dumps(data).encode("utf-8"),
-                            capture_output=True)
-
-    # Check result with a CompletedProcess
-    if result.returncode != 0 or result.stderr != b'':
-        raise ValueError(result.stderr)
-
-    cert = result.stdout.strip().decode('utf-8')
+    cc = ceph.cryptotools.remote.CryptoCaller()
+    pkey = cc.create_private_key()
+    cert = cc.create_self_signed_cert(dname, pkey)
     return cert, pkey
 
 
-def verify_cacrt_content(crt):
-    # type: (str) -> int
-
+def verify_cacrt_content(crt: str) -> int:
     try:
-        import subprocess
-        result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_cacrt_content"],
-                                input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
-                                capture_output=True)
-        # The above script will only produce stdout output.
-        # The only scenarios that produce stderr output are failures to import modules
-        # or syntax errors which test_tls.py will catch
-
-        # Check result of CompletedProcess
-        if result.returncode != 0 or result.stderr != b'':
-            logger.warning(result.stderr)
-            raise ValueError(result.stderr)
-    except (ValueError) as e:
-        raise ServerConfigException(f'Invalid certificate: {e}')
-
-    return int(result.stdout.strip().decode('utf-8'))
+        cc = ceph.cryptotools.remote.CryptoCaller()
+        return cc.verify_cacrt_content(crt)
+    except ValueError as err:
+        raise ServerConfigException(f'Invalid certificate: {err}')
 
 
 def verify_cacrt(cert_fname):
@@ -602,54 +573,21 @@ def verify_cacrt(cert_fname):
 
 def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
     """Basic validation of a ca cert"""
-
+    cc = ceph.cryptotools.remote.CryptoCaller()
     try:
-        import subprocess
-        org_name_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--org_name"],
-                                       input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
-                                       capture_output=True)
-
-        # Check result with a CompletedProcess
-        if org_name_proc.returncode != 0 or org_name_proc.stderr != b'':
-            raise ValueError(org_name_proc.stderr)
-
-        cn_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--cn"],
-                                 input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
-                                 capture_output=True)
-
-        # Check result with a CompletedProcess
-        if cn_proc.returncode != 0 or cn_proc.stderr != b'':
-            raise ValueError(cn_proc.stderr)
-
-        org_name, cn = org_name_proc.stdout.strip().decode('utf-8'), cn_proc.stdout.strip().decode('utf-8')
-
-    except (ValueError) as e:
-        raise ServerConfigException(f'Invalid certificate key: {e}')
-    return (org_name, cn)
+        return cc.get_cert_issuer_info(crt)
+    except ValueError as err:
+        raise ServerConfigException(f'Invalid certificate key: {err}')
 
 def verify_tls(crt, key):
-    # type: (str, str) -> None
-    verify_cacrt_content(crt)
-
+    # type: (str, str) -> int
+    cc = ceph.cryptotools.remote.CryptoCaller()
+    days_to_expiration = cc.verify_cacrt_content(crt)
     try:
-        import subprocess
-        import json
-
-        data = {
-            "crt": crt.decode("utf-8") if isinstance(crt, bytes) else crt,  # type: ignore[attr-defined]
-            "key": key.decode("utf-8") if isinstance(key, bytes) else key   # type: ignore[attr-defined]
-        }
-        result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_tls"],
-                                input=json.dumps(data).encode("utf-8"),
-                                capture_output=True)
-
-        # Check result of CompletedProcess
-        if result.returncode != 0 or result.stdout != b'':
-            logger.warning(result.stdout)
-            raise ServerConfigException(result.stdout)
-    except (ServerConfigException) as e:
-        raise ServerConfigException(f'Invalid certificate: {e}')
-
+        cc.verify_tls(crt, key)
+    except ValueError as err:
+        raise ServerConfigException(str(err))
+    return days_to_expiration
 
 
 def verify_tls_files(cert_fname, pkey_fname):
@@ -888,16 +826,5 @@ def password_hash(password: Optional[str], salt_password: Optional[str] = None)
     if not salt_password:
         salt_password = ''
 
-    import subprocess
-    import json
-
-    data = {"password": password, "salt_password": salt_password}
-    result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "password_hash"],
-                            input=json.dumps(data).encode("utf-8"),
-                            capture_output=True)
-
-    # Check result with a CompletedProcess
-    if result.returncode != 0 or result.stderr != b'':
-        raise ValueError(result.stderr)
-
-    return result.stdout.strip().decode('utf-8')
+    cc = ceph.cryptotools.remote.CryptoCaller()
+    return cc.password_hash(password, salt_password)