body = _get_body(response)
assert body == data[ofs:end+1]
-@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_multipart_upload():
bucket_name = get_new_bucket()
# check extra client.complete_multipart_upload
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
- response = client.head_bucket(Bucket=bucket_name)
- rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', objlen))
- assert rgw_bytes_used == objlen
-
- rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 1))
- assert rgw_object_count == 1
+ response = client.list_objects_v2(Bucket=bucket_name, Prefix=key)
+ assert len(response['Contents']) == 1
+ assert response['Contents'][0]['Size'] == objlen
response = client.get_object(Bucket=bucket_name, Key=key)
assert response['ContentType'] == content_type
(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen)
client.abort_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id)
- response = client.head_bucket(Bucket=bucket_name)
- rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', 0))
- assert rgw_bytes_used == 0
-
- rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 0))
- assert rgw_object_count == 0
+ response = client.list_objects_v2(Bucket=bucket_name, Prefix=key)
+ assert 'Contents' not in response
def test_abort_multipart_upload_not_found():
bucket_name = get_new_bucket()
assert body == data[ofs:end+1]
@pytest.mark.encryption
-@pytest.mark.fails_on_aws
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_upload():
bucket_name = get_new_bucket()
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
- response = client.head_bucket(Bucket=bucket_name)
- rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 1))
- assert rgw_object_count == 1
- rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', objlen))
- assert rgw_bytes_used == objlen
+ response = client.list_objects_v2(Bucket=bucket_name, Prefix=key)
+ assert len(response['Contents']) == 1
+ assert response['Contents'][0]['Size'] == objlen
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
client.meta.events.register('before-call.s3.GetObject', lf)
_check_content_using_range_enc(client, bucket_name, key, data, size, partlen + i, enc_headers=enc_headers)
@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_unaligned_multipart_upload():
bucket_name = get_new_bucket()
client = get_client()
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
- response = client.head_bucket(Bucket=bucket_name)
- rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-object-count'])
- assert rgw_object_count == 1
- rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-bytes-used'])
- assert rgw_bytes_used == objlen
+ response = client.list_objects_v2(Bucket=bucket_name, Prefix=key)
+ assert len(response['Contents']) == 1
+ assert response['Contents'][0]['Size'] == objlen
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
client.meta.events.register('before-call.s3.GetObject', lf)
assert status == 400
@pytest.mark.encryption
-@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_multipart_bad_download():
bucket_name = get_new_bucket()
client = get_client()
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
- response = client.head_bucket(Bucket=bucket_name)
- rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 1))
- assert rgw_object_count == 1
- rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', objlen))
- assert rgw_bytes_used == objlen
+ response = client.list_objects_v2(Bucket=bucket_name, Prefix=key)
+ assert len(response['Contents']) == 1
+ assert response['Contents'][0]['Size'] == objlen
lf = (lambda **kwargs: kwargs['params']['headers'].update(put_headers))
client.meta.events.register('before-call.s3.GetObject', lf)
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
- response = client.head_bucket(Bucket=bucket_name)
- rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 1))
- assert rgw_object_count == 1
- rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', objlen))
- assert rgw_bytes_used == objlen
+ response = client.list_objects_v2(Bucket=bucket_name, Prefix=key)
+ assert len(response['Contents']) == 1
+ assert response['Contents'][0]['Size'] == objlen
lf = (lambda **kwargs: kwargs['params']['headers'].update(part_headers))
client.meta.events.register('before-call.s3.UploadPart', lf)
client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
- response = client.head_bucket(Bucket=bucket_name)
- rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 1))
- assert rgw_object_count == 1
- rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', objlen))
- assert rgw_bytes_used == objlen
+ response = client.list_objects_v2(Bucket=bucket_name, Prefix=key)
+ assert len(response['Contents']) == 1
+ assert response['Contents'][0]['Size'] == objlen
lf = (lambda **kwargs: kwargs['params']['headers'].update(part_headers))
client.meta.events.register('before-call.s3.UploadPart', lf)