_check_content_using_range_enc(client, bucket_name, key, data, 1000000, enc_headers=enc_headers)
_check_content_using_range_enc(client, bucket_name, key, data, 10000000, enc_headers=enc_headers)
+@pytest.mark.encryption
+def test_encryption_sse_c_unaligned_multipart_upload():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ key = "multipart_enc"
+ content_type = 'text/plain'
+ objlen = 30 * 1024 * 1024
+ partlen = 1 + 5 * 1024 * 1024 # not a multiple of the 4k encryption block size
+ metadata = {'foo': 'bar'}
+ enc_headers = {
+ 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
+ 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
+ 'Content-Type': content_type
+ }
+ resend_parts = []
+
+ (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
+ part_size=partlen, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
+ client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
+ client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ response = client.head_bucket(Bucket=bucket_name)
+ rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-object-count'])
+ assert rgw_object_count == 1
+ rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders']['x-rgw-bytes-used'])
+ assert rgw_bytes_used == objlen
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
+ client.meta.events.register('before-call.s3.GetObject', lf)
+ response = client.get_object(Bucket=bucket_name, Key=key)
+
+ assert response['Metadata'] == metadata
+ assert response['ResponseMetadata']['HTTPHeaders']['content-type'] == content_type
+
+ body = _get_body(response)
+ assert body == data
+ size = response['ContentLength']
+ assert len(body) == size
+
@pytest.mark.encryption
# TODO: remove this fails_on_rgw when I fix it
@pytest.mark.fails_on_rgw