client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+def test_block_public_restrict_public_buckets():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ # remove any existing public access block configuration
+ resp = client.delete_public_access_block(Bucket=bucket_name)
+ assert resp['ResponseMetadata']['HTTPStatusCode'] == 204
+
+ # upload an object
+ response = client.put_object(Bucket=bucket_name, Key='foo', Body='bar')
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ # upload a bucket policy that allows public access
+ resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
+ policy_document = make_json_policy("s3:GetObject",
+ resource)
+ resp = client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+ assert resp['ResponseMetadata']['HTTPStatusCode'] == 204
+
+ # check if the object is accessible publicly
+ unauthenticated_client = get_unauthenticated_client()
+ response = unauthenticated_client.get_object(Bucket=bucket_name, Key='foo')
+ assert _get_body(response) == 'bar'
+
+ # put a public access block configuration that restricts public buckets
+ access_conf = {'BlockPublicAcls': False,
+ 'IgnorePublicAcls': False,
+ 'BlockPublicPolicy': False,
+ 'RestrictPublicBuckets': True}
+ resp = client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
+ assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ # check if the object is no longer accessible publicly
+ check_access_denied(unauthenticated_client.get_object, Bucket=bucket_name, Key='foo')
+
+ # check if the object is still accessible by the owner
+ response = client.get_object(Bucket=bucket_name, Key='foo')
+ assert _get_body(response) == 'bar'
+
+
def test_ignore_public_acls():
bucket_name = get_new_bucket()
client = get_client()