]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
rgw/s3_boto3: Add tests for bucket encryption APIs
authorRahul Dev Parashar <rahul.dev@flipkart.com>
Mon, 26 Jul 2021 10:45:02 +0000 (16:15 +0530)
committerAli Maredia <amaredia@redhat.com>
Thu, 12 Aug 2021 17:45:16 +0000 (13:45 -0400)
Tests are added for GetBucketEncryption, PutBucketEncryption,
and DeleteBucketEncryption APIs.

Related PR: https://github.com/ceph/ceph/pull/42222

Signed-off-by: Rahul Dev Parashar <rahul.dev@flipkart.com>
(cherry picked from commit 44643af0b0e51e6550e508d6854bd576d254141d)

s3tests_boto3/functional/test_s3.py

index ae0452f242f42456162d97442b5c019ad48d7b01..045a43a4bfb20d594eafdf4ffd96b4bef5c3c8c4 100644 (file)
@@ -13184,3 +13184,86 @@ def test_multipart_upload_on_a_bucket_with_policy():
     (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, client=client)
     response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
     eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='put bucket encryption on bucket')
+@attr(assertion='succeeds')
+def test_put_bucket_encryption():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    server_side_encryption_conf = {
+        'Rules': [
+            {
+                'ApplyServerSideEncryptionByDefault': {
+                    'SSEAlgorithm': 'AES256'
+                }
+            },
+        ]
+    }
+
+    response = client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+    eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='get bucket encryption on bucket')
+@attr(assertion='succeeds')
+def test_get_bucket_encryption():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    response_code = ""
+    try:
+        client.get_bucket_encryption(Bucket=bucket_name)
+    except ClientError as e:
+        response_code = e.response['Error']['Code']
+
+    eq(response_code, 'ServerSideEncryptionConfigurationNotFoundError')
+
+    server_side_encryption_conf = {
+        'Rules': [
+            {
+                'ApplyServerSideEncryptionByDefault': {
+                    'SSEAlgorithm': 'AES256'
+                }
+            },
+        ]
+    }
+
+    client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+
+    response = client.get_bucket_encryption(Bucket=bucket_name)
+    eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+    eq(response['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'],
+        server_side_encryption_conf['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'])
+
+
+@attr(resource='bucket')
+@attr(method='delete')
+@attr(operation='delete bucket encryption on bucket')
+@attr(assertion='succeeds')
+def test_delete_bucket_encryption():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    response = client.delete_bucket_encryption(Bucket=bucket_name)
+    eq(response['ResponseMetadata']['HTTPStatusCode'], 204)
+
+    server_side_encryption_conf = {
+        'Rules': [
+            {
+                'ApplyServerSideEncryptionByDefault': {
+                    'SSEAlgorithm': 'AES256'
+                }
+            },
+        ]
+    }
+
+    client.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=server_side_encryption_conf)
+
+    response = client.delete_bucket_encryption(Bucket=bucket_name)
+    eq(response['ResponseMetadata']['HTTPStatusCode'], 204)