found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
assert found
+def check_parts_count(parts, expected):
+ # AWS docs disagree on the name of this element
+ if 'TotalPartsCount' in parts:
+ assert parts['TotalPartsCount'] == expected
+ else:
+ assert parts['PartsCount'] == expected
+
@pytest.mark.checksum
@pytest.mark.fails_on_dbstore
-def test_get_object_attributes():
+def test_get_multipart_checksum_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
nparts = len(parts)
assert response['ObjectSize'] == objlen
assert response['Checksum']['ChecksumSHA256'] == upload_checksum
- assert response['ObjectParts']['TotalPartsCount'] == nparts
+ check_parts_count(response['ObjectParts'], nparts)
# check the parts
partno = 1
assert obj_part['Size'] == objlen - ((nparts-1) * (5 * 1024 * 1024))
assert obj_part['ChecksumSHA256'] == checksums[partno - 1]
partno += 1
+
+@pytest.mark.fails_on_dbstore
+def test_get_multipart_object_attributes():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ key = "multipart"
+ part_size = 5*1024*1024
+ objlen = 30*1024*1024
+
+ (upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
+ response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
+ UploadId=upload_id,
+ MultipartUpload={'Parts': parts})
+ etag = response['ETag'].strip('"')
+ assert len(etag)
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key, \
+ ObjectAttributes=request_attributes)
+
+ # check overall object
+ nparts = len(parts)
+ assert response['ObjectSize'] == objlen
+ check_parts_count(response['ObjectParts'], len(parts))
+ assert response['ObjectParts']['IsTruncated'] == False
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+
+ # check the parts
+ partno = 1
+ for obj_part in response['ObjectParts']['Parts']:
+ assert obj_part['PartNumber'] == partno
+ assert obj_part['Size'] == part_size
+ assert 'ChecksumSHA256' not in obj_part
+ partno += 1
+
+@pytest.mark.fails_on_dbstore
+def test_get_paginated_multipart_object_attributes():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ key = "multipart"
+ part_size = 5*1024*1024
+ objlen = 30*1024*1024
+
+ (upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
+ response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
+ UploadId=upload_id,
+ MultipartUpload={'Parts': parts})
+ etag = response['ETag'].strip('"')
+ assert len(etag)
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+ ObjectAttributes=request_attributes,
+ MaxParts=1, PartNumberMarker=3)
+
+ # check overall object
+ assert response['ObjectSize'] == objlen
+ check_parts_count(response['ObjectParts'], len(parts))
+ assert response['ObjectParts']['MaxParts'] == 1
+ assert response['ObjectParts']['PartNumberMarker'] == 3
+ assert response['ObjectParts']['IsTruncated'] == True
+ assert response['ObjectParts']['NextPartNumberMarker'] == 4
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+
+ # check the part
+ assert len(response['ObjectParts']['Parts']) == 1
+ obj_part = response['ObjectParts']['Parts'][0]
+ assert obj_part['PartNumber'] == 4
+ assert obj_part['Size'] == part_size
+ assert 'ChecksumSHA256' not in obj_part
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+ ObjectAttributes=request_attributes,
+ MaxParts=10, PartNumberMarker=4)
+
+ # check overall object
+ assert response['ObjectSize'] == objlen
+ check_parts_count(response['ObjectParts'], len(parts))
+ assert response['ObjectParts']['MaxParts'] == 10
+ assert response['ObjectParts']['IsTruncated'] == False
+ assert response['ObjectParts']['PartNumberMarker'] == 4
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+
+ # check the parts
+ assert len(response['ObjectParts']['Parts']) == 2
+ partno = 5
+ for obj_part in response['ObjectParts']['Parts']:
+ assert obj_part['PartNumber'] == partno
+ assert obj_part['Size'] == part_size
+ assert 'ChecksumSHA256' not in obj_part
+ partno += 1
+
+@pytest.mark.fails_on_dbstore
+def test_get_single_multipart_object_attributes():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ key = "multipart"
+ part_size = 5*1024*1024
+ part_sizes = [part_size] # just one part
+ part_count = len(part_sizes)
+ total_size = sum(part_sizes)
+
+ (upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size)
+ response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
+ UploadId=upload_id,
+ MultipartUpload={'Parts': parts})
+ etag = response['ETag'].strip('"')
+ assert len(etag)
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+ ObjectAttributes=request_attributes)
+
+ assert response['ObjectSize'] == total_size
+ check_parts_count(response['ObjectParts'], 1)
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+
+ assert len(response['ObjectParts']['Parts']) == 1
+ obj_part = response['ObjectParts']['Parts'][0]
+ assert obj_part['PartNumber'] == 1
+ assert obj_part['Size'] == part_size
+ assert 'ChecksumSHA256' not in obj_part
+
+def test_get_checksum_object_attributes():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ key = "myobj"
+ size = 1024
+ body = FakeWriteFile(size, 'A')
+ sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
+ response = client.put_object(Bucket=bucket_name, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=sha256sum)
+ assert sha256sum == response['ChecksumSHA256']
+ etag = response['ETag'].strip('"')
+ assert len(etag)
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+ ObjectAttributes=request_attributes)
+
+ assert response['ObjectSize'] == size
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+ assert response['Checksum']['ChecksumSHA256'] == sha256sum
+ assert 'ObjectParts' not in response
+
+def test_get_versioned_object_attributes():
+ bucket_name = get_new_bucket()
+ check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
+ client = get_client()
+ key = "obj"
+ objlen = 3
+
+ response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
+ etag = response['ETag'].strip('"')
+ assert len(etag)
+ version = response['VersionId']
+ assert len(version)
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+ ObjectAttributes=request_attributes)
+
+ assert 'DeleteMarker' not in response
+ assert response['VersionId'] == version
+
+ assert response['ObjectSize'] == 3
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+ assert 'ObjectParts' not in response
+
+ # write a new current version
+ client.put_object(Bucket=bucket_name, Key=key, Body='foo')
+
+ # ask for the original version again
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key, VersionId=version,
+ ObjectAttributes=request_attributes)
+
+ assert 'DeleteMarker' not in response
+ assert response['VersionId'] == version
+
+ assert response['ObjectSize'] == 3
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+ assert 'ObjectParts' not in response
+
+@pytest.mark.encryption
+def test_get_sse_c_encrypted_object_attributes():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ key = 'obj'
+ objlen = 1000
+ data = 'A'*objlen
+ sse_args = {
+ 'SSECustomerAlgorithm': 'AES256',
+ 'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw=='
+ }
+ attrs = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+
+ response = client.put_object(Bucket=bucket_name, Key=key, Body=data, **sse_args)
+ etag = response['ETag'].strip('"')
+ assert len(etag)
+
+ # GetObjectAttributes fails without sse-c headers
+ e = assert_raises(ClientError, client.get_object_attributes,
+ Bucket=bucket_name, Key=key, ObjectAttributes=attrs)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 400
+
+ # and succeeds sse-c headers
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+ ObjectAttributes=attrs, **sse_args)
+
+ assert 'DeleteMarker' not in response
+ assert 'VersionId' not in response
+
+ assert response['ObjectSize'] == objlen
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+ assert 'ObjectParts' not in response
+
+def test_get_object_attributes():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ key = "obj"
+ objlen = 3
+
+ response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
+ etag = response['ETag'].strip('"')
+ assert len(etag)
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key,
+ ObjectAttributes=request_attributes)
+
+ assert 'DeleteMarker' not in response
+ assert 'VersionId' not in response
+
+ assert response['ObjectSize'] == 3
+ assert response['ETag'] == etag
+ assert response['StorageClass'] == 'STANDARD'
+ assert 'ObjectParts' not in response