From: John Mulligan Date: Thu, 17 Apr 2025 21:12:50 +0000 (-0400) Subject: pybind/mgr: update mgr_util to use cryptotools CryptoCaller class X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bb15cfc9c4f3d4595edd2ce4f8d5b30dc1f6a648;p=ceph-ci.git pybind/mgr: update mgr_util to use cryptotools CryptoCaller class Signed-off-by: John Mulligan (cherry picked from commit 2b9cf2453f13eb48e43e4eb06c78365c397c50cd) Conflicts: src/pybind/mgr/mgr_util.py - accepted incoming changes --- diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index 94c52e1a399..050e6844abf 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -24,6 +24,7 @@ else: from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator from ceph.deployment.utils import wrap_ipv6 +import ceph.cryptotools.remote T = TypeVar('T') @@ -540,48 +541,18 @@ def create_self_signed_cert(organisation: str = 'Ceph', else: dname = {"O": organisation, "CN": common_name} - import json - import subprocess - - private_key = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--private_key"], - capture_output=True) - - pkey = private_key.stdout.strip().decode('utf-8') - - data = {"dname": dname, "private_key": pkey} - - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--certificate"], - input=json.dumps(data).encode("utf-8"), - capture_output=True) - - # Check result with a CompletedProcess - if result.returncode != 0 or result.stderr != b'': - raise ValueError(result.stderr) - - cert = result.stdout.strip().decode('utf-8') + cc = ceph.cryptotools.remote.CryptoCaller() + pkey = cc.create_private_key() + cert = cc.create_self_signed_cert(dname, pkey) return cert, pkey -def verify_cacrt_content(crt): - # type: (str) -> int - +def verify_cacrt_content(crt: str) -> int: try: - import subprocess - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_cacrt_content"], - input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), - capture_output=True) - # The above script will only produce stdout output. - # The only scenarios that produce stderr output are failures to import modules - # or syntax errors which test_tls.py will catch - - # Check result of CompletedProcess - if result.returncode != 0 or result.stderr != b'': - logger.warning(result.stderr) - raise ValueError(result.stderr) - except (ValueError) as e: - raise ServerConfigException(f'Invalid certificate: {e}') - - return int(result.stdout.strip().decode('utf-8')) + cc = ceph.cryptotools.remote.CryptoCaller() + return cc.verify_cacrt_content(crt) + except ValueError as err: + raise ServerConfigException(f'Invalid certificate: {err}') def verify_cacrt(cert_fname): @@ -602,54 +573,21 @@ def verify_cacrt(cert_fname): def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]: """Basic validation of a ca cert""" - + cc = ceph.cryptotools.remote.CryptoCaller() try: - import subprocess - org_name_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--org_name"], - input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), - capture_output=True) - - # Check result with a CompletedProcess - if org_name_proc.returncode != 0 or org_name_proc.stderr != b'': - raise ValueError(org_name_proc.stderr) - - cn_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--cn"], - input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), - capture_output=True) - - # Check result with a CompletedProcess - if cn_proc.returncode != 0 or cn_proc.stderr != b'': - raise ValueError(cn_proc.stderr) - - org_name, cn = org_name_proc.stdout.strip().decode('utf-8'), cn_proc.stdout.strip().decode('utf-8') - - except (ValueError) as e: - raise ServerConfigException(f'Invalid certificate key: {e}') - return (org_name, cn) + return cc.get_cert_issuer_info(crt) + except ValueError as err: + raise ServerConfigException(f'Invalid certificate key: {err}') def verify_tls(crt, key): - # type: (str, str) -> None - verify_cacrt_content(crt) - + # type: (str, str) -> int + cc = ceph.cryptotools.remote.CryptoCaller() + days_to_expiration = cc.verify_cacrt_content(crt) try: - import subprocess - import json - - data = { - "crt": crt.decode("utf-8") if isinstance(crt, bytes) else crt, # type: ignore[attr-defined] - "key": key.decode("utf-8") if isinstance(key, bytes) else key # type: ignore[attr-defined] - } - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_tls"], - input=json.dumps(data).encode("utf-8"), - capture_output=True) - - # Check result of CompletedProcess - if result.returncode != 0 or result.stdout != b'': - logger.warning(result.stdout) - raise ServerConfigException(result.stdout) - except (ServerConfigException) as e: - raise ServerConfigException(f'Invalid certificate: {e}') - + cc.verify_tls(crt, key) + except ValueError as err: + raise ServerConfigException(str(err)) + return days_to_expiration def verify_tls_files(cert_fname, pkey_fname): @@ -888,16 +826,5 @@ def password_hash(password: Optional[str], salt_password: Optional[str] = None) if not salt_password: salt_password = '' - import subprocess - import json - - data = {"password": password, "salt_password": salt_password} - result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "password_hash"], - input=json.dumps(data).encode("utf-8"), - capture_output=True) - - # Check result with a CompletedProcess - if result.returncode != 0 or result.stderr != b'': - raise ValueError(result.stderr) - - return result.stdout.strip().decode('utf-8') + cc = ceph.cryptotools.remote.CryptoCaller() + return cc.password_hash(password, salt_password)