]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
iam: test cross-account permissions
authorCasey Bodley <cbodley@redhat.com>
Tue, 30 Jan 2024 23:17:17 +0000 (18:17 -0500)
committerCasey Bodley <cbodley@redhat.com>
Sun, 10 Mar 2024 14:45:22 +0000 (10:45 -0400)
test the [iam alt root] user's access to buckets owned by [iam root]
using various policy principals and acl grantees

Signed-off-by: Casey Bodley <cbodley@redhat.com>
pytest.ini
s3tests_boto3/functional/test_iam.py

index 3ce92cf67ff71985fd035c5d27021cb1101abe6b..4c456be86705219832472c131346c13e157c3859 100644 (file)
@@ -17,6 +17,7 @@ markers =
     fails_on_s3
     fails_with_subdomain
     iam_account
+    iam_cross_account
     iam_role
     iam_tenant
     iam_user
index a49f1291378bb27b841a29202e17128afffddcb2..4cfbfad341d56230465561be62f2e3e42b409dfd 100644 (file)
@@ -13,6 +13,11 @@ from . import (
     get_alt_client,
     get_iam_client,
     get_iam_root_client,
+    get_iam_root_user_id,
+    get_iam_root_email,
+    get_iam_alt_root_client,
+    get_iam_alt_root_user_id,
+    get_iam_alt_root_email,
     make_iam_name,
     get_iam_path_prefix,
     get_new_bucket,
@@ -1908,3 +1913,377 @@ def test_account_role_policy_allow(iam_root):
     # the policy may take a bit to start working. retry until it returns
     # something other than AccessDenied
     retry_on('AccessDenied', 10, s3.list_buckets)
+
+
+# fixture for iam alt account root user
+@pytest.fixture
+def iam_alt_root(configfile):
+    client = get_iam_alt_root_client()
+    try:
+        arn = client.get_user()['User']['Arn']
+        if not arn.endswith(':root'):
+            pytest.skip('[iam alt root] user does not have :root arn')
+    except ClientError as e:
+        pytest.skip('[iam alt root] user does not belong to an account')
+
+    yield client
+    nuke_users(client, PathPrefix=get_iam_path_prefix())
+    nuke_roles(client, PathPrefix=get_iam_path_prefix())
+
+
+# test cross-account access, adding user policy before the bucket policy
+def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn):
+    # add a user policy that allows s3 actions
+    alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 's3:*',
+            'Resource': '*'
+            }]
+        }))
+
+    key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+    alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+                             aws_secret_access_key=key['SecretAccessKey'])
+
+    # create a bucket with the root user
+    bucket = get_new_bucket(roots3)
+    try:
+        # the access key may take a bit to start working. retry until it returns
+        # something other than InvalidAccessKeyId
+        e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+        status, error_code = _get_status_and_error_code(e.response)
+        assert status == 403
+        assert error_code == 'AccessDenied'
+
+        # add a bucket policy that allows s3:ListBucket for the iam user's arn
+        roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
+            'Version': '2012-10-17',
+            'Statement': [{
+                'Effect': 'Allow',
+                'Principal': {'AWS': alt_arn},
+                'Action': 's3:ListBucket',
+                'Resource': f'arn:aws:s3:::{bucket}'
+                }]
+            }))
+
+        # verify that the iam user can eventually access it
+        retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+    finally:
+        roots3.delete_bucket(Bucket=bucket)
+
+# test cross-account access, adding bucket policy before the user policy
+def _test_cross_account_bucket_user_policy(roots3, alt_root, alt_name, alt_arn):
+    key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+    alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+                             aws_secret_access_key=key['SecretAccessKey'])
+
+    # create a bucket with the root user
+    bucket = get_new_bucket(roots3)
+    try:
+        # add a bucket policy that allows s3:ListBucket for the iam user's arn
+        roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
+            'Version': '2012-10-17',
+            'Statement': [{
+                'Effect': 'Allow',
+                'Principal': {'AWS': alt_arn},
+                'Action': 's3:ListBucket',
+                'Resource': f'arn:aws:s3:::{bucket}'
+                }]
+            }))
+
+        # the access key may take a bit to start working. retry until it returns
+        # something other than InvalidAccessKeyId
+        e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+        status, error_code = _get_status_and_error_code(e.response)
+        assert status == 403
+        assert error_code == 'AccessDenied'
+
+        # add a user policy that allows s3 actions
+        alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+            'Version': '2012-10-17',
+            'Statement': [{
+                'Effect': 'Allow',
+                'Action': 's3:*',
+                'Resource': '*'
+                }]
+            }))
+
+        # verify that the iam user can eventually access it
+        retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+    finally:
+        roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_user_policy_allow_user_arn(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    user_arn = response['User']['Arn']
+    _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, user_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_bucket_policy_allow_user_arn(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    user_arn = response['User']['Arn']
+    _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, user_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_bucket_policy_allow_account_arn(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    user_arn = response['User']['Arn']
+    account_arn = user_arn.replace(f':user{path}{user_name}', ':root')
+    _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_user_policy_allow_account_arn(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    user_arn = response['User']['Arn']
+    account_arn = user_arn.replace(f':user{path}{user_name}', ':root')
+    _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_bucket_policy_allow_account_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    user_arn = response['User']['Arn']
+    account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+    _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_id)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_user_policy_allow_account_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    user_arn = response['User']['Arn']
+    account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+    _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_id)
+
+
+# test cross-account access, adding user policy before the bucket acl
+def _test_cross_account_user_policy_bucket_acl(roots3, alt_root, alt_name, grantee):
+    # add a user policy that allows s3 actions
+    alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+        'Version': '2012-10-17',
+        'Statement': [{
+            'Effect': 'Allow',
+            'Action': 's3:*',
+            'Resource': '*'
+            }]
+        }))
+
+    key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+    alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+                             aws_secret_access_key=key['SecretAccessKey'])
+
+    # create a bucket with the root user
+    bucket = get_new_bucket(roots3)
+    try:
+        # the access key may take a bit to start working. retry until it returns
+        # something other than InvalidAccessKeyId
+        e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+        status, error_code = _get_status_and_error_code(e.response)
+        assert status == 403
+        assert error_code == 'AccessDenied'
+
+        # add a bucket acl that grants READ access
+        roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
+
+        # verify that the iam user can eventually access it
+        retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+    finally:
+        roots3.delete_bucket(Bucket=bucket)
+
+# test cross-account access, adding bucket acl before the user policy
+def _test_cross_account_bucket_acl_user_policy(roots3, alt_root, alt_name, grantee):
+    key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+    alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+                             aws_secret_access_key=key['SecretAccessKey'])
+
+    # create a bucket with the root user
+    bucket = get_new_bucket(roots3)
+    try:
+        # add a bucket acl that grants READ access
+        roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
+
+        # the access key may take a bit to start working. retry until it returns
+        # something other than InvalidAccessKeyId
+        e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+        status, error_code = _get_status_and_error_code(e.response)
+        assert status == 403
+        assert error_code == 'AccessDenied'
+
+        # add a user policy that allows s3 actions
+        alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+            'Version': '2012-10-17',
+            'Statement': [{
+                'Effect': 'Allow',
+                'Action': 's3:*',
+                'Resource': '*'
+                }]
+            }))
+
+        # verify that the iam user can eventually access it
+        retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+    finally:
+        roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.fails_on_aws # can't grant to individual users
+def test_cross_account_bucket_acl_user_policy_grant_user_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    grantee = 'id=' + response['User']['UserId']
+    _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.fails_on_aws # can't grant to individual users
+def test_cross_account_user_policy_bucket_acl_grant_user_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    response = iam_alt_root.create_user(UserName=user_name, Path=path)
+    grantee = 'id=' + response['User']['UserId']
+    _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_acl_user_policy_grant_canonical_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    iam_alt_root.create_user(UserName=user_name, Path=path)
+    grantee = 'id=' + get_iam_alt_root_user_id()
+    _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_policy_bucket_acl_grant_canonical_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    iam_alt_root.create_user(UserName=user_name, Path=path)
+    grantee = 'id=' + get_iam_alt_root_user_id()
+    _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_acl_user_policy_grant_account_email(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    iam_alt_root.create_user(UserName=user_name, Path=path)
+    grantee = 'emailAddress=' + get_iam_alt_root_email()
+    _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_policy_bucket_acl_grant_account_email(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    path = get_iam_path_prefix()
+    user_name = make_iam_name('AltUser')
+    iam_alt_root.create_user(UserName=user_name, Path=path)
+    grantee = 'emailAddress=' + get_iam_alt_root_email()
+    _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
+
+
+# test root cross-account access with bucket policy
+def _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn):
+    # create a bucket with the root user
+    bucket = get_new_bucket(roots3)
+    try:
+        e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket)
+        status, error_code = _get_status_and_error_code(e.response)
+        assert status == 403
+        assert error_code == 'AccessDenied'
+
+        # add a bucket policy that allows s3:ListBucket for the iam user's arn
+        roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
+            'Version': '2012-10-17',
+            'Statement': [{
+                'Effect': 'Allow',
+                'Principal': {'AWS': alt_arn},
+                'Action': 's3:ListBucket',
+                'Resource': f'arn:aws:s3:::{bucket}'
+                }]
+            }))
+
+        # verify that the iam user can eventually access it
+        retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+    finally:
+        roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_policy_allow_account_arn(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    alts3 = get_iam_alt_root_client(service_name='s3')
+    alt_arn = iam_alt_root.get_user()['User']['Arn']
+    _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_policy_allow_account_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    alts3 = get_iam_alt_root_client(service_name='s3')
+    alt_arn = iam_alt_root.get_user()['User']['Arn']
+    account_id = alt_arn.removeprefix('arn:aws:iam::').removesuffix(':root')
+    _test_cross_account_root_bucket_policy(roots3, alts3, account_id)
+
+# test root cross-account access with bucket acls
+def _test_cross_account_root_bucket_acl(roots3, alts3, grantee):
+    # create a bucket with the root user
+    bucket = get_new_bucket(roots3)
+    try:
+        e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket)
+        status, error_code = _get_status_and_error_code(e.response)
+        assert status == 403
+        assert error_code == 'AccessDenied'
+
+        # add a bucket acl that grants READ
+        roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
+
+        # verify that the iam user can eventually access it
+        retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+    finally:
+        roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_acl_grant_canonical_id(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    alts3 = get_iam_alt_root_client(service_name='s3')
+    grantee = 'id=' + get_iam_alt_root_user_id()
+    _test_cross_account_root_bucket_acl(roots3, alts3, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_acl_grant_account_email(iam_root, iam_alt_root):
+    roots3 = get_iam_root_client(service_name='s3')
+    alts3 = get_iam_alt_root_client(service_name='s3')
+    grantee = 'emailAddress=' + get_iam_alt_root_email()
+    _test_cross_account_root_bucket_acl(roots3, alts3, grantee)