From 2737c34e48d881bf6d3fe74362e145e86ac1dea2 Mon Sep 17 00:00:00 2001 From: Raja Sharma Date: Thu, 26 Jun 2025 11:28:52 +0530 Subject: [PATCH] rgw/sts: GetCallerIndentity test Signed-off-by: Raja Sharma --- s3tests_boto3/functional/test_sts.py | 95 ++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index b13f56dd..06de13e3 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -31,8 +31,11 @@ from email.header import decode_header from . import( configfile, setup_teardown, + get_iam_root_client, get_iam_client, get_sts_client, + get_iam_path_prefix, + make_iam_name, get_client, get_alt_user_id, get_config_endpoint, @@ -40,6 +43,7 @@ from . import( get_parameter_name, get_main_aws_access_key, get_main_aws_secret_key, + get_iam_root_user_id, get_thumbprint, get_aud, get_token, @@ -2069,3 +2073,94 @@ def test_assume_role_with_web_identity_role_resource_tag(): oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] ) + +def retry_on(code, tries, func, *args, **kwargs): + for i in range(tries): + try: + return func(*args, **kwargs) + except ClientError as e: + err = e.response['Error']['Code'] + if i + 1 < tries and err in code: + print(f'Got {err}, retrying in {i}s..') + time.sleep(i) + continue + raise + +@pytest.mark.test_of_sts +@pytest.mark.fails_on_dbstore +def test_get_caller_identity_root(): + root_user_id = get_iam_root_user_id() + user_arn = f'arn:aws:iam::{root_user_id}:root' + sts = get_iam_root_client(service_name='sts') + response = sts.get_caller_identity() + assert response['UserId'] == root_user_id + assert response['Account'] == root_user_id + assert response['Arn'] == user_arn + +@pytest.mark.test_of_sts +@pytest.mark.fails_on_dbstore +def test_get_caller_identity_non_root_user(): + iam_root = get_iam_root_client() + path = get_iam_path_prefix() + user_name = make_iam_name('MyUser') + user = iam_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + user_id = user['UserId'] + + key = iam_root.create_access_key(UserName=user_name)['AccessKey'] + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + user_identity = sts.get_caller_identity() + iam_root.delete_access_key(UserName=user_name, AccessKeyId=key['AccessKeyId']) + iam_root.delete_user(UserName=user_name) + + assert user_identity['UserId'] == user_id + assert user_identity['Account'] == account_id + assert user_identity['Arn'] == user_arn + +@pytest.mark.test_of_sts +@pytest.mark.fails_on_dbstore +def test_get_caller_identity_after_assume_role(): + iam_root = get_iam_root_client() + path = get_iam_path_prefix() + user_name = make_iam_name('MyUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + + user = iam_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}') + key = iam_root.create_access_key(UserName=user_name)['AccessKey'] + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + assume_role = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = assume_role['Credentials'] + + assumed_sts = get_sts_client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + response = assumed_sts.get_caller_identity() + iam_root.delete_role(RoleName=role_name) + iam_root.delete_access_key(UserName=user_name, AccessKeyId=key['AccessKeyId']) + iam_root.delete_user(UserName=user_name) + + assert response['UserId'] == f"{role['RoleId']}:{session_name}" + assert response['Account'] == account_id + assert response['Arn'] == assume_role['AssumedRoleUser']['Arn'] + -- 2.47.3