get_buckets_list,
get_objects_list,
get_main_kms_keyid,
+ get_secondary_kms_keyid,
nuke_prefixed_buckets,
)
@attr(assertion='empty buckets return no contents')
def test_bucket_list_empty():
bucket = get_new_bucket_resource()
- is_empty = _bucket_is_empty(bucket)
+ is_empty = _bucket_is_empty(bucket)
eq(is_empty, True)
@attr(resource='bucket')
bucket1 = get_new_bucket_resource()
bucket2 = get_new_bucket_resource()
obj = bucket1.put_object(Body='str', Key='asdf')
- is_empty = _bucket_is_empty(bucket2)
+ is_empty = _bucket_is_empty(bucket2)
eq(is_empty, True)
-
+
def _create_objects(bucket=None, bucket_name=None, keys=[]):
"""
Populate a (specified or new) bucket with objects with
response1 = client.list_objects_v2(Bucket=bucket_name)
eq(response1['KeyCount'], 5)
-
-
+
+
@attr(resource='bucket')
@attr(method='get')
@attr(operation='list')
"""
changes ms from datetime1 to 0, compares it to datetime2
"""
- # both times are in datetime format but datetime1 has
+ # both times are in datetime format but datetime1 has
# microseconds and datetime2 does not
datetime1 = datetime1.replace(microsecond=0)
eq(datetime1, datetime2)
@attr(operation='list all objects (anonymous)')
@attr(assertion='succeeds')
def test_bucket_list_objects_anonymous():
- bucket_name = get_new_bucket()
+ bucket_name = get_new_bucket()
client = get_client()
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
@attr(assertion='succeeds')
@attr('list-objects-v2')
def test_bucket_listv2_objects_anonymous():
- bucket_name = get_new_bucket()
+ bucket_name = get_new_bucket()
client = get_client()
client.put_bucket_acl(Bucket=bucket_name, ACL='public-read')
@attr(operation='list all objects (anonymous)')
@attr(assertion='fails')
def test_bucket_list_objects_anonymous_fail():
- bucket_name = get_new_bucket()
+ bucket_name = get_new_bucket()
unauthenticated_client = get_unauthenticated_client()
e = assert_raises(ClientError, unauthenticated_client.list_objects, Bucket=bucket_name)
@attr(assertion='fails')
@attr('list-objects-v2')
def test_bucket_listv2_objects_anonymous_fail():
- bucket_name = get_new_bucket()
+ bucket_name = get_new_bucket()
unauthenticated_client = get_unauthenticated_client()
e = assert_raises(ClientError, unauthenticated_client.list_objects_v2, Bucket=bucket_name)
@attr(operation='non-existant bucket')
@attr(assertion='fails 404')
def test_bucket_notexist():
- bucket_name = get_new_bucket_name()
+ bucket_name = get_new_bucket_name()
client = get_client()
e = assert_raises(ClientError, client.list_objects, Bucket=bucket_name)
@attr(assertion='fails 404')
@attr('list-objects-v2')
def test_bucketv2_notexist():
- bucket_name = get_new_bucket_name()
+ bucket_name = get_new_bucket_name()
client = get_client()
e = assert_raises(ClientError, client.list_objects_v2, Bucket=bucket_name)
@attr(operation='non-existant bucket')
@attr(assertion='fails 404')
def test_bucket_delete_notexist():
- bucket_name = get_new_bucket_name()
+ bucket_name = get_new_bucket_name()
client = get_client()
e = assert_raises(ClientError, client.delete_bucket, Bucket=bucket_name)
http_response = None
def get_http_response(**kwargs):
- global http_response
+ global http_response
http_response = kwargs['http_response'].__dict__
@attr(resource='object')
client = get_client()
response = client.list_objects(Bucket=bucket_name)
eq(len(response['Contents']), 3)
-
+
objs_dict = _make_objs_dict(key_names=key_names)
- response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
+ response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
eq(len(response['Deleted']), 3)
assert 'Errors' not in response
response = client.list_objects(Bucket=bucket_name)
assert 'Contents' not in response
- response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
+ response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
eq(len(response['Deleted']), 3)
assert 'Errors' not in response
response = client.list_objects(Bucket=bucket_name)
client = get_client()
response = client.list_objects_v2(Bucket=bucket_name)
eq(len(response['Contents']), 3)
-
+
objs_dict = _make_objs_dict(key_names=key_names)
- response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
+ response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
eq(len(response['Deleted']), 3)
assert 'Errors' not in response
response = client.list_objects_v2(Bucket=bucket_name)
assert 'Contents' not in response
- response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
+ response = client.delete_objects(Bucket=bucket_name, Delete=objs_dict)
eq(len(response['Deleted']), 3)
assert 'Errors' not in response
response = client.list_objects_v2(Bucket=bucket_name)
client.put_object(Bucket=bucket_name, Key='foo', Body='bar')
response = client.get_object(Bucket=bucket_name, Key='foo')
last_modified = str(response['LastModified'])
-
+
last_modified = last_modified.split('+')[0]
mtime = datetime.datetime.strptime(last_modified, '%Y-%m-%d %H:%M:%S')
client.create_bucket(ACL=bucket_acl, Bucket=bucket_name)
client.put_object(ACL=object_acl, Bucket=bucket_name, Key='foo')
- return bucket_name
+ return bucket_name
def _setup_bucket_acl(bucket_acl=None):
"""
name=name,
)
bucket = get_new_bucket_resource(name=bucket_name)
- is_empty = _bucket_is_empty(bucket)
+ is_empty = _bucket_is_empty(bucket)
eq(is_empty, True)
-
+
# AWS does not enforce all documented bucket restrictions.
# http://docs.amazonwebservices.com/AmazonS3/2006-03-01/dev/index.html?BucketRestrictions.html
@attr('fails_on_aws')
if location_constraint == "":
location_constraint = None
eq(response['LocationConstraint'], location_constraint)
-
+
@attr(resource='bucket')
@attr(method='put')
@attr(operation='re-create by non-owner')
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
eq(response['Owner']['DisplayName'], display_name)
eq(response['Owner']['ID'], user_id)
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
grants = response['Grants']
check_grants(
grants,
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
grants = response['Grants']
check_grants(
grants,
display_name = get_main_display_name()
user_id = get_main_user_id()
-
grants = response['Grants']
check_grants(
grants,
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
grants = response['Grants']
check_grants(
grants,
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
grants = response['Grants']
check_grants(
grants,
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
grants = response['Grants']
check_grants(
grants,
alt_client = get_alt_client()
main_client.create_bucket(Bucket=bucket_name, ACL='public-read-write')
-
+
alt_client.put_object(Bucket=bucket_name, Key='foo', Body='bar')
bucket_acl_response = main_client.get_bucket_acl(Bucket=bucket_name)
alt_client = get_alt_client()
main_client.create_bucket(Bucket=bucket_name, ACL='public-read-write')
-
+
alt_client.put_object(Bucket=bucket_name, Key='foo', Body='bar')
bucket_acl_response = main_client.get_bucket_acl(Bucket=bucket_name)
alt_client = get_alt_client()
main_client.create_bucket(Bucket=bucket_name, ACL='public-read-write')
-
+
main_client.put_object(Bucket=bucket_name, Key='foo', Body='bar')
alt_user_id = get_alt_user_id()
grant = { 'Grants': [{'Grantee': {'ID': alt_user_id, 'Type': 'CanonicalUser' }, 'Permission': 'FULL_CONTROL'}], 'Owner': {'DisplayName': main_display_name, 'ID': main_user_id}}
main_client.put_object_acl(Bucket=bucket_name, Key='foo', AccessControlPolicy=grant)
-
+
grant = { 'Grants': [{'Grantee': {'ID': alt_user_id, 'Type': 'CanonicalUser' }, 'Permission': 'READ_ACP'}], 'Owner': {'DisplayName': main_display_name, 'ID': main_user_id}}
alt_client.put_object_acl(Bucket=bucket_name, Key='foo', AccessControlPolicy=grant)
owned by the main user, not the alt user
A grant is a dictionary in the form of:
{u'Grantee': {u'Type': 'type', u'DisplayName': 'name', u'ID': 'id'}, u'Permission': 'PERM'}
-
+
"""
client = get_client()
main_user_id = get_main_user_id()
alt_client = get_alt_client()
main_client.create_bucket(Bucket=bucket_name, ACL='public-read-write')
-
+
header = {'x-amz-foo': 'bar'}
# lambda to add any header
add_header = (lambda **kwargs: kwargs['params']['headers'].update(header))
def _check_object_acl(permission):
"""
- Sets the permission on an object then checks to see
+ Sets the permission on an object then checks to see
if it was set
"""
bucket_name = get_new_bucket()
client.put_object(Bucket=bucket_name, Key='foo_key', Body='bar')
response = client.get_object_acl(Bucket=bucket_name, Key='foo_key')
-
+
grants = response['Grants']
check_grants(
grants,
client.create_bucket(Bucket=bucket_name)
response = client.get_bucket_acl(Bucket=bucket_name)
-
+
grants = response['Grants']
alt_user_id = get_alt_user_id()
alt_display_name = get_alt_display_name()
# set bucket acl to public-read-write so that teardown can work
alt_client.put_bucket_acl(Bucket=bucket_name, ACL='public-read-write')
-
+
# This test will fail on DH Objects. DHO allows multiple users with one account, which
# would violate the uniqueness requirement of a user's email. As such, DHO users are
client.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy = grant)
response = client.get_bucket_acl(Bucket=bucket_name)
-
+
grants = response['Grants']
check_grants(
grants,
# acled object write fail
check_access_denied(alt_client.put_object, Bucket=bucket_name, Key=key1, Body='barcontent')
- # NOTE: The above put's causes the connection to go bad, therefore the client can't be used
+ # NOTE: The above put's causes the connection to go bad, therefore the client can't be used
# anymore. This can be solved either by:
# 1) putting an empty string ('') in the 'Body' field of those put_object calls
# 2) getting a new client hence the creation of alt_client{2,3} for the tests below
# acled object write fail
check_access_denied(alt_client.put_object, Bucket=bucket_name, Key=key1, Body='barcontent')
- # NOTE: The above put's causes the connection to go bad, therefore the client can't be used
+ # NOTE: The above put's causes the connection to go bad, therefore the client can't be used
# anymore. This can be solved either by:
# 1) putting an empty string ('') in the 'Body' field of those put_object calls
# 2) getting a new client hence the creation of alt_client{2,3} for the tests below
e = assert_raises(ClientError, client.copy, copy_source, bucket_name, 'bar321foo')
status = _get_status(e.response)
eq(status, 404)
-
+
@attr(resource='object')
@attr(method='put')
@attr(operation='copy object to/from versioned bucket')
for start_offset in range(0, size, part_size):
end_offset = min(start_offset + part_size - 1, size - 1)
part_num = i+1
- copy_source_range = 'bytes={start}-{end}'.format(start=start_offset, end=end_offset)
+ copy_source_range = 'bytes={start}-{end}'.format(start=start_offset, end=end_offset)
response = client.upload_part_copy(Bucket=dest_bucket_name, Key=dest_key, CopySource=copy_source, PartNumber=part_num, UploadId=upload_id, CopySourceRange=copy_source_range)
parts.append({'ETag': response['CopyPartResult'][u'ETag'], 'PartNumber': part_num})
i = i+1
upload_id = response['UploadId']
copy_source = {'Bucket': src_bucket_name, 'Key': src_key}
- copy_source_range = 'bytes={start}-{end}'.format(start=0, end=21)
+ copy_source_range = 'bytes={start}-{end}'.format(start=0, end=21)
e = assert_raises(ClientError, client.upload_part_copy,Bucket=src_bucket_name, Key='dest', UploadId=upload_id, CopySource=copy_source, CopySourceRange=copy_source_range, PartNumber=1)
status, error_code = _get_status_and_error_code(e.response)
copy_source = {'Bucket': src_bucket_name, 'Key': src_key}
part_num = 1
- copy_source_range = 'bytes={start}-{end}'.format(start=0, end=9)
+ copy_source_range = 'bytes={start}-{end}'.format(start=0, end=9)
response = client.upload_part_copy(Bucket=dest_bucket_name, Key=dest_key, CopySource=copy_source, PartNumber=part_num, UploadId=upload_id)
response = client.get_object(Bucket=dest_bucket_name, Key=dest_key)
eq(response['ContentLength'], 10)
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
-
+
@attr(resource='object')
@attr(method='put')
@attr(operation='check multipart copies with single small part')
objlen = 10*1024*1024
(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
-
+
@attr(assertion='successful')
def test_multipart_copy_multiple_sizes():
src_key = 'foo'
(upload_id, parts) = _multipart_copy(src_bucket_name, src_key, dest_bucket_name, dest_key, size)
client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
-
+
size = 5*1024*1024+100*1024
(upload_id, parts) = _multipart_copy(src_bucket_name, src_key, dest_bucket_name, dest_key, size)
client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
-
+
size = 5*1024*1024+600*1024
(upload_id, parts) = _multipart_copy(src_bucket_name, src_key, dest_bucket_name, dest_key, size)
client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
-
+
size = 10*1024*1024+100*1024
(upload_id, parts) = _multipart_copy(src_bucket_name, src_key, dest_bucket_name, dest_key, size)
client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
-
+
size = 10*1024*1024+600*1024
(upload_id, parts) = _multipart_copy(src_bucket_name, src_key, dest_bucket_name, dest_key, size)
client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
-
+
size = 10*1024*1024
(upload_id, parts) = _multipart_copy(src_bucket_name, src_key, dest_bucket_name, dest_key, size)
client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
def _do_test_multipart_upload_contents(bucket_name, key, num_parts):
payload=gen_rand_string(5)*1024*1024
client = get_client()
-
+
response = client.create_multipart_upload(Bucket=bucket_name, Key=key)
upload_id = response['UploadId']
num_parts=2
client.put_object(Bucket=bucket_name, Key=key, Body=payload)
-
+
response = client.create_multipart_upload(Bucket=bucket_name, Key=key)
upload_id = response['UploadId']
cors_config ={
'CORSRules': [
- {'AllowedMethods': allowed_methods,
+ {'AllowedMethods': allowed_methods,
'AllowedOrigins': allowed_origins,
},
]
assert r.headers.get('access-control-allow-origin', None) == expect_allow_origin
assert r.headers.get('access-control-allow-methods', None) == expect_allow_methods
-
+
@attr(resource='bucket')
@attr(method='get')
@attr(operation='check cors response when origin header set')
cors_config ={
'CORSRules': [
- {'AllowedMethods': ['GET'],
+ {'AllowedMethods': ['GET'],
'AllowedOrigins': ['*suffix'],
},
- {'AllowedMethods': ['GET'],
+ {'AllowedMethods': ['GET'],
'AllowedOrigins': ['start*end'],
},
- {'AllowedMethods': ['GET'],
+ {'AllowedMethods': ['GET'],
'AllowedOrigins': ['prefix*'],
},
- {'AllowedMethods': ['PUT'],
+ {'AllowedMethods': ['PUT'],
'AllowedOrigins': ['*.put'],
}
]
cors_config ={
'CORSRules': [
- {'AllowedMethods': ['GET'],
+ {'AllowedMethods': ['GET'],
'AllowedOrigins': ['*'],
},
]
cors_config ={
'CORSRules': [
- {'AllowedMethods': ['GET'],
+ {'AllowedMethods': ['GET'],
'AllowedOrigins': ['*'],
'ExposeHeaders': ['x-amz-meta-header1'],
},
# clear parts
parts[:] = []
-
+
# ok, now for the actual test
fp_b = FakeWriteFile(file_size, 'B')
def upload_fp_b():
version_ids.append(version_id)
if check_versions:
- check_obj_versions(client, bucket_name, key, version_ids, contents)
+ check_obj_versions(client, bucket_name, key, version_ids, contents)
return (version_ids, contents)
version_ids.pop(i)
contents.pop(i)
i += 1
-
+
# add new content with 'null' version id to the end
contents.append(content)
version_ids.append('null')
return (version_ids, contents)
-
+
@attr(resource='object')
@attr(method='create')
version_ids.append(version['VersionId'])
version_ids.reverse()
- check_obj_versions(client, bucket_name, key, version_ids, contents)
+ check_obj_versions(client, bucket_name, key, version_ids, contents)
for idx in xrange(num_versions):
remove_obj_version(client, bucket_name, key, version_ids, contents, idx)
response = client.get_object(Bucket=bucket_name, Key=new_key_name)
body = _get_body(response)
eq(body, contents[i])
-
+
another_bucket_name = get_new_bucket()
for i in xrange(num_versions):
response = client.get_object(Bucket=another_bucket_name, Key=new_key_name)
body = _get_body(response)
eq(body, contents[i])
-
+
new_key_name = 'new_key'
copy_source = {'Bucket': bucket_name, 'Key': key}
client.copy_object(Bucket=another_bucket_name, CopySource=copy_source, Key=new_key_name)
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
eq(response['Owner']['DisplayName'], display_name)
eq(response['Owner']['ID'], user_id)
display_name = get_main_display_name()
user_id = get_main_user_id()
-
+
eq(response['Owner']['DisplayName'], display_name)
eq(response['Owner']['ID'], user_id)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidArgument')
-
+
@attr(resource='bucket')
@attr(method='put')
@attr(operation='same id')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'InvalidArgument')
-
+
@attr(resource='bucket')
@attr(method='put')
@attr(operation='invalid status in lifecycle rule')
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
eq(error_code, 'MalformedXML')
-
+
rules=[{'ID': 'rule1', 'Expiration': {'Days': 2}, 'Prefix': 'test1/', 'Status':'disabled'}]
lifecycle = {'Rules': rules}
}
resend_parts = []
- (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
+ (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
part_size=5*1024*1024, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
}
resend_parts = []
- e = assert_raises(ClientError, _multipart_upload_enc, client=client, bucket_name=bucket_name,
+ e = assert_raises(ClientError, _multipart_upload_enc, client=client, bucket_name=bucket_name,
key=key, size=objlen, part_size=5*1024*1024, init_headers=init_headers, part_headers=part_headers, metadata=metadata, resend_parts=resend_parts)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
}
resend_parts = []
- e = assert_raises(ClientError, _multipart_upload_enc, client=client, bucket_name=bucket_name,
+ e = assert_raises(ClientError, _multipart_upload_enc, client=client, bucket_name=bucket_name,
key=key, size=objlen, part_size=5*1024*1024, init_headers=init_headers, part_headers=part_headers, metadata=metadata, resend_parts=resend_parts)
status, error_code = _get_status_and_error_code(e.response)
eq(status, 400)
}
resend_parts = []
- (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
+ (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
part_size=5*1024*1024, init_headers=put_headers, part_headers=put_headers, metadata=metadata, resend_parts=resend_parts)
lf = (lambda **kwargs: kwargs['params']['headers'].update(put_headers))
eq(body, data)
-@attr(resource='object')
-@attr(method='put')
-@attr(operation='Test SSE-KMS encrypted transfer 1 byte')
-@attr(assertion='success')
-@attr('encryption')
-def test_sse_kms_transfer_1b():
- _test_sse_kms_customer_write(1)
-@attr(resource='object')
-@attr(method='put')
-@attr(operation='Test SSE-KMS encrypted transfer 1KB')
-@attr(assertion='success')
-@attr('encryption')
-def test_sse_kms_transfer_1kb():
- _test_sse_kms_customer_write(1024)
-
-
-@attr(resource='object')
-@attr(method='put')
-@attr(operation='Test SSE-KMS encrypted transfer 1MB')
-@attr(assertion='success')
-@attr('encryption')
-def test_sse_kms_transfer_1MB():
- _test_sse_kms_customer_write(1024*1024)
-@attr(resource='object')
-@attr(method='put')
-@attr(operation='Test SSE-KMS encrypted transfer 13 bytes')
-@attr(assertion='success')
-@attr('encryption')
-def test_sse_kms_transfer_13b():
- _test_sse_kms_customer_write(13)
-
@attr(resource='object')
@attr(method='head')
@attr(operation='Test SSE-KMS encrypted does perform head properly')
@attr(assertion='success')
@attr('encryption')
def test_sse_kms_method_head():
+ kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket()
client = get_client()
sse_kms_client_headers = {
'x-amz-server-side-encryption': 'aws:kms',
- 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid
}
data = 'A'*1000
key = 'testobj'
response = client.head_object(Bucket=bucket_name, Key=key)
eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'aws:kms')
- eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], 'testkey-1')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], kms_keyid)
lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_kms_client_headers))
client.meta.events.register('before-call.s3.HeadObject', lf)
@attr(assertion='operation success')
@attr('encryption')
def test_sse_kms_present():
+ kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket()
client = get_client()
sse_kms_client_headers = {
'x-amz-server-side-encryption': 'aws:kms',
- 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid
}
data = 'A'*100
key = 'testobj'
@attr(assertion='successful')
@attr('encryption')
def test_sse_kms_multipart_upload():
+ kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket()
client = get_client()
key = "multipart_enc"
metadata = {'foo': 'bar'}
enc_headers = {
'x-amz-server-side-encryption': 'aws:kms',
- 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2',
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid,
'Content-Type': content_type
}
resend_parts = []
- (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
+ (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
part_size=5*1024*1024, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
@attr(assertion='successful')
@attr('encryption')
def test_sse_kms_multipart_invalid_chunks_1():
+ kms_keyid = get_main_kms_keyid()
+ kms_keyid2 = get_secondary_kms_keyid()
bucket_name = get_new_bucket()
client = get_client()
key = "multipart_enc"
metadata = {'foo': 'bar'}
init_headers = {
'x-amz-server-side-encryption': 'aws:kms',
- 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid,
'Content-Type': content_type
}
part_headers = {
'x-amz-server-side-encryption': 'aws:kms',
- 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid2
}
resend_parts = []
- _multipart_upload_enc(client, bucket_name, key, objlen, part_size=5*1024*1024,
- init_headers=init_headers, part_headers=part_headers, metadata=metadata,
+ _multipart_upload_enc(client, bucket_name, key, objlen, part_size=5*1024*1024,
+ init_headers=init_headers, part_headers=part_headers, metadata=metadata,
resend_parts=resend_parts)
@attr(assertion='successful')
@attr('encryption')
def test_sse_kms_multipart_invalid_chunks_2():
+ kms_keyid = get_main_kms_keyid()
bucket_name = get_new_bucket()
client = get_client()
key = "multipart_enc"
metadata = {'foo': 'bar'}
init_headers = {
'x-amz-server-side-encryption': 'aws:kms',
- 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid,
'Content-Type': content_type
}
part_headers = {
}
resend_parts = []
- _multipart_upload_enc(client, bucket_name, key, objlen, part_size=5*1024*1024,
- init_headers=init_headers, part_headers=part_headers, metadata=metadata,
+ _multipart_upload_enc(client, bucket_name, key, objlen, part_size=5*1024*1024,
+ init_headers=init_headers, part_headers=part_headers, metadata=metadata,
resend_parts=resend_parts)
+
@attr(resource='object')
@attr(method='post')
@attr(operation='authenticated KMS browser based upload via POST request')
@attr(operation='Test SSE-KMS encrypted transfer 1 byte')
@attr(assertion='success')
@attr('encryption')
-def test_sse_kms_barb_transfer_1b():
+def test_sse_kms_transfer_1b():
kms_keyid = get_main_kms_keyid()
if kms_keyid is None:
raise SkipTest
@attr(operation='Test SSE-KMS encrypted transfer 1KB')
@attr(assertion='success')
@attr('encryption')
-def test_sse_kms_barb_transfer_1kb():
+def test_sse_kms_transfer_1kb():
kms_keyid = get_main_kms_keyid()
if kms_keyid is None:
raise SkipTest
@attr(operation='Test SSE-KMS encrypted transfer 1MB')
@attr(assertion='success')
@attr('encryption')
-def test_sse_kms_barb_transfer_1MB():
+def test_sse_kms_transfer_1MB():
kms_keyid = get_main_kms_keyid()
if kms_keyid is None:
raise SkipTest
@attr(operation='Test SSE-KMS encrypted transfer 13 bytes')
@attr(assertion='success')
@attr('encryption')
-def test_sse_kms_barb_transfer_13b():
+def test_sse_kms_transfer_13b():
kms_keyid = get_main_kms_keyid()
if kms_keyid is None:
raise SkipTest
response = client.get_object(Bucket=bucket_name, Key=key)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
- # the 'referer' headers need to be removed for this one
+ # the 'referer' headers need to be removed for this one
#response = client.get_object(Bucket=bucket_name, Key=key)
#eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
signature = base64.b64encode(hmac.new(aws_secret_access_key, policy, sha).digest())
- payload = OrderedDict([
+ payload = OrderedDict([
("key" , "foo.txt"),
("AWSAccessKeyId" , aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
response = client.list_object_versions(Bucket=bucket_name)
versions = response['Versions']
for version in versions:
- eq(version['VersionId'], version_id)
+ eq(version['VersionId'], version_id)
# for versioning-default-bucket, no version-id should return.
response = client.list_object_versions(Bucket=bucket_name)
versions = response['Versions']
for version in versions:
- eq(version['VersionId'], version_id)
+ eq(version['VersionId'], version_id)
# for versioning-default-bucket, no version-id should return.
bucket_name = get_new_bucket()
response = alt_client.get_object(Bucket=bucket_name2, Key='new_foo')
body = _get_body(response)
eq(body, 'public/foo')
-
+
copy_source = {'Bucket': bucket_name, 'Key': 'public/bar'}
alt_client.copy_object(Bucket=bucket_name2, CopySource=copy_source, Key='new_foo2')
del kwargs['params']['headers']["x-amz-metadata-directive"]
alt_client.meta.events.register('before-call.s3.CopyObject', remove_header)
-
+
copy_source = {'Bucket': src_bucket_name, 'Key': 'public/bar'}
check_access_denied(alt_client.copy_object, Bucket=bucket_name, CopySource=copy_source, Key='new_foo2', Metadata={"foo": "bar"})
eq(response['ObjectLockMode'], retention['Mode'])
eq(response['ObjectLockRetainUntilDate'], retention['RetainUntilDate'])
eq(response['ObjectLockLegalHoldStatus'], legal_hold['Status'])
-
+
client.put_object_legal_hold(Bucket=bucket_name, Key=key, LegalHold={'Status':'OFF'})
client.delete_object(Bucket=bucket_name, Key=key, VersionId=response['VersionId'], BypassGovernanceRetention=True)