import json
import os
import sys
-# from cryptography.hazmat.primitives.ciphers.aead import AESGCM
-# import base64
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+import base64
import re
-# import jwt
+import jwt
import traceback
import requests
import sched
import importlib.util
import pathlib
-from ceph.cryptotools.select import get_crypto_caller
-
class URUploadSnap:
def __init__(self, agent, req: dict):
self._req = req
user_jwt_password = r"{}".format(reg_credentials['password'])
registry_url = reg_credentials['url']
if re.match(self.valid_container_registry, registry_url):
- cc = get_crypto_caller()
- jwt_jti = cc.call_home_decrypt_jwt_password(user_jwt_password)
- # jwt_jti = jwt.decode(user_jwt_password, options={
- # "verify_signature": False})["jti"]
+ jwt_jti = jwt.decode(user_jwt_password, options={
+ "verify_signature": False})["jti"]
self.log.info("JWT jti field extracted succesfully")
else:
jti_token_fail = f"url for registry credentials stored in <mgr/cephadm/registry_url> does not match with the expected ones <{self.valid_container_registry}>"
try:
encrypted_keys = self._load_encrypted_keys()
- cc = get_crypto_caller()
- clear_keys = cc.decrypt_call_home_encrypted_keys(decryption_key, decryption_nonce, encrypted_keys)
- # aes_key = base64.b64decode(decryption_key)
- # nonce = base64.b64decode(decyption_nonce)
- # aesgcm = AESGCM(aes_key)
- # clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
+ aes_key = base64.b64decode(decryption_key)
+ nonce = base64.b64decode(decyption_nonce)
+ aesgcm = AESGCM(aes_key)
+ clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
keys = json.loads(clear_keys)
return keys
except Exception as e:
@abc.abstractmethod
def verify_password(self, password: str, hashed_password: str) -> bool:
"""Return true if a password and hash match."""
-
- @abc.abstractmethod
- def decrypt_call_home_encrypted_keys(
- self,
- decryption_key: str,
- decryption_nonce: str,
- encrypted_keys: bytes
- ) -> str:
- """Return call home key decrypted but still as a json string"""
-
- @abc.abstractmethod
- def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
- """Decrypt encrypted call home jwt user password"""
_respond({'ok': True}) # need to emit something on success
-def decrypt_call_home_encrypted_keys(args: Namespace) -> None:
- data = _load()
- decryption_key = data['decryption_key']
- decryption_nonce = data['decryption_nonce']
- encrypted_keys = data['encrypted_keys']
- decrypted_json_encoded_keys = args.crypto.decrypt_call_home_encrypted_keys(
- decryption_key, decryption_nonce, encrypted_keys
- )
- _respond({'decrypted_json_encoded_keys': decrypted_json_encoded_keys})
-
-
-def call_home_decrypt_jwt_password(args: Namespace) -> None:
- data = _load()
- user_jwt_password = data['user_jwt_password']
- decrypted_user_jwt_password = args.crypto.call_home_decrypt_jwt_password(user_jwt_password)
- _respond({'decrypted_jwt_user_password': decrypted_user_jwt_password})
-
-
def main() -> None:
# create the top-level parser
parser = argparse.ArgumentParser(prog='cryptotools.py')
parser_verify_password = subparsers.add_parser('verify_password')
parser_verify_password.set_defaults(func=verify_password)
- # call home specific for decoding secrets
- parser_call_home_decrypt_secrets = subparsers.add_parser('decrypt_call_home_encrypted_keys')
- parser_call_home_decrypt_secrets.set_defaults(func=decrypt_call_home_encrypted_keys)
-
- # call home specific for decoding jwt user password
- parser_call_home_decrypt_jwt_password = subparsers.add_parser('call_home_decrypt_jwt_password')
- parser_call_home_decrypt_jwt_password.set_defaults(func=call_home_decrypt_jwt_password)
-
# parse the args and call whatever function was selected
args = parser.parse_args()
args.func(args)
from OpenSSL import crypto, SSL
import bcrypt
-# for call_home_agent
-import base64
-import jwt # type: ignore
-from cryptography.hazmat.primitives.ciphers.aead import AESGCM # type: ignore
from .caller import CryptoCaller, CryptoCallError
)
except SSL.Error as e:
self.fail(f'Invalid cert/key pair: {e}')
-
- def decrypt_call_home_encrypted_keys(
- self,
- decryption_key: str,
- decryption_nonce: str,
- encrypted_keys: bytes
- ) -> str:
- aes_key = base64.b64decode(decryption_key)
- nonce = base64.b64decode(decryption_nonce)
- aesgcm = AESGCM(aes_key)
- clear_keys = aesgcm.decrypt(nonce, encrypted_keys, b'')
- return clear_keys
-
- def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
- jwt_jti = jwt.decode(user_jwt_password, options={
- "verify_signature": False})["jti"]
- return jwt_jti
result_obj = self._result_json(result)
ok = result_obj.get("ok", False)
return ok
-
- def decrypt_call_home_encrypted_keys(
- self,
- decryption_key: str,
- decryption_nonce: str,
- encrypted_keys: bytes
- ) -> str:
- key_data = {
- "decryption_key": decryption_key,
- "decryption_nonce": decryption_nonce,
- "encrypted_keys": encrypted_keys
- }
- result = self._run(
- ["decrypt_call_home_encrypted_keys"],
- input_data=json.dumps(key_data),
- capture_output=True,
- check=True,
- )
- result_obj = self._result_json(result)
- decrypted_json_encoded_keys = result_obj.get("decrypted_json_encoded_keys", '')
- return decrypted_json_encoded_keys
-
- def call_home_decrypt_jwt_password(self, user_jwt_password: str) -> str:
- pwd_data = {"user_jwt_password": user_jwt_password}
- result = self._run(
- ["call_home_decrypt_jwt_password"],
- input_data=json.dumps(pwd_data),
- capture_output=True,
- check=True,
- )
- result_obj = self._result_json(result)
- decrypted_jwt_user_password = result_obj.get("decrypted_jwt_user_password", '')
- return decrypted_jwt_user_password