get_iam_s3client,
get_alt_iam_client,
get_alt_user_id,
+ get_sts_client,
)
from .utils import _get_status, _get_status_and_error_code
assert arn == response['Role']['Arn']
assert desc == response['Role']['Description']
assert 43200 == response['Role']['MaxSessionDuration']
+
+
+role_policy = json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:*',
+ "Resource": "*"
+ }]
+ })
+
+# IAM RolePolicy apis
+@pytest.mark.iam_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_account_role_policy(iam_root):
+ path = get_iam_path_prefix()
+ role_name = make_iam_name('r')
+ policy_name = 'MyPolicy'
+ policy2_name = 'AnotherPolicy'
+
+ # Get/Put/Delete fail on nonexistent RoleName
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy)
+
+ iam_root.create_role(RoleName=role_name, Path=path, AssumeRolePolicyDocument=assume_role_policy)
+
+ # Get/Delete fail on nonexistent PolicyName
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+
+ iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy)
+
+ response = iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+ assert role_name == response['RoleName']
+ assert policy_name == response['PolicyName']
+ assert role_policy == json.dumps(response['PolicyDocument'])
+
+ response = iam_root.list_role_policies(RoleName=role_name)
+ assert [policy_name] == response['PolicyNames']
+
+ iam_root.put_role_policy(RoleName=role_name, PolicyName=policy2_name, PolicyDocument=role_policy)
+
+ response = iam_root.list_role_policies(RoleName=role_name)
+ assert [policy2_name, policy_name] == response['PolicyNames']
+
+ iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+ iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy2_name)
+
+ # Get/Delete fail after Delete
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_account_role_policy_allow(iam_root):
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('MyUser')
+ role_name = make_iam_name('MyRole')
+ session_name = 'MySession'
+
+ user = iam_root.create_user(UserName=user_name, Path=path)['User']
+ user_arn = user['Arn']
+
+ trust_policy = json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 'sts:AssumeRole',
+ 'Principal': {'AWS': user_arn}
+ }]
+ })
+ # returns MalformedPolicyDocument until the user arn starts working
+ role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+ RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+ role_arn = role['Arn']
+
+ key = iam_root.create_access_key(UserName=user_name)['AccessKey']
+ sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ # returns InvalidClientTokenId or AccessDenied until the access key starts working
+ response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+ RoleArn=role_arn, RoleSessionName=session_name)
+ creds = response['Credentials']
+
+ s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
+ aws_secret_access_key = creds['SecretAccessKey'],
+ aws_session_token = creds['SessionToken'])
+
+ # expect AccessDenied because no identity policy allows s3 actions
+ e = assert_raises(ClientError, s3.list_buckets)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ policy_name = 'AllowListAllMyBuckets'
+ policy = json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:ListAllMyBuckets',
+ 'Resource': '*'
+ }]
+ })
+ iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
+
+ # the policy may take a bit to start working. retry until it returns
+ # something other than AccessDenied
+ retry_on('AccessDenied', 10, s3.list_buckets)