From 2b9cf2453f13eb48e43e4eb06c78365c397c50cd Mon Sep 17 00:00:00 2001 From: John Mulligan Date: Thu, 17 Apr 2025 17:12:50 -0400 Subject: [PATCH] pybind/mgr: update mgr_util to use cryptotools CryptoCaller class Signed-off-by: John Mulligan --- src/pybind/mgr/mgr_util.py | 114 +++++++------------------------------ 1 file changed, 20 insertions(+), 94 deletions(-) diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index 95194d6943d4e..d3fb5ae70192c 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -32,6 +32,7 @@ else: from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator from ceph.deployment.utils import wrap_ipv6 +import ceph.cryptotools.remote T = TypeVar('T') @@ -636,48 +637,18 @@ def create_self_signed_cert(organisation: str = 'Ceph', else: dname = {"O": organisation, "CN": common_name} - import json - import subprocess - - private_key = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--private_key"], - capture_output=True) - - pkey = private_key.stdout.strip().decode('utf-8') - - data = {"dname": dname, "private_key": pkey} - - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--certificate"], - input=json.dumps(data).encode("utf-8"), - capture_output=True) - - # Check result with a CompletedProcess - if result.returncode != 0 or result.stderr != b'': - raise ValueError(result.stderr) - - cert = result.stdout.strip().decode('utf-8') + cc = ceph.cryptotools.remote.CryptoCaller() + pkey = cc.create_private_key() + cert = cc.create_self_signed_cert(dname, pkey) return cert, pkey -def verify_cacrt_content(crt): - # type: (str) -> int - +def verify_cacrt_content(crt: str) -> int: try: - import subprocess - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_cacrt_content"], - input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), - capture_output=True) - # The above script will only produce stdout output. - # The only scenarios that produce stderr output are failures to import modules - # or syntax errors which test_tls.py will catch - - # Check result of CompletedProcess - if result.returncode != 0 or result.stderr != b'': - logger.warning(result.stderr) - raise ValueError(result.stderr) - except (ValueError) as e: - raise ServerConfigException(f'Invalid certificate: {e}') - - return int(result.stdout.strip().decode('utf-8')) + cc = ceph.cryptotools.remote.CryptoCaller() + return cc.verify_cacrt_content(crt) + except ValueError as err: + raise ServerConfigException(f'Invalid certificate: {err}') def verify_cacrt(cert_fname): @@ -699,55 +670,21 @@ def verify_cacrt(cert_fname): def get_cert_issuer_info(crt: str) -> Tuple[Optional[str], Optional[str]]: """Basic validation of a ca cert""" - + cc = ceph.cryptotools.remote.CryptoCaller() try: - import subprocess - org_name_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--org_name"], - input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), - capture_output=True) - - # Check result with a CompletedProcess - if org_name_proc.returncode != 0 or org_name_proc.stderr != b'': - raise ValueError(org_name_proc.stderr) - - cn_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--cn"], - input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), - capture_output=True) - - # Check result with a CompletedProcess - if cn_proc.returncode != 0 or cn_proc.stderr != b'': - raise ValueError(cn_proc.stderr) - - org_name, cn = org_name_proc.stdout.strip().decode('utf-8'), cn_proc.stdout.strip().decode('utf-8') - - except (ValueError) as e: - raise ServerConfigException(f'Invalid certificate key: {e}') - return (org_name, cn) + return cc.get_cert_issuer_info(crt) + except ValueError as err: + raise ServerConfigException(f'Invalid certificate key: {err}') def verify_tls(crt, key): # type: (str, str) -> int - days_to_expiration = verify_cacrt_content(crt) - + cc = ceph.cryptotools.remote.CryptoCaller() + days_to_expiration = cc.verify_cacrt_content(crt) try: - import subprocess - import json - - data = { - "crt": crt.decode("utf-8") if isinstance(crt, bytes) else crt, # type: ignore[attr-defined] - "key": key.decode("utf-8") if isinstance(key, bytes) else key # type: ignore[attr-defined] - } - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_tls"], - input=json.dumps(data).encode("utf-8"), - capture_output=True) - - # Check result of CompletedProcess - if result.returncode != 0 or result.stdout != b'': - logger.warning(result.stdout) - raise ServerConfigException(result.stdout) - except (ServerConfigException) as e: - raise ServerConfigException(f'Invalid certificate: {e}') - + cc.verify_tls(crt, key) + except ValueError as err: + raise ServerConfigException(str(err)) return days_to_expiration @@ -989,16 +926,5 @@ def password_hash(password: Optional[str], salt_password: Optional[str] = None) if not salt_password: salt_password = '' - import subprocess - import json - - data = {"password": password, "salt_password": salt_password} - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "password_hash"], - input=json.dumps(data).encode("utf-8"), - capture_output=True) - - # Check result with a CompletedProcess - if result.returncode != 0 or result.stderr != b'': - raise ValueError(result.stderr) - - return result.stdout.strip().decode('utf-8') + cc = ceph.cryptotools.remote.CryptoCaller() + return cc.password_hash(password, salt_password) -- 2.39.5