add tests for public access configuration
authorAbhishek Lekshmanan <abhishek@suse.com>
Tue, 8 Oct 2019 08:46:38 +0000 (10:46 +0200)
committerAbhishek Lekshmanan <abhishek@suse.com>
Wed, 22 Jan 2020 16:01:41 +0000 (17:01 +0100)
Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
s3tests_boto3/functional/test_s3.py

index cedd386c4e2db435753a24e9ab3cbe98da6a0821..85751bc15fc6cfdf282c49faba840bccdbdf1676 100644 (file)
@@ -12540,3 +12540,109 @@ def test_get_nonpublicpolicy_deny_bucket_policy_status():
     client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
     resp = client.get_bucket_policy_status(Bucket=bucket_name)
     eq(resp['PolicyStatus']['IsPublic'],True)
+
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='get public access block on a bucket')
+@attr(assertion='succeeds')
+@attr('policy_status')
+def test_get_default_public_block():
+    #client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    resp = client.get_public_access_block(Bucket=bucket_name)
+    eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], False)
+    eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], False)
+    eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], False)
+    eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], False)
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='get public access block on a bucket')
+@attr(assertion='succeeds')
+@attr('policy_status')
+def test_put_public_block():
+    #client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    access_conf = {'BlockPublicAcls': True,
+                   'IgnorePublicAcls': True,
+                   'BlockPublicPolicy': True,
+                   'RestrictPublicBuckets': False}
+
+    client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
+
+    resp = client.get_public_access_block(Bucket=bucket_name)
+    eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
+    eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
+    eq(resp['PublicAccessBlockConfiguration']['IgnorePublicAcls'], access_conf['IgnorePublicAcls'])
+    eq(resp['PublicAccessBlockConfiguration']['RestrictPublicBuckets'], access_conf['RestrictPublicBuckets'])
+
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='get public access block on a bucket')
+@attr(assertion='succeeds')
+@attr('policy_status')
+def test_block_public_put_bucket_acls():
+    #client = get_svc_client(svc='s3control', client_config=Config(s3={'addressing_style': 'path'}))
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    access_conf = {'BlockPublicAcls': True,
+                   'IgnorePublicAcls': False,
+                   'BlockPublicPolicy': True,
+                   'RestrictPublicBuckets': False}
+
+    client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
+
+    resp = client.get_public_access_block(Bucket=bucket_name)
+    eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
+    eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
+
+    e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read')
+    status, error_code = _get_status_and_error_code(e.response)
+    eq(status, 403)
+
+    e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='public-read-write')
+    status, error_code = _get_status_and_error_code(e.response)
+    eq(status, 403)
+
+    e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket_name,ACL='authenticated-read')
+    status, error_code = _get_status_and_error_code(e.response)
+    eq(status, 403)
+
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='block public acls on canned acls')
+@attr(assertion='succeeds')
+@attr('policy_status')
+def test_block_public_object_canned_acls():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    access_conf = {'BlockPublicAcls': True,
+                   'IgnorePublicAcls': False,
+                   'BlockPublicPolicy': False,
+                   'RestrictPublicBuckets': False}
+
+    client.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration=access_conf)
+
+    # resp = client.get_public_access_block(Bucket=bucket_name)
+    # eq(resp['PublicAccessBlockConfiguration']['BlockPublicAcls'], access_conf['BlockPublicAcls'])
+    # eq(resp['PublicAccessBlockConfiguration']['BlockPublicPolicy'], access_conf['BlockPublicPolicy'])
+
+    e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo1', Body='bar', ACL='public-read')
+    status, error_code = _get_status_and_error_code(e.response)
+    eq(status, 403)
+
+    e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo2', Body='bar', ACL='public-read')
+    status, error_code = _get_status_and_error_code(e.response)
+    eq(status, 403)
+
+    e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key='foo3', Body='bar', ACL='authenticated-read')
+    status, error_code = _get_status_and_error_code(e.response)
+    eq(status, 403)