]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
iam: test cross-account policy with assumed role
authorCasey Bodley <cbodley@redhat.com>
Wed, 21 Feb 2024 14:36:06 +0000 (09:36 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Apr 2024 15:24:49 +0000 (11:24 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit a3a16eb66a020ce079a9ce1cc9398921e264ca3d)

s3tests_boto3/functional/test_iam.py

index 313a896b5a6a87abed1006a36f30d03d361273c3..101cfb8fdd4d638d824227923d01feaad73f9f67 100644 (file)
@@ -2301,6 +2301,288 @@ def test_account_role_policy_allow(iam_root):
     # something other than AccessDenied
     retry_on('AccessDenied', 10, s3.list_buckets)
 
+# alt account user assumes main account role to access main account bucket
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_same_account_role_policy_allow(iam_root, iam_alt_root):
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    role_name = make_iam_name('MyRole')
+    session_name = 'MySession'
+    bucket_name = get_new_bucket_name()
+
+    user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
+    user_arn = user['Arn']
+    key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
+
+    s3_main = get_iam_root_client(service_name='s3')
+    s3_main.create_bucket(Bucket=bucket_name)
+
+    trust_policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 'sts:AssumeRole',
+            'Principal': {'AWS': user_arn}
+            }]
+        })
+    # returns MalformedPolicyDocument until the user arn starts working
+    role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+                    RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+    role_arn = role['Arn']
+
+    sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+                         aws_secret_access_key=key['SecretAccessKey'])
+
+    # returns InvalidClientTokenId or AccessDenied until the access key starts working
+    response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+                        RoleArn=role_arn, RoleSessionName=session_name)
+    creds = response['Credentials']
+
+    s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
+                          aws_secret_access_key = creds['SecretAccessKey'],
+                          aws_session_token = creds['SessionToken'])
+
+    # expect AccessDenied because no identity policy allows s3 actions
+    e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name)
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    policy_name = 'AllowListBucket'
+    policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 's3:ListBucket',
+            'Resource': '*'
+            }]
+        })
+    iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
+
+    # the policy may take a bit to start working. retry until it returns
+    # something other than AccessDenied
+    retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name)
+
+# alt account user assumes main account role to access alt account bucket
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_cross_account_role_policy_allow(iam_root, iam_alt_root):
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    role_name = make_iam_name('MyRole')
+    session_name = 'MySession'
+    bucket_name = get_new_bucket_name()
+
+    user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
+    user_arn = user['Arn']
+    key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
+
+    s3_alt = get_iam_alt_root_client(service_name='s3')
+    s3_alt.create_bucket(Bucket=bucket_name)
+
+    trust_policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 'sts:AssumeRole',
+            'Principal': {'AWS': user_arn}
+            }]
+        })
+    # returns MalformedPolicyDocument until the user arn starts working
+    role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+                    RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+    role_arn = role['Arn']
+
+    sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+                         aws_secret_access_key=key['SecretAccessKey'])
+
+    # returns InvalidClientTokenId or AccessDenied until the access key starts working
+    response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+                        RoleArn=role_arn, RoleSessionName=session_name)
+    creds = response['Credentials']
+
+    s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
+                          aws_secret_access_key = creds['SecretAccessKey'],
+                          aws_session_token = creds['SessionToken'])
+
+    # expect AccessDenied because no identity policy allows s3 actions
+    e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name)
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    policy_name = 'AllowListBucket'
+    policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 's3:ListBucket',
+            'Resource': '*'
+            }]
+        })
+    iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
+
+    # expect AccessDenied because no resource policy allows the main account
+    e = assert_raises(ClientError, s3.list_objects, Bucket=bucket_name)
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    # add a bucket policy that allows s3:ListBucket for the main account's arn
+    main_arn = iam_root.get_user()['User']['Arn']
+    s3_alt.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Principal': {'AWS': main_arn},
+            'Action': 's3:ListBucket',
+            'Resource': f'arn:aws:s3:::{bucket_name}'
+            }]
+        }))
+
+    # the policy may take a bit to start working. retry until it returns
+    # something other than AccessDenied
+    retry_on('AccessDenied', 10, s3.list_objects, Bucket=bucket_name)
+
+# alt account user assumes main account role to create a bucket
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_account_role_policy_allow_create_bucket(iam_root, iam_alt_root):
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    role_name = make_iam_name('MyRole')
+    session_name = 'MySession'
+    bucket_name = get_new_bucket_name()
+
+    user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
+    user_arn = user['Arn']
+    key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
+
+    trust_policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 'sts:AssumeRole',
+            'Principal': {'AWS': user_arn}
+            }]
+        })
+    # returns MalformedPolicyDocument until the user arn starts working
+    role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+                    RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+    role_arn = role['Arn']
+
+    sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+                         aws_secret_access_key=key['SecretAccessKey'])
+
+    # returns InvalidClientTokenId or AccessDenied until the access key starts working
+    response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+                        RoleArn=role_arn, RoleSessionName=session_name)
+    creds = response['Credentials']
+
+    s3 = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
+                          aws_secret_access_key = creds['SecretAccessKey'],
+                          aws_session_token = creds['SessionToken'])
+
+    # expect AccessDenied because no identity policy allows s3 actions
+    e = assert_raises(ClientError, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private')
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    policy_name = 'AllowCreateBucket'
+    policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': ['s3:CreateBucket', 's3:PutBucketAcl'],
+            'Resource': '*'
+            }]
+        })
+    iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
+
+    # the policy may take a bit to start working. retry until it returns
+    # something other than AccessDenied
+    retry_on('AccessDenied', 10, s3.create_bucket, Bucket=bucket_name, ObjectOwnership='ObjectWriter', ACL='private')
+
+    # verify that the bucket is owned by the role's account
+    s3_main = get_iam_root_client(service_name='s3')
+    response = s3_main.get_bucket_acl(Bucket=bucket_name)
+
+    main_arn = iam_root.get_user()['User']['Arn']
+    account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root')
+    assert response['Owner']['ID'] == account_id
+    assert response['Grants'][0]['Grantee']['ID'] == account_id
+
+# alt account user assumes main account role to read the role info
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.iam_role
+@pytest.mark.role_policy
+def test_account_role_policy_allow_get_role(iam_root, iam_alt_root):
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    role_name = make_iam_name('MyRole')
+    session_name = 'MySession'
+    bucket_name = get_new_bucket_name()
+
+    user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
+    user_arn = user['Arn']
+    key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']
+
+    trust_policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 'sts:AssumeRole',
+            'Principal': {'AWS': user_arn}
+            }]
+        })
+    # returns MalformedPolicyDocument until the user arn starts working
+    role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
+                    RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
+    role_arn = role['Arn']
+
+    sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
+                         aws_secret_access_key=key['SecretAccessKey'])
+
+    # returns InvalidClientTokenId or AccessDenied until the access key starts working
+    response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
+                        RoleArn=role_arn, RoleSessionName=session_name)
+    creds = response['Credentials']
+
+    iam = get_iam_root_client(service_name='iam',
+                              aws_access_key_id = creds['AccessKeyId'],
+                              aws_secret_access_key = creds['SecretAccessKey'],
+                              aws_session_token = creds['SessionToken'])
+
+    # expect AccessDenied because no identity policy allows iam actions
+    e = assert_raises(ClientError, iam.get_role, RoleName=role_name)
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 403
+    assert error_code == 'AccessDenied'
+
+    policy_name = 'AllowGetRole'
+    policy = json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 'iam:GetRole',
+            'Resource': '*'
+            }]
+        })
+    iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)
+
+    # the policy may take a bit to start working. retry until it returns
+    # something other than AccessDenied
+    retry_on('AccessDenied', 10, iam.get_role, RoleName=role_name)
+
 
 # IAM OpenIDConnectProvider apis
 @pytest.mark.iam_account