]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: enable verification of TLS certs without files
authorPatrick Seidensal <pseidensal@suse.com>
Thu, 5 Mar 2020 12:15:16 +0000 (13:15 +0100)
committerPatrick Seidensal <pseidensal@suse.com>
Thu, 5 Mar 2020 12:19:16 +0000 (13:19 +0100)
Signed-off-by: Patrick Seidensal <pseidensal@suse.com>
src/pybind/mgr/mgr_util.py

index 7288c4994e83b4b370089d5dc0209d3a5643b77c..dfa0f0d9022abe157069a5c11adc9d78ebe14efe 100644 (file)
@@ -155,6 +155,18 @@ def create_self_signed_cert(organisation='Ceph', common_name='mgr') -> Tuple[str
     return cert.decode('utf-8'), pkey.decode('utf-8')
 
 
+def verify_cacrt_content(crt):
+    # type: (str) -> None
+    from OpenSSL import crypto
+    try:
+        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+        if x509.has_expired():
+            logger.warning('Certificate has expired: {}'.format(crt))
+    except (ValueError, crypto.Error) as e:
+        raise ServerConfigException(
+            'Invalid certificate: {}'.format(str(e)))
+
+
 def verify_cacrt(cert_fname):
     # type: (str) -> None
     """Basic validation of a ca cert"""
@@ -164,18 +176,42 @@ def verify_cacrt(cert_fname):
     if not os.path.isfile(cert_fname):
         raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
 
-    from OpenSSL import crypto
     try:
         with open(cert_fname) as f:
-            x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
-            if x509.has_expired():
-                logger.warning(
-                    'Certificate {} has expired'.format(cert_fname))
-    except (ValueError, crypto.Error) as e:
+            verify_cacrt_content(f.read())
+    except ValueError as e:
         raise ServerConfigException(
             'Invalid certificate {}: {}'.format(cert_fname, str(e)))
 
 
+def verify_tls(crt, key):
+    # type: (str, str) -> None
+    verify_cacrt_content(crt)
+
+    from OpenSSL import crypto, SSL
+    try:
+        _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
+        _key.check()
+    except (ValueError, crypto.Error) as e:
+        raise ServerConfigException(
+            'Invalid private key: {}'.format(str(e)))
+    try:
+        _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+    except ValueError as e:
+        raise ServerConfigException(
+            'Invalid certificate key: {}'.format(str(e))
+        )
+
+    try:
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        context.use_certificate(_crt)
+        context.use_privatekey(_key)
+        context.check_privatekey()
+    except crypto.Error as e:
+        logger.warning(
+            'Private key and certificate do not match up: {}'.format(str(e)))
+
+
 def verify_tls_files(cert_fname, pkey_fname):
     # type: (str, str) -> None
     """Basic checks for TLS certificate and key files