From 6e67add3832e46be91ff29817ca2417ccbabcdf9 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 24 Jun 2025 21:02:30 -0400 Subject: [PATCH] s3: test object ownership Signed-off-by: Casey Bodley --- pytest.ini | 1 + s3tests_boto3/functional/test_s3.py | 246 ++++++++++++++++++++++++++++ 2 files changed, 247 insertions(+) diff --git a/pytest.ini b/pytest.ini index 7248af21..9b981c62 100644 --- a/pytest.ini +++ b/pytest.ini @@ -35,6 +35,7 @@ markers = lifecycle_transition list_objects_v2 object_lock + object_ownership role_policy session_policy s3select diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index e5be89b2..aa4609d2 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -19169,3 +19169,249 @@ def test_delete_objects_version_if_match_size(): response = client.delete_objects(Bucket=bucket, Delete={'Objects': [{'Key': key, 'VersionId': version, 'Size': badsize}]}) assert 200 == response['ResponseMetadata']['HTTPStatusCode'] + +def public_bucket_policy(bucket): + return json.dumps({ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": "*", + "Resource": [ + f"arn:aws:s3:::{bucket}", + f"arn:aws:s3:::{bucket}/*" + ] + }] + }) + +def get_object_acl_owner(client, bucket, key): + response = client.get_object_acl(Bucket=bucket, Key=key) + return response['Owner']['ID'], response['Owner']['DisplayName'] + +def get_multipart_acl_owner(client, bucket, key, upload_id): + response = client.list_parts(Bucket=bucket, Key=key, UploadId=upload_id) + return response['Owner']['ID'], response['Owner']['DisplayName'] + +def _test_object_ownership_bucket_owner_enforced(client, bucket, bucket_owner): + # put_object() succeeds without ACL + client.put_object(Bucket=bucket, Key='put-object-no-acl') + assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-no-acl') + # put_object() succeeds with ACL=bucket-owner-full-control + client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control') + assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control') + # put_object() fails with other ACL + e = assert_raises(ClientError, client.put_object, Bucket=bucket, Key='put-object-private', ACL='private') + assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response) + + # create_multipart_upload() succeeds without ACL + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl') + assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId']) + # create_multipart_upload() succeeds with ACL=bucket-owner-full-control + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control') + assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId']) + # create_multipart_upload() fails with other ACL + e = assert_raises(ClientError, client.create_multipart_upload, Bucket=bucket, Key='create-multipart-upload-private', ACL='private') + assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response) + + # copy_object() succeeds without ACL + client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}) + assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-no-acl') + # copy_object() succeeds with ACL=bucket-owner-full-control + client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control') + assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control') + # copy_object() fails with other ACL + e = assert_raises(ClientError, client.copy_object, Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private') + assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response) + + # put_bucket_acl() fails + e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket, ACL='private') + assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response) + # put_object_acl() fails + e = assert_raises(ClientError, client.put_object_acl, Bucket=bucket, Key='put-object-no-acl', ACL='private') + assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response) + +def _test_object_ownership_bucket_owner_preferred(client, bucket, bucket_owner): + # put_object() without ACL owned by client + client.put_object(Bucket=bucket, Key='put-object-no-acl') + assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-no-acl') + # put_object() with ACL=bucket-owner-full-control owned by bucket owner + client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control') + assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control') + # put_object() with other ACL owned by client + client.put_object(Bucket=bucket, Key='put-object-private', ACL='private') + assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-private') + + # create_multipart_upload() without ACL owned by client + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl') + assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId']) + # create_multipart_upload() with ACL=bucket-owner-full-control owned by bucket owner + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control') + assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId']) + # create_multipart_upload() with other ACL owned by client + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-private', ACL='private') + assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-private', response['UploadId']) + + # copy_object() without ACL owned by client + client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}) + assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-no-acl') + # copy_object() with ACL=bucket-owner-full-control owned by bucket owner + client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control') + assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control') + # copy_object() with other ACL owned by client + client.copy_object(Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private') + assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-private') + + # put_bucket_acl() and put_object_acl() succeed + client.put_bucket_acl(Bucket=bucket, ACL='private') + client.put_object_acl(Bucket=bucket, Key='put-object-no-acl', ACL='private') + +def _test_object_ownership_object_writer(client, bucket, bucket_owner): + # put_object() without ACL owned by client + client.put_object(Bucket=bucket, Key='put-object-no-acl') + assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-no-acl') + # put_object() with ACL=bucket-owner-full-control owned by client + client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control') + assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control') + # put_object() with other ACL owned by client + client.put_object(Bucket=bucket, Key='put-object-private', ACL='private') + assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-private') + + # create_multipart_upload() without ACL owned by client + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl') + assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId']) + # create_multipart_upload() with ACL=bucket-owner-full-control owned by client + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control') + assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId']) + # create_multipart_upload() with other ACL owned by client + response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-private', ACL='private') + assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-private', response['UploadId']) + + # copy_object() without ACL owned by client + client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}) + assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-no-acl') + # copy_object() with ACL=bucket-owner-full-control owned by client + client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control') + assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control') + # copy_object() with other ACL owned by client + client.copy_object(Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private') + assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-private') + + # put_bucket_acl() and put_object_acl() succeed + client.put_bucket_acl(Bucket=bucket, ACL='private') + client.put_object_acl(Bucket=bucket, Key='put-object-no-acl', ACL='private') + +def get_bucket_ownership(client, bucket): + response = client.get_bucket_ownership_controls(Bucket=bucket) + assert 1 == len(response['OwnershipControls']['Rules']) + return response['OwnershipControls']['Rules'][0]['ObjectOwnership'] + +@pytest.mark.object_ownership +@pytest.mark.fails_on_aws # aws defaults to BucketOwnerEnforced +def test_create_bucket_no_ownership_controls(): + client = get_client() + bucket = get_new_bucket() + e = assert_raises(ClientError, client.get_bucket_ownership_controls, Bucket=bucket) + assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response) + +@pytest.mark.object_ownership +@pytest.mark.fails_on_dbstore +def test_create_bucket_bucket_owner_enforced(): + client = get_client() + bucket_owner = (get_main_user_id(), get_main_display_name()) + bucket = get_new_bucket_name() + client.create_bucket(Bucket=bucket, ObjectOwnership='BucketOwnerEnforced') + assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket) + # add public bucket policy and test with 'alt' user + client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket)) + _test_object_ownership_bucket_owner_enforced(get_alt_client(), bucket, bucket_owner) + +@pytest.mark.object_ownership +@pytest.mark.fails_on_dbstore +def test_create_bucket_bucket_owner_preferred(): + client = get_client() + bucket_owner = (get_main_user_id(), get_main_display_name()) + bucket = get_new_bucket_name() + client.create_bucket(Bucket=bucket, ObjectOwnership='BucketOwnerPreferred') + assert 'BucketOwnerPreferred' == get_bucket_ownership(client, bucket) + # add public bucket policy and test with 'alt' user + client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket)) + _test_object_ownership_bucket_owner_preferred(get_alt_client(), bucket, bucket_owner) + +@pytest.mark.object_ownership +@pytest.mark.fails_on_dbstore +def test_create_bucket_object_writer(): + client = get_client() + bucket_owner = (get_main_user_id(), get_main_display_name()) + bucket = get_new_bucket_name() + client.create_bucket(Bucket=bucket, ObjectOwnership='ObjectWriter') + assert 'ObjectWriter' == get_bucket_ownership(client, bucket) + # add public bucket policy and test with 'alt' user + client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket)) + _test_object_ownership_object_writer(get_alt_client(), bucket, bucket_owner) + +@pytest.mark.object_ownership +@pytest.mark.fails_on_dbstore +def test_put_bucket_ownership_bucket_owner_enforced(): + client = get_client() + bucket_owner = (get_main_user_id(), get_main_display_name()) + bucket = get_new_bucket_name() + ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerEnforced'}]} + + # expect PutBucketOwnershipControls to fail with public-read ACL + client.create_bucket(Bucket=bucket, ACL='public-read') + e = assert_raises(ClientError, client.put_bucket_ownership_controls, + Bucket=bucket, OwnershipControls=ownership) + status, error_code = _get_status_and_error_code(e.response) + assert status == 400 + assert error_code == 'InvalidBucketAclWithObjectOwnership' + + # expect success with default private ACL + client.put_bucket_acl(Bucket=bucket, ACL='private') + client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership) + assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket) + + # add public bucket policy and test with 'alt' user + client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket)) + _test_object_ownership_bucket_owner_enforced(get_alt_client(), bucket, bucket_owner) + +@pytest.mark.object_ownership +@pytest.mark.fails_on_dbstore +def test_put_bucket_ownership_bucket_owner_preferred(): + client = get_client() + bucket_owner = (get_main_user_id(), get_main_display_name()) + bucket = get_new_bucket(client) + ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerPreferred'}]} + client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership) + assert 'BucketOwnerPreferred' == get_bucket_ownership(client, bucket) + # add public bucket policy and test with 'alt' user + client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket)) + _test_object_ownership_bucket_owner_preferred(get_alt_client(), bucket, bucket_owner) + +@pytest.mark.object_ownership +@pytest.mark.fails_on_dbstore +def test_put_bucket_ownership_object_writer(): + client = get_client() + bucket_owner = (get_main_user_id(), get_main_display_name()) + bucket = get_new_bucket(client) + ownership = {'Rules': [{'ObjectOwnership': 'ObjectWriter'}]} + client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership) + assert 'ObjectWriter' == get_bucket_ownership(client, bucket) + # add public bucket policy and test with 'alt' user + client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket)) + _test_object_ownership_object_writer(get_alt_client(), bucket, bucket_owner) + +@pytest.mark.object_ownership +def test_bucket_create_delete_bucket_ownership(): + client = get_client() + bucket_owner = (get_main_user_id(), get_main_display_name()) + bucket = get_new_bucket(client) + ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerEnforced'}]} + client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership) + assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket) + + client.delete_bucket_ownership_controls(Bucket=bucket) + + e = assert_raises(ClientError, client.get_bucket_ownership_controls, Bucket=bucket) + assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response) + + client.delete_bucket_ownership_controls(Bucket=bucket) -- 2.39.5