from . import(
configfile,
setup_teardown,
+ get_iam_root_client,
get_iam_client,
get_sts_client,
+ get_iam_path_prefix,
+ make_iam_name,
get_client,
get_alt_user_id,
get_config_endpoint,
get_parameter_name,
get_main_aws_access_key,
get_main_aws_secret_key,
+ get_iam_root_user_id,
get_thumbprint,
get_aud,
get_token,
oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]
)
+
+def retry_on(code, tries, func, *args, **kwargs):
+ for i in range(tries):
+ try:
+ return func(*args, **kwargs)
+ except ClientError as e:
+ err = e.response['Error']['Code']
+ if i + 1 < tries and err in code:
+ print(f'Got {err}, retrying in {i}s..')
+ time.sleep(i)
+ continue
+ raise
+
+@pytest.mark.test_of_sts
+@pytest.mark.fails_on_dbstore
+def test_get_caller_identity_root():
+ root_user_id = get_iam_root_user_id()
+ user_arn = f'arn:aws:iam::{root_user_id}:root'
+ sts = get_iam_root_client(service_name='sts')
+ response = sts.get_caller_identity()
+ assert response['UserId'] == root_user_id
+ assert response['Account'] == root_user_id
+ assert response['Arn'] == user_arn
+
+@pytest.mark.test_of_sts
+@pytest.mark.fails_on_dbstore
+def test_get_caller_identity_non_root_user():
+ iam_root = get_iam_root_client()
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('MyUser')
+ user = iam_root.create_user(UserName=user_name, Path=path)['User']
+ user_arn = user['Arn']
+ account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+ user_id = user['UserId']
+
+ key = iam_root.create_access_key(UserName=user_name)['AccessKey']
+ sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ user_identity = sts.get_caller_identity()
+ iam_root.delete_access_key(UserName=user_name, AccessKeyId=key['AccessKeyId'])
+ iam_root.delete_user(UserName=user_name)
+
+ assert user_identity['UserId'] == user_id
+ assert user_identity['Account'] == account_id
+ assert user_identity['Arn'] == user_arn
+
+@pytest.mark.test_of_sts
+@pytest.mark.fails_on_dbstore
+def test_get_caller_identity_after_assume_role():
+ iam_root = get_iam_root_client()
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('MyUser')
+ role_name = make_iam_name('MyRole')
+ session_name = 'MySession'
+
+ user = iam_root.create_user(UserName=user_name, Path=path)['User']
+ user_arn = user['Arn']
+ account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+ key = iam_root.create_access_key(UserName=user_name)['AccessKey']
+ sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+ trust_policy = json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 'sts:AssumeRole',
+ 'Principal': {'AWS': user_arn}
+ }]
+ })
+
+ role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+ RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+ role_arn = role['Arn']
+
+ assume_role = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+ RoleArn=role_arn, RoleSessionName=session_name)
+ creds = assume_role['Credentials']
+
+ assumed_sts = get_sts_client(aws_access_key_id = creds['AccessKeyId'],
+ aws_secret_access_key = creds['SecretAccessKey'],
+ aws_session_token = creds['SessionToken'])
+ response = assumed_sts.get_caller_identity()
+ iam_root.delete_role(RoleName=role_name)
+ iam_root.delete_access_key(UserName=user_name, AccessKeyId=key['AccessKeyId'])
+ iam_root.delete_user(UserName=user_name)
+
+ assert response['UserId'] == f"{role['RoleId']}:{session_name}"
+ assert response['Account'] == account_id
+ assert response['Arn'] == assume_role['AssumedRoleUser']['Arn']
+