]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
more tests for GetObjectAttributes
authorCasey Bodley <cbodley@redhat.com>
Thu, 17 Oct 2024 22:26:07 +0000 (18:26 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 7 Jan 2025 20:24:46 +0000 (15:24 -0500)
* multipart upload without checksums
* multipart upload with a single part
* pagination of multipart parts
* non-multipart upload with/without checksum
* versioned object, current and non-current
* sse-c encrypted object

Signed-off-by: Casey Bodley <cbodley@redhat.com>
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
s3tests_boto3/functional/test_s3.py

index 1bb81a10e12b341ef3e31a5cb34b592dc1436010..5e8cad1ef0abfe422013a59f005a8f6ef391b507 100644 (file)
@@ -15367,9 +15367,16 @@ def test_bucket_logging_single_prefix():
         found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
     assert found
 
+def check_parts_count(parts, expected):
+    # AWS docs disagree on the name of this element
+    if 'TotalPartsCount' in parts:
+        assert parts['TotalPartsCount'] == expected
+    else:
+        assert parts['PartsCount'] == expected
+
 @pytest.mark.checksum
 @pytest.mark.fails_on_dbstore
-def test_get_object_attributes():
+def test_get_multipart_checksum_object_attributes():
     bucket_name = get_new_bucket()
     client = get_client()
 
@@ -15398,7 +15405,7 @@ def test_get_object_attributes():
     nparts = len(parts)
     assert response['ObjectSize'] == objlen
     assert response['Checksum']['ChecksumSHA256'] == upload_checksum
-    assert response['ObjectParts']['TotalPartsCount'] == nparts
+    check_parts_count(response['ObjectParts'], nparts)
 
     # check the parts
     partno = 1
@@ -15410,3 +15417,255 @@ def test_get_object_attributes():
             assert obj_part['Size'] == objlen - ((nparts-1) * (5 * 1024 * 1024))
         assert obj_part['ChecksumSHA256'] == checksums[partno - 1]
         partno += 1
+
+@pytest.mark.fails_on_dbstore
+def test_get_multipart_object_attributes():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    key = "multipart"
+    part_size = 5*1024*1024
+    objlen = 30*1024*1024
+
+    (upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
+    response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
+                                                UploadId=upload_id,
+                                                MultipartUpload={'Parts': parts})
+    etag = response['ETag'].strip('"')
+    assert len(etag)
+
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key, \
+                                            ObjectAttributes=request_attributes)
+
+    # check overall object
+    nparts = len(parts)
+    assert response['ObjectSize'] == objlen
+    check_parts_count(response['ObjectParts'], len(parts))
+    assert response['ObjectParts']['IsTruncated'] == False
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+
+    # check the parts
+    partno = 1
+    for obj_part in response['ObjectParts']['Parts']:
+        assert obj_part['PartNumber'] == partno
+        assert obj_part['Size'] == part_size
+        assert 'ChecksumSHA256' not in obj_part
+        partno += 1
+
+@pytest.mark.fails_on_dbstore
+def test_get_paginated_multipart_object_attributes():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    key = "multipart"
+    part_size = 5*1024*1024
+    objlen = 30*1024*1024
+
+    (upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
+    response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
+                                                UploadId=upload_id,
+                                                MultipartUpload={'Parts': parts})
+    etag = response['ETag'].strip('"')
+    assert len(etag)
+
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+                                            ObjectAttributes=request_attributes,
+                                            MaxParts=1, PartNumberMarker=3)
+
+    # check overall object
+    assert response['ObjectSize'] == objlen
+    check_parts_count(response['ObjectParts'], len(parts))
+    assert response['ObjectParts']['MaxParts'] == 1
+    assert response['ObjectParts']['PartNumberMarker'] == 3
+    assert response['ObjectParts']['IsTruncated'] == True
+    assert response['ObjectParts']['NextPartNumberMarker'] == 4
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+
+    # check the part
+    assert len(response['ObjectParts']['Parts']) == 1
+    obj_part = response['ObjectParts']['Parts'][0]
+    assert obj_part['PartNumber'] == 4
+    assert obj_part['Size'] == part_size
+    assert 'ChecksumSHA256' not in obj_part
+
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+                                            ObjectAttributes=request_attributes,
+                                            MaxParts=10, PartNumberMarker=4)
+
+    # check overall object
+    assert response['ObjectSize'] == objlen
+    check_parts_count(response['ObjectParts'], len(parts))
+    assert response['ObjectParts']['MaxParts'] == 10
+    assert response['ObjectParts']['IsTruncated'] == False
+    assert response['ObjectParts']['PartNumberMarker'] == 4
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+
+    # check the parts
+    assert len(response['ObjectParts']['Parts']) == 2
+    partno = 5
+    for obj_part in response['ObjectParts']['Parts']:
+        assert obj_part['PartNumber'] == partno
+        assert obj_part['Size'] == part_size
+        assert 'ChecksumSHA256' not in obj_part
+        partno += 1
+
+@pytest.mark.fails_on_dbstore
+def test_get_single_multipart_object_attributes():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    key = "multipart"
+    part_size = 5*1024*1024
+    part_sizes = [part_size] # just one part
+    part_count = len(part_sizes)
+    total_size = sum(part_sizes)
+
+    (upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size)
+    response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
+                                                UploadId=upload_id,
+                                                MultipartUpload={'Parts': parts})
+    etag = response['ETag'].strip('"')
+    assert len(etag)
+
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+                                            ObjectAttributes=request_attributes)
+
+    assert response['ObjectSize'] == total_size
+    check_parts_count(response['ObjectParts'], 1)
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+
+    assert len(response['ObjectParts']['Parts']) == 1
+    obj_part = response['ObjectParts']['Parts'][0]
+    assert obj_part['PartNumber'] == 1
+    assert obj_part['Size'] == part_size
+    assert 'ChecksumSHA256' not in obj_part
+
+def test_get_checksum_object_attributes():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    key = "myobj"
+    size = 1024
+    body = FakeWriteFile(size, 'A')
+    sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
+    response = client.put_object(Bucket=bucket_name, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=sha256sum)
+    assert sha256sum == response['ChecksumSHA256']
+    etag = response['ETag'].strip('"')
+    assert len(etag)
+
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+                                            ObjectAttributes=request_attributes)
+
+    assert response['ObjectSize'] == size
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+    assert response['Checksum']['ChecksumSHA256'] == sha256sum
+    assert 'ObjectParts' not in response
+
+def test_get_versioned_object_attributes():
+    bucket_name = get_new_bucket()
+    check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
+    client = get_client()
+    key = "obj"
+    objlen = 3
+
+    response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
+    etag = response['ETag'].strip('"')
+    assert len(etag)
+    version = response['VersionId']
+    assert len(version)
+
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+                                            ObjectAttributes=request_attributes)
+
+    assert 'DeleteMarker' not in response
+    assert response['VersionId'] == version
+
+    assert response['ObjectSize'] == 3
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+    assert 'ObjectParts' not in response
+
+    # write a new current version
+    client.put_object(Bucket=bucket_name, Key=key, Body='foo')
+
+    # ask for the original version again
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key, VersionId=version,
+                                            ObjectAttributes=request_attributes)
+
+    assert 'DeleteMarker' not in response
+    assert response['VersionId'] == version
+
+    assert response['ObjectSize'] == 3
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+    assert 'ObjectParts' not in response
+
+@pytest.mark.encryption
+def test_get_sse_c_encrypted_object_attributes():
+    bucket_name = get_new_bucket()
+    client = get_client()
+    key = 'obj'
+    objlen = 1000
+    data = 'A'*objlen
+    sse_args = {
+        'SSECustomerAlgorithm': 'AES256',
+        'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+        'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw=='
+    }
+    attrs = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+
+    response = client.put_object(Bucket=bucket_name, Key=key, Body=data, **sse_args)
+    etag = response['ETag'].strip('"')
+    assert len(etag)
+
+    # GetObjectAttributes fails without sse-c headers
+    e = assert_raises(ClientError, client.get_object_attributes,
+                      Bucket=bucket_name, Key=key, ObjectAttributes=attrs)
+    status, error_code = _get_status_and_error_code(e.response)
+    assert status == 400
+
+    # and succeeds sse-c headers
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+                                            ObjectAttributes=attrs, **sse_args)
+
+    assert 'DeleteMarker' not in response
+    assert 'VersionId' not in response
+
+    assert response['ObjectSize'] == objlen
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+    assert 'ObjectParts' not in response
+
+def test_get_object_attributes():
+    bucket_name = get_new_bucket()
+    client = get_client()
+    key = "obj"
+    objlen = 3
+
+    response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
+    etag = response['ETag'].strip('"')
+    assert len(etag)
+
+    request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+    response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+                                            ObjectAttributes=request_attributes)
+
+    assert 'DeleteMarker' not in response
+    assert 'VersionId' not in response
+
+    assert response['ObjectSize'] == 3
+    assert response['ETag'] == etag
+    assert response['StorageClass'] == 'STANDARD'
+    assert 'ObjectParts' not in response