]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
iam: add account tests for GroupPolicy apis
authorCasey Bodley <cbodley@redhat.com>
Sun, 11 Feb 2024 16:51:17 +0000 (11:51 -0500)
committerCasey Bodley <cbodley@redhat.com>
Sun, 10 Mar 2024 14:45:22 +0000 (10:45 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
pytest.ini
s3tests_boto3/functional/test_iam.py

index 5da379d748caccbe18b1e23126c8d2edb68cf3f8..c7e041808408a596d0465d8a369381fc2b025667 100644 (file)
@@ -17,6 +17,7 @@ markers =
     fails_on_s3
     fails_with_subdomain
     group
+    group_policy
     iam_account
     iam_cross_account
     iam_role
index e873fe14cc3a1519f25eb651cc0155fa7487cd49..313a896b5a6a87abed1006a36f30d03d361273c3 100644 (file)
@@ -928,6 +928,24 @@ def nuke_users(client, **kwargs):
             except:
                 pass
 
+def nuke_group_policies(client, name):
+    p = client.get_paginator('list_group_policies')
+    for response in p.paginate(GroupName=name):
+        for policy in response['PolicyNames']:
+            try:
+                client.delete_group_policy(GroupName=name, PolicyName=policy)
+            except:
+                pass
+
+def nuke_attached_group_policies(client, name):
+    p = client.get_paginator('list_attached_group_policies')
+    for response in p.paginate(GroupName=name):
+        for policy in response['AttachedPolicies']:
+            try:
+                client.detach_group_policy(GroupName=name, PolicyArn=policy['PolicyArn'])
+            except:
+                pass
+
 def nuke_group_users(client, name):
     p = client.get_paginator('get_group')
     for response in p.paginate(GroupName=name):
@@ -939,6 +957,14 @@ def nuke_group_users(client, name):
 
 def nuke_group(client, name):
     # delete group policies and remove all users
+    try:
+        nuke_group_policies(client, name)
+    except:
+        pass
+    try:
+        nuke_attached_group_policies(client, name)
+    except:
+        pass
     try:
         nuke_group_users(client, name)
     except:
@@ -1727,6 +1753,200 @@ def test_account_group_update(iam_root):
     assert group_id == groups[0]['GroupId']
 
 
+# IAM GroupPolicy apis
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_inline_group_policy(iam_root):
+    path = get_iam_path_prefix()
+    name = make_iam_name('name')
+    policy_name = 'List'
+    bucket_name = get_new_bucket_name()
+    policy1 = json.dumps({'Version': '2012-10-17', 'Statement': [
+        {'Effect': 'Deny',
+         'Action': 's3:ListBucket',
+         'Resource': f'arn:aws:s3:::{bucket_name}'}]})
+    policy2 = json.dumps({'Version': '2012-10-17', 'Statement': [
+        {'Effect': 'Allow',
+         'Action': 's3:ListBucket',
+         'Resource': f'arn:aws:s3:::{bucket_name}'}]})
+
+    # Get/Put/Delete fail on nonexistent GroupName
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1)
+
+    iam_root.create_group(GroupName=name, Path=path)
+
+    # Get/Delete fail on nonexistent PolicyName
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+
+    iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy1)
+
+    response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+    assert policy1 == json.dumps(response['PolicyDocument'])
+    response = iam_root.list_group_policies(GroupName=name)
+    assert [policy_name] == response['PolicyNames']
+
+    iam_root.put_group_policy(GroupName=name, PolicyName=policy_name, PolicyDocument=policy2)
+
+    response = iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+    assert policy2 == json.dumps(response['PolicyDocument'])
+    response = iam_root.list_group_policies(GroupName=name)
+    assert [policy_name] == response['PolicyNames']
+
+    # DeleteGroup fails while policies are still attached
+    with pytest.raises(iam_root.exceptions.DeleteConflictException):
+        iam_root.delete_group(GroupName=name)
+
+    iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+
+    # Get/Delete fail after Delete
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.get_group_policy(GroupName=name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.delete_group_policy(GroupName=name, PolicyName=policy_name)
+
+    response = iam_root.list_group_policies(GroupName=name)
+    assert [] == response['PolicyNames']
+
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_managed_group_policy(iam_root):
+    path = get_iam_path_prefix()
+    name = make_iam_name('name')
+    policy1 = 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
+    policy2 = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
+
+    # Attach/Detach/List fail on nonexistent GroupName
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.list_attached_group_policies(GroupName=name)
+
+    iam_root.create_group(GroupName=name, Path=path)
+
+    # Detach fails on unattached PolicyArn
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.detach_group_policy(GroupName=name, PolicyArn=policy1)
+
+    iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1)
+    iam_root.attach_group_policy(GroupName=name, PolicyArn=policy1)
+
+    response = iam_root.list_attached_group_policies(GroupName=name)
+    assert len(response['AttachedPolicies']) == 1
+    assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName']
+    assert policy1 == response['AttachedPolicies'][0]['PolicyArn']
+
+    iam_root.attach_group_policy(GroupName=name, PolicyArn=policy2)
+
+    response = iam_root.list_attached_group_policies(GroupName=name)
+    policies = response['AttachedPolicies']
+    assert len(policies) == 2
+    names = [p['PolicyName'] for p in policies]
+    arns = [p['PolicyArn'] for p in policies]
+    assert 'AmazonS3FullAccess' in names
+    assert policy1 in arns
+    assert 'AmazonS3ReadOnlyAccess' in names
+    assert policy2 in arns
+
+    iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2)
+
+    # Detach fails after Detach
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.detach_group_policy(GroupName=name, PolicyArn=policy2)
+
+    response = iam_root.list_attached_group_policies(GroupName=name)
+    assert len(response['AttachedPolicies']) == 1
+    assert 'AmazonS3FullAccess' == response['AttachedPolicies'][0]['PolicyName']
+    assert policy1 == response['AttachedPolicies'][0]['PolicyArn']
+
+    # DeleteGroup fails while policies are still attached
+    with pytest.raises(iam_root.exceptions.DeleteConflictException):
+        iam_root.delete_group(GroupName=name)
+
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_inline_group_policy_allow(iam_root):
+    path = get_iam_path_prefix()
+    username = make_iam_name('User')
+    groupname = make_iam_name('Group')
+    bucket_name = get_new_bucket_name()
+
+    iam_root.create_user(UserName=username, Path=path)
+
+    key = iam_root.create_access_key(UserName=username)['AccessKey']
+    client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+                              aws_secret_access_key=key['SecretAccessKey'])
+
+    iam_root.create_group(GroupName=groupname, Path=path)
+    iam_root.add_user_to_group(GroupName=groupname, UserName=username)
+
+    # the access key may take a bit to start working. retry until it returns
+    # something other than InvalidAccessKeyId
+    e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets)
+    # expect AccessDenied because no identity policy allows s3 actions
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    # add a group policy that allows s3 actions
+    policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 's3:*',
+            'Resource': '*'
+            }]
+        })
+    policy_name = 'AllowStar'
+    iam_root.put_group_policy(GroupName=groupname, PolicyName=policy_name, PolicyDocument=policy)
+
+    # the policy may take a bit to start working. retry until it returns
+    # something other than AccessDenied
+    retry_on('AccessDenied', 10, client.list_buckets)
+
+@pytest.mark.group_policy
+@pytest.mark.iam_account
+def test_account_managed_group_policy_allow(iam_root):
+    path = get_iam_path_prefix()
+    username = make_iam_name('User')
+    groupname = make_iam_name('Group')
+    bucket_name = get_new_bucket_name()
+
+    iam_root.create_user(UserName=username, Path=path)
+
+    key = iam_root.create_access_key(UserName=username)['AccessKey']
+    client = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+                              aws_secret_access_key=key['SecretAccessKey'])
+
+    iam_root.create_group(GroupName=groupname, Path=path)
+    iam_root.add_user_to_group(GroupName=groupname, UserName=username)
+
+    # the access key may take a bit to start working. retry until it returns
+    # something other than InvalidAccessKeyId
+    e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, client.list_buckets)
+    # expect AccessDenied because no identity policy allows s3 actions
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    # add a group policy that allows s3 read actions
+    policy_arn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
+    iam_root.attach_group_policy(GroupName=groupname, PolicyArn=policy_arn)
+
+    # the policy may take a bit to start working. retry until it returns
+    # something other than AccessDenied
+    retry_on('AccessDenied', 10, client.list_buckets)
+
+
 assume_role_policy = json.dumps({
     'Version': '2012-10-17',
     'Statement': [{