import ssl
from collections import namedtuple
from collections import defaultdict
-from io import StringIO
from io import BytesIO
-from email.header import decode_header
-
from .utils import assert_raises
from .utils import generate_random
from .utils import _get_status_and_error_code
from .policy import Policy, Statement, make_json_policy
+from .iam import iam_root
+
from . import (
configfile,
setup_teardown,
get_alt_user_id,
get_alt_email,
get_alt_client,
+ get_iam_root_client,
get_tenant_client,
get_v2_tenant_client,
get_tenant_iam_client,
get_lc_debug_interval,
get_restore_debug_interval,
get_read_through_days,
+ create_iam_user_s3client,
)
response = alt_client.list_objects_v2(Bucket=bucket_name)
assert len(response['Contents']) == 1
+@pytest.mark.bucket_policy
+@pytest.mark.iam_account
+@pytest.mark.iam_user
+def test_bucket_policy_deny_self_denied_policy(iam_root):
+ root_client = get_iam_root_client(service_name="s3")
+ bucket_name = get_new_bucket(root_client)
+
+ resource1 = "arn:aws:s3:::" + bucket_name
+ resource2 = "arn:aws:s3:::" + bucket_name + "/*"
+ policy_document = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Deny",
+ "Principal": "*",
+ "Action": [
+ "s3:PutBucketPolicy",
+ "s3:GetBucketPolicy",
+ "s3:DeleteBucketPolicy",
+ ],
+ "Resource": [
+ "{}".format(resource1),
+ "{}".format(resource2)
+ ]
+ }]
+ })
+
+ root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+ # non-root account should not be able to get, put or delete bucket policy
+ root_alt_client = create_iam_user_s3client(iam_root)
+ check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
+ check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
+ check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
+
+ # root account should be able to get, put or delete bucket policy
+ response = root_client.get_bucket_policy(Bucket=bucket_name)
+ assert response['Policy'] == policy_document
+ root_client.delete_bucket_policy(Bucket=bucket_name)
+ root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+@pytest.mark.bucket_policy
+@pytest.mark.iam_account
+@pytest.mark.iam_user
+def test_bucket_policy_deny_self_denied_policy_confirm_header(iam_root):
+ root_client = get_iam_root_client(service_name="s3")
+ bucket_name = get_new_bucket(root_client)
+
+ resource1 = "arn:aws:s3:::" + bucket_name
+ resource2 = "arn:aws:s3:::" + bucket_name + "/*"
+ policy_document = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Deny",
+ "Principal": "*",
+ "Action": [
+ "s3:PutBucketPolicy",
+ "s3:GetBucketPolicy",
+ "s3:DeleteBucketPolicy",
+ ],
+ "Resource": [
+ "{}".format(resource1),
+ "{}".format(resource2)
+ ]
+ }]
+ })
+
+ root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document, ConfirmRemoveSelfBucketAccess=True)
+
+ # non-root account should not be able to get, put or delete bucket policy
+ root_alt_client = create_iam_user_s3client(iam_root)
+ check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name)
+ check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name)
+ check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
+
+ # root account should not be able to get, put or delete bucket policy
+ check_access_denied(root_client.get_bucket_policy, Bucket=bucket_name)
+ check_access_denied(root_client.delete_bucket_policy, Bucket=bucket_name)
+ check_access_denied(root_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document)
+
@pytest.mark.bucket_policy
def test_bucket_policy_acl():
bucket_name = get_new_bucket()