]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
python-common/cryptotools: unify and organize all endpoint functions
authorJohn Mulligan <jmulligan@redhat.com>
Thu, 24 Apr 2025 18:36:58 +0000 (14:36 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Mon, 7 Jul 2025 13:32:24 +0000 (09:32 -0400)
Lightly reorganize and make the "endpoint" functions in cryptotools.py more
consistent and uniform. Use small functions for input and output
handling so that the handling is done the same way throughout. Pass a
pre-constructed crypto caller via the args to then endpoint functions.
Make generating the private key it's own named function rather than
one single (and only) function with overloaded behavior controlled by
a cli switch.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/python-common/ceph/cryptotools/cryptotools.py
src/python-common/ceph/cryptotools/remote.py

index 979e664c1d3fe3db0406fe7425668921ae9fbbda..1466d4b606db0f54c239266781d9d590f79b69e3 100644 (file)
@@ -138,80 +138,88 @@ class InternalCryptoCaller:
             self.fail(f'Invalid cert/key pair: {e}')
 
 
-# subcommand functions
-def password_hash(args: Namespace) -> None:
-    data = json.loads(sys.stdin.read())
+def _read() -> str:
+    return sys.stdin.read()
+
+
+def _load() -> Dict[str, Any]:
+    return json.loads(_read())
+
+
+def _respond(data: Dict[str, Any]) -> None:
+    json.dump(data, sys.stdout)
+
+
+def _write(content: str) -> None:
+    sys.stdout.write(content)
+    sys.stdout.flush()
+
+
+def _fail(msg: str, code: int = 0) -> Any:
+    json.dump({'error': msg}, sys.stdout)
+    sys.exit(code)
+
 
+def password_hash(args: Namespace) -> None:
+    data = _load()
     password = data['password']
     salt_password = data['salt_password']
-
-    hash_str = InternalCryptoCaller().password_hash(password, salt_password)
-    json.dump({'hash': hash_str}, sys.stdout)
+    hash_str = args.crypto.password_hash(password, salt_password)
+    _respond({'hash': hash_str})
 
 
 def verify_password(args: Namespace) -> None:
-    icc = InternalCryptoCaller()
-    data = json.loads(sys.stdin.read())
+    data = _load()
     password = data.get('password', '')
     hashed_password = data.get('hashed_password', '')
     try:
-        icc.verify_password(password, hashed_password)
+        ok = args.crypto.verify_password(password, hashed_password)
     except ValueError as err:
-        _fail_message(str(err))
-    json.dump({'ok': ok}, sys.stdout)
+        _fail(str(err))
+    _respond({'ok': ok})
+
+
+def create_private_key(args: Namespace) -> None:
+    _write(args.crypto.create_private_key())
 
 
 def create_self_signed_cert(args: Namespace) -> None:
-    icc = InternalCryptoCaller()
-    # Generate private key
-    if args.private_key:
-        # create a key pair
-        print(icc.create_private_key())
-        return
-
-    data = json.loads(sys.stdin.read())
+    data = _load()
     dname = data['dname']
-    print(icc.create_self_signed_cert(dname, data['private_key']))
+    private_key = data['private_key']
+    _write(args.crypto.create_self_signed_cert(dname, private_key))
 
 
 def certificate_days_to_expire(args: Namespace) -> None:
-    icc = InternalCryptoCaller()
-    crt = sys.stdin.read()
+    crt = _read()
     try:
-        days_until_exp = icc.certificate_days_to_expire(crt)
+        days_until_exp = args.crypto.certificate_days_to_expire(crt)
     except InternalError as err:
-        print(str(err), file=sys.stderr)
-        sys.exit(1)
-    json.dump({'days_until_expiration': days_until_exp}, sys.stdout)
+        _fail(str(err))
+    _respond({'days_until_expiration': days_until_exp})
 
 
 def get_cert_issuer_info(args: Namespace) -> None:
-    crt = sys.stdin.read()
-    org_name, cn = InternalCryptoCaller().get_cert_issuer_info(crt)
-    json.dump({'org_name': org_name, 'cn': cn}, sys.stdout)
-
-
-def _fail_message(msg: str) -> None:
-    json.dump({'error': msg}, sys.stdout)
-    sys.exit(0)
+    crt = _read()
+    org_name, cn = args.crypto.get_cert_issuer_info(crt)
+    _respond({'org_name': org_name, 'cn': cn})
 
 
 def verify_tls(args: Namespace) -> None:
-    data = json.loads(sys.stdin.read())
-
+    data = _load()
     crt = data['crt']
     key = data['key']
-
     try:
-        InternalCryptoCaller().verify_tls(crt, key)
+        args.crypto.verify_tls(crt, key)
     except ValueError as err:
-        json.dump({'error': str(err)}, sys.stdout)
-    json.dump({'ok': True}, sys.stdout)  # need to emit something on success
+        _fail(str(err))
+    _respond({'ok': True})  # need to emit something on success
 
 
-def main():
+def main() -> None:
     # create the top-level parser
     parser = argparse.ArgumentParser(prog='cryptotools.py')
+    parser.set_defaults(crypto=InternalCryptoCaller())
     subparsers = parser.add_subparsers(required=True)
 
     # create the parser for the "password_hash" command
@@ -220,14 +228,11 @@ def main():
 
     # create the parser for the "create_self_signed_cert" command
     parser_cssc = subparsers.add_parser('create_self_signed_cert')
-    parser_cssc.add_argument(
-        '--private_key', required=False, action='store_true'
-    )
-    parser_cssc.add_argument(
-        '--certificate', required=False, action='store_true'
-    )
     parser_cssc.set_defaults(func=create_self_signed_cert)
 
+    parser_cpk = subparsers.add_parser('create_private_key')
+    parser_cpk.set_defaults(func=create_private_key)
+
     # create the parser for the "certificate_days_to_expire" command
     parser_dte = subparsers.add_parser('certificate_days_to_expire')
     parser_dte.set_defaults(func=certificate_days_to_expire)
index 40e01d199128dbf05df82033bd87b0ff9c430ac6..76438b3d1321c9bd02d6b9f226bb59133d0b5c34 100644 (file)
@@ -97,7 +97,7 @@ class CryptoCaller:
     def create_private_key(self) -> str:
         """Create a new TLS private key, returning it as a string."""
         result = self._run(
-            ['create_self_signed_cert', '--private_key'],
+            ['create_private_key'],
             capture_output=True,
             check=True,
         )