if 'UNITTEST' in os.environ:
import tests
+import bcrypt
import cephfs
import contextlib
import datetime
"""
+ from OpenSSL import crypto
+ from uuid import uuid4
+
# RDN = Relative Distinguished Name
valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
+ # create a key pair
+ pkey = crypto.PKey()
+ pkey.generate_key(crypto.TYPE_RSA, 2048)
+
+ # Create a "subject" object
+ req = crypto.X509Req()
+ subj = req.get_subject()
+
if dname:
# dname received, so check it contains valid RDNs
if not all(field in valid_RDN_list for field in dname):
else:
dname = {"O": organisation, "CN": common_name}
- import json
- import subprocess
-
- private_key = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--private_key"],
- capture_output=True)
-
- pkey = private_key.stdout.strip().decode('utf-8')
+ # populate the subject with the dname settings
+ for k, v in dname.items():
+ setattr(subj, k, v)
- data = {"dname": dname, "private_key": pkey}
+ # create a self-signed cert
+ cert = crypto.X509()
+ cert.set_subject(req.get_subject())
+ cert.set_serial_number(int(uuid4()))
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(pkey)
+ cert.sign(pkey, 'sha512')
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--certificate"],
- input=json.dumps(data).encode("utf-8"),
- capture_output=True)
+ cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+ pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
- # Check result with a CompletedProcess
- if result.returncode != 0 or result.stderr != b'':
- raise ValueError(result.stderr)
-
- cert = result.stdout.strip().decode('utf-8')
- return cert, pkey
+ return cert.decode('utf-8'), pkey.decode('utf-8')
def verify_cacrt_content(crt):
- # type: (str) -> int
-
+ # type: (str) -> None
+ from OpenSSL import crypto
try:
- import subprocess
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_cacrt_content"],
- input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
- capture_output=True)
- # The above script will only produce stdout output.
- # The only scenarios that produce stderr output are failures to import modules
- # or syntax errors which test_tls.py will catch
-
- # Check result of CompletedProcess
- if result.returncode != 0 or result.stderr != b'':
- logger.warning(result.stderr)
- raise ValueError(result.stderr)
- except (ValueError) as e:
+ crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+ x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+ if x509.has_expired():
+ org, cn = get_cert_issuer_info(crt)
+ no_after = x509.get_notAfter()
+ end_date = None
+ if no_after is not None:
+ end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ')
+ msg = f'Certificate issued by "{org}/{cn}" expired on {end_date}'
+ logger.warning(msg)
+ raise ServerConfigException(msg)
+ except (ValueError, crypto.Error) as e:
raise ServerConfigException(f'Invalid certificate: {e}')
- return int(result.stdout.strip().decode('utf-8'))
-
def verify_cacrt(cert_fname):
# type: (str) -> None
def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
"""Basic validation of a ca cert"""
+ from OpenSSL import crypto, SSL
try:
- import subprocess
- org_name_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--org_name"],
- input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
- capture_output=True)
-
- # Check result with a CompletedProcess
- if org_name_proc.returncode != 0 or org_name_proc.stderr != b'':
- raise ValueError(org_name_proc.stderr)
-
- cn_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--cn"],
- input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
- capture_output=True)
-
- # Check result with a CompletedProcess
- if cn_proc.returncode != 0 or cn_proc.stderr != b'':
- raise ValueError(cn_proc.stderr)
-
- org_name, cn = org_name_proc.stdout.strip().decode('utf-8'), cn_proc.stdout.strip().decode('utf-8')
-
- except (ValueError) as e:
+ crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+ (org_name, cn) = (None, None)
+ cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+ components = cert.get_issuer().get_components()
+ for c in components:
+ if c[0].decode() == 'O': # org comp
+ org_name = c[1].decode()
+ elif c[0].decode() == 'CN': # common name comp
+ cn = c[1].decode()
+ return (org_name, cn)
+ except (ValueError, crypto.Error) as e:
raise ServerConfigException(f'Invalid certificate key: {e}')
- return (org_name, cn)
def verify_tls(crt, key):
# type: (str, str) -> None
verify_cacrt_content(crt)
+ from OpenSSL import crypto, SSL
try:
- import subprocess
- import json
-
- data = {
- "crt": crt.decode("utf-8") if isinstance(crt, bytes) else crt, # type: ignore[attr-defined]
- "key": key.decode("utf-8") if isinstance(key, bytes) else key # type: ignore[attr-defined]
- }
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_tls"],
- input=json.dumps(data).encode("utf-8"),
- capture_output=True)
-
- # Check result of CompletedProcess
- if result.returncode != 0 or result.stdout != b'':
- logger.warning(result.stdout)
- raise ServerConfigException(result.stdout)
- except (ServerConfigException) as e:
- raise ServerConfigException(f'Invalid certificate: {e}')
+ _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
+ _key.check()
+ except (ValueError, crypto.Error) as e:
+ raise ServerConfigException(
+ 'Invalid private key: {}'.format(str(e)))
+ try:
+ crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+ _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+ except ValueError as e:
+ raise ServerConfigException(
+ 'Invalid certificate key: {}'.format(str(e))
+ )
+
+ try:
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.use_certificate(_crt)
+ context.use_privatekey(_key)
+ context.check_privatekey()
+ except crypto.Error as e:
+ logger.warning('Private key and certificate do not match up: {}'.format(str(e)))
+ except SSL.Error as e:
+ raise ServerConfigException(f'Invalid cert/key pair: {e}')
if not os.path.isfile(pkey_fname):
raise ServerConfigException('private key %s does not exist' % pkey_fname)
- if not os.path.isfile(cert_fname):
- raise ServerConfigException('certificate %s does not exist' % cert_fname)
+ from OpenSSL import crypto, SSL
try:
- with open(pkey_fname) as key_file, open(cert_fname) as cert_file:
- verify_tls(cert_file.read(), key_file.read())
- except (ServerConfigException) as e:
- raise ServerConfigException({e})
+ with open(pkey_fname) as f:
+ pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
+ pkey.check()
+ except (ValueError, crypto.Error) as e:
+ raise ServerConfigException(
+ 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
+ try:
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
+ context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
+ context.check_privatekey()
+ except crypto.Error as e:
+ logger.warning(
+ 'Private key {} and certificate {} do not match up: {}'.format(
+ pkey_fname, cert_fname, str(e)))
def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
return outer
-def parse_combined_pem_file(pem_data: str) -> Tuple[Optional[str], Optional[str]]:
-
- # Extract the certificate
- cert_start = "-----BEGIN CERTIFICATE-----"
- cert_end = "-----END CERTIFICATE-----"
- cert = None
- if cert_start in pem_data and cert_end in pem_data:
- cert = pem_data[pem_data.index(cert_start):pem_data.index(cert_end) + len(cert_end)]
-
- # Extract the private key
- key_start = "-----BEGIN PRIVATE KEY-----"
- key_end = "-----END PRIVATE KEY-----"
- private_key = None
- if key_start in pem_data and key_end in pem_data:
- private_key = pem_data[pem_data.index(key_start):pem_data.index(key_end) + len(key_end)]
-
- return cert, private_key
-
-
def password_hash(password: Optional[str], salt_password: Optional[str] = None) -> Optional[str]:
if not password:
return None
-
if not salt_password:
- salt_password = ''
-
- import subprocess
- import json
-
- data = {"password": password, "salt_password": salt_password}
- result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "password_hash"],
- input=json.dumps(data).encode("utf-8"),
- capture_output=True)
-
- # Check result with a CompletedProcess
- if result.returncode != 0 or result.stderr != b'':
- raise ValueError(result.stderr)
-
- return result.stdout.strip().decode('utf-8')
+ salt = bcrypt.gensalt()
+ else:
+ salt = salt_password.encode('utf8')
+ return bcrypt.hashpw(password.encode('utf8'), salt).decode('utf8')
+++ /dev/null
-"""
-This file has been isolated into a module so that it can be run
-in a subprocess therefore sidestepping the
-`PyO3 modules may only be initialized once per interpreter process` problem.
-"""
-
-import argparse
-import bcrypt
-import datetime
-import json
-import sys
-import warnings
-
-from argparse import Namespace
-from OpenSSL import crypto, SSL
-from uuid import uuid4
-from typing import Tuple, Optional
-
-
-# subcommand functions
-def password_hash(args: Namespace) -> None:
- data = json.loads(sys.stdin.read())
-
- password = data['password']
- salt_password = data['salt_password']
-
- if not salt_password:
- salt = bcrypt.gensalt()
- else:
- salt = salt_password.encode('utf8')
-
- print(bcrypt.hashpw(password.encode('utf8'), salt).decode())
-
-
-def create_self_signed_cert(args: Namespace) -> None:
-
- # Generate private key
- if args.private_key:
- # create a key pair
- pkey = crypto.PKey()
- pkey.generate_key(crypto.TYPE_RSA, 2048)
- print(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode())
- return
-
- data = json.loads(sys.stdin.read())
-
- dname = data['dname']
- pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, data['private_key'])
-
- # Create a "subject" object
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- req = crypto.X509Req()
- subj = req.get_subject()
-
- # populate the subject with the dname settings
- for k, v in dname.items():
- setattr(subj, k, v)
-
- # create a self-signed cert
- cert = crypto.X509()
- cert.set_subject(req.get_subject())
- cert.set_serial_number(int(uuid4()))
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
- cert.set_issuer(cert.get_subject())
- cert.set_pubkey(pkey)
- cert.sign(pkey, 'sha512')
-
- print(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode())
-
-
-def _get_cert_issuer_info(crt: str) -> Tuple[Optional[str], Optional[str]]:
- """Basic validation of a CA cert
- """
-
- crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
- (org_name, cn) = (None, None)
- cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
- components = cert.get_issuer().get_components()
- for c in components:
- if c[0].decode() == 'O': # org comp
- org_name = c[1].decode()
- elif c[0].decode() == 'CN': # common name comp
- cn = c[1].decode()
-
- return (org_name, cn)
-
-
-def verify_cacrt_content(args: Namespace) -> None:
- crt = sys.stdin.read()
-
- crt_buffer = crt.encode("utf-8") if isinstance(crt, str) else crt
- x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
- no_after = x509.get_notAfter()
- if not no_after:
- print("Certificate does not have an expiration date.", file=sys.stderr)
- sys.exit(1)
-
- end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ')
-
- if x509.has_expired():
- org, cn = _get_cert_issuer_info(crt)
- msg = 'Certificate issued by "%s/%s" expired on %s' % (org, cn, end_date)
- print(msg, file=sys.stderr)
- sys.exit(1)
-
- # Certificate still valid, calculate and return days until expiration
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- print((end_date - datetime.datetime.utcnow()).days)
-
-
-def get_cert_issuer_info(args: Namespace) -> None:
- crt = sys.stdin.read()
-
- crt_buffer = crt.encode("utf-8") if isinstance(crt, str) else crt
- (org_name, cn) = (None, None)
- cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
- components = cert.get_issuer().get_components()
- for c in components:
- if c[0].decode() == 'O': # org comp
- org_name = c[1].decode()
- elif c[0].decode() == 'CN': # common name comp
- cn = c[1].decode()
-
- if args.org_name:
- print(org_name)
-
- if args.cn:
- print(cn)
-
-
-def verify_tls(args: Namespace) -> None:
-
- data = json.loads(sys.stdin.read())
-
- crt = data['crt']
- key = data['key']
-
- try:
- _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
- _key.check()
- except (ValueError, crypto.Error) as e:
- print('Invalid private key: %s' % str(e))
- try:
- crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
- _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
- except ValueError as e:
- print('Invalid certificate key: %s' % str(e))
-
- try:
- context = SSL.Context(SSL.TLSv1_METHOD)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- context.use_certificate(_crt)
- context.use_privatekey(_key)
-
- context.check_privatekey()
- except crypto.Error as e:
- print('Private key and certificate do not match up: %s' % str(e))
- except SSL.Error as e:
- print(f'Invalid cert/key pair: {e}')
-
-
-if __name__ == "__main__":
- # create the top-level parser
- parser = argparse.ArgumentParser(prog='cryptotools.py')
- subparsers = parser.add_subparsers(required=True)
-
- # create the parser for the "password_hash" command
- parser_foo = subparsers.add_parser('password_hash')
- parser_foo.set_defaults(func=password_hash)
-
- # create the parser for the "create_self_signed_cert" command
- parser_bar = subparsers.add_parser('create_self_signed_cert')
- parser_bar.add_argument('--private_key', required=False, action='store_true')
- parser_bar.add_argument('--certificate', required=False, action='store_true')
- parser_bar.set_defaults(func=create_self_signed_cert)
-
- # create the parser for the "verify_cacrt_content" command
- parser_bar = subparsers.add_parser('verify_cacrt_content')
- parser_bar.set_defaults(func=verify_cacrt_content)
-
- # create the parser for the "get_cert_issuer_info" command
- parser_bar = subparsers.add_parser('get_cert_issuer_info')
- parser_bar.add_argument('--org_name', required=False, action='store_true')
- parser_bar.add_argument('--cn', required=False, action='store_true')
- parser_bar.set_defaults(func=get_cert_issuer_info)
-
- # create the parser for the "verify_tls" command
- parser_bar = subparsers.add_parser('verify_tls')
- parser_bar.set_defaults(func=verify_tls)
-
- # parse the args and call whatever function was selected
- args = parser.parse_args()
- args.func(args)