From: Justin Caratzas Date: Mon, 6 Oct 2025 23:25:44 +0000 (-0400) Subject: python-common/cryptotools: unify and organize all endpoint functions X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=93752ef2d228b024dca436b052b7c71bfe48692e;p=ceph-ci.git python-common/cryptotools: unify and organize all endpoint functions Lightly reorganize and make the "endpoint" functions in cryptotools.py more consistent and uniform. Use small functions for input and output handling so that the handling is done the same way throughout. Pass a pre-constructed crypto caller via the args to then endpoint functions. Make generating the private key it's own named function rather than one single (and only) function with overloaded behavior controlled by a cli switch. Signed-off-by: John Mulligan (cherry picked from commit 552d7b4373afa1a93fe47ce234560b9c8485321d) Resolves: rhbz#2401206 --- diff --git a/src/python-common/ceph/cryptotools/cryptotools.py b/src/python-common/ceph/cryptotools/cryptotools.py index 979e664c1d3..1466d4b606d 100644 --- a/src/python-common/ceph/cryptotools/cryptotools.py +++ b/src/python-common/ceph/cryptotools/cryptotools.py @@ -138,80 +138,88 @@ class InternalCryptoCaller: self.fail(f'Invalid cert/key pair: {e}') -# subcommand functions -def password_hash(args: Namespace) -> None: - data = json.loads(sys.stdin.read()) +def _read() -> str: + return sys.stdin.read() + + +def _load() -> Dict[str, Any]: + return json.loads(_read()) + + +def _respond(data: Dict[str, Any]) -> None: + json.dump(data, sys.stdout) + + +def _write(content: str) -> None: + sys.stdout.write(content) + sys.stdout.flush() + + +def _fail(msg: str, code: int = 0) -> Any: + json.dump({'error': msg}, sys.stdout) + sys.exit(code) + +def password_hash(args: Namespace) -> None: + data = _load() password = data['password'] salt_password = data['salt_password'] - - hash_str = InternalCryptoCaller().password_hash(password, salt_password) - json.dump({'hash': hash_str}, sys.stdout) + hash_str = args.crypto.password_hash(password, salt_password) + _respond({'hash': hash_str}) def verify_password(args: Namespace) -> None: - icc = InternalCryptoCaller() - data = json.loads(sys.stdin.read()) + data = _load() password = data.get('password', '') hashed_password = data.get('hashed_password', '') try: - icc.verify_password(password, hashed_password) + ok = args.crypto.verify_password(password, hashed_password) except ValueError as err: - _fail_message(str(err)) - json.dump({'ok': ok}, sys.stdout) + _fail(str(err)) + _respond({'ok': ok}) + + +def create_private_key(args: Namespace) -> None: + _write(args.crypto.create_private_key()) def create_self_signed_cert(args: Namespace) -> None: - icc = InternalCryptoCaller() - # Generate private key - if args.private_key: - # create a key pair - print(icc.create_private_key()) - return - - data = json.loads(sys.stdin.read()) + data = _load() dname = data['dname'] - print(icc.create_self_signed_cert(dname, data['private_key'])) + private_key = data['private_key'] + _write(args.crypto.create_self_signed_cert(dname, private_key)) def certificate_days_to_expire(args: Namespace) -> None: - icc = InternalCryptoCaller() - crt = sys.stdin.read() + crt = _read() try: - days_until_exp = icc.certificate_days_to_expire(crt) + days_until_exp = args.crypto.certificate_days_to_expire(crt) except InternalError as err: - print(str(err), file=sys.stderr) - sys.exit(1) - json.dump({'days_until_expiration': days_until_exp}, sys.stdout) + _fail(str(err)) + _respond({'days_until_expiration': days_until_exp}) def get_cert_issuer_info(args: Namespace) -> None: - crt = sys.stdin.read() - org_name, cn = InternalCryptoCaller().get_cert_issuer_info(crt) - json.dump({'org_name': org_name, 'cn': cn}, sys.stdout) - - -def _fail_message(msg: str) -> None: - json.dump({'error': msg}, sys.stdout) - sys.exit(0) + crt = _read() + org_name, cn = args.crypto.get_cert_issuer_info(crt) + _respond({'org_name': org_name, 'cn': cn}) def verify_tls(args: Namespace) -> None: - data = json.loads(sys.stdin.read()) - + data = _load() crt = data['crt'] key = data['key'] - try: - InternalCryptoCaller().verify_tls(crt, key) + args.crypto.verify_tls(crt, key) except ValueError as err: - json.dump({'error': str(err)}, sys.stdout) - json.dump({'ok': True}, sys.stdout) # need to emit something on success + _fail(str(err)) + _respond({'ok': True}) # need to emit something on success -def main(): +def main() -> None: # create the top-level parser parser = argparse.ArgumentParser(prog='cryptotools.py') + parser.set_defaults(crypto=InternalCryptoCaller()) subparsers = parser.add_subparsers(required=True) # create the parser for the "password_hash" command @@ -220,14 +228,11 @@ def main(): # create the parser for the "create_self_signed_cert" command parser_cssc = subparsers.add_parser('create_self_signed_cert') - parser_cssc.add_argument( - '--private_key', required=False, action='store_true' - ) - parser_cssc.add_argument( - '--certificate', required=False, action='store_true' - ) parser_cssc.set_defaults(func=create_self_signed_cert) + parser_cpk = subparsers.add_parser('create_private_key') + parser_cpk.set_defaults(func=create_private_key) + # create the parser for the "certificate_days_to_expire" command parser_dte = subparsers.add_parser('certificate_days_to_expire') parser_dte.set_defaults(func=certificate_days_to_expire) diff --git a/src/python-common/ceph/cryptotools/remote.py b/src/python-common/ceph/cryptotools/remote.py index 40e01d19912..76438b3d132 100644 --- a/src/python-common/ceph/cryptotools/remote.py +++ b/src/python-common/ceph/cryptotools/remote.py @@ -97,7 +97,7 @@ class CryptoCaller: def create_private_key(self) -> str: """Create a new TLS private key, returning it as a string.""" result = self._run( - ['create_self_signed_cert', '--private_key'], + ['create_private_key'], capture_output=True, check=True, )