]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
Revert "python-common/cryptotools: add funcs for call_home_agent crypto activities"
authorYaarit Hatuka <yhatuka@ibm.com>
Fri, 31 Oct 2025 21:02:26 +0000 (17:02 -0400)
committerYaarit Hatuka <yhatuka@redhat.com>
Mon, 17 Nov 2025 05:36:44 +0000 (05:36 +0000)
This reverts commit 21230c1d73dd6c684a382f3b19bc043a17ddcc2e.

Resolves: rhbz#2408379

Signed-off-by: Yaarit Hatuka <yhatuka@ibm.com>
src/pybind/mgr/call_home_agent/module.py
src/python-common/ceph/cryptotools/caller.py
src/python-common/ceph/cryptotools/cryptotools.py
src/python-common/ceph/cryptotools/internal.py
src/python-common/ceph/cryptotools/remote.py

index a31f1184d9e046d7497caa38c6af9f777787a474..55e6a90c3147a7f1817cc1df248d89c1ffc87491 100644 (file)
@@ -14,10 +14,10 @@ import datetime
 import json
 import os
 import sys
-from cryptography.hazmat.primitives.ciphers.aead import AESGCM
-import base64
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+import base64
 import re
-import jwt
+import jwt
 import traceback
 import requests
 import sched
@@ -27,8 +27,6 @@ import threading
 import importlib.util
 import pathlib
 
-from ceph.cryptotools.select import get_crypto_caller
-
 class URUploadSnap:
     def __init__(self, agent, req: dict):
         self._req = req
@@ -357,10 +355,8 @@ class CallHomeAgent(MgrModule):
             user_jwt_password = r"{}".format(reg_credentials['password'])
             registry_url = reg_credentials['url']
             if re.match(self.valid_container_registry, registry_url):
-                cc = get_crypto_caller()
-                jwt_jti = cc.call_home_decrypt_jwt_password(user_jwt_password)
-                # jwt_jti = jwt.decode(user_jwt_password, options={
-                #                     "verify_signature": False})["jti"]
+                jwt_jti = jwt.decode(user_jwt_password, options={
+                                    "verify_signature": False})["jti"]
                 self.log.info("JWT jti field extracted succesfully")
             else:
                 jti_token_fail = f"url for registry credentials stored in <mgr/cephadm/registry_url> does not match with the expected ones <{self.valid_container_registry}>"
@@ -631,12 +627,10 @@ class CallHomeAgent(MgrModule):
 
         try:
             encrypted_keys = self._load_encrypted_keys()
-            cc = get_crypto_caller()
-            clear_keys = cc.decrypt_call_home_encrypted_keys(decryption_key, decryption_nonce, encrypted_keys)
-            # aes_key = base64.b64decode(decryption_key)
-            # nonce = base64.b64decode(decyption_nonce)
-            # aesgcm = AESGCM(aes_key)
-            # clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
+            aes_key = base64.b64decode(decryption_key)
+            nonce = base64.b64decode(decyption_nonce)
+            aesgcm = AESGCM(aes_key)
+            clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
             keys = json.loads(clear_keys)
             return keys
         except Exception as e:
index 52c5ce606bb7b1de301be754ef10f74f83201bf6..42147e5573b8e95a5a753ce8f0df52f301f2c8fa 100644 (file)
@@ -46,16 +46,3 @@ class CryptoCaller(abc.ABC):
     @abc.abstractmethod
     def verify_password(self, password: str, hashed_password: str) -> bool:
         """Return true if a password and hash match."""
-
-    @abc.abstractmethod
-    def decrypt_call_home_encrypted_keys(
-        self,
-        decryption_key: str,
-        decryption_nonce: str,
-        encrypted_keys: bytes
-    ) -> str:
-        """Return call home key decrypted but still as a json string"""
-
-    @abc.abstractmethod
-    def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
-        """Decrypt encrypted call home jwt user password"""
index ff92b5cb72dc3a9c36e94e4cd74dd6216588fc8f..4aae0d8c9336bd32d1c9006563ee8cab5f0e8404 100644 (file)
@@ -93,24 +93,6 @@ def verify_tls(args: Namespace) -> None:
     _respond({'ok': True})  # need to emit something on success
 
 
-def decrypt_call_home_encrypted_keys(args: Namespace) -> None:
-    data = _load()
-    decryption_key = data['decryption_key']
-    decryption_nonce = data['decryption_nonce']
-    encrypted_keys = data['encrypted_keys']
-    decrypted_json_encoded_keys = args.crypto.decrypt_call_home_encrypted_keys(
-        decryption_key, decryption_nonce, encrypted_keys
-    )
-    _respond({'decrypted_json_encoded_keys': decrypted_json_encoded_keys})
-
-
-def call_home_decrypt_jwt_password(args: Namespace) -> None:
-    data = _load()
-    user_jwt_password = data['user_jwt_password']
-    decrypted_user_jwt_password = args.crypto.call_home_decrypt_jwt_password(user_jwt_password)
-    _respond({'decrypted_jwt_user_password': decrypted_user_jwt_password})
-
-
 def main() -> None:
     # create the top-level parser
     parser = argparse.ArgumentParser(prog='cryptotools.py')
@@ -144,14 +126,6 @@ def main() -> None:
     parser_verify_password = subparsers.add_parser('verify_password')
     parser_verify_password.set_defaults(func=verify_password)
 
-    # call home specific for decoding secrets
-    parser_call_home_decrypt_secrets = subparsers.add_parser('decrypt_call_home_encrypted_keys')
-    parser_call_home_decrypt_secrets.set_defaults(func=decrypt_call_home_encrypted_keys)
-
-    # call home specific for decoding jwt user password
-    parser_call_home_decrypt_jwt_password = subparsers.add_parser('call_home_decrypt_jwt_password')
-    parser_call_home_decrypt_jwt_password.set_defaults(func=call_home_decrypt_jwt_password)
-
     # parse the args and call whatever function was selected
     args = parser.parse_args()
     args.func(args)
index 71469cb193e38a78f0ae54484bb517de8ccae011..7d6e0a487ecc940bb050f1622168438b2c3ea7ff 100644 (file)
@@ -10,10 +10,6 @@ import warnings
 from OpenSSL import crypto, SSL
 import bcrypt
 
-# for call_home_agent
-import base64
-import jwt  # type: ignore
-from cryptography.hazmat.primitives.ciphers.aead import AESGCM  # type: ignore
 
 from .caller import CryptoCaller, CryptoCallError
 
@@ -136,20 +132,3 @@ class InternalCryptoCaller(CryptoCaller):
             )
         except SSL.Error as e:
             self.fail(f'Invalid cert/key pair: {e}')
-
-    def decrypt_call_home_encrypted_keys(
-        self,
-        decryption_key: str,
-        decryption_nonce: str,
-        encrypted_keys: bytes
-    ) -> str:
-        aes_key = base64.b64decode(decryption_key)
-        nonce = base64.b64decode(decryption_nonce)
-        aesgcm = AESGCM(aes_key)
-        clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
-        return clear_keys
-
-    def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
-        jwt_jti = jwt.decode(user_jwt_password, options={
-                            "verify_signature": False})["jti"]
-        return jwt_jti
index 2ac15f294219f535a30294791ef5adb6a24c8356..2574b4ecdac215624e43312a799564dfed81d4fe 100644 (file)
@@ -181,36 +181,3 @@ class ProcessCryptoCaller(CryptoCaller):
         result_obj = self._result_json(result)
         ok = result_obj.get("ok", False)
         return ok
-
-    def decrypt_call_home_encrypted_keys(
-        self,
-        decryption_key: str,
-        decryption_nonce: str,
-        encrypted_keys: bytes
-    ) -> str:
-        key_data = {
-            "decryption_key": decryption_key,
-            "decryption_nonce": decryption_nonce,
-            "encrypted_keys": encrypted_keys
-        }
-        result = self._run(
-            ["decrypt_call_home_encrypted_keys"],
-            input_data=json.dumps(key_data),
-            capture_output=True,
-            check=True,
-        )
-        result_obj = self._result_json(result)
-        decrypted_json_encoded_keys = result_obj.get("decrypted_json_encoded_keys", '')
-        return decrypted_json_encoded_keys
-
-    def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
-        pwd_data = {"user_jwt_password": user_jwt_password}
-        result = self._run(
-            ["call_home_decrypt_jwt_password"],
-            input_data=json.dumps(pwd_data),
-            capture_output=True,
-            check=True,
-        )
-        result_obj = self._result_json(result)
-        decrypted_jwt_user_password = result_obj.get("decrypted_jwt_user_password", '')
-        return decrypted_jwt_user_password