From 982edc02b2ab5c5cf49c1171db98f6b87c5956f7 Mon Sep 17 00:00:00 2001 From: "Paulo E. Castro" Date: Sat, 5 Apr 2025 21:47:55 +0100 Subject: [PATCH] pybind/mgr: Hack around the 'ImportError: PyO3 modules may only be initialized once per interpreter process' issue. Fixes: https://tracker.ceph.com/issues/64213 Signed-off-by: Paulo E. Castro (cherry picked from commit 5b2aa8f8c61d7c2a56e1480c479801079a1ff822) --- src/pybind/mgr/cephadm/tests/test_cephadm.py | 2 - src/pybind/mgr/mgr_util.py | 209 ++++++++++-------- src/pybind/mgr/tests/test_tls.py | 10 +- src/python-common/ceph/pybind/__init__.py | 0 src/python-common/ceph/pybind/mgr/__init__.py | 0 .../ceph/pybind/mgr/cryptotools.py | 197 +++++++++++++++++ 6 files changed, 319 insertions(+), 99 deletions(-) create mode 100644 src/python-common/ceph/pybind/__init__.py create mode 100644 src/python-common/ceph/pybind/mgr/__init__.py create mode 100644 src/python-common/ceph/pybind/mgr/cryptotools.py diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index e38d8f9c10b..22e2936200c 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -2099,12 +2099,10 @@ class TestCephadm(object): ), CephadmOrchestrator.apply_container), ] ) - @mock.patch("subprocess.run", None) @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) @mock.patch("cephadm.services.nfs.NFSService.create_rados_config_obj", mock.MagicMock()) @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) - @mock.patch("subprocess.run", mock.MagicMock()) def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'test'): with with_service(cephadm_module, spec, meth, 'test'): diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index 8d71dd69128..94c52e1a399 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -3,7 +3,6 @@ import os if 'UNITTEST' in os.environ: import tests -import bcrypt import cephfs import contextlib import datetime @@ -531,20 +530,9 @@ def create_self_signed_cert(organisation: str = 'Ceph', """ - from OpenSSL import crypto - from uuid import uuid4 - # RDN = Relative Distinguished Name valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress'] - # create a key pair - pkey = crypto.PKey() - pkey.generate_key(crypto.TYPE_RSA, 2048) - - # Create a "subject" object - req = crypto.X509Req() - subj = req.get_subject() - if dname: # dname received, so check it contains valid RDNs if not all(field in valid_RDN_list for field in dname): @@ -552,44 +540,49 @@ def create_self_signed_cert(organisation: str = 'Ceph', else: dname = {"O": organisation, "CN": common_name} - # populate the subject with the dname settings - for k, v in dname.items(): - setattr(subj, k, v) + import json + import subprocess + + private_key = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--private_key"], + capture_output=True) + + pkey = private_key.stdout.strip().decode('utf-8') - # create a self-signed cert - cert = crypto.X509() - cert.set_subject(req.get_subject()) - cert.set_serial_number(int(uuid4())) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years - cert.set_issuer(cert.get_subject()) - cert.set_pubkey(pkey) - cert.sign(pkey, 'sha512') + data = {"dname": dname, "private_key": pkey} - cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) - pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) + result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "create_self_signed_cert", "--certificate"], + input=json.dumps(data).encode("utf-8"), + capture_output=True) - return cert.decode('utf-8'), pkey.decode('utf-8') + # Check result with a CompletedProcess + if result.returncode != 0 or result.stderr != b'': + raise ValueError(result.stderr) + + cert = result.stdout.strip().decode('utf-8') + return cert, pkey def verify_cacrt_content(crt): - # type: (str) -> None - from OpenSSL import crypto + # type: (str) -> int + try: - crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) - if x509.has_expired(): - org, cn = get_cert_issuer_info(crt) - no_after = x509.get_notAfter() - end_date = None - if no_after is not None: - end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ') - msg = f'Certificate issued by "{org}/{cn}" expired on {end_date}' - logger.warning(msg) - raise ServerConfigException(msg) - except (ValueError, crypto.Error) as e: + import subprocess + result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_cacrt_content"], + input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), + capture_output=True) + # The above script will only produce stdout output. + # The only scenarios that produce stderr output are failures to import modules + # or syntax errors which test_tls.py will catch + + # Check result of CompletedProcess + if result.returncode != 0 or result.stderr != b'': + logger.warning(result.stderr) + raise ValueError(result.stderr) + except (ValueError) as e: raise ServerConfigException(f'Invalid certificate: {e}') + return int(result.stdout.strip().decode('utf-8')) + def verify_cacrt(cert_fname): # type: (str) -> None @@ -610,49 +603,52 @@ def verify_cacrt(cert_fname): def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]: """Basic validation of a ca cert""" - from OpenSSL import crypto, SSL try: - crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt - (org_name, cn) = (None, None) - cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) - components = cert.get_issuer().get_components() - for c in components: - if c[0].decode() == 'O': # org comp - org_name = c[1].decode() - elif c[0].decode() == 'CN': # common name comp - cn = c[1].decode() - return (org_name, cn) - except (ValueError, crypto.Error) as e: + import subprocess + org_name_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--org_name"], + input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), + capture_output=True) + + # Check result with a CompletedProcess + if org_name_proc.returncode != 0 or org_name_proc.stderr != b'': + raise ValueError(org_name_proc.stderr) + + cn_proc = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "get_cert_issuer_info", "--cn"], + input=crt if isinstance(crt, bytes) else crt.encode('utf-8'), + capture_output=True) + + # Check result with a CompletedProcess + if cn_proc.returncode != 0 or cn_proc.stderr != b'': + raise ValueError(cn_proc.stderr) + + org_name, cn = org_name_proc.stdout.strip().decode('utf-8'), cn_proc.stdout.strip().decode('utf-8') + + except (ValueError) as e: raise ServerConfigException(f'Invalid certificate key: {e}') + return (org_name, cn) def verify_tls(crt, key): # type: (str, str) -> None verify_cacrt_content(crt) - from OpenSSL import crypto, SSL try: - _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key) - _key.check() - except (ValueError, crypto.Error) as e: - raise ServerConfigException( - 'Invalid private key: {}'.format(str(e))) - try: - crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt - _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) - except ValueError as e: - raise ServerConfigException( - 'Invalid certificate key: {}'.format(str(e)) - ) - - try: - context = SSL.Context(SSL.TLSv1_METHOD) - context.use_certificate(_crt) - context.use_privatekey(_key) - context.check_privatekey() - except crypto.Error as e: - logger.warning('Private key and certificate do not match up: {}'.format(str(e))) - except SSL.Error as e: - raise ServerConfigException(f'Invalid cert/key pair: {e}') + import subprocess + import json + + data = { + "crt": crt.decode("utf-8") if isinstance(crt, bytes) else crt, # type: ignore[attr-defined] + "key": key.decode("utf-8") if isinstance(key, bytes) else key # type: ignore[attr-defined] + } + result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "verify_tls"], + input=json.dumps(data).encode("utf-8"), + capture_output=True) + + # Check result of CompletedProcess + if result.returncode != 0 or result.stdout != b'': + logger.warning(result.stdout) + raise ServerConfigException(result.stdout) + except (ServerConfigException) as e: + raise ServerConfigException(f'Invalid certificate: {e}') @@ -681,24 +677,14 @@ def verify_tls_files(cert_fname, pkey_fname): if not os.path.isfile(pkey_fname): raise ServerConfigException('private key %s does not exist' % pkey_fname) - from OpenSSL import crypto, SSL + if not os.path.isfile(cert_fname): + raise ServerConfigException('certificate %s does not exist' % cert_fname) try: - with open(pkey_fname) as f: - pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read()) - pkey.check() - except (ValueError, crypto.Error) as e: - raise ServerConfigException( - 'Invalid private key {}: {}'.format(pkey_fname, str(e))) - try: - context = SSL.Context(SSL.TLSv1_METHOD) - context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM) - context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM) - context.check_privatekey() - except crypto.Error as e: - logger.warning( - 'Private key {} and certificate {} do not match up: {}'.format( - pkey_fname, cert_fname, str(e))) + with open(pkey_fname) as key_file, open(cert_fname) as cert_file: + verify_tls(cert_file.read(), key_file.read()) + except (ServerConfigException) as e: + raise ServerConfigException({e}) def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float: @@ -876,11 +862,42 @@ def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], return outer +def parse_combined_pem_file(pem_data: str) -> Tuple[Optional[str], Optional[str]]: + + # Extract the certificate + cert_start = "-----BEGIN CERTIFICATE-----" + cert_end = "-----END CERTIFICATE-----" + cert = None + if cert_start in pem_data and cert_end in pem_data: + cert = pem_data[pem_data.index(cert_start):pem_data.index(cert_end) + len(cert_end)] + + # Extract the private key + key_start = "-----BEGIN PRIVATE KEY-----" + key_end = "-----END PRIVATE KEY-----" + private_key = None + if key_start in pem_data and key_end in pem_data: + private_key = pem_data[pem_data.index(key_start):pem_data.index(key_end) + len(key_end)] + + return cert, private_key + + def password_hash(password: Optional[str], salt_password: Optional[str] = None) -> Optional[str]: if not password: return None + if not salt_password: - salt = bcrypt.gensalt() - else: - salt = salt_password.encode('utf8') - return bcrypt.hashpw(password.encode('utf8'), salt).decode('utf8') + salt_password = '' + + import subprocess + import json + + data = {"password": password, "salt_password": salt_password} + result = subprocess.run(["python3", "-m", "ceph.pybind.mgr.cryptotools", "password_hash"], + input=json.dumps(data).encode("utf-8"), + capture_output=True) + + # Check result with a CompletedProcess + if result.returncode != 0 or result.stderr != b'': + raise ValueError(result.stderr) + + return result.stdout.strip().decode('utf-8') diff --git a/src/pybind/mgr/tests/test_tls.py b/src/pybind/mgr/tests/test_tls.py index 19ce46a93fd..39ba5ae0a03 100644 --- a/src/pybind/mgr/tests/test_tls.py +++ b/src/pybind/mgr/tests/test_tls.py @@ -1,4 +1,4 @@ -from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException, get_cert_issuer_info +from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException, get_cert_issuer_info, verify_cacrt_content from OpenSSL import crypto, SSL import unittest @@ -10,6 +10,9 @@ valid_ceph_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0S invalid_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEBn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZa\n-----END CERTIFICATE-----\n """ +expired_cert = """-----BEGIN CERTIFICATE-----\nMIICtjCCAZ4CAQAwDQYJKoZIhvcNAQENBQAwITEQMA4GA1UEAwwHY2VwaGFkbTEN\nMAsGA1UECgwEQ2VwaDAeFw0xNTAyMTYxOTQ4MTdaFw0yMDAyMTUxOTQ4MTdaMCEx\nEDAOBgNVBAMMB2NlcGhhZG0xDTALBgNVBAoMBENlcGgwggEiMA0GCSqGSIb3DQEB\nAQUAA4IBDwAwggEKAoIBAQCxYHJ6RlPeuhZJyAMR1ru01BEGbwhI7vMga8pwyTX8\nNn1ow2asbj7lad+jO5j5Gon8GFwsrKM0S8vmITxd5QkshnHPQRQF8hz4aieNOQiL\nnVRBTHgLihEBJCpyuTmHLn1G374ZObuFqyHcnIrKNdeKb0JxNKbx26/2NrWwFGAe\nAj5KuizMHJMZYVLfYelP4g2hSRPe2JJWI4429LeLWuBQBL9t/IPY0IlmFDP4eL+S\nB2Py8Ig2XY5oyaaxpwI8H/cAY92lsoHPI3ldDn1JEiH5Gwzxf+9fF29cesp8BYcm\naav1jT8ONvsfn7AxKDKcfZIpRNKlOqFIC03gG5R3O1iHAgMBAAEwDQYJKoZIhvcN\nAQENBQADggEBADh9bAMR7RIK3M3u6LoTQQrcoxJ0pEXBrFQGQk2uz2krlDTKRS+2\nubwD8bLNd3dl5RzvVJ1hui8y9JMnqYwgMrjR9B0EDUM/ihYU2zO3ZN9nhhnTN2qT\n+UtFtyilg3U4nQdWGw2jFPu08JPoF/g+7iBH+/o5WOfzOovjLg4BsVlKUP4ND8Dv\nXr8gxZTlaoZvZlhMCdhiT2aKstCA9R3RYBbEo/FtcsHOkO0EFuxCLiVd/eo3F56C\njfVWnvqyz3r2f1G1VafvhhdlMJ4p35Hw1ms6nFTLx5dKwJW+Xve+qBU3Q5I5iV02\nAIXXBaqId/YqKXZd+Ge/XBmluXH929PtUOk=\n-----END CERTIFICATE-----\n +""" + class TLSchecks(unittest.TestCase): def test_defaults(self): @@ -53,3 +56,8 @@ class TLSchecks(unittest.TestCase): # invalid certificate self.assertRaises(ServerConfigException, get_cert_issuer_info, invalid_cert) + + # expired certificate + self.assertRaisesRegex(ServerConfigException, + 'Certificate issued by "Ceph/cephadm" expired', + verify_cacrt_content, expired_cert) diff --git a/src/python-common/ceph/pybind/__init__.py b/src/python-common/ceph/pybind/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/python-common/ceph/pybind/mgr/__init__.py b/src/python-common/ceph/pybind/mgr/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/python-common/ceph/pybind/mgr/cryptotools.py b/src/python-common/ceph/pybind/mgr/cryptotools.py new file mode 100644 index 00000000000..c14f9b2a453 --- /dev/null +++ b/src/python-common/ceph/pybind/mgr/cryptotools.py @@ -0,0 +1,197 @@ +""" +This file has been isolated into a module so that it can be run +in a subprocess therefore sidestepping the +`PyO3 modules may only be initialized once per interpreter process` problem. +""" + +import argparse +import bcrypt +import datetime +import json +import sys +import warnings + +from argparse import Namespace +from OpenSSL import crypto, SSL +from uuid import uuid4 +from typing import Tuple, Optional + + +# subcommand functions +def password_hash(args: Namespace) -> None: + data = json.loads(sys.stdin.read()) + + password = data['password'] + salt_password = data['salt_password'] + + if not salt_password: + salt = bcrypt.gensalt() + else: + salt = salt_password.encode('utf8') + + print(bcrypt.hashpw(password.encode('utf8'), salt).decode()) + + +def create_self_signed_cert(args: Namespace) -> None: + + # Generate private key + if args.private_key: + # create a key pair + pkey = crypto.PKey() + pkey.generate_key(crypto.TYPE_RSA, 2048) + print(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey).decode()) + return + + data = json.loads(sys.stdin.read()) + + dname = data['dname'] + pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, data['private_key']) + + # Create a "subject" object + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + req = crypto.X509Req() + subj = req.get_subject() + + # populate the subject with the dname settings + for k, v in dname.items(): + setattr(subj, k, v) + + # create a self-signed cert + cert = crypto.X509() + cert.set_subject(req.get_subject()) + cert.set_serial_number(int(uuid4())) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(pkey) + cert.sign(pkey, 'sha512') + + print(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode()) + + +def _get_cert_issuer_info(crt: str) -> Tuple[Optional[str], Optional[str]]: + """Basic validation of a CA cert + """ + + crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt + (org_name, cn) = (None, None) + cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) + components = cert.get_issuer().get_components() + for c in components: + if c[0].decode() == 'O': # org comp + org_name = c[1].decode() + elif c[0].decode() == 'CN': # common name comp + cn = c[1].decode() + + return (org_name, cn) + + +def verify_cacrt_content(args: Namespace) -> None: + crt = sys.stdin.read() + + crt_buffer = crt.encode("utf-8") if isinstance(crt, str) else crt + x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) + no_after = x509.get_notAfter() + if not no_after: + print("Certificate does not have an expiration date.", file=sys.stderr) + sys.exit(1) + + end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ') + + if x509.has_expired(): + org, cn = _get_cert_issuer_info(crt) + msg = 'Certificate issued by "%s/%s" expired on %s' % (org, cn, end_date) + print(msg, file=sys.stderr) + sys.exit(1) + + # Certificate still valid, calculate and return days until expiration + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + print((end_date - datetime.datetime.utcnow()).days) + + +def get_cert_issuer_info(args: Namespace) -> None: + crt = sys.stdin.read() + + crt_buffer = crt.encode("utf-8") if isinstance(crt, str) else crt + (org_name, cn) = (None, None) + cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) + components = cert.get_issuer().get_components() + for c in components: + if c[0].decode() == 'O': # org comp + org_name = c[1].decode() + elif c[0].decode() == 'CN': # common name comp + cn = c[1].decode() + + if args.org_name: + print(org_name) + + if args.cn: + print(cn) + + +def verify_tls(args: Namespace) -> None: + + data = json.loads(sys.stdin.read()) + + crt = data['crt'] + key = data['key'] + + try: + _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key) + _key.check() + except (ValueError, crypto.Error) as e: + print('Invalid private key: %s' % str(e)) + try: + crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt + _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer) + except ValueError as e: + print('Invalid certificate key: %s' % str(e)) + + try: + context = SSL.Context(SSL.TLSv1_METHOD) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + context.use_certificate(_crt) + context.use_privatekey(_key) + + context.check_privatekey() + except crypto.Error as e: + print('Private key and certificate do not match up: %s' % str(e)) + except SSL.Error as e: + print(f'Invalid cert/key pair: {e}') + + +if __name__ == "__main__": + # create the top-level parser + parser = argparse.ArgumentParser(prog='cryptotools.py') + subparsers = parser.add_subparsers(required=True) + + # create the parser for the "password_hash" command + parser_foo = subparsers.add_parser('password_hash') + parser_foo.set_defaults(func=password_hash) + + # create the parser for the "create_self_signed_cert" command + parser_bar = subparsers.add_parser('create_self_signed_cert') + parser_bar.add_argument('--private_key', required=False, action='store_true') + parser_bar.add_argument('--certificate', required=False, action='store_true') + parser_bar.set_defaults(func=create_self_signed_cert) + + # create the parser for the "verify_cacrt_content" command + parser_bar = subparsers.add_parser('verify_cacrt_content') + parser_bar.set_defaults(func=verify_cacrt_content) + + # create the parser for the "get_cert_issuer_info" command + parser_bar = subparsers.add_parser('get_cert_issuer_info') + parser_bar.add_argument('--org_name', required=False, action='store_true') + parser_bar.add_argument('--cn', required=False, action='store_true') + parser_bar.set_defaults(func=get_cert_issuer_info) + + # create the parser for the "verify_tls" command + parser_bar = subparsers.add_parser('verify_tls') + parser_bar.set_defaults(func=verify_tls) + + # parse the args and call whatever function was selected + args = parser.parse_args() + args.func(args) -- 2.39.5