from typing import Any, Tuple, IO
import ipaddress
import tempfile
-import os
import logging
from datetime import datetime, timedelta
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
-from OpenSSL import crypto, SSL
+from mgr_util import verify_tls_files
from orchestrator import OrchestratorError
self.root_cert = given_cert
self.root_key = serialization.load_pem_private_key(
data=priv_key.encode('utf-8'), backend=default_backend(), password=None)
-
-
-def verify_tls(crt, key):
- # type: (str, str) -> None
- verify_cacrt_content(crt)
-
- try:
- _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
- _key.check()
- except (ValueError, crypto.Error) as e:
- raise SSLConfigException('Invalid private key: {}'.format(str(e)))
- try:
- _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
- except ValueError as e:
- raise SSLConfigException('Invalid certificate key: {}'.format(str(e))
- )
-
- try:
- context = SSL.Context(SSL.TLSv1_METHOD)
- context.use_certificate(_crt)
- context.use_privatekey(_key)
- context.check_privatekey()
- except crypto.Error as e:
- logger.warning(f'Private key and certificate do not match up: {e}')
-
-
-def verify_tls_files(cert_fname, pkey_fname):
- # type: (str, str) -> None
- """Basic checks for TLS certificate and key files
-
- Do some validations to the private key and certificate:
- - Check the type and format
- - Check the certificate expiration date
- - Check the consistency of the private key
- - Check that the private key and certificate match up
-
- :param cert_fname: Name of the certificate file
- :param pkey_fname: name of the certificate public key file
-
- :raises SSLConfigException: An error with a message
-
- """
- if not cert_fname or not pkey_fname:
- raise SSLConfigException('no certificate configured')
-
- verify_cacrt(cert_fname)
-
- if not os.path.isfile(pkey_fname):
- raise SSLConfigException('private key %s does not exist' % pkey_fname)
-
- try:
- with open(pkey_fname) as f:
- pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
- pkey.check()
- except (ValueError, crypto.Error) as e:
- raise SSLConfigException(
- 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
- try:
- context = SSL.Context(SSL.TLSv1_METHOD)
- context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
- context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
- context.check_privatekey()
- except crypto.Error as e:
- logger.warning(
- f'Private key {pkey_fname} and certificate {cert_fname} do not match up: {e}')
-
-
-def verify_cacrt(cert_fname):
- # type: (str) -> None
- """Basic validation of a ca cert"""
-
- if not cert_fname:
- raise SSLConfigException("CA cert not configured")
- if not os.path.isfile(cert_fname):
- raise SSLConfigException("Certificate {} does not exist".format(cert_fname))
-
- try:
- with open(cert_fname) as f:
- verify_cacrt_content(f.read())
- except ValueError as e:
- raise SSLConfigException(
- 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
-
-
-def verify_cacrt_content(crt):
- # type: (str) -> None
- try:
- x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
- if x509.has_expired():
- logger.warning(f'Certificate has expired: {crt}')
- except (ValueError, crypto.Error) as e:
- raise SSLConfigException(f'Invalid certificate: {e}')