except:
pass
+def nuke_group_policies(client, name):
+ p = client.get_paginator('list_group_policies')
+ for response in p.paginate(GroupName=name):
+ for policy in response['PolicyNames']:
+ try:
+ client.delete_group_policy(GroupName=name, PolicyName=policy)
+ except:
+ pass
+
+def nuke_attached_group_policies(client, name):
+ p = client.get_paginator('list_attached_group_policies')
+ for response in p.paginate(GroupName=name):
+ for policy in response['AttachedPolicies']:
+ try:
+ client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn'])
+ except:
+ pass
+
def nuke_group_users(client, name):
p = client.get_paginator('get_group')
for response in p.paginate(GroupName=name):
def nuke_group(client, name):
# delete group policies and remove all users
+ try:
+ nuke_group_policies(client, name)
+ except:
+ pass
+ try:
+ nuke_attached_group_policies(client, name)
+ except:
+ pass
try:
nuke_group_users(client, name)
except:
assert group_id == groups[0]['GroupId']
+# IAM GroupPolicy apis
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_inline_group_policy(iam_root):
+ path = get_iam_path_prefix()
+ name = make_iam_name('name')
+ policy_name = 'List'
+ bucket_name = get_new_bucket_name()
+ policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [
+ {'Effect': 'Deny',
+ 'Action': 's3:ListBucket',
+ 'Resource': f'arn:aws:s3:::{bucket_name}'}]})
+ policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [
+ {'Effect': 'Allow',
+ 'Action': 's3:ListBucket',
+ 'Resource': f'arn:aws:s3:::{bucket_name}'}]})
+
+ # Get/Put/Delete fail on nonexistent GroupName
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1)
+
+ iam_root.create_group(GroupName=name, Path=path)
+
+ # Get/Delete fail on nonexistent PolicyName
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+
+ iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1)
+
+ response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+ assert policy1 == json.dumps(response['PolicyDocument'])
+ response = iam_root.list_group_policies(GroupName=name)
+ assert [policy_name] == response['PolicyNames']
+
+ iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy2)
+
+ response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+ assert policy2 == json.dumps(response['PolicyDocument'])
+ response = iam_root.list_group_policies(GroupName=name)
+ assert [policy_name] == response['PolicyNames']
+
+ # DeleteGroup fails while policies are still attached
+ with pytest.raises(iam_root.exceptions.DeleteConflictException):
+ iam_root.delete_group(GroupName=name)
+
+ iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+
+ # Get/Delete fail after Delete
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+
+ response = iam_root.list_group_policies(GroupName=name)
+ assert [] == response['PolicyNames']
+
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_managed_group_policy(iam_root):
+ path = get_iam_path_prefix()
+ name = make_iam_name('name')
+ policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
+ policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
+
+ # Attach/Detach/List fail on nonexistent GroupName
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1)
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.list_attached_group_policies(GroupName=name)
+
+ iam_root.create_group(GroupName=name, Path=path)
+
+ # Detach fails on unattached PolicyArn
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1)
+
+ iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1)
+ iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1)
+
+ response = iam_root.list_attached_group_policies(GroupName=name)
+ assert len(response['AttachedPolicies']) == 1
+ assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName']
+ assert policy1 == response['AttachedPolicies'][0]['PolicyArn']
+
+ iam_root.attach_group_policy(GroupName=name, PolicyArn=policy2)
+
+ response = iam_root.list_attached_group_policies(GroupName=name)
+ policies = response['AttachedPolicies']
+ assert len(policies) == 2
+ names = [p['PolicyName'] for p in policies]
+ arns = [p['PolicyArn'] for p in policies]
+ assert 'AmazonS3FullAccess' in names
+ assert policy1 in arns
+ assert 'AmazonS3ReadOnlyAccess' in names
+ assert policy2 in arns
+
+ iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2)
+
+ # Detach fails after Detach
+ with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+ iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2)
+
+ response = iam_root.list_attached_group_policies(GroupName=name)
+ assert len(response['AttachedPolicies']) == 1
+ assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName']
+ assert policy1 == response['AttachedPolicies'][0]['PolicyArn']
+
+ # DeleteGroup fails while policies are still attached
+ with pytest.raises(iam_root.exceptions.DeleteConflictException):
+ iam_root.delete_group(GroupName=name)
+
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_inline_group_policy_allow(iam_root):
+ path = get_iam_path_prefix()
+ username = make_iam_name('User')
+ groupname = make_iam_name('Group')
+ bucket_name = get_new_bucket_name()
+
+ iam_root.create_user(UserName=username, Path=path)
+
+ key = iam_root.create_access_key(UserName=username)['AccessKey']
+ client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ iam_root.create_group(GroupName=groupname, Path=path)
+ iam_root.add_user_to_group(GroupName=groupname, UserName=username)
+
+ # the access key may take a bit to start working. retry until it returns
+ # something other than InvalidAccessKeyId
+ e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets)
+ # expect AccessDenied because no identity policy allows s3 actions
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a group policy that allows s3 actions
+ policy = json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:*',
+ 'Resource': '*'
+ }]
+ })
+ policy_name = 'AllowStar'
+ iam_root.put_group_policy(GroupName=groupname, PolicyName=policy_name, PolicyDocument=policy)
+
+ # the policy may take a bit to start working. retry until it returns
+ # something other than AccessDenied
+ retry_on('AccessDenied', 10, client.list_buckets)
+
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_managed_group_policy_allow(iam_root):
+ path = get_iam_path_prefix()
+ username = make_iam_name('User')
+ groupname = make_iam_name('Group')
+ bucket_name = get_new_bucket_name()
+
+ iam_root.create_user(UserName=username, Path=path)
+
+ key = iam_root.create_access_key(UserName=username)['AccessKey']
+ client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ iam_root.create_group(GroupName=groupname, Path=path)
+ iam_root.add_user_to_group(GroupName=groupname, UserName=username)
+
+ # the access key may take a bit to start working. retry until it returns
+ # something other than InvalidAccessKeyId
+ e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets)
+ # expect AccessDenied because no identity policy allows s3 actions
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a group policy that allows s3 read actions
+ policy_arn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
+ iam_root.attach_group_policy(GroupName=groupname, PolicyArn=policy_arn)
+
+ # the policy may take a bit to start working. retry until it returns
+ # something other than AccessDenied
+ retry_on('AccessDenied', 10, client.list_buckets)
+
+
assume_role_policy = json.dumps({
'Version': '2012-10-17',
'Statement': [{