(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='put bucket encryption on bucket')
+@attr(assertion='succeeds')
+def test_put_bucket_encryption():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ server_side_encryption_conf = {
+ 'Rules': [
+ {
+ 'ApplyServerSideEncryptionByDefault': {
+ 'SSEAlgorithm': 'AES256'
+ }
+ },
+ ]
+ }
+
+ response = client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='get bucket encryption on bucket')
+@attr(assertion='succeeds')
+def test_get_bucket_encryption():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ response_code = ""
+ try:
+ client.get_bucket_encryption(Bucket=bucket_name)
+ except ClientError as e:
+ response_code = e.response['Error']['Code']
+
+ eq(response_code, 'ServerSideEncryptionConfigurationNotFoundError')
+
+ server_side_encryption_conf = {
+ 'Rules': [
+ {
+ 'ApplyServerSideEncryptionByDefault': {
+ 'SSEAlgorithm': 'AES256'
+ }
+ },
+ ]
+ }
+
+ client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+
+ response = client.get_bucket_encryption(Bucket=bucket_name)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ eq(response['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'],
+ server_side_encryption_conf['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'])
+
+
+@attr(resource='bucket')
+@attr(method='delete')
+@attr(operation='delete bucket encryption on bucket')
+@attr(assertion='succeeds')
+def test_delete_bucket_encryption():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ response = client.delete_bucket_encryption(Bucket=bucket_name)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
+
+ server_side_encryption_conf = {
+ 'Rules': [
+ {
+ 'ApplyServerSideEncryptionByDefault': {
+ 'SSEAlgorithm': 'AES256'
+ }
+ },
+ ]
+ }
+
+ client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+
+ response = client.delete_bucket_encryption(Bucket=bucket_name)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 204)