import json
import os
import sys
-from cryptography.hazmat.primitives.ciphers.aead import AESGCM
-import base64
+# from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+# import base64
import re
-import jwt
+# import jwt
import traceback
import requests
import sched
#from threading import Event
import threading
+from ceph.cryptotools.select import get_crypto_caller
+
class URUploadSnap:
def __init__(self, agent, req: dict):
self._req = req
user_jwt_password = r"{}".format(reg_credentials['password'])
registry_url = reg_credentials['url']
if re.match(self.valid_container_registry, registry_url):
- jwt_jti = jwt.decode(user_jwt_password, options={
- "verify_signature": False})["jti"]
+ cc = get_crypto_caller()
+ jwt_jti = cc.call_home_decrypt_jwt_password(user_jwt_password)
+ # jwt_jti = jwt.decode(user_jwt_password, options={
+ # "verify_signature": False})["jti"]
self.log.info("JWT jti field extracted succesfully")
else:
jti_token_fail = f"url for registry credentials stored in <mgr/cephadm/registry_url> does not match with the expected ones <{self.valid_container_registry}>"
try:
encrypted_keys = self._load_encrypted_keys()
- aes_key = base64.b64decode(decryption_key)
- nonce = base64.b64decode(decyption_nonce)
- aesgcm = AESGCM(aes_key)
- clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
+ cc = get_crypto_caller()
+ clear_keys = cc.decrypt_call_home_encrypted_keys(decryption_key, decryption_nonce, encrypted_keys)
+ # aes_key = base64.b64decode(decryption_key)
+ # nonce = base64.b64decode(decyption_nonce)
+ # aesgcm = AESGCM(aes_key)
+ # clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
keys = json.loads(clear_keys)
return keys
except Exception as e:
@abc.abstractmethod
def verify_password(self, password: str, hashed_password: str) -> bool:
"""Return true if a password and hash match."""
+
+ @abc.abstractmethod
+ def decrypt_call_home_encrypted_keys(
+ self,
+ decryption_key: str,
+ decryption_nonce: str,
+ encrypted_keys: bytes
+ ) -> str:
+ """Return call home key decrypted but still as a json string"""
+
+ @abc.abstractmethod
+ def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
+ """Decrypt encrypted call home jwt user password"""
_respond({'ok': True}) # need to emit something on success
+def decrypt_call_home_encrypted_keys(args: Namespace) -> None:
+ data = _load()
+ decryption_key = data['decryption_key']
+ decryption_nonce = data['decryption_nonce']
+ encrypted_keys = data['encrypted_keys']
+ decrypted_json_encoded_keys = args.crypto.decrypt_call_home_encrypted_keys(
+ decryption_key, decryption_nonce, encrypted_keys
+ )
+ _respond({'decrypted_json_encoded_keys': decrypted_json_encoded_keys})
+
+
+def call_home_decrypt_jwt_password(args: Namespace) -> None:
+ data = _load()
+ user_jwt_password = data['user_jwt_password']
+ decrypted_user_jwt_password = args.crypto.call_home_decrypt_jwt_password(user_jwt_password)
+ _respond({'decrypted_jwt_user_password': decrypted_user_jwt_password})
+
+
def main() -> None:
# create the top-level parser
parser = argparse.ArgumentParser(prog='cryptotools.py')
parser_verify_password = subparsers.add_parser('verify_password')
parser_verify_password.set_defaults(func=verify_password)
+ # call home specific for decoding secrets
+ parser_call_home_decrypt_secrets = subparsers.add_parser('decrypt_call_home_encrypted_keys')
+ parser_call_home_decrypt_secrets.set_defaults(func=decrypt_call_home_encrypted_keys)
+
+ # call home specific for decoding jwt user password
+ parser_call_home_decrypt_jwt_password = subparsers.add_parser('call_home_decrypt_jwt_password')
+ parser_call_home_decrypt_jwt_password.set_defaults(func=call_home_decrypt_jwt_password)
+
# parse the args and call whatever function was selected
args = parser.parse_args()
args.func(args)
from OpenSSL import crypto, SSL
import bcrypt
+# for call_home_agent
+import base64
+import jwt # type: ignore
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM # type: ignore
from .caller import CryptoCaller, CryptoCallError
)
except SSL.Error as e:
self.fail(f'Invalid cert/key pair: {e}')
+
+ def decrypt_call_home_encrypted_keys(
+ self,
+ decryption_key: str,
+ decryption_nonce: str,
+ encrypted_keys: bytes
+ ) -> str:
+ aes_key = base64.b64decode(decryption_key)
+ nonce = base64.b64decode(decryption_nonce)
+ aesgcm = AESGCM(aes_key)
+ clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
+ return clear_keys
+
+ def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
+ jwt_jti = jwt.decode(user_jwt_password, options={
+ "verify_signature": False})["jti"]
+ return jwt_jti
result_obj = self._result_json(result)
ok = result_obj.get("ok", False)
return ok
+
+ def decrypt_call_home_encrypted_keys(
+ self,
+ decryption_key: str,
+ decryption_nonce: str,
+ encrypted_keys: bytes
+ ) -> str:
+ key_data = {
+ "decryption_key": decryption_key,
+ "decryption_nonce": decryption_nonce,
+ "encrypted_keys": encrypted_keys
+ }
+ result = self._run(
+ ["decrypt_call_home_encrypted_keys"],
+ input_data=json.dumps(key_data),
+ capture_output=True,
+ check=True,
+ )
+ result_obj = self._result_json(result)
+ decrypted_json_encoded_keys = result_obj.get("decrypted_json_encoded_keys", '')
+ return decrypted_json_encoded_keys
+
+ def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
+ pwd_data = {"user_jwt_password": user_jwt_password}
+ result = self._run(
+ ["call_home_decrypt_jwt_password"],
+ input_data=json.dumps(pwd_data),
+ capture_output=True,
+ check=True,
+ )
+ result_obj = self._result_json(result)
+ decrypted_jwt_user_password = result_obj.get("decrypted_jwt_user_password", '')
+ return decrypted_jwt_user_password