bucket_name = get_new_bucket()
client = get_client()
- deny_incorrect_algo = {
- "StringNotEquals": {
- "s3:x-amz-server-side-encryption-customer-algorithm": "AES256"
- }
- }
-
deny_unencrypted_obj = {
"Null" : {
"s3:x-amz-server-side-encryption-customer-algorithm": "true"
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
- s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
- s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
- policy_document = p.add_statement(s1).add_statement(s2).to_json()
+ s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+ policy_document = p.add_statement(s).to_json()
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
)
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+def test_encryption_sse_c_deny_algo_with_bucket_policy():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ deny_incorrect_algo = {
+ "StringNotEquals": {
+ "s3:x-amz-server-side-encryption-customer-algorithm": "AES256"
+ }
+ }
+
+ p = Policy()
+ resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
+
+ s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+ policy_document = p.add_statement(s).to_json()
+
+ client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+ check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar', SSECustomerAlgorithm='AES192')
+
+ client.put_object(
+ Bucket=bucket_name, Key='foo', Body='bar',
+ SSECustomerAlgorithm='AES256',
+ SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
+ )
+
+
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
bucket_name = get_new_bucket()
client = get_v2_client()
- deny_incorrect_algo = {
- "StringNotEquals": {
- "s3:x-amz-server-side-encryption": "AES256"
- }
- }
-
deny_unencrypted_obj = {
"Null" : {
"s3:x-amz-server-side-encryption": "true"
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
- s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
- s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
- policy_document = p.add_statement(s1).add_statement(s2).to_json()
-
- # boto3.set_stream_logger(name='botocore')
+ s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+ policy_document = p.add_statement(s).to_json()
client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
key1_str ='testobj'
- #response = client.get_bucket_policy(Bucket=bucket_name)
- #print response
+ check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
+
+ response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256')
+ assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256'
+
+
+@pytest.mark.encryption
+@pytest.mark.bucket_policy
+@pytest.mark.sse_s3
+@pytest.mark.fails_on_dbstore
+def test_bucket_policy_put_obj_s3_incorrect_algo_sse_s3():
+ bucket_name = get_new_bucket()
+ client = get_v2_client()
+
+ deny_incorrect_algo = {
+ "StringNotEquals": {
+ "s3:x-amz-server-side-encryption": "AES256"
+ }
+ }
+ p = Policy()
+ resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
- # doing this here breaks the next request w/ 400 (non-sse bug). Do it last.
- #check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
+ s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+ policy_document = p.add_statement(s).to_json()
- #TODO: why is this a 400 and not passing, it appears boto3 is not parsing the 200 response the rgw sends back properly
- # DEBUGGING: run the boto2 and compare the requests
- # DEBUGGING: try to run this with v2 auth (figure out why get_v2_client isn't working) to make the requests similar to what boto2 is doing
- # DEBUGGING: try to add other options to put_object to see if that makes the response better
+ client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+ key1_str ='testobj'
+
+ check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str, ServerSideEncryption='AES192')
- # first validate that writing a sse-s3 object works
response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256')
- response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption']
assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256'
- # then validate that a non-encrypted object fails.
- # (this also breaks the connection--non-sse bug, probably because the server
- # errors out before it consumes the data...)
- check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
@pytest.mark.encryption
@pytest.mark.bucket_policy