response = client.delete_objects(Bucket=bucket, Delete={'Objects': [{'Key': key, 'VersionId': version, 'Size': badsize}]})
assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
+def public_bucket_policy(bucket):
+ return json.dumps({
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "*",
+ "Resource": [
+ f"arn:aws:s3:::{bucket}",
+ f"arn:aws:s3:::{bucket}/*"
+ ]
+ }]
+ })
+
+def get_object_acl_owner(client, bucket, key):
+ response = client.get_object_acl(Bucket=bucket, Key=key)
+ return response['Owner']['ID'], response['Owner']['DisplayName']
+
+def get_multipart_acl_owner(client, bucket, key, upload_id):
+ response = client.list_parts(Bucket=bucket, Key=key, UploadId=upload_id)
+ return response['Owner']['ID'], response['Owner']['DisplayName']
+
+def _test_object_ownership_bucket_owner_enforced(client, bucket, bucket_owner):
+ # put_object() succeeds without ACL
+ client.put_object(Bucket=bucket, Key='put-object-no-acl')
+ assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-no-acl')
+ # put_object() succeeds with ACL=bucket-owner-full-control
+ client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control')
+ assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control')
+ # put_object() fails with other ACL
+ e = assert_raises(ClientError, client.put_object, Bucket=bucket, Key='put-object-private', ACL='private')
+ assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+ # create_multipart_upload() succeeds without ACL
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl')
+ assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId'])
+ # create_multipart_upload() succeeds with ACL=bucket-owner-full-control
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control')
+ assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId'])
+ # create_multipart_upload() fails with other ACL
+ e = assert_raises(ClientError, client.create_multipart_upload, Bucket=bucket, Key='create-multipart-upload-private', ACL='private')
+ assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+ # copy_object() succeeds without ACL
+ client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'})
+ assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-no-acl')
+ # copy_object() succeeds with ACL=bucket-owner-full-control
+ client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control')
+ assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control')
+ # copy_object() fails with other ACL
+ e = assert_raises(ClientError, client.copy_object, Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private')
+ assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+ # put_bucket_acl() fails
+ e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket, ACL='private')
+ assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+ # put_object_acl() fails
+ e = assert_raises(ClientError, client.put_object_acl, Bucket=bucket, Key='put-object-no-acl', ACL='private')
+ assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+def _test_object_ownership_bucket_owner_preferred(client, bucket, bucket_owner):
+ # put_object() without ACL owned by client
+ client.put_object(Bucket=bucket, Key='put-object-no-acl')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-no-acl')
+ # put_object() with ACL=bucket-owner-full-control owned by bucket owner
+ client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control')
+ assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control')
+ # put_object() with other ACL owned by client
+ client.put_object(Bucket=bucket, Key='put-object-private', ACL='private')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-private')
+
+ # create_multipart_upload() without ACL owned by client
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl')
+ assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId'])
+ # create_multipart_upload() with ACL=bucket-owner-full-control owned by bucket owner
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control')
+ assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId'])
+ # create_multipart_upload() with other ACL owned by client
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-private', ACL='private')
+ assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-private', response['UploadId'])
+
+ # copy_object() without ACL owned by client
+ client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'})
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-no-acl')
+ # copy_object() with ACL=bucket-owner-full-control owned by bucket owner
+ client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control')
+ assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control')
+ # copy_object() with other ACL owned by client
+ client.copy_object(Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-private')
+
+ # put_bucket_acl() and put_object_acl() succeed
+ client.put_bucket_acl(Bucket=bucket, ACL='private')
+ client.put_object_acl(Bucket=bucket, Key='put-object-no-acl', ACL='private')
+
+def _test_object_ownership_object_writer(client, bucket, bucket_owner):
+ # put_object() without ACL owned by client
+ client.put_object(Bucket=bucket, Key='put-object-no-acl')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-no-acl')
+ # put_object() with ACL=bucket-owner-full-control owned by client
+ client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control')
+ # put_object() with other ACL owned by client
+ client.put_object(Bucket=bucket, Key='put-object-private', ACL='private')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-private')
+
+ # create_multipart_upload() without ACL owned by client
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl')
+ assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId'])
+ # create_multipart_upload() with ACL=bucket-owner-full-control owned by client
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control')
+ assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId'])
+ # create_multipart_upload() with other ACL owned by client
+ response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-private', ACL='private')
+ assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-private', response['UploadId'])
+
+ # copy_object() without ACL owned by client
+ client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'})
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-no-acl')
+ # copy_object() with ACL=bucket-owner-full-control owned by client
+ client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control')
+ # copy_object() with other ACL owned by client
+ client.copy_object(Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private')
+ assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-private')
+
+ # put_bucket_acl() and put_object_acl() succeed
+ client.put_bucket_acl(Bucket=bucket, ACL='private')
+ client.put_object_acl(Bucket=bucket, Key='put-object-no-acl', ACL='private')
+
+def get_bucket_ownership(client, bucket):
+ response = client.get_bucket_ownership_controls(Bucket=bucket)
+ assert 1 == len(response['OwnershipControls']['Rules'])
+ return response['OwnershipControls']['Rules'][0]['ObjectOwnership']
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_aws # aws defaults to BucketOwnerEnforced
+def test_create_bucket_no_ownership_controls():
+ client = get_client()
+ bucket = get_new_bucket()
+ e = assert_raises(ClientError, client.get_bucket_ownership_controls, Bucket=bucket)
+ assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_create_bucket_bucket_owner_enforced():
+ client = get_client()
+ bucket_owner = (get_main_user_id(), get_main_display_name())
+ bucket = get_new_bucket_name()
+ client.create_bucket(Bucket=bucket, ObjectOwnership='BucketOwnerEnforced')
+ assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket)
+ # add public bucket policy and test with 'alt' user
+ client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+ _test_object_ownership_bucket_owner_enforced(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_create_bucket_bucket_owner_preferred():
+ client = get_client()
+ bucket_owner = (get_main_user_id(), get_main_display_name())
+ bucket = get_new_bucket_name()
+ client.create_bucket(Bucket=bucket, ObjectOwnership='BucketOwnerPreferred')
+ assert 'BucketOwnerPreferred' == get_bucket_ownership(client, bucket)
+ # add public bucket policy and test with 'alt' user
+ client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+ _test_object_ownership_bucket_owner_preferred(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_create_bucket_object_writer():
+ client = get_client()
+ bucket_owner = (get_main_user_id(), get_main_display_name())
+ bucket = get_new_bucket_name()
+ client.create_bucket(Bucket=bucket, ObjectOwnership='ObjectWriter')
+ assert 'ObjectWriter' == get_bucket_ownership(client, bucket)
+ # add public bucket policy and test with 'alt' user
+ client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+ _test_object_ownership_object_writer(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_put_bucket_ownership_bucket_owner_enforced():
+ client = get_client()
+ bucket_owner = (get_main_user_id(), get_main_display_name())
+ bucket = get_new_bucket_name()
+ ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerEnforced'}]}
+
+ # expect PutBucketOwnershipControls to fail with public-read ACL
+ client.create_bucket(Bucket=bucket, ACL='public-read')
+ e = assert_raises(ClientError, client.put_bucket_ownership_controls,
+ Bucket=bucket, OwnershipControls=ownership)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 400
+ assert error_code == 'InvalidBucketAclWithObjectOwnership'
+
+ # expect success with default private ACL
+ client.put_bucket_acl(Bucket=bucket, ACL='private')
+ client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+ assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket)
+
+ # add public bucket policy and test with 'alt' user
+ client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+ _test_object_ownership_bucket_owner_enforced(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_put_bucket_ownership_bucket_owner_preferred():
+ client = get_client()
+ bucket_owner = (get_main_user_id(), get_main_display_name())
+ bucket = get_new_bucket(client)
+ ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerPreferred'}]}
+ client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+ assert 'BucketOwnerPreferred' == get_bucket_ownership(client, bucket)
+ # add public bucket policy and test with 'alt' user
+ client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+ _test_object_ownership_bucket_owner_preferred(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_put_bucket_ownership_object_writer():
+ client = get_client()
+ bucket_owner = (get_main_user_id(), get_main_display_name())
+ bucket = get_new_bucket(client)
+ ownership = {'Rules': [{'ObjectOwnership': 'ObjectWriter'}]}
+ client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+ assert 'ObjectWriter' == get_bucket_ownership(client, bucket)
+ # add public bucket policy and test with 'alt' user
+ client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+ _test_object_ownership_object_writer(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+def test_bucket_create_delete_bucket_ownership():
+ client = get_client()
+ bucket_owner = (get_main_user_id(), get_main_display_name())
+ bucket = get_new_bucket(client)
+ ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerEnforced'}]}
+ client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+ assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket)
+
+ client.delete_bucket_ownership_controls(Bucket=bucket)
+
+ e = assert_raises(ClientError, client.get_bucket_ownership_controls, Bucket=bucket)
+ assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response)
+
+ client.delete_bucket_ownership_controls(Bucket=bucket)