body = _get_body(response)
assert body == 'bar'
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+def test_encryption_sse_c_enforced_with_bucket_policy():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ deny_incorrect_algo = {
+ "StringNotEquals": {
+ "s3:x-amz-server-side-encryption-customer-algorithm": "AES256"
+ }
+ }
+
+ deny_unencrypted_obj = {
+ "Null" : {
+ "s3:x-amz-server-side-encryption-customer-algorithm": "true"
+ }
+ }
+
+ p = Policy()
+ resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
+
+ s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+ s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+ policy_document = p.add_statement(s1).add_statement(s2).to_json()
+
+ client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+ check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar')
+
+ client.put_object(
+ Bucket=bucket_name, Key='foo', Body='bar',
+ SSECustomerAlgorithm='AES256',
+ SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
+ )
+
+
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
assert body == data
-
-
-
-
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_sse_kms_method_head():