From: Yaarit Hatuka Date: Fri, 31 Oct 2025 21:02:26 +0000 (-0400) Subject: Revert "python-common/cryptotools: add funcs for call_home_agent crypto activities" X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fa60965af68637afe50117aede044372eabf945d;p=ceph-ci.git Revert "python-common/cryptotools: add funcs for call_home_agent crypto activities" This reverts commit 21230c1d73dd6c684a382f3b19bc043a17ddcc2e. Resolves: rhbz#2408379 Signed-off-by: Yaarit Hatuka --- diff --git a/src/pybind/mgr/call_home_agent/module.py b/src/pybind/mgr/call_home_agent/module.py index a31f1184d9e..55e6a90c314 100644 --- a/src/pybind/mgr/call_home_agent/module.py +++ b/src/pybind/mgr/call_home_agent/module.py @@ -14,10 +14,10 @@ import datetime import json import os import sys -# from cryptography.hazmat.primitives.ciphers.aead import AESGCM -# import base64 +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +import base64 import re -# import jwt +import jwt import traceback import requests import sched @@ -27,8 +27,6 @@ import threading import importlib.util import pathlib -from ceph.cryptotools.select import get_crypto_caller - class URUploadSnap: def __init__(self, agent, req: dict): self._req = req @@ -357,10 +355,8 @@ class CallHomeAgent(MgrModule): user_jwt_password = r"{}".format(reg_credentials['password']) registry_url = reg_credentials['url'] if re.match(self.valid_container_registry, registry_url): - cc = get_crypto_caller() - jwt_jti = cc.call_home_decrypt_jwt_password(user_jwt_password) - # jwt_jti = jwt.decode(user_jwt_password, options={ - # "verify_signature": False})["jti"] + jwt_jti = jwt.decode(user_jwt_password, options={ + "verify_signature": False})["jti"] self.log.info("JWT jti field extracted succesfully") else: jti_token_fail = f"url for registry credentials stored in does not match with the expected ones <{self.valid_container_registry}>" @@ -631,12 +627,10 @@ class CallHomeAgent(MgrModule): try: encrypted_keys = self._load_encrypted_keys() - cc = get_crypto_caller() - clear_keys = cc.decrypt_call_home_encrypted_keys(decryption_key, decryption_nonce, encrypted_keys) - # aes_key = base64.b64decode(decryption_key) - # nonce = base64.b64decode(decyption_nonce) - # aesgcm = AESGCM(aes_key) - # clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'') + aes_key = base64.b64decode(decryption_key) + nonce = base64.b64decode(decyption_nonce) + aesgcm = AESGCM(aes_key) + clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'') keys = json.loads(clear_keys) return keys except Exception as e: diff --git a/src/python-common/ceph/cryptotools/caller.py b/src/python-common/ceph/cryptotools/caller.py index 52c5ce606bb..42147e5573b 100644 --- a/src/python-common/ceph/cryptotools/caller.py +++ b/src/python-common/ceph/cryptotools/caller.py @@ -46,16 +46,3 @@ class CryptoCaller(abc.ABC): @abc.abstractmethod def verify_password(self, password: str, hashed_password: str) -> bool: """Return true if a password and hash match.""" - - @abc.abstractmethod - def decrypt_call_home_encrypted_keys( - self, - decryption_key: str, - decryption_nonce: str, - encrypted_keys: bytes - ) -> str: - """Return call home key decrypted but still as a json string""" - - @abc.abstractmethod - def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str: - """Decrypt encrypted call home jwt user password""" diff --git a/src/python-common/ceph/cryptotools/cryptotools.py b/src/python-common/ceph/cryptotools/cryptotools.py index ff92b5cb72d..4aae0d8c933 100644 --- a/src/python-common/ceph/cryptotools/cryptotools.py +++ b/src/python-common/ceph/cryptotools/cryptotools.py @@ -93,24 +93,6 @@ def verify_tls(args: Namespace) -> None: _respond({'ok': True}) # need to emit something on success -def decrypt_call_home_encrypted_keys(args: Namespace) -> None: - data = _load() - decryption_key = data['decryption_key'] - decryption_nonce = data['decryption_nonce'] - encrypted_keys = data['encrypted_keys'] - decrypted_json_encoded_keys = args.crypto.decrypt_call_home_encrypted_keys( - decryption_key, decryption_nonce, encrypted_keys - ) - _respond({'decrypted_json_encoded_keys': decrypted_json_encoded_keys}) - - -def call_home_decrypt_jwt_password(args: Namespace) -> None: - data = _load() - user_jwt_password = data['user_jwt_password'] - decrypted_user_jwt_password = args.crypto.call_home_decrypt_jwt_password(user_jwt_password) - _respond({'decrypted_jwt_user_password': decrypted_user_jwt_password}) - - def main() -> None: # create the top-level parser parser = argparse.ArgumentParser(prog='cryptotools.py') @@ -144,14 +126,6 @@ def main() -> None: parser_verify_password = subparsers.add_parser('verify_password') parser_verify_password.set_defaults(func=verify_password) - # call home specific for decoding secrets - parser_call_home_decrypt_secrets = subparsers.add_parser('decrypt_call_home_encrypted_keys') - parser_call_home_decrypt_secrets.set_defaults(func=decrypt_call_home_encrypted_keys) - - # call home specific for decoding jwt user password - parser_call_home_decrypt_jwt_password = subparsers.add_parser('call_home_decrypt_jwt_password') - parser_call_home_decrypt_jwt_password.set_defaults(func=call_home_decrypt_jwt_password) - # parse the args and call whatever function was selected args = parser.parse_args() args.func(args) diff --git a/src/python-common/ceph/cryptotools/internal.py b/src/python-common/ceph/cryptotools/internal.py index 71469cb193e..7d6e0a487ec 100644 --- a/src/python-common/ceph/cryptotools/internal.py +++ b/src/python-common/ceph/cryptotools/internal.py @@ -10,10 +10,6 @@ import warnings from OpenSSL import crypto, SSL import bcrypt -# for call_home_agent -import base64 -import jwt # type: ignore -from cryptography.hazmat.primitives.ciphers.aead import AESGCM # type: ignore from .caller import CryptoCaller, CryptoCallError @@ -136,20 +132,3 @@ class InternalCryptoCaller(CryptoCaller): ) except SSL.Error as e: self.fail(f'Invalid cert/key pair: {e}') - - def decrypt_call_home_encrypted_keys( - self, - decryption_key: str, - decryption_nonce: str, - encrypted_keys: bytes - ) -> str: - aes_key = base64.b64decode(decryption_key) - nonce = base64.b64decode(decryption_nonce) - aesgcm = AESGCM(aes_key) - clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'') - return clear_keys - - def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str: - jwt_jti = jwt.decode(user_jwt_password, options={ - "verify_signature": False})["jti"] - return jwt_jti diff --git a/src/python-common/ceph/cryptotools/remote.py b/src/python-common/ceph/cryptotools/remote.py index 2ac15f29421..2574b4ecdac 100644 --- a/src/python-common/ceph/cryptotools/remote.py +++ b/src/python-common/ceph/cryptotools/remote.py @@ -181,36 +181,3 @@ class ProcessCryptoCaller(CryptoCaller): result_obj = self._result_json(result) ok = result_obj.get("ok", False) return ok - - def decrypt_call_home_encrypted_keys( - self, - decryption_key: str, - decryption_nonce: str, - encrypted_keys: bytes - ) -> str: - key_data = { - "decryption_key": decryption_key, - "decryption_nonce": decryption_nonce, - "encrypted_keys": encrypted_keys - } - result = self._run( - ["decrypt_call_home_encrypted_keys"], - input_data=json.dumps(key_data), - capture_output=True, - check=True, - ) - result_obj = self._result_json(result) - decrypted_json_encoded_keys = result_obj.get("decrypted_json_encoded_keys", '') - return decrypted_json_encoded_keys - - def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str: - pwd_data = {"user_jwt_password": user_jwt_password} - result = self._run( - ["call_home_decrypt_jwt_password"], - input_data=json.dumps(pwd_data), - capture_output=True, - check=True, - ) - result_obj = self._result_json(result) - decrypted_jwt_user_password = result_obj.get("decrypted_jwt_user_password", '') - return decrypted_jwt_user_password