]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
pybind/mgr: Hack around the 'ImportError: PyO3 modules may only be initialized once...
authorPaulo E. Castro <pecastro@wormholenet.com>
Sat, 5 Apr 2025 20:47:55 +0000 (21:47 +0100)
committerNizamudeen A <nia@redhat.com>
Tue, 22 Jul 2025 03:47:21 +0000 (09:17 +0530)
Fixes: https://tracker.ceph.com/issues/64213
Signed-off-by: Paulo E. Castro <pecastro@wormholenet.com>
(cherry picked from commit 5b2aa8f8c61d7c2a56e1480c479801079a1ff822)

src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/mgr_util.py
src/pybind/mgr/tests/test_tls.py
src/python-common/ceph/pybind/__init__.py [new file with mode: 0644]
src/python-common/ceph/pybind/mgr/__init__.py [new file with mode: 0644]
src/python-common/ceph/pybind/mgr/cryptotools.py [new file with mode: 0644]

index e38d8f9c10b5bd692b4f957fc7ed0f434dcc433d..22e2936200c66151f999dd6bcdc22746f09088c6 100644 (file)
@@ -2099,12 +2099,10 @@ class TestCephadm(object):
             ), CephadmOrchestrator.apply_container),
         ]
     )
-    @mock.patch("subprocess.run", None)
     @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
     @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock())
     @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
-    @mock.patch("subprocess.run", mock.MagicMock())
     def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, spec, meth, 'test'):
index 8d71dd69128840928a64e8cb551d72583d49ba1b..94c52e1a399d322358bd83f81fb876dc4cd2ff88 100644 (file)
@@ -3,7 +3,6 @@ import os
 if 'UNITTEST' in os.environ:
     import tests
 
-import bcrypt
 import cephfs
 import contextlib
 import datetime
@@ -531,20 +530,9 @@ def create_self_signed_cert(organisation: str = 'Ceph',
 
     """
 
-    from OpenSSL import crypto
-    from uuid import uuid4
-
     # RDN = Relative Distinguished Name
     valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
 
-    # create a key pair
-    pkey = crypto.PKey()
-    pkey.generate_key(crypto.TYPE_RSA, 2048)
-
-    # Create a "subject" object
-    req = crypto.X509Req()
-    subj = req.get_subject()
-
     if dname:
         # dname received, so check it contains valid RDNs
         if not all(field in valid_RDN_list for field in dname):
@@ -552,44 +540,49 @@ def create_self_signed_cert(organisation: str = 'Ceph',
     else:
         dname = {"O": organisation, "CN": common_name}
 
-    # populate the subject with the dname settings
-    for k, v in dname.items():
-        setattr(subj, k, v)
+    import json
+    import subprocess
+
+    private_key = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--private_key"],
+                                 capture_output=True)
+
+    pkey = private_key.stdout.strip().decode('utf-8')
 
-    # create a self-signed cert
-    cert = crypto.X509()
-    cert.set_subject(req.get_subject())
-    cert.set_serial_number(int(uuid4()))
-    cert.gmtime_adj_notBefore(0)
-    cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)  # 10 years
-    cert.set_issuer(cert.get_subject())
-    cert.set_pubkey(pkey)
-    cert.sign(pkey, 'sha512')
+    data = {"dname": dname, "private_key": pkey}
 
-    cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
-    pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
+    result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--certificate"],
+                            input=json.dumps(data).encode("utf-8"),
+                            capture_output=True)
 
-    return cert.decode('utf-8'), pkey.decode('utf-8')
+    # Check result with a CompletedProcess
+    if result.returncode != 0 or result.stderr != b'':
+        raise ValueError(result.stderr)
+
+    cert = result.stdout.strip().decode('utf-8')
+    return cert, pkey
 
 
 def verify_cacrt_content(crt):
-    # type: (str) -> None
-    from OpenSSL import crypto
+    # type: (str) -> int
+
     try:
-        crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
-        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
-        if x509.has_expired():
-            org, cn = get_cert_issuer_info(crt)
-            no_after = x509.get_notAfter()
-            end_date = None
-            if no_after is not None:
-                end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ')
-            msg = f'Certificate issued by "{org}/{cn}" expired on {end_date}'
-            logger.warning(msg)
-            raise ServerConfigException(msg)
-    except (ValueError, crypto.Error) as e:
+        import subprocess
+        result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_cacrt_content"],
+                                input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
+                                capture_output=True)
+        # The above script will only produce stdout output.
+        # The only scenarios that produce stderr output are failures to import modules
+        # or syntax errors which test_tls.py will catch
+
+        # Check result of CompletedProcess
+        if result.returncode != 0 or result.stderr != b'':
+            logger.warning(result.stderr)
+            raise ValueError(result.stderr)
+    except (ValueError) as e:
         raise ServerConfigException(f'Invalid certificate: {e}')
 
+    return int(result.stdout.strip().decode('utf-8'))
+
 
 def verify_cacrt(cert_fname):
     # type: (str) -> None
@@ -610,49 +603,52 @@ def verify_cacrt(cert_fname):
 def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
     """Basic validation of a ca cert"""
 
-    from OpenSSL import crypto, SSL
     try:
-        crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
-        (org_name, cn) = (None, None)
-        cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
-        components = cert.get_issuer().get_components()
-        for c in components:
-            if c[0].decode() == 'O':  # org comp
-                org_name = c[1].decode()
-            elif c[0].decode() == 'CN':  # common name comp
-                cn = c[1].decode()
-        return (org_name, cn)
-    except (ValueError, crypto.Error) as e:
+        import subprocess
+        org_name_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--org_name"],
+                                       input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
+                                       capture_output=True)
+
+        # Check result with a CompletedProcess
+        if org_name_proc.returncode != 0 or org_name_proc.stderr != b'':
+            raise ValueError(org_name_proc.stderr)
+
+        cn_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--cn"],
+                                 input=crt if isinstance(crt, bytes) else crt.encode('utf-8'),
+                                 capture_output=True)
+
+        # Check result with a CompletedProcess
+        if cn_proc.returncode != 0 or cn_proc.stderr != b'':
+            raise ValueError(cn_proc.stderr)
+
+        org_name, cn = org_name_proc.stdout.strip().decode('utf-8'), cn_proc.stdout.strip().decode('utf-8')
+
+    except (ValueError) as e:
         raise ServerConfigException(f'Invalid certificate key: {e}')
+    return (org_name, cn)
 
 def verify_tls(crt, key):
     # type: (str, str) -> None
     verify_cacrt_content(crt)
 
-    from OpenSSL import crypto, SSL
     try:
-        _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
-        _key.check()
-    except (ValueError, crypto.Error) as e:
-        raise ServerConfigException(
-            'Invalid private key: {}'.format(str(e)))
-    try:
-        crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
-        _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
-    except ValueError as e:
-        raise ServerConfigException(
-            'Invalid certificate key: {}'.format(str(e))
-        )
-
-    try:
-        context = SSL.Context(SSL.TLSv1_METHOD)
-        context.use_certificate(_crt)
-        context.use_privatekey(_key)
-        context.check_privatekey()
-    except crypto.Error as e:
-        logger.warning('Private key and certificate do not match up: {}'.format(str(e)))
-    except SSL.Error as e:
-        raise ServerConfigException(f'Invalid cert/key pair: {e}')
+        import subprocess
+        import json
+
+        data = {
+            "crt": crt.decode("utf-8") if isinstance(crt, bytes) else crt,  # type: ignore[attr-defined]
+            "key": key.decode("utf-8") if isinstance(key, bytes) else key   # type: ignore[attr-defined]
+        }
+        result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_tls"],
+                                input=json.dumps(data).encode("utf-8"),
+                                capture_output=True)
+
+        # Check result of CompletedProcess
+        if result.returncode != 0 or result.stdout != b'':
+            logger.warning(result.stdout)
+            raise ServerConfigException(result.stdout)
+    except (ServerConfigException) as e:
+        raise ServerConfigException(f'Invalid certificate: {e}')
 
 
 
@@ -681,24 +677,14 @@ def verify_tls_files(cert_fname, pkey_fname):
     if not os.path.isfile(pkey_fname):
         raise ServerConfigException('private key %s does not exist' % pkey_fname)
 
-    from OpenSSL import crypto, SSL
+    if not os.path.isfile(cert_fname):
+        raise ServerConfigException('certificate %s does not exist' % cert_fname)
 
     try:
-        with open(pkey_fname) as f:
-            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
-            pkey.check()
-    except (ValueError, crypto.Error) as e:
-        raise ServerConfigException(
-            'Invalid private key {}: {}'.format(pkey_fname, str(e)))
-    try:
-        context = SSL.Context(SSL.TLSv1_METHOD)
-        context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
-        context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
-        context.check_privatekey()
-    except crypto.Error as e:
-        logger.warning(
-            'Private key {} and certificate {} do not match up: {}'.format(
-                pkey_fname, cert_fname, str(e)))
+        with open(pkey_fname) as key_file, open(cert_fname) as cert_file:
+            verify_tls(cert_file.read(), key_file.read())
+    except (ServerConfigException) as e:
+        raise ServerConfigException({e})
 
 
 def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
@@ -876,11 +862,42 @@ def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]],
     return outer
 
 
+def parse_combined_pem_file(pem_data: str) -> Tuple[Optional[str], Optional[str]]:
+
+    # Extract the certificate
+    cert_start = "-----BEGIN CERTIFICATE-----"
+    cert_end = "-----END CERTIFICATE-----"
+    cert = None
+    if cert_start in pem_data and cert_end in pem_data:
+        cert = pem_data[pem_data.index(cert_start):pem_data.index(cert_end) + len(cert_end)]
+
+    # Extract the private key
+    key_start = "-----BEGIN PRIVATE KEY-----"
+    key_end = "-----END PRIVATE KEY-----"
+    private_key = None
+    if key_start in pem_data and key_end in pem_data:
+        private_key = pem_data[pem_data.index(key_start):pem_data.index(key_end) + len(key_end)]
+
+    return cert, private_key
+
+
 def password_hash(password: Optional[str], salt_password: Optional[str] = None) -> Optional[str]:
     if not password:
         return None
+
     if not salt_password:
-        salt = bcrypt.gensalt()
-    else:
-        salt = salt_password.encode('utf8')
-    return bcrypt.hashpw(password.encode('utf8'), salt).decode('utf8')
+        salt_password = ''
+
+    import subprocess
+    import json
+
+    data = {"password": password, "salt_password": salt_password}
+    result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "password_hash"],
+                            input=json.dumps(data).encode("utf-8"),
+                            capture_output=True)
+
+    # Check result with a CompletedProcess
+    if result.returncode != 0 or result.stderr != b'':
+        raise ValueError(result.stderr)
+
+    return result.stdout.strip().decode('utf-8')
index 19ce46a93fdd0a1034ca7d83277ba600c3ba6f07..39ba5ae0a03333635b01e2b4e2fcb9cbfb9a5a10 100644 (file)
@@ -1,4 +1,4 @@
-from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException, get_cert_issuer_info
+from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException, get_cert_issuer_info, verify_cacrt_content
 from OpenSSL import crypto, SSL
 
 import unittest
@@ -10,6 +10,9 @@ valid_ceph_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0S
 invalid_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEBn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZa\n-----END CERTIFICATE-----\n
 """
 
+expired_cert = """-----BEGIN CERTIFICATE-----\nMIICtjCCAZ4CAQAwDQYJKoZIhvcNAQENBQAwITEQMA4GA1UEAwwHY2VwaGFkbTEN\nMAsGA1UECgwEQ2VwaDAeFw0xNTAyMTYxOTQ4MTdaFw0yMDAyMTUxOTQ4MTdaMCEx\nEDAOBgNVBAMMB2NlcGhhZG0xDTALBgNVBAoMBENlcGgwggEiMA0GCSqGSIb3DQEB\nAQUAA4IBDwAwggEKAoIBAQCxYHJ6RlPeuhZJyAMR1ru01BEGbwhI7vMga8pwyTX8\nNn1ow2asbj7lad+jO5j5Gon8GFwsrKM0S8vmITxd5QkshnHPQRQF8hz4aieNOQiL\nnVRBTHgLihEBJCpyuTmHLn1G374ZObuFqyHcnIrKNdeKb0JxNKbx26/2NrWwFGAe\nAj5KuizMHJMZYVLfYelP4g2hSRPe2JJWI4429LeLWuBQBL9t/IPY0IlmFDP4eL+S\nB2Py8Ig2XY5oyaaxpwI8H/cAY92lsoHPI3ldDn1JEiH5Gwzxf+9fF29cesp8BYcm\naav1jT8ONvsfn7AxKDKcfZIpRNKlOqFIC03gG5R3O1iHAgMBAAEwDQYJKoZIhvcN\nAQENBQADggEBADh9bAMR7RIK3M3u6LoTQQrcoxJ0pEXBrFQGQk2uz2krlDTKRS+2\nubwD8bLNd3dl5RzvVJ1hui8y9JMnqYwgMrjR9B0EDUM/ihYU2zO3ZN9nhhnTN2qT\n+UtFtyilg3U4nQdWGw2jFPu08JPoF/g+7iBH+/o5WOfzOovjLg4BsVlKUP4ND8Dv\nXr8gxZTlaoZvZlhMCdhiT2aKstCA9R3RYBbEo/FtcsHOkO0EFuxCLiVd/eo3F56C\njfVWnvqyz3r2f1G1VafvhhdlMJ4p35Hw1ms6nFTLx5dKwJW+Xve+qBU3Q5I5iV02\nAIXXBaqId/YqKXZd+Ge/XBmluXH929PtUOk=\n-----END CERTIFICATE-----\n
+"""
+
 class TLSchecks(unittest.TestCase):
 
     def test_defaults(self):
@@ -53,3 +56,8 @@ class TLSchecks(unittest.TestCase):
 
         # invalid certificate
         self.assertRaises(ServerConfigException, get_cert_issuer_info, invalid_cert)
+
+        # expired certificate
+        self.assertRaisesRegex(ServerConfigException,
+                               'Certificate issued by "Ceph/cephadm" expired',
+                               verify_cacrt_content, expired_cert)
diff --git a/src/python-common/ceph/pybind/__init__.py b/src/python-common/ceph/pybind/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/python-common/ceph/pybind/mgr/__init__.py b/src/python-common/ceph/pybind/mgr/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/python-common/ceph/pybind/mgr/cryptotools.py b/src/python-common/ceph/pybind/mgr/cryptotools.py
new file mode 100644 (file)
index 0000000..c14f9b2
--- /dev/null
@@ -0,0 +1,197 @@
+"""
+This file has been isolated into a module so that it can be run
+in a subprocess therefore sidestepping the
+`PyO3 modules may only be initialized once per interpreter process` problem.
+"""
+
+import argparse
+import bcrypt
+import datetime
+import json
+import sys
+import warnings
+
+from argparse import Namespace
+from OpenSSL import crypto, SSL
+from uuid import uuid4
+from typing import Tuple, Optional
+
+
+# subcommand functions
+def password_hash(args: Namespace) -> None:
+    data = json.loads(sys.stdin.read())
+
+    password = data['password']
+    salt_password = data['salt_password']
+
+    if not salt_password:
+        salt = bcrypt.gensalt()
+    else:
+        salt = salt_password.encode('utf8')
+
+    print(bcrypt.hashpw(password.encode('utf8'), salt).decode())
+
+
+def create_self_signed_cert(args: Namespace) -> None:
+
+    # Generate private key
+    if args.private_key:
+        # create a key pair
+        pkey = crypto.PKey()
+        pkey.generate_key(crypto.TYPE_RSA, 2048)
+        print(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode())
+        return
+
+    data = json.loads(sys.stdin.read())
+
+    dname = data['dname']
+    pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, data['private_key'])
+
+    # Create a "subject" object
+    with warnings.catch_warnings():
+        warnings.simplefilter("ignore")
+        req = crypto.X509Req()
+    subj = req.get_subject()
+
+    # populate the subject with the dname settings
+    for k, v in dname.items():
+        setattr(subj, k, v)
+
+    # create a self-signed cert
+    cert = crypto.X509()
+    cert.set_subject(req.get_subject())
+    cert.set_serial_number(int(uuid4()))
+    cert.gmtime_adj_notBefore(0)
+    cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)  # 10 years
+    cert.set_issuer(cert.get_subject())
+    cert.set_pubkey(pkey)
+    cert.sign(pkey, 'sha512')
+
+    print(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode())
+
+
+def _get_cert_issuer_info(crt: str) -> Tuple[Optional[str], Optional[str]]:
+    """Basic validation of a CA cert
+    """
+
+    crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+    (org_name, cn) = (None, None)
+    cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+    components = cert.get_issuer().get_components()
+    for c in components:
+        if c[0].decode() == 'O':  # org comp
+            org_name = c[1].decode()
+        elif c[0].decode() == 'CN':  # common name comp
+            cn = c[1].decode()
+
+    return (org_name, cn)
+
+
+def verify_cacrt_content(args: Namespace) -> None:
+    crt = sys.stdin.read()
+
+    crt_buffer = crt.encode("utf-8") if isinstance(crt, str) else crt
+    x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+    no_after = x509.get_notAfter()
+    if not no_after:
+        print("Certificate does not have an expiration date.", file=sys.stderr)
+        sys.exit(1)
+
+    end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ')
+
+    if x509.has_expired():
+        org, cn = _get_cert_issuer_info(crt)
+        msg = 'Certificate issued by "%s/%s" expired on %s' % (org, cn, end_date)
+        print(msg, file=sys.stderr)
+        sys.exit(1)
+
+    # Certificate still valid, calculate and return days until expiration
+    with warnings.catch_warnings():
+        warnings.simplefilter("ignore")
+        print((end_date - datetime.datetime.utcnow()).days)
+
+
+def get_cert_issuer_info(args: Namespace) -> None:
+    crt = sys.stdin.read()
+
+    crt_buffer = crt.encode("utf-8") if isinstance(crt, str) else crt
+    (org_name, cn) = (None, None)
+    cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+    components = cert.get_issuer().get_components()
+    for c in components:
+        if c[0].decode() == 'O':  # org comp
+            org_name = c[1].decode()
+        elif c[0].decode() == 'CN':  # common name comp
+            cn = c[1].decode()
+
+    if args.org_name:
+        print(org_name)
+
+    if args.cn:
+        print(cn)
+
+
+def verify_tls(args: Namespace) -> None:
+
+    data = json.loads(sys.stdin.read())
+
+    crt = data['crt']
+    key = data['key']
+
+    try:
+        _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
+        _key.check()
+    except (ValueError, crypto.Error) as e:
+        print('Invalid private key: %s' % str(e))
+    try:
+        crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+        _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+    except ValueError as e:
+        print('Invalid certificate key: %s' % str(e))
+
+    try:
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore")
+            context.use_certificate(_crt)
+            context.use_privatekey(_key)
+
+        context.check_privatekey()
+    except crypto.Error as e:
+        print('Private key and certificate do not match up: %s' % str(e))
+    except SSL.Error as e:
+        print(f'Invalid cert/key pair: {e}')
+
+
+if __name__ == "__main__":
+    # create the top-level parser
+    parser = argparse.ArgumentParser(prog='cryptotools.py')
+    subparsers = parser.add_subparsers(required=True)
+
+    # create the parser for the "password_hash" command
+    parser_foo = subparsers.add_parser('password_hash')
+    parser_foo.set_defaults(func=password_hash)
+
+    # create the parser for the "create_self_signed_cert" command
+    parser_bar = subparsers.add_parser('create_self_signed_cert')
+    parser_bar.add_argument('--private_key', required=False, action='store_true')
+    parser_bar.add_argument('--certificate', required=False, action='store_true')
+    parser_bar.set_defaults(func=create_self_signed_cert)
+
+    # create the parser for the "verify_cacrt_content" command
+    parser_bar = subparsers.add_parser('verify_cacrt_content')
+    parser_bar.set_defaults(func=verify_cacrt_content)
+
+    # create the parser for the "get_cert_issuer_info" command
+    parser_bar = subparsers.add_parser('get_cert_issuer_info')
+    parser_bar.add_argument('--org_name', required=False, action='store_true')
+    parser_bar.add_argument('--cn', required=False, action='store_true')
+    parser_bar.set_defaults(func=get_cert_issuer_info)
+
+    # create the parser for the "verify_tls" command
+    parser_bar = subparsers.add_parser('verify_tls')
+    parser_bar.set_defaults(func=verify_tls)
+
+    # parse the args and call whatever function was selected
+    args = parser.parse_args()
+    args.func(args)