]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
CopyObject: add storage class copy tests
authorSeena Fallah <seenafallah@gmail.com>
Wed, 5 Mar 2025 12:00:58 +0000 (13:00 +0100)
committerCasey Bodley <cbodley@redhat.com>
Tue, 2 Dec 2025 18:49:57 +0000 (13:49 -0500)
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
(cherry picked from commit eed0551f3308778ab561146e3d6e1c8226d8f9dc)

s3tests/functional/test_s3.py

index ce57d1ff1f1a8a7a81376ba46a3b2d4f48e3a71a..9ff1e3705bfa86795169e2b42d862f07775d36c2 100644 (file)
@@ -89,6 +89,7 @@ from . import (
     get_cloud_client,
     nuke_prefixed_buckets,
     configured_storage_classes,
+    configure,
     get_lc_debug_interval,
     get_restore_debug_interval,
     get_restore_processor_period,
@@ -19735,3 +19736,218 @@ def test_bucket_create_delete_bucket_ownership():
     assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response)
 
     client.delete_bucket_ownership_controls(Bucket=bucket)
+
+#########################
+# COPY ENCRYPTION TESTS #
+#########################
+_copy_enc_source_modes = {
+    'unencrypted': {
+        'marks': [pytest.mark.fails_on_aws],
+    },
+    'sse-s3': {
+        'args': {'ServerSideEncryption': 'AES256'},
+        'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
+        'marks': [pytest.mark.sse_s3],
+    },
+    'sse-c': {
+        'args': {
+            'SSECustomerAlgorithm': 'AES256',
+            'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+            'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
+        },
+        'source_copy_args': {
+            'CopySourceSSECustomerAlgorithm': 'AES256',
+            'CopySourceSSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+            'CopySourceSSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
+        },
+    },
+    'sse-kms': {
+        'args': {
+            'ServerSideEncryption': 'aws:kms',
+            'SSEKMSKeyId': lambda: get_main_kms_keyid()
+        },
+    }
+}
+_copy_enc_dest_modes = {
+    'unencrypted': {
+        'marks': [pytest.mark.fails_on_aws],
+    },
+    'sse-s3': {
+        'args': {'ServerSideEncryption': 'AES256'},
+        'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
+        'marks': [pytest.mark.sse_s3],
+    },
+    'sse-c': {
+        'args': {
+            'SSECustomerAlgorithm': 'AES256',
+            'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
+            'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
+        },
+        'get_args': {
+            'SSECustomerAlgorithm': 'AES256',
+            'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
+            'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
+        },
+        'assert': lambda r: (
+            r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
+            r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-md5'] == 'arxBvwY2V4SiOne6yppVPQ=='
+        )
+    },
+    'sse-kms': {
+        'args': {
+            'ServerSideEncryption': 'aws:kms',
+            'SSEKMSKeyId': lambda: get_secondary_kms_keyid()
+        },
+        'assert': lambda r: (
+            r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
+            r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_secondary_kms_keyid()
+        )
+    }
+}
+
+def _test_copy_enc(file_size, source_mode_key, dest_mode_key, source_sc=None, dest_sc=None):
+    source_args = _copy_enc_source_modes[source_mode_key]
+    dest_args = _copy_enc_dest_modes[dest_mode_key]
+
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    # upload original file with source encryption
+    data = 'A'*file_size
+    args = {key: value() if callable(value) else value for key, value in source_args.get('args', {}).items()}
+    if source_sc:
+        args['StorageClass'] = source_sc
+    response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data, **args)
+    assert source_args.get('assert', lambda r: True)(response)
+
+    # copy the object to a new key, with destination encryption
+    dest_bucket_name = get_new_bucket()
+    copy_args = {key: value() if callable(value) else value for key, value in dest_args.get('args', {}).items()}
+    copy_args.update(source_args.get('source_copy_args', {}))
+    if dest_sc:
+        copy_args['StorageClass'] = dest_sc
+    response = client.copy_object(Bucket=dest_bucket_name, Key='testobj2', CopySource={'Bucket': bucket_name, 'Key': 'testobj'}, **copy_args)
+    assert dest_args.get('assert', lambda r: True)(response)
+
+    # verify the copy is encrypted
+    get_args = dest_args.get('get_args', {})
+    response = client.get_object(Bucket=dest_bucket_name, Key='testobj2', **get_args)
+    assert dest_args.get('assert', lambda r: True)(response)
+    body = _get_body(response)
+    assert body == data
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+    pytest.param(
+        source_key,
+        dest_key,
+        marks=[
+            *_copy_enc_source_modes[source_key].get('marks', []),
+            *_copy_enc_dest_modes[dest_key].get('marks', [])
+        ]
+    )
+    for source_key in _copy_enc_source_modes.keys()
+    for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_1b(source_mode_key, dest_mode_key):
+    _test_copy_enc(1, source_mode_key, dest_mode_key)
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+    pytest.param(
+        source_key,
+        dest_key,
+        marks=[
+            *_copy_enc_source_modes[source_key].get('marks', []),
+            *_copy_enc_dest_modes[dest_key].get('marks', [])
+        ]
+    )
+    for source_key in _copy_enc_source_modes.keys()
+    for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_1kb(source_mode_key, dest_mode_key):
+    _test_copy_enc(1024, source_mode_key, dest_mode_key)
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+    pytest.param(
+        source_key,
+        dest_key,
+        marks=[
+            *_copy_enc_source_modes[source_key].get('marks', []),
+            *_copy_enc_dest_modes[dest_key].get('marks', [])
+        ]
+    )
+    for source_key in _copy_enc_source_modes.keys()
+    for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_1mb(source_mode_key, dest_mode_key):
+    _test_copy_enc(1024*1024, source_mode_key, dest_mode_key)
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+    pytest.param(
+        source_key,
+        dest_key,
+        marks=[
+            *_copy_enc_source_modes[source_key].get('marks', []),
+            *_copy_enc_dest_modes[dest_key].get('marks', [])
+        ]
+    )
+    for source_key in _copy_enc_source_modes.keys()
+    for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_8mb(source_mode_key, dest_mode_key):
+    _test_copy_enc(8*1024*1024, source_mode_key, dest_mode_key)
+
+def generate_copy_enc_storage_class_params():
+    configure()
+    sc = configured_storage_classes()
+    if len(sc) < 2:
+        return []
+
+    obj_sizes = [1, 1024, 1024*1024, 8*1024*1024]
+
+    params = []
+    for source_key in _copy_enc_source_modes.keys():
+        for dest_key in _copy_enc_dest_modes.keys():
+            source_marks = _copy_enc_source_modes[source_key].get('marks', [])
+            dest_marks = _copy_enc_dest_modes[dest_key].get('marks', [])
+
+            for source_sc in sc:
+                for dest_sc in sc:
+                    if source_sc == dest_sc:
+                        continue
+                    for obj_size in obj_sizes:
+                        param = pytest.param(
+                            source_key,
+                            dest_key,
+                            source_sc,
+                            dest_sc,
+                            obj_size,
+                            marks=[*source_marks, *dest_marks]
+                        )
+                        params.append(param)
+    return params
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.storage_class
+@pytest.mark.fails_on_aws  # storage classes are not there
+@pytest.mark.parametrize(
+    "source_mode_key, dest_mode_key, source_storage_class, dest_storage_class, obj_size",
+    generate_copy_enc_storage_class_params()
+)
+def test_copy_enc_storage_class(source_mode_key, dest_mode_key, source_storage_class, dest_storage_class, obj_size):
+    if len(configured_storage_classes()) < 2:
+        pytest.skip('need at least two storage classes to test copy storage class')
+
+    print(
+        f"Testing copy from {source_mode_key} to {dest_mode_key} with storage class "
+        f"{source_storage_class} -> {dest_storage_class} and object size {obj_size}"
+    )
+    _test_copy_enc(obj_size, source_mode_key, dest_mode_key, source_storage_class, dest_storage_class)