]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
python-common/cryptotools: add funcs for call_home_agent crypto activities
authorAdam King <adking@redhat.com>
Fri, 10 Oct 2025 20:46:03 +0000 (16:46 -0400)
committerJustin Caratzas <jcaratza@redhat.com>
Wed, 15 Oct 2025 13:48:28 +0000 (09:48 -0400)
So that cephadm and the call_home_agent modules aren't both attempting
to import cryptography libraries that cause https://tracker.ceph.com/issues/64213

Resolves: rhbz#2402769

Signed-off-by: Adam King <adking@redhat.com>
src/pybind/mgr/call_home_agent/module.py
src/python-common/ceph/cryptotools/caller.py
src/python-common/ceph/cryptotools/cryptotools.py
src/python-common/ceph/cryptotools/internal.py
src/python-common/ceph/cryptotools/remote.py

index 21657f960de6f6168c7e604c51e9d4681a73ac6f..96c3f7be651d0281e681f1389b7467f541528ef3 100644 (file)
@@ -14,10 +14,10 @@ import datetime
 import json
 import os
 import sys
-from cryptography.hazmat.primitives.ciphers.aead import AESGCM
-import base64
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+import base64
 import re
-import jwt
+import jwt
 import traceback
 import requests
 import sched
@@ -25,6 +25,8 @@ import time
 #from threading import Event
 import threading
 
+from ceph.cryptotools.select import get_crypto_caller
+
 class URUploadSnap:
     def __init__(self, agent, req: dict):
         self._req = req
@@ -346,8 +348,10 @@ class CallHomeAgent(MgrModule):
             user_jwt_password = r"{}".format(reg_credentials['password'])
             registry_url = reg_credentials['url']
             if re.match(self.valid_container_registry, registry_url):
-                jwt_jti = jwt.decode(user_jwt_password, options={
-                                    "verify_signature": False})["jti"]
+                cc = get_crypto_caller()
+                jwt_jti = cc.call_home_decrypt_jwt_password(user_jwt_password)
+                # jwt_jti = jwt.decode(user_jwt_password, options={
+                #                     "verify_signature": False})["jti"]
                 self.log.info("JWT jti field extracted succesfully")
             else:
                 jti_token_fail = f"url for registry credentials stored in <mgr/cephadm/registry_url> does not match with the expected ones <{self.valid_container_registry}>"
@@ -525,10 +529,12 @@ class CallHomeAgent(MgrModule):
 
         try:
             encrypted_keys = self._load_encrypted_keys()
-            aes_key = base64.b64decode(decryption_key)
-            nonce = base64.b64decode(decyption_nonce)
-            aesgcm = AESGCM(aes_key)
-            clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
+            cc = get_crypto_caller()
+            clear_keys = cc.decrypt_call_home_encrypted_keys(decryption_key, decryption_nonce, encrypted_keys)
+            # aes_key = base64.b64decode(decryption_key)
+            # nonce = base64.b64decode(decyption_nonce)
+            # aesgcm = AESGCM(aes_key)
+            # clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
             keys = json.loads(clear_keys)
             return keys
         except Exception as e:
index 42147e5573b8e95a5a753ce8f0df52f301f2c8fa..52c5ce606bb7b1de301be754ef10f74f83201bf6 100644 (file)
@@ -46,3 +46,16 @@ class CryptoCaller(abc.ABC):
     @abc.abstractmethod
     def verify_password(self, password: str, hashed_password: str) -> bool:
         """Return true if a password and hash match."""
+
+    @abc.abstractmethod
+    def decrypt_call_home_encrypted_keys(
+        self,
+        decryption_key: str,
+        decryption_nonce: str,
+        encrypted_keys: bytes
+    ) -> str:
+        """Return call home key decrypted but still as a json string"""
+
+    @abc.abstractmethod
+    def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
+        """Decrypt encrypted call home jwt user password"""
index 4aae0d8c9336bd32d1c9006563ee8cab5f0e8404..ff92b5cb72dc3a9c36e94e4cd74dd6216588fc8f 100644 (file)
@@ -93,6 +93,24 @@ def verify_tls(args: Namespace) -> None:
     _respond({'ok': True})  # need to emit something on success
 
 
+def decrypt_call_home_encrypted_keys(args: Namespace) -> None:
+    data = _load()
+    decryption_key = data['decryption_key']
+    decryption_nonce = data['decryption_nonce']
+    encrypted_keys = data['encrypted_keys']
+    decrypted_json_encoded_keys = args.crypto.decrypt_call_home_encrypted_keys(
+        decryption_key, decryption_nonce, encrypted_keys
+    )
+    _respond({'decrypted_json_encoded_keys': decrypted_json_encoded_keys})
+
+
+def call_home_decrypt_jwt_password(args: Namespace) -> None:
+    data = _load()
+    user_jwt_password = data['user_jwt_password']
+    decrypted_user_jwt_password = args.crypto.call_home_decrypt_jwt_password(user_jwt_password)
+    _respond({'decrypted_jwt_user_password': decrypted_user_jwt_password})
+
+
 def main() -> None:
     # create the top-level parser
     parser = argparse.ArgumentParser(prog='cryptotools.py')
@@ -126,6 +144,14 @@ def main() -> None:
     parser_verify_password = subparsers.add_parser('verify_password')
     parser_verify_password.set_defaults(func=verify_password)
 
+    # call home specific for decoding secrets
+    parser_call_home_decrypt_secrets = subparsers.add_parser('decrypt_call_home_encrypted_keys')
+    parser_call_home_decrypt_secrets.set_defaults(func=decrypt_call_home_encrypted_keys)
+
+    # call home specific for decoding jwt user password
+    parser_call_home_decrypt_jwt_password = subparsers.add_parser('call_home_decrypt_jwt_password')
+    parser_call_home_decrypt_jwt_password.set_defaults(func=call_home_decrypt_jwt_password)
+
     # parse the args and call whatever function was selected
     args = parser.parse_args()
     args.func(args)
index 7d6e0a487ecc940bb050f1622168438b2c3ea7ff..71469cb193e38a78f0ae54484bb517de8ccae011 100644 (file)
@@ -10,6 +10,10 @@ import warnings
 from OpenSSL import crypto, SSL
 import bcrypt
 
+# for call_home_agent
+import base64
+import jwt  # type: ignore
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM  # type: ignore
 
 from .caller import CryptoCaller, CryptoCallError
 
@@ -132,3 +136,20 @@ class InternalCryptoCaller(CryptoCaller):
             )
         except SSL.Error as e:
             self.fail(f'Invalid cert/key pair: {e}')
+
+    def decrypt_call_home_encrypted_keys(
+        self,
+        decryption_key: str,
+        decryption_nonce: str,
+        encrypted_keys: bytes
+    ) -> str:
+        aes_key = base64.b64decode(decryption_key)
+        nonce = base64.b64decode(decryption_nonce)
+        aesgcm = AESGCM(aes_key)
+        clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
+        return clear_keys
+
+    def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
+        jwt_jti = jwt.decode(user_jwt_password, options={
+                            "verify_signature": False})["jti"]
+        return jwt_jti
index 2574b4ecdac215624e43312a799564dfed81d4fe..2ac15f294219f535a30294791ef5adb6a24c8356 100644 (file)
@@ -181,3 +181,36 @@ class ProcessCryptoCaller(CryptoCaller):
         result_obj = self._result_json(result)
         ok = result_obj.get("ok", False)
         return ok
+
+    def decrypt_call_home_encrypted_keys(
+        self,
+        decryption_key: str,
+        decryption_nonce: str,
+        encrypted_keys: bytes
+    ) -> str:
+        key_data = {
+            "decryption_key": decryption_key,
+            "decryption_nonce": decryption_nonce,
+            "encrypted_keys": encrypted_keys
+        }
+        result = self._run(
+            ["decrypt_call_home_encrypted_keys"],
+            input_data=json.dumps(key_data),
+            capture_output=True,
+            check=True,
+        )
+        result_obj = self._result_json(result)
+        decrypted_json_encoded_keys = result_obj.get("decrypted_json_encoded_keys", '')
+        return decrypted_json_encoded_keys
+
+    def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
+        pwd_data = {"user_jwt_password": user_jwt_password}
+        result = self._run(
+            ["call_home_decrypt_jwt_password"],
+            input_data=json.dumps(pwd_data),
+            capture_output=True,
+            check=True,
+        )
+        result_obj = self._result_json(result)
+        decrypted_jwt_user_password = result_obj.get("decrypted_jwt_user_password", '')
+        return decrypted_jwt_user_password