]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
s3: test object ownership 660/head
authorCasey Bodley <cbodley@redhat.com>
Wed, 25 Jun 2025 01:02:30 +0000 (21:02 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 19 Sep 2025 13:34:47 +0000 (09:34 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
pytest.ini
s3tests_boto3/functional/test_s3.py

index 7248af21e715557840a5e6e4b065686167506b02..9b981c62863fb4e3afb8ed8b9fa612903e808556 100644 (file)
@@ -35,6 +35,7 @@ markers =
     lifecycle_transition
     list_objects_v2
     object_lock
+    object_ownership
     role_policy
     session_policy
     s3select
index e5be89b242fdc874af221b583a31f0d91171e0fb..aa4609d28dae968dc393b0372030b9709af35a9d 100644 (file)
@@ -19169,3 +19169,249 @@ def test_delete_objects_version_if_match_size():
 
     response = client.delete_objects(Bucket=bucket, Delete={'Objects': [{'Key': key, 'VersionId': version, 'Size': badsize}]})
     assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
+def public_bucket_policy(bucket):
+    return json.dumps({
+        "Version": "2012-10-17",
+        "Statement": [{
+            "Effect": "Allow",
+            "Principal": {"AWS": "*"},
+            "Action": "*",
+            "Resource": [
+                f"arn:aws:s3:::{bucket}",
+                f"arn:aws:s3:::{bucket}/*"
+            ]
+        }]
+    })
+
+def get_object_acl_owner(client, bucket, key):
+    response = client.get_object_acl(Bucket=bucket, Key=key)
+    return response['Owner']['ID'], response['Owner']['DisplayName']
+
+def get_multipart_acl_owner(client, bucket, key, upload_id):
+    response = client.list_parts(Bucket=bucket, Key=key, UploadId=upload_id)
+    return response['Owner']['ID'], response['Owner']['DisplayName']
+
+def _test_object_ownership_bucket_owner_enforced(client, bucket, bucket_owner):
+    # put_object() succeeds without ACL
+    client.put_object(Bucket=bucket, Key='put-object-no-acl')
+    assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-no-acl')
+    # put_object() succeeds with ACL=bucket-owner-full-control
+    client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control')
+    assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control')
+    # put_object() fails with other ACL
+    e = assert_raises(ClientError, client.put_object, Bucket=bucket, Key='put-object-private', ACL='private')
+    assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+    # create_multipart_upload() succeeds without ACL
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl')
+    assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId'])
+    # create_multipart_upload() succeeds with ACL=bucket-owner-full-control
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control')
+    assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId'])
+    # create_multipart_upload() fails with other ACL
+    e = assert_raises(ClientError, client.create_multipart_upload, Bucket=bucket, Key='create-multipart-upload-private', ACL='private')
+    assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+    # copy_object() succeeds without ACL
+    client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'})
+    assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-no-acl')
+    # copy_object() succeeds with ACL=bucket-owner-full-control
+    client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control')
+    assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control')
+    # copy_object() fails with other ACL
+    e = assert_raises(ClientError, client.copy_object, Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private')
+    assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+    # put_bucket_acl() fails
+    e = assert_raises(ClientError, client.put_bucket_acl, Bucket=bucket, ACL='private')
+    assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+    # put_object_acl() fails
+    e = assert_raises(ClientError, client.put_object_acl, Bucket=bucket, Key='put-object-no-acl', ACL='private')
+    assert (400, 'AccessControlListNotSupported') == _get_status_and_error_code(e.response)
+
+def _test_object_ownership_bucket_owner_preferred(client, bucket, bucket_owner):
+    # put_object() without ACL owned by client
+    client.put_object(Bucket=bucket, Key='put-object-no-acl')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-no-acl')
+    # put_object() with ACL=bucket-owner-full-control owned by bucket owner
+    client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control')
+    assert bucket_owner == get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control')
+    # put_object() with other ACL owned by client
+    client.put_object(Bucket=bucket, Key='put-object-private', ACL='private')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-private')
+
+    # create_multipart_upload() without ACL owned by client
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl')
+    assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId'])
+    # create_multipart_upload() with ACL=bucket-owner-full-control owned by bucket owner
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control')
+    assert bucket_owner == get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId'])
+    # create_multipart_upload() with other ACL owned by client
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-private', ACL='private')
+    assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-private', response['UploadId'])
+
+    # copy_object() without ACL owned by client
+    client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'})
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-no-acl')
+    # copy_object() with ACL=bucket-owner-full-control owned by bucket owner
+    client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control')
+    assert bucket_owner == get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control')
+    # copy_object() with other ACL owned by client
+    client.copy_object(Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-private')
+
+    # put_bucket_acl() and put_object_acl() succeed
+    client.put_bucket_acl(Bucket=bucket, ACL='private')
+    client.put_object_acl(Bucket=bucket, Key='put-object-no-acl', ACL='private')
+
+def _test_object_ownership_object_writer(client, bucket, bucket_owner):
+    # put_object() without ACL owned by client
+    client.put_object(Bucket=bucket, Key='put-object-no-acl')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-no-acl')
+    # put_object() with ACL=bucket-owner-full-control owned by client
+    client.put_object(Bucket=bucket, Key='put-object-bucket-owner-full-control', ACL='bucket-owner-full-control')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-bucket-owner-full-control')
+    # put_object() with other ACL owned by client
+    client.put_object(Bucket=bucket, Key='put-object-private', ACL='private')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'put-object-private')
+
+    # create_multipart_upload() without ACL owned by client
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-no-acl')
+    assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-no-acl', response['UploadId'])
+    # create_multipart_upload() with ACL=bucket-owner-full-control owned by client
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-bucket-owner-full-control', ACL='bucket-owner-full-control')
+    assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-bucket-owner-full-control', response['UploadId'])
+    # create_multipart_upload() with other ACL owned by client
+    response = client.create_multipart_upload(Bucket=bucket, Key='create-multipart-upload-private', ACL='private')
+    assert bucket_owner != get_multipart_acl_owner(client, bucket, 'create-multipart-upload-private', response['UploadId'])
+
+    # copy_object() without ACL owned by client
+    client.copy_object(Bucket=bucket, Key='copy-object-no-acl', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'})
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-no-acl')
+    # copy_object() with ACL=bucket-owner-full-control owned by client
+    client.copy_object(Bucket=bucket, Key='copy-object-bucket-owner-full-control', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='bucket-owner-full-control')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-bucket-owner-full-control')
+    # copy_object() with other ACL owned by client
+    client.copy_object(Bucket=bucket, Key='copy-object-private', CopySource={'Bucket': bucket, 'Key': 'put-object-no-acl'}, ACL='private')
+    assert bucket_owner != get_object_acl_owner(client, bucket, 'copy-object-private')
+
+    # put_bucket_acl() and put_object_acl() succeed
+    client.put_bucket_acl(Bucket=bucket, ACL='private')
+    client.put_object_acl(Bucket=bucket, Key='put-object-no-acl', ACL='private')
+
+def get_bucket_ownership(client, bucket):
+    response = client.get_bucket_ownership_controls(Bucket=bucket)
+    assert 1 == len(response['OwnershipControls']['Rules'])
+    return response['OwnershipControls']['Rules'][0]['ObjectOwnership']
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_aws # aws defaults to BucketOwnerEnforced
+def test_create_bucket_no_ownership_controls():
+    client = get_client()
+    bucket = get_new_bucket()
+    e = assert_raises(ClientError, client.get_bucket_ownership_controls, Bucket=bucket)
+    assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_create_bucket_bucket_owner_enforced():
+    client = get_client()
+    bucket_owner = (get_main_user_id(), get_main_display_name())
+    bucket = get_new_bucket_name()
+    client.create_bucket(Bucket=bucket, ObjectOwnership='BucketOwnerEnforced')
+    assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket)
+    # add public bucket policy and test with 'alt' user
+    client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+    _test_object_ownership_bucket_owner_enforced(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_create_bucket_bucket_owner_preferred():
+    client = get_client()
+    bucket_owner = (get_main_user_id(), get_main_display_name())
+    bucket = get_new_bucket_name()
+    client.create_bucket(Bucket=bucket, ObjectOwnership='BucketOwnerPreferred')
+    assert 'BucketOwnerPreferred' == get_bucket_ownership(client, bucket)
+    # add public bucket policy and test with 'alt' user
+    client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+    _test_object_ownership_bucket_owner_preferred(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_create_bucket_object_writer():
+    client = get_client()
+    bucket_owner = (get_main_user_id(), get_main_display_name())
+    bucket = get_new_bucket_name()
+    client.create_bucket(Bucket=bucket, ObjectOwnership='ObjectWriter')
+    assert 'ObjectWriter' == get_bucket_ownership(client, bucket)
+    # add public bucket policy and test with 'alt' user
+    client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+    _test_object_ownership_object_writer(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_put_bucket_ownership_bucket_owner_enforced():
+    client = get_client()
+    bucket_owner = (get_main_user_id(), get_main_display_name())
+    bucket = get_new_bucket_name()
+    ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerEnforced'}]}
+
+    # expect PutBucketOwnershipControls to fail with public-read ACL
+    client.create_bucket(Bucket=bucket, ACL='public-read')
+    e = assert_raises(ClientError, client.put_bucket_ownership_controls,
+                      Bucket=bucket, OwnershipControls=ownership)
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 400
+    assert error_code == 'InvalidBucketAclWithObjectOwnership'
+
+    # expect success with default private ACL
+    client.put_bucket_acl(Bucket=bucket, ACL='private')
+    client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+    assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket)
+
+    # add public bucket policy and test with 'alt' user
+    client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+    _test_object_ownership_bucket_owner_enforced(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_put_bucket_ownership_bucket_owner_preferred():
+    client = get_client()
+    bucket_owner = (get_main_user_id(), get_main_display_name())
+    bucket = get_new_bucket(client)
+    ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerPreferred'}]}
+    client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+    assert 'BucketOwnerPreferred' == get_bucket_ownership(client, bucket)
+    # add public bucket policy and test with 'alt' user
+    client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+    _test_object_ownership_bucket_owner_preferred(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+@pytest.mark.fails_on_dbstore
+def test_put_bucket_ownership_object_writer():
+    client = get_client()
+    bucket_owner = (get_main_user_id(), get_main_display_name())
+    bucket = get_new_bucket(client)
+    ownership = {'Rules': [{'ObjectOwnership': 'ObjectWriter'}]}
+    client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+    assert 'ObjectWriter' == get_bucket_ownership(client, bucket)
+    # add public bucket policy and test with 'alt' user
+    client.put_bucket_policy(Bucket=bucket, Policy=public_bucket_policy(bucket))
+    _test_object_ownership_object_writer(get_alt_client(), bucket, bucket_owner)
+
+@pytest.mark.object_ownership
+def test_bucket_create_delete_bucket_ownership():
+    client = get_client()
+    bucket_owner = (get_main_user_id(), get_main_display_name())
+    bucket = get_new_bucket(client)
+    ownership = {'Rules': [{'ObjectOwnership': 'BucketOwnerEnforced'}]}
+    client.put_bucket_ownership_controls(Bucket=bucket, OwnershipControls=ownership)
+    assert 'BucketOwnerEnforced' == get_bucket_ownership(client, bucket)
+
+    client.delete_bucket_ownership_controls(Bucket=bucket)
+
+    e = assert_raises(ClientError, client.get_bucket_ownership_controls, Bucket=bucket)
+    assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response)
+
+    client.delete_bucket_ownership_controls(Bucket=bucket)