from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
from ceph.deployment.utils import wrap_ipv6
+import ceph.cryptotools.remote
T = TypeVar('T')
else:
dname = {"O": organisation, "CN": common_name}
- import json
- import subprocess
-
- private_key = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--private_key"],
- capture_output=True)
-
- pkey = private_key.stdout.strip().decode('utf-8')
-
- data = {"dname": dname, "private_key": pkey}
-
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--certificate"],
- input=json.dumps(data).encode("utf-8"),
- capture_output=True)
-
- # Check result with a CompletedProcess
- if result.returncode != 0 or result.stderr != b'':
- raise ValueError(result.stderr)
-
- cert = result.stdout.strip().decode('utf-8')
+ cc = ceph.cryptotools.remote.CryptoCaller()
+ pkey = cc.create_private_key()
+ cert = cc.create_self_signed_cert(dname, pkey)
return cert, pkey
-def verify_cacrt_content(crt):
- # type: (str) -> int
-
+def verify_cacrt_content(crt: str) -> int:
try:
- import subprocess
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_cacrt_content"],
- input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
- capture_output=True)
- # The above script will only produce stdout output.
- # The only scenarios that produce stderr output are failures to import modules
- # or syntax errors which test_tls.py will catch
-
- # Check result of CompletedProcess
- if result.returncode != 0 or result.stderr != b'':
- logger.warning(result.stderr)
- raise ValueError(result.stderr)
- except (ValueError) as e:
- raise ServerConfigException(f'Invalid certificate: {e}')
-
- return int(result.stdout.strip().decode('utf-8'))
+ cc = ceph.cryptotools.remote.CryptoCaller()
+ return cc.verify_cacrt_content(crt)
+ except ValueError as err:
+ raise ServerConfigException(f'Invalid certificate: {err}')
def verify_cacrt(cert_fname):
def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
"""Basic validation of a ca cert"""
-
+ cc = ceph.cryptotools.remote.CryptoCaller()
try:
- import subprocess
- org_name_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--org_name"],
- input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
- capture_output=True)
-
- # Check result with a CompletedProcess
- if org_name_proc.returncode != 0 or org_name_proc.stderr != b'':
- raise ValueError(org_name_proc.stderr)
-
- cn_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--cn"],
- input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
- capture_output=True)
-
- # Check result with a CompletedProcess
- if cn_proc.returncode != 0 or cn_proc.stderr != b'':
- raise ValueError(cn_proc.stderr)
-
- org_name, cn = org_name_proc.stdout.strip().decode('utf-8'), cn_proc.stdout.strip().decode('utf-8')
-
- except (ValueError) as e:
- raise ServerConfigException(f'Invalid certificate key: {e}')
- return (org_name, cn)
+ return cc.get_cert_issuer_info(crt)
+ except ValueError as err:
+ raise ServerConfigException(f'Invalid certificate key: {err}')
def verify_tls(crt, key):
- # type: (str, str) -> None
- verify_cacrt_content(crt)
-
+ # type: (str, str) -> int
+ cc = ceph.cryptotools.remote.CryptoCaller()
+ days_to_expiration = cc.verify_cacrt_content(crt)
try:
- import subprocess
- import json
-
- data = {
- "crt": crt.decode("utf-8") if isinstance(crt, bytes) else crt, # type: ignore[attr-defined]
- "key": key.decode("utf-8") if isinstance(key, bytes) else key # type: ignore[attr-defined]
- }
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_tls"],
- input=json.dumps(data).encode("utf-8"),
- capture_output=True)
-
- # Check result of CompletedProcess
- if result.returncode != 0 or result.stdout != b'':
- logger.warning(result.stdout)
- raise ServerConfigException(result.stdout)
- except (ServerConfigException) as e:
- raise ServerConfigException(f'Invalid certificate: {e}')
-
+ cc.verify_tls(crt, key)
+ except ValueError as err:
+ raise ServerConfigException(str(err))
+ return days_to_expiration
def verify_tls_files(cert_fname, pkey_fname):
if not salt_password:
salt_password = ''
- import subprocess
- import json
-
- data = {"password": password, "salt_password": salt_password}
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "password_hash"],
- input=json.dumps(data).encode("utf-8"),
- capture_output=True)
-
- # Check result with a CompletedProcess
- if result.returncode != 0 or result.stderr != b'':
- raise ValueError(result.stderr)
-
- return result.stdout.strip().decode('utf-8')
+ cc = ceph.cryptotools.remote.CryptoCaller()
+ return cc.password_hash(password, salt_password)