intersect = set(unordered_keys_out).intersection(unordered_keys_out2)
assert 0 == len(intersect)
+ #pdb.set_trace()
# verify that unordered used with delimiter results in error
e = assert_raises(ClientError,
client.list_objects, Bucket=bucket_name, Delimiter="/")
return (upload_id, s, parts)
+def _multipart_upload_checksum(bucket_name, key, size, part_size=5*1024*1024, client=None, content_type=None, metadata=None, resend_parts=[]):
+ """
+ generate a multi-part upload for a random file of specifed size,
+ if requested, generate a list of the parts
+ return the upload descriptor
+ """
+ if client == None:
+ client = get_client()
+
+
+ if content_type == None and metadata == None:
+ response = client.create_multipart_upload(Bucket=bucket_name, Key=key, ChecksumAlgorithm='SHA256')
+ else:
+ response = client.create_multipart_upload(Bucket=bucket_name, Key=key, Metadata=metadata, ContentType=content_type,
+ ChecksumAlgorithm='SHA256')
+
+ upload_id = response['UploadId']
+ s = ''
+ parts = []
+ part_checksums = []
+ for i, part in enumerate(generate_random(size, part_size)):
+ # part_num is necessary because PartNumber for upload_part and in parts must start at 1 and i starts at 0
+ part_num = i+1
+ s += part
+ response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part,
+ ChecksumAlgorithm='SHA256')
+
+ parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': part_num})
+
+ armored_part_cksum = base64.b64encode(hashlib.sha256(part.encode('utf-8')).digest())
+ part_checksums.append(armored_part_cksum.decode())
+
+ if i in resend_parts:
+ client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num, Body=part,
+ ChecksumAlgorithm='SHA256')
+
+ return (upload_id, s, parts, part_checksums)
+
@pytest.mark.fails_on_dbstore
def test_object_copy_versioning_multipart_upload():
bucket_name = get_new_bucket()
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400
-
def _has_bucket_logging_extension():
src_bucket_name = get_new_bucket_name()
src_bucket = get_new_bucket_resource(name=src_bucket_name)
src_keys = _get_keys(response)
found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
assert found
+
+@pytest.mark.checksum
+@pytest.mark.fails_on_dbstore
+def test_get_object_attributes():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ #pdb.set_trace()
+ key = "multipart_checksum"
+ key_metadata = {'foo': 'bar'}
+ content_type = 'text/plain'
+ objlen = 64 * 1024 * 1024
+
+ (upload_id, data, parts, checksums) = \
+ _multipart_upload_checksum(bucket_name=bucket_name, key=key, size=objlen,
+ content_type=content_type, metadata=key_metadata)
+ response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
+ UploadId=upload_id,
+ MultipartUpload={'Parts': parts})
+ upload_checksum = response['ChecksumSHA256']
+
+ response = client.get_object(Bucket=bucket_name, Key=key)
+
+ request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
+
+ response = client.get_object_attributes(Bucket=bucket_name, Key=key, \
+ ObjectAttributes=request_attributes)
+
+ # check overall object
+ nparts = len(parts)
+ assert response['ObjectSize'] == objlen
+ assert response['Checksum']['ChecksumSHA256'] == upload_checksum
+ assert response['ObjectParts']['TotalPartsCount'] == nparts
+
+ # check the parts
+ partno = 1
+ for obj_part in response['ObjectParts']['Parts']:
+ assert obj_part['PartNumber'] == partno
+ if partno < len(parts):
+ assert obj_part['Size'] == 5 * 1024 * 1024
+ else:
+ assert obj_part['Size'] == objlen - ((nparts-1) * (5 * 1024 * 1024))
+ assert obj_part['ChecksumSHA256'] == checksums[partno - 1]
+ partno += 1