assert status == 400
assert error_code == 'InvalidPart'
+@pytest.mark.fails_on_dbstore
+def test_multipart_single_get_part():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ key = "mymultipart"
+
+ part_size = 5*1024*1024
+ part_sizes = [part_size] # just one part
+ part_count = len(part_sizes)
+ total_size = sum(part_sizes)
+
+ (upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size)
+
+ # request part before complete
+ e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=1)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 404
+ assert error_code == 'NoSuchKey'
+
+ client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ assert len(parts) == part_count
+
+ for part, size in zip(parts, part_sizes):
+ response = client.head_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber'])
+ assert response['PartsCount'] == part_count
+ assert response['ETag'] == '"{}"'.format(part['ETag'])
+
+ response = client.get_object(Bucket=bucket_name, Key=key, PartNumber=part['PartNumber'])
+ assert response['PartsCount'] == part_count
+ assert response['ETag'] == '"{}"'.format(part['ETag'])
+ assert response['ContentLength'] == size
+ # compare contents
+ for chunk in response['Body'].iter_chunks():
+ assert chunk.decode() == data[0:len(chunk)]
+ data = data[len(chunk):]
+
+ # request PartNumber out of range
+ e = assert_raises(ClientError, client.get_object, Bucket=bucket_name, Key=key, PartNumber=5)
+ status, error_code = _get_status_and_error_code(e.response)
+ assert status == 400
+ assert error_code == 'InvalidPart'
+
@pytest.mark.fails_on_dbstore
def test_non_multipart_get_part():
bucket_name = get_new_bucket()