eq(obj['Owner']['ID'],key_data['ID'])
_compare_dates(obj['LastModified'],key_data['LastModified'])
-# amazon is eventually consistent, retry a bit if failed
-def check_configure_versioning_retry(bucket_name, status, expected_string):
- client = get_client()
-
- response = client.put_bucket_versioning(Bucket=bucket_name, VersioningConfiguration={'MFADelete': 'Disabled','Status': status})
-
- read_status = None
-
- for i in range(5):
- try:
- response = client.get_bucket_versioning(Bucket=bucket_name)
- read_status = response['Status']
- except KeyError:
- read_status = None
-
- if (expected_string == read_status):
- break
-
- time.sleep(1)
-
- eq(expected_string, read_status)
-
@attr(resource='object')
@attr(method='head')
t.append(thr)
return t
-def _do_wait_completion(t):
- for thr in t:
- thr.join()
-
@attr(resource='object')
@attr(method='put')
@attr(operation='concurrent creation of objects, concurrent removal')
}
]
}''' % bucket_name
- boto3.set_stream_logger(name='botocore')
+ # boto3.set_stream_logger(name='botocore')
client.put_bucket_policy(Bucket=bucket_name, Policy=policy)
request_headers={'referer': 'http://www.example.com/'}
@attr(resource='object')
@attr(method='put')
-@attr(operation='Deny put obj requests without encryption')
+@attr(operation='Deny put obj specifying both sse-c and sse-s3')
+@attr(assertion='success')
+@attr('encryption')
+def test_put_obj_enc_conflict_c_s3():
+ bucket_name = get_new_bucket()
+ client = get_v2_client()
+
+ # boto3.set_stream_logger(name='botocore')
+
+ key1_str ='testobj'
+
+ sse_client_headers = {
+ 'x-amz-server-side-encryption' : 'AES256',
+ 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
+ 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
+ }
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_client_headers))
+ client.meta.events.register('before-call.s3.PutObject', lf)
+ e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key=key1_str)
+ status, error_code = _get_status_and_error_code(e.response)
+ eq(status, 400)
+ eq(error_code, 'InvalidArgument')
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Deny put obj specifying both sse-c and sse-kms')
+@attr(assertion='success')
+@attr('encryption')
+def test_put_obj_enc_conflict_c_kms():
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ kms_keyid = 'fool-me-once'
+ bucket_name = get_new_bucket()
+ client = get_v2_client()
+
+ # boto3.set_stream_logger(name='botocore')
+
+ key1_str ='testobj'
+
+ sse_client_headers = {
+ 'x-amz-server-side-encryption' : 'aws:kms',
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid,
+ 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
+ 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
+ }
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_client_headers))
+ client.meta.events.register('before-call.s3.PutObject', lf)
+ e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key=key1_str)
+ status, error_code = _get_status_and_error_code(e.response)
+ eq(status, 400)
+ eq(error_code, 'InvalidArgument')
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Deny put obj specifying sse-s3 with kms key id')
+@attr(assertion='success')
+@attr('encryption')
+def test_put_obj_enc_conflict_s3_kms():
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ kms_keyid = 'fool-me-once'
+ bucket_name = get_new_bucket()
+ client = get_v2_client()
+
+ # boto3.set_stream_logger(name='botocore')
+
+ key1_str ='testobj'
+
+ sse_client_headers = {
+ 'x-amz-server-side-encryption' : 'AES256',
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid
+ }
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_client_headers))
+ client.meta.events.register('before-call.s3.PutObject', lf)
+ e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key=key1_str)
+ status, error_code = _get_status_and_error_code(e.response)
+ eq(status, 400)
+ eq(error_code, 'InvalidArgument')
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Deny put obj specifying invalid algorithm' )
+@attr(assertion='success')
+@attr('encryption')
+def test_put_obj_enc_conflict_bad_enc_kms():
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ kms_keyid = 'fool-me-once'
+ bucket_name = get_new_bucket()
+ client = get_v2_client()
+
+ # boto3.set_stream_logger(name='botocore')
+
+ key1_str ='testobj'
+
+ sse_client_headers = {
+ 'x-amz-server-side-encryption' : 'aes:kms', # aes != aws
+ }
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_client_headers))
+ client.meta.events.register('before-call.s3.PutObject', lf)
+ e = assert_raises(ClientError, client.put_object, Bucket=bucket_name, Key=key1_str)
+ status, error_code = _get_status_and_error_code(e.response)
+ eq(status, 400)
+ eq(error_code, 'InvalidArgument')
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Deny put obj requests if not sse-s3: without encryption')
@attr(assertion='success')
@attr('encryption')
@attr('bucket-policy')
-# TODO: remove this 'fails_on_rgw' once I get the test passing
-@attr('fails_on_rgw')
-def test_bucket_policy_put_obj_enc():
+@attr('sse-s3')
+def test_bucket_policy_put_obj_s3_noenc():
bucket_name = get_new_bucket()
client = get_v2_client()
s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
policy_document = p.add_statement(s1).add_statement(s2).to_json()
- boto3.set_stream_logger(name='botocore')
+ # boto3.set_stream_logger(name='botocore')
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
key1_str ='testobj'
#response = client.get_bucket_policy(Bucket=bucket_name)
#print response
- check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
- sse_client_headers = {
- 'x-amz-server-side-encryption' : 'AES256',
- 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
- 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
- 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
- }
+ # doing this here breaks the next request w/ 400 (non-sse bug). Do it last.
+ #check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
- lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_client_headers))
- client.meta.events.register('before-call.s3.PutObject', lf)
#TODO: why is this a 400 and not passing, it appears boto3 is not parsing the 200 response the rgw sends back properly
# DEBUGGING: run the boto2 and compare the requests
# DEBUGGING: try to run this with v2 auth (figure out why get_v2_client isn't working) to make the requests similar to what boto2 is doing
# DEBUGGING: try to add other options to put_object to see if that makes the response better
- client.put_object(Bucket=bucket_name, Key=key1_str)
+
+ # first validate that writing a sse-s3 object works
+ response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256')
+ response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption']
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'AES256')
+
+ # then validate that a non-encrypted object fails.
+ # (this also breaks the connection--non-sse bug, probably because the server
+ # errors out before it consumes the data...)
+ check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
@attr(resource='object')
@attr(method='put')
-@attr(operation='put obj with RequestObjectTag')
+@attr(operation='Deny put obj requests if not sse-s3: kms')
@attr(assertion='success')
-@attr('tagging')
+@attr('encryption')
@attr('bucket-policy')
-# TODO: remove this fails_on_rgw when I fix it
-@attr('fails_on_rgw')
-def test_bucket_policy_put_obj_request_obj_tag():
+@attr('sse-s3')
+def test_bucket_policy_put_obj_s3_kms():
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ kms_keyid = 'fool-me-twice'
bucket_name = get_new_bucket()
- client = get_client()
+ client = get_v2_client()
- tag_conditional = {"StringEquals": {
- "s3:RequestObjectTag/security" : "public"
- }}
+ deny_incorrect_algo = {
+ "StringNotEquals": {
+ "s3:x-amz-server-side-encryption": "AES256"
+ }
+ }
+
+ deny_unencrypted_obj = {
+ "Null" : {
+ "s3:x-amz-server-side-encryption": "true"
+ }
+ }
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
- s1 = Statement("s3:PutObject", resource, effect="Allow", condition=tag_conditional)
- policy_document = p.add_statement(s1).to_json()
+ s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+ s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+ policy_document = p.add_statement(s1).add_statement(s2).to_json()
- client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+ # boto3.set_stream_logger(name='botocore')
- alt_client = get_alt_client()
+ client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
key1_str ='testobj'
- check_access_denied(alt_client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
- headers = {"x-amz-tagging" : "security=public"}
- lf = (lambda **kwargs: kwargs['params']['headers'].update(headers))
+ #response = client.get_bucket_policy(Bucket=bucket_name)
+ #print response
+
+ sse_client_headers = {
+ 'x-amz-server-side-encryption': 'aws:kms',
+ 'x-amz-server-side-encryption-aws-kms-key-id': kms_keyid
+ }
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_client_headers))
client.meta.events.register('before-call.s3.PutObject', lf)
- #TODO: why is this a 400 and not passing
- alt_client.put_object(Bucket=bucket_name, Key=key1_str, Body=key1_str)
+ check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
@attr(resource='object')
-@attr(method='get')
-@attr(operation='Test ExistingObjectTag conditional on get object acl')
+@attr(method='put')
+@attr(operation='Deny put obj requests if not sse-kms: without encryption')
@attr(assertion='success')
-@attr('tagging')
+@attr('encryption')
@attr('bucket-policy')
-@attr('fails_on_dbstore')
-def test_bucket_policy_get_obj_acl_existing_tag():
- bucket_name = _create_objects(keys=['publictag', 'privatetag', 'invalidtag'])
- client = get_client()
+def test_bucket_policy_put_obj_kms_noenc():
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ raise SkipTest
+ bucket_name = get_new_bucket()
+ client = get_v2_client()
- tag_conditional = {"StringEquals": {
- "s3:ExistingObjectTag/security" : "public"
- }}
+ deny_incorrect_algo = {
+ "StringNotEquals": {
+ "s3:x-amz-server-side-encryption": "aws:kms"
+ }
+ }
+
+ deny_unencrypted_obj = {
+ "Null" : {
+ "s3:x-amz-server-side-encryption": "true"
+ }
+ }
+ p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
- policy_document = make_json_policy("s3:GetObjectAcl",
- resource,
- conditions=tag_conditional)
+ s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+ s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+ policy_document = p.add_statement(s1).add_statement(s2).to_json()
+
+ # boto3.set_stream_logger(name='botocore')
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
- tagset = []
- tagset.append({'Key': 'security', 'Value': 'public'})
- tagset.append({'Key': 'foo', 'Value': 'bar'})
+ key1_str ='testobj'
+ key2_str ='unicorn'
- input_tagset = {'TagSet': tagset}
+ #response = client.get_bucket_policy(Bucket=bucket_name)
+ #print response
- response = client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tagset)
- eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ # must do check_access_denied last - otherwise, pending data
+ # breaks next call...
+ response = client.put_object(Bucket=bucket_name, Key=key1_str,
+ ServerSideEncryption='aws:kms', SSEKMSKeyId=kms_keyid)
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'aws:kms')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], kms_keyid)
- tagset2 = []
- tagset2.append({'Key': 'security', 'Value': 'private'})
+ check_access_denied(client.put_object, Bucket=bucket_name, Key=key2_str, Body=key2_str)
- input_tagset = {'TagSet': tagset2}
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Deny put obj requests if not sse-kms: s3')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-policy')
+def test_bucket_policy_put_obj_kms_s3():
+ bucket_name = get_new_bucket()
+ client = get_v2_client()
- response = client.put_object_tagging(Bucket=bucket_name, Key='privatetag', Tagging=input_tagset)
- eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ deny_incorrect_algo = {
+ "StringNotEquals": {
+ "s3:x-amz-server-side-encryption": "aws:kms"
+ }
+ }
- tagset3 = []
- tagset3.append({'Key': 'security1', 'Value': 'public'})
+ deny_unencrypted_obj = {
+ "Null" : {
+ "s3:x-amz-server-side-encryption": "true"
+ }
+ }
- input_tagset = {'TagSet': tagset3}
+ p = Policy()
+ resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
+
+ s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+ s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+ policy_document = p.add_statement(s1).add_statement(s2).to_json()
+
+ # boto3.set_stream_logger(name='botocore')
+
+ client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+ key1_str ='testobj'
+
+ #response = client.get_bucket_policy(Bucket=bucket_name)
+ #print response
+
+ sse_client_headers = {
+ 'x-amz-server-side-encryption' : 'AES256',
+ }
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_client_headers))
+ client.meta.events.register('before-call.s3.PutObject', lf)
+ check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='put obj with RequestObjectTag')
+@attr(assertion='success')
+@attr('tagging')
+@attr('bucket-policy')
+# TODO: remove this fails_on_rgw when I fix it
+@attr('fails_on_rgw')
+def test_bucket_policy_put_obj_request_obj_tag():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ tag_conditional = {"StringEquals": {
+ "s3:RequestObjectTag/security" : "public"
+ }}
+
+ p = Policy()
+ resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
+
+ s1 = Statement("s3:PutObject", resource, effect="Allow", condition=tag_conditional)
+ policy_document = p.add_statement(s1).to_json()
+
+ client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+ alt_client = get_alt_client()
+ key1_str ='testobj'
+ check_access_denied(alt_client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
+
+ headers = {"x-amz-tagging" : "security=public"}
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(headers))
+ client.meta.events.register('before-call.s3.PutObject', lf)
+ #TODO: why is this a 400 and not passing
+ alt_client.put_object(Bucket=bucket_name, Key=key1_str, Body=key1_str)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test ExistingObjectTag conditional on get object acl')
+@attr(assertion='success')
+@attr('tagging')
+@attr('bucket-policy')
+@attr('fails_on_dbstore')
+def test_bucket_policy_get_obj_acl_existing_tag():
+ bucket_name = _create_objects(keys=['publictag', 'privatetag', 'invalidtag'])
+ client = get_client()
+
+ tag_conditional = {"StringEquals": {
+ "s3:ExistingObjectTag/security" : "public"
+ }}
+
+ resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
+ policy_document = make_json_policy("s3:GetObjectAcl",
+ resource,
+ conditions=tag_conditional)
+
+
+ client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+ tagset = []
+ tagset.append({'Key': 'security', 'Value': 'public'})
+ tagset.append({'Key': 'foo', 'Value': 'bar'})
+
+ input_tagset = {'TagSet': tagset}
+
+ response = client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tagset)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+ tagset2 = []
+ tagset2.append({'Key': 'security', 'Value': 'private'})
+
+ input_tagset = {'TagSet': tagset2}
+
+ response = client.put_object_tagging(Bucket=bucket_name, Key='privatetag', Tagging=input_tagset)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+ tagset3 = []
+ tagset3.append({'Key': 'security1', 'Value': 'public'})
+
+ input_tagset = {'TagSet': tagset3}
response = client.put_object_tagging(Bucket=bucket_name, Key='invalidtag', Tagging=input_tagset)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
@attr(assertion='fails')
@attr('object-lock')
@attr('fails_on_dbstore')
-def test_object_lock_put_obj_lock_invalid_years():
+def test_object_lock_put_obj_lock_invalid_mode():
bucket_name = get_new_bucket_name()
client = get_client()
client.create_bucket(Bucket=bucket_name, ObjectLockEnabledForBucket=True)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
-
-@attr(resource='bucket')
-@attr(method='put')
-@attr(operation='put bucket encryption on bucket')
-@attr(assertion='succeeds')
-def test_put_bucket_encryption():
- bucket_name = get_new_bucket()
- client = get_client()
-
+def _put_bucket_encryption_s3(client, bucket_name):
+ """
+ enable a default encryption policy on the given bucket
+ """
server_side_encryption_conf = {
'Rules': [
{
},
]
}
+ response = client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+def _put_bucket_encryption_kms(client, bucket_name):
+ """
+ enable a default encryption policy on the given bucket
+ """
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ kms_keyid = 'fool-me-again'
+ server_side_encryption_conf = {
+ 'Rules': [
+ {
+ 'ApplyServerSideEncryptionByDefault': {
+ 'SSEAlgorithm': 'aws:kms',
+ 'KMSMasterKeyID': kms_keyid
+ }
+ },
+ ]
+ }
response = client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='put bucket encryption on bucket - s3')
+@attr(assertion='succeeds')
+@attr('sse-s3')
+def test_put_bucket_encryption_s3():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_s3(client, bucket_name)
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='put bucket encryption on bucket - kms')
+@attr(assertion='succeeds')
+@attr('encryption')
+def test_put_bucket_encryption_kms():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_kms(client, bucket_name)
+
+
@attr(resource='bucket')
@attr(method='get')
-@attr(operation='get bucket encryption on bucket')
+@attr(operation='get bucket encryption on bucket - s3')
@attr(assertion='succeeds')
-def test_get_bucket_encryption():
+@attr('sse-s3')
+def test_get_bucket_encryption_s3():
bucket_name = get_new_bucket()
client = get_client()
eq(response_code, 'ServerSideEncryptionConfigurationNotFoundError')
- server_side_encryption_conf = {
- 'Rules': [
- {
- 'ApplyServerSideEncryptionByDefault': {
- 'SSEAlgorithm': 'AES256'
- }
- },
- ]
- }
+ _put_bucket_encryption_s3(client, bucket_name)
+
+ response = client.get_bucket_encryption(Bucket=bucket_name)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ eq(response['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'], 'AES256')
+
+
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='get bucket encryption on bucket - kms')
+@attr(assertion='succeeds')
+@attr('encryption')
+def test_get_bucket_encryption_kms():
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ kms_keyid = 'fool-me-again'
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ response_code = ""
+ try:
+ client.get_bucket_encryption(Bucket=bucket_name)
+ except ClientError as e:
+ response_code = e.response['Error']['Code']
- client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+ eq(response_code, 'ServerSideEncryptionConfigurationNotFoundError')
+
+ _put_bucket_encryption_kms(client, bucket_name)
response = client.get_bucket_encryption(Bucket=bucket_name)
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
- eq(response['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'],
- server_side_encryption_conf['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'])
+ eq(response['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'], 'aws:kms')
+ eq(response['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'], kms_keyid)
@attr(resource='bucket')
@attr(method='delete')
-@attr(operation='delete bucket encryption on bucket')
+@attr(operation='delete bucket encryption on bucket - s3')
@attr(assertion='succeeds')
-def test_delete_bucket_encryption():
+@attr('sse-s3')
+def test_delete_bucket_encryption_s3():
bucket_name = get_new_bucket()
client = get_client()
response = client.delete_bucket_encryption(Bucket=bucket_name)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
- server_side_encryption_conf = {
- 'Rules': [
- {
- 'ApplyServerSideEncryptionByDefault': {
- 'SSEAlgorithm': 'AES256'
- }
- },
- ]
- }
+ _put_bucket_encryption_s3(client, bucket_name)
+
+ response = client.delete_bucket_encryption(Bucket=bucket_name)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
+
+ response_code = ""
+ try:
+ client.get_bucket_encryption(Bucket=bucket_name)
+ except ClientError as e:
+ response_code = e.response['Error']['Code']
- client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+ eq(response_code, 'ServerSideEncryptionConfigurationNotFoundError')
+
+
+@attr(resource='bucket')
+@attr(method='delete')
+@attr(operation='delete bucket encryption on bucket - kms')
+@attr(assertion='succeeds')
+@attr('encryption')
+def test_delete_bucket_encryption_kms():
+ bucket_name = get_new_bucket()
+ client = get_client()
response = client.delete_bucket_encryption(Bucket=bucket_name)
eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
+
+ _put_bucket_encryption_kms(client, bucket_name)
+
+ response = client.delete_bucket_encryption(Bucket=bucket_name)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
+
+ response_code = ""
+ try:
+ client.get_bucket_encryption(Bucket=bucket_name)
+ except ClientError as e:
+ response_code = e.response['Error']['Code']
+
+ eq(response_code, 'ServerSideEncryptionConfigurationNotFoundError')
+
+def _test_sse_s3_default_upload(file_size):
+ """
+ Test enables bucket encryption.
+ Create a file of A's of certain size, and use it to set_contents_from_file.
+ Re-read the contents, and confirm we get same content as input i.e., A's
+ """
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_s3(client, bucket_name)
+
+ data = 'A'*file_size
+ response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data)
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'AES256')
+
+ response = client.get_object(Bucket=bucket_name, Key='testobj')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'AES256')
+ body = _get_body(response)
+ eq(body, data)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1 byte upload to SSE-S3 default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_s3_default_upload_1b():
+ _test_sse_s3_default_upload(1)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1KB upload to SSE-S3 default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_s3_default_upload_1kb():
+ _test_sse_s3_default_upload(1024)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1MB upload to SSE-S3 default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_s3_default_upload_1mb():
+ _test_sse_s3_default_upload(1024*1024)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 8MB upload to SSE-S3 default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_s3_default_upload_8mb():
+ _test_sse_s3_default_upload(8*1024*1024)
+
+def _test_sse_kms_default_upload(file_size):
+ """
+ Test enables bucket encryption.
+ Create a file of A's of certain size, and use it to set_contents_from_file.
+ Re-read the contents, and confirm we get same content as input i.e., A's
+ """
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ raise SkipTest
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_kms(client, bucket_name)
+
+ data = 'A'*file_size
+ response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data)
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'aws:kms')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], kms_keyid)
+
+ response = client.get_object(Bucket=bucket_name, Key='testobj')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'aws:kms')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], kms_keyid)
+ body = _get_body(response)
+ eq(body, data)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1 byte upload to SSE-KMS default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_kms_default_upload_1b():
+ _test_sse_kms_default_upload(1)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1KB upload to SSE-KMS default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_kms_default_upload_1kb():
+ _test_sse_kms_default_upload(1024)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1MB upload to SSE-KMS default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_kms_default_upload_1mb():
+ _test_sse_kms_default_upload(1024*1024)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 8MB upload to SSE-KMS default-encrypted bucket')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_kms_default_upload_8mb():
+ _test_sse_kms_default_upload(8*1024*1024)
+
+
+
+@attr(resource='object')
+@attr(method='head')
+@attr(operation='Test head operation on SSE-S3 default-encrypted object')
+@attr(assertion='success')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_s3_default_method_head():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_s3(client, bucket_name)
+
+ data = 'A'*1000
+ key = 'testobj'
+ client.put_object(Bucket=bucket_name, Key=key, Body=data)
+
+ response = client.head_object(Bucket=bucket_name, Key=key)
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'AES256')
+
+ sse_s3_headers = {
+ 'x-amz-server-side-encryption': 'AES256',
+ }
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(sse_s3_headers))
+ client.meta.events.register('before-call.s3.HeadObject', lf)
+ e = assert_raises(ClientError, client.head_object, Bucket=bucket_name, Key=key)
+ status, error_code = _get_status_and_error_code(e.response)
+ eq(status, 400)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='complete SSE-S3 multi-part upload')
+@attr(assertion='successful')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_s3_default_multipart_upload():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_s3(client, bucket_name)
+
+ key = "multipart_enc"
+ content_type = 'text/plain'
+ objlen = 30 * 1024 * 1024
+ metadata = {'foo': 'bar'}
+ enc_headers = {
+ 'Content-Type': content_type
+ }
+ resend_parts = []
+
+ (upload_id, data, parts) = _multipart_upload_enc(client, bucket_name, key, objlen,
+ part_size=5*1024*1024, init_headers=enc_headers, part_headers=enc_headers, metadata=metadata, resend_parts=resend_parts)
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(enc_headers))
+ client.meta.events.register('before-call.s3.CompleteMultipartUpload', lf)
+ client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ response = client.head_bucket(Bucket=bucket_name)
+ rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 1))
+ eq(rgw_object_count, 1)
+ rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', objlen))
+ eq(rgw_bytes_used, objlen)
+
+ lf = (lambda **kwargs: kwargs['params']['headers'].update(part_headers))
+ client.meta.events.register('before-call.s3.UploadPart', lf)
+
+ response = client.get_object(Bucket=bucket_name, Key=key)
+
+ eq(response['Metadata'], metadata)
+ eq(response['ResponseMetadata']['HTTPHeaders']['content-type'], content_type)
+
+ body = _get_body(response)
+ eq(body, data)
+ size = response['ContentLength']
+ eq(len(body), size)
+
+ _check_content_using_range(key, bucket_name, data, 1000000)
+ _check_content_using_range(key, bucket_name, data, 10000000)
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated SSE-S3 browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('sse-s3')
+def test_sse_s3_default_post_object_authenticated_request():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_s3(client, bucket_name)
+
+ url = _get_post_url(bucket_name)
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {
+ "expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
+ "conditions": [
+ {"bucket": bucket_name},
+ ["starts-with", "$key", "foo"],
+ {"acl": "private"},
+ ["starts-with", "$Content-Type", "text/plain"],
+ ["starts-with", "$x-amz-server-side-encryption", ""],
+ ["content-length-range", 0, 1024]
+ ]
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
+ policy = base64.b64encode(bytes_json_policy_document)
+ aws_secret_access_key = get_main_aws_secret_key()
+ aws_access_key_id = get_main_aws_access_key()
+
+ signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),
+ ('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+
+ response = client.get_object(Bucket=bucket_name, Key='foo.txt')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'AES256')
+ body = _get_body(response)
+ eq(body, 'bar')
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated SSE-kMS browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+@attr('encryption')
+@attr('bucket-encryption')
+@attr('encryption')
+def test_sse_kms_default_post_object_authenticated_request():
+ kms_keyid = get_main_kms_keyid()
+ if kms_keyid is None:
+ raise SkipTest
+ bucket_name = get_new_bucket()
+ client = get_client()
+ _put_bucket_encryption_kms(client, bucket_name)
+
+ url = _get_post_url(bucket_name)
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {
+ "expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
+ "conditions": [
+ {"bucket": bucket_name},
+ ["starts-with", "$key", "foo"],
+ {"acl": "private"},
+ ["starts-with", "$Content-Type", "text/plain"],
+ ["starts-with", "$x-amz-server-side-encryption", ""],
+ ["content-length-range", 0, 1024]
+ ]
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
+ policy = base64.b64encode(bytes_json_policy_document)
+ aws_secret_access_key = get_main_aws_secret_key()
+ aws_access_key_id = get_main_aws_access_key()
+
+ signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),
+ ('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+
+ response = client.get_object(Bucket=bucket_name, Key='foo.txt')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'aws:kms')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'], kms_keyid)
+ body = _get_body(response)
+ eq(body, 'bar')
+
+
+def _test_sse_s3_encrypted_upload(file_size):
+ """
+ Test upload of the given size, specifically requesting sse-s3 encryption.
+ """
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ data = 'A'*file_size
+ response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data, ServerSideEncryption='AES256')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'AES256')
+
+ response = client.get_object(Bucket=bucket_name, Key='testobj')
+ eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'], 'AES256')
+ body = _get_body(response)
+ eq(body, data)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1 byte upload with SSE-S3 encryption')
+@attr(assertion='success')
+@attr('encryption')
+@attr('sse-s3')
+def test_sse_s3_encrypted_upload_1b():
+ _test_sse_s3_encrypted_upload(1)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1Kb upload with SSE-S3 encryption')
+@attr(assertion='success')
+@attr('encryption')
+@attr('sse-s3')
+def test_sse_s3_encrypted_upload_1kb():
+ _test_sse_s3_encrypted_upload(1024)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 1MB upload with SSE-S3 encryption')
+@attr(assertion='success')
+@attr('encryption')
+@attr('sse-s3')
+def test_sse_s3_encrypted_upload_1mb():
+ _test_sse_s3_encrypted_upload(1024*1024)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test 8MB upload with SSE-S3 encryption')
+@attr(assertion='success')
+@attr('encryption')
+@attr('sse-s3')
+def test_sse_s3_encrypted_upload_8mb():
+ _test_sse_s3_encrypted_upload(8*1024*1024)