get_alt_client,
get_iam_client,
get_iam_root_client,
+ get_iam_root_user_id,
+ get_iam_root_email,
+ get_iam_alt_root_client,
+ get_iam_alt_root_user_id,
+ get_iam_alt_root_email,
make_iam_name,
get_iam_path_prefix,
get_new_bucket,
# the policy may take a bit to start working. retry until it returns
# something other than AccessDenied
retry_on('AccessDenied', 10, s3.list_buckets)
+
+
+# fixture for iam alt account root user
+@pytest.fixture
+def iam_alt_root(configfile):
+ client = get_iam_alt_root_client()
+ try:
+ arn = client.get_user()['User']['Arn']
+ if not arn.endswith(':root'):
+ pytest.skip('[iam alt root] user does not have :root arn')
+ except ClientError as e:
+ pytest.skip('[iam alt root] user does not belong to an account')
+
+ yield client
+ nuke_users(client, PathPrefix=get_iam_path_prefix())
+ nuke_roles(client, PathPrefix=get_iam_path_prefix())
+
+
+# test cross-account access, adding user policy before the bucket policy
+def _test_cross_account_user_bucket_policy(roots3, alt_root, alt_name, alt_arn):
+ # add a user policy that allows s3 actions
+ alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:*',
+ 'Resource': '*'
+ }]
+ }))
+
+ key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+ alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ # create a bucket with the root user
+ bucket = get_new_bucket(roots3)
+ try:
+ # the access key may take a bit to start working. retry until it returns
+ # something other than InvalidAccessKeyId
+ e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a bucket policy that allows s3:ListBucket for the iam user's arn
+ roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': alt_arn},
+ 'Action': 's3:ListBucket',
+ 'Resource': f'arn:aws:s3:::{bucket}'
+ }]
+ }))
+
+ # verify that the iam user can eventually access it
+ retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+ finally:
+ roots3.delete_bucket(Bucket=bucket)
+
+# test cross-account access, adding bucket policy before the user policy
+def _test_cross_account_bucket_user_policy(roots3, alt_root, alt_name, alt_arn):
+ key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+ alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ # create a bucket with the root user
+ bucket = get_new_bucket(roots3)
+ try:
+ # add a bucket policy that allows s3:ListBucket for the iam user's arn
+ roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': alt_arn},
+ 'Action': 's3:ListBucket',
+ 'Resource': f'arn:aws:s3:::{bucket}'
+ }]
+ }))
+
+ # the access key may take a bit to start working. retry until it returns
+ # something other than InvalidAccessKeyId
+ e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a user policy that allows s3 actions
+ alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:*',
+ 'Resource': '*'
+ }]
+ }))
+
+ # verify that the iam user can eventually access it
+ retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+ finally:
+ roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_user_policy_allow_user_arn(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ user_arn = response['User']['Arn']
+ _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, user_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_bucket_policy_allow_user_arn(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ user_arn = response['User']['Arn']
+ _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, user_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_bucket_policy_allow_account_arn(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ user_arn = response['User']['Arn']
+ account_arn = user_arn.replace(f':user{path}{user_name}', ':root')
+ _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_user_policy_allow_account_arn(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ user_arn = response['User']['Arn']
+ account_arn = user_arn.replace(f':user{path}{user_name}', ':root')
+ _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_bucket_policy_allow_account_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ user_arn = response['User']['Arn']
+ account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+ _test_cross_account_user_bucket_policy(roots3, iam_alt_root, user_name, account_id)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_user_policy_allow_account_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ user_arn = response['User']['Arn']
+ account_id = user_arn.removeprefix('arn:aws:iam::').removesuffix(f':user{path}{user_name}')
+ _test_cross_account_bucket_user_policy(roots3, iam_alt_root, user_name, account_id)
+
+
+# test cross-account access, adding user policy before the bucket acl
+def _test_cross_account_user_policy_bucket_acl(roots3, alt_root, alt_name, grantee):
+ # add a user policy that allows s3 actions
+ alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:*',
+ 'Resource': '*'
+ }]
+ }))
+
+ key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+ alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ # create a bucket with the root user
+ bucket = get_new_bucket(roots3)
+ try:
+ # the access key may take a bit to start working. retry until it returns
+ # something other than InvalidAccessKeyId
+ e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a bucket acl that grants READ access
+ roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
+
+ # verify that the iam user can eventually access it
+ retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+ finally:
+ roots3.delete_bucket(Bucket=bucket)
+
+# test cross-account access, adding bucket acl before the user policy
+def _test_cross_account_bucket_acl_user_policy(roots3, alt_root, alt_name, grantee):
+ key = alt_root.create_access_key(UserName=alt_name)['AccessKey']
+ alts3 = get_iam_s3client(aws_access_key_id=key['AccessKeyId'],
+ aws_secret_access_key=key['SecretAccessKey'])
+
+ # create a bucket with the root user
+ bucket = get_new_bucket(roots3)
+ try:
+ # add a bucket acl that grants READ access
+ roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
+
+ # the access key may take a bit to start working. retry until it returns
+ # something other than InvalidAccessKeyId
+ e = assert_raises(ClientError, retry_on, 'InvalidAccessKeyId', 10, alts3.list_objects, Bucket=bucket)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a user policy that allows s3 actions
+ alt_root.put_user_policy(UserName=alt_name, PolicyName='AllowStar', PolicyDocument=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Action': 's3:*',
+ 'Resource': '*'
+ }]
+ }))
+
+ # verify that the iam user can eventually access it
+ retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+ finally:
+ roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.fails_on_aws # can't grant to individual users
+def test_cross_account_bucket_acl_user_policy_grant_user_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ grantee = 'id=' + response['User']['UserId']
+ _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+@pytest.mark.fails_on_aws # can't grant to individual users
+def test_cross_account_user_policy_bucket_acl_grant_user_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ response = iam_alt_root.create_user(UserName=user_name, Path=path)
+ grantee = 'id=' + response['User']['UserId']
+ _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_acl_user_policy_grant_canonical_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ iam_alt_root.create_user(UserName=user_name, Path=path)
+ grantee = 'id=' + get_iam_alt_root_user_id()
+ _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_policy_bucket_acl_grant_canonical_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ iam_alt_root.create_user(UserName=user_name, Path=path)
+ grantee = 'id=' + get_iam_alt_root_user_id()
+ _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_bucket_acl_user_policy_grant_account_email(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ iam_alt_root.create_user(UserName=user_name, Path=path)
+ grantee = 'emailAddress=' + get_iam_alt_root_email()
+ _test_cross_account_bucket_acl_user_policy(roots3, iam_alt_root, user_name, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_user_policy_bucket_acl_grant_account_email(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ path = get_iam_path_prefix()
+ user_name = make_iam_name('AltUser')
+ iam_alt_root.create_user(UserName=user_name, Path=path)
+ grantee = 'emailAddress=' + get_iam_alt_root_email()
+ _test_cross_account_user_policy_bucket_acl(roots3, iam_alt_root, user_name, grantee)
+
+
+# test root cross-account access with bucket policy
+def _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn):
+ # create a bucket with the root user
+ bucket = get_new_bucket(roots3)
+ try:
+ e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a bucket policy that allows s3:ListBucket for the iam user's arn
+ roots3.put_bucket_policy(Bucket=bucket, Policy=json.dumps({
+ 'Version': '2012-10-17',
+ 'Statement': [{
+ 'Effect': 'Allow',
+ 'Principal': {'AWS': alt_arn},
+ 'Action': 's3:ListBucket',
+ 'Resource': f'arn:aws:s3:::{bucket}'
+ }]
+ }))
+
+ # verify that the iam user can eventually access it
+ retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+ finally:
+ roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_policy_allow_account_arn(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ alts3 = get_iam_alt_root_client(service_name='s3')
+ alt_arn = iam_alt_root.get_user()['User']['Arn']
+ _test_cross_account_root_bucket_policy(roots3, alts3, alt_arn)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_policy_allow_account_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ alts3 = get_iam_alt_root_client(service_name='s3')
+ alt_arn = iam_alt_root.get_user()['User']['Arn']
+ account_id = alt_arn.removeprefix('arn:aws:iam::').removesuffix(':root')
+ _test_cross_account_root_bucket_policy(roots3, alts3, account_id)
+
+# test root cross-account access with bucket acls
+def _test_cross_account_root_bucket_acl(roots3, alts3, grantee):
+ # create a bucket with the root user
+ bucket = get_new_bucket(roots3)
+ try:
+ e = assert_raises(ClientError, alts3.list_objects, Bucket=bucket)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 403
+ assert error_code == 'AccessDenied'
+
+ # add a bucket acl that grants READ
+ roots3.put_bucket_acl(Bucket=bucket, GrantRead=grantee)
+
+ # verify that the iam user can eventually access it
+ retry_on('AccessDenied', 10, alts3.list_objects, Bucket=bucket)
+ finally:
+ roots3.delete_bucket(Bucket=bucket)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_acl_grant_canonical_id(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ alts3 = get_iam_alt_root_client(service_name='s3')
+ grantee = 'id=' + get_iam_alt_root_user_id()
+ _test_cross_account_root_bucket_acl(roots3, alts3, grantee)
+
+@pytest.mark.iam_account
+@pytest.mark.iam_cross_account
+def test_cross_account_root_bucket_acl_grant_account_email(iam_root, iam_alt_root):
+ roots3 = get_iam_root_client(service_name='s3')
+ alts3 = get_iam_alt_root_client(service_name='s3')
+ grantee = 'emailAddress=' + get_iam_alt_root_email()
+ _test_cross_account_root_bucket_acl(roots3, alts3, grantee)