]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
iam: add account test for RolePolicy apis
authorCasey Bodley <cbodley@redhat.com>
Fri, 26 Jan 2024 20:51:55 +0000 (15:51 -0500)
committerCasey Bodley <cbodley@redhat.com>
Sun, 10 Mar 2024 14:45:22 +0000 (10:45 -0400)
adds test cases for the following iam actions:
* PutRolePolicy
* GetRolePolicy
* DeleteRolePolicy
* ListRolePolicies

verified to pass against aws when an account root user's credentials are
provided in the [iam] section of s3tests.conf

Signed-off-by: Casey Bodley <cbodley@redhat.com>
pytest.ini
s3tests_boto3/functional/test_iam.py

index 4a94bc81e059608ca3a3e2e61c953a542beb7804..3ce92cf67ff71985fd035c5d27021cb1101abe6b 100644 (file)
@@ -25,6 +25,7 @@ markers =
     lifecycle_transition
     list_objects_v2
     object_lock
+    role_policy
     session_policy
     s3select
     s3website
index 95fcb806947aec38450e92845ccf941d7872ffb5..b06c5b8be60766dc8e93baade2fda712bbdd0a24 100644 (file)
@@ -20,6 +20,7 @@ from . import (
     get_iam_s3client,
     get_alt_iam_client,
     get_alt_user_id,
+    get_sts_client,
 )
 from .utils import _get_status, _get_status_and_error_code
 
@@ -1718,3 +1719,122 @@ def test_account_role_update(iam_root):
     assert arn == response['Role']['Arn']
     assert desc == response['Role']['Description']
     assert 43200 == response['Role']['MaxSessionDuration']
+
+
+role_policy = json.dumps({
+    'Version': '2012-10-17',
+    'Statement': [{
+        'Effect': 'Allow',
+        'Action': 's3:*',
+        "Resource": "*"
+        }]
+    })
+
+# IAM RolePolicy apis
+@pytest.mark.iam_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_account_role_policy(iam_root):
+    path = get_iam_path_prefix()
+    role_name = make_iam_name('r')
+    policy_name = 'MyPolicy'
+    policy2_name = 'AnotherPolicy'
+
+    # Get/Put/Delete fail on nonexistent RoleName
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy)
+
+    iam_root.create_role(RoleName=role_name, Path=path, AssumeRolePolicyDocument=assume_role_policy)
+
+    # Get/Delete fail on nonexistent PolicyName
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+
+    iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=role_policy)
+
+    response = iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+    assert role_name == response['RoleName']
+    assert policy_name == response['PolicyName']
+    assert role_policy == json.dumps(response['PolicyDocument'])
+
+    response = iam_root.list_role_policies(RoleName=role_name)
+    assert [policy_name] == response['PolicyNames']
+
+    iam_root.put_role_policy(RoleName=role_name, PolicyName=policy2_name, PolicyDocument=role_policy)
+
+    response = iam_root.list_role_policies(RoleName=role_name)
+    assert [policy2_name, policy_name] == response['PolicyNames']
+
+    iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+    iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy2_name)
+
+    # Get/Delete fail after Delete
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.get_role_policy(RoleName=role_name, PolicyName=policy_name)
+    with pytest.raises(iam_root.exceptions.NoSuchEntityException):
+        iam_root.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_account_role_policy_allow(iam_root):
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('MyUser')
+    role_name = make_iam_name('MyRole')
+    session_name = 'MySession'
+
+    user = iam_root.create_user(UserName=user_name, Path=path)['User']
+    user_arn = user['Arn']
+
+    trust_policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 'sts:AssumeRole',
+            'Principal': {'AWS': user_arn}
+            }]
+        })
+    # returns MalformedPolicyDocument until the user arn starts working
+    role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+                    RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+    role_arn = role['Arn']
+
+    key = iam_root.create_access_key(UserName=user_name)['AccessKey']
+    sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+                         aws_secret_access_key=key['SecretAccessKey'])
+
+    # returns InvalidClientTokenId or AccessDenied until the access key starts working
+    response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+                        RoleArn=role_arn, RoleSessionName=session_name)
+    creds = response['Credentials']
+
+    s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
+                          aws_secret_access_key = creds['SecretAccessKey'],
+                          aws_session_token = creds['SessionToken'])
+
+    # expect AccessDenied because no identity policy allows s3 actions
+    e = assert_raises(ClientError, s3.list_buckets)
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    policy_name = 'AllowListAllMyBuckets'
+    policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 's3:ListAllMyBuckets',
+            'Resource': '*'
+            }]
+        })
+    iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
+
+    # the policy may take a bit to start working. retry until it returns
+    # something other than AccessDenied
+    retry_on('AccessDenied', 10, s3.list_buckets)