get_cloud_client,
nuke_prefixed_buckets,
configured_storage_classes,
+ configure,
get_lc_debug_interval,
get_restore_debug_interval,
get_restore_processor_period,
assert (404, 'OwnershipControlsNotFoundError') == _get_status_and_error_code(e.response)
client.delete_bucket_ownership_controls(Bucket=bucket)
+
+#########################
+# COPY ENCRYPTION TESTS #
+#########################
+_copy_enc_source_modes = {
+ 'unencrypted': {
+ 'marks': [pytest.mark.fails_on_aws],
+ },
+ 'sse-s3': {
+ 'args': {'ServerSideEncryption': 'AES256'},
+ 'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
+ 'marks': [pytest.mark.sse_s3],
+ },
+ 'sse-c': {
+ 'args': {
+ 'SSECustomerAlgorithm': 'AES256',
+ 'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
+ },
+ 'source_copy_args': {
+ 'CopySourceSSECustomerAlgorithm': 'AES256',
+ 'CopySourceSSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'CopySourceSSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
+ },
+ },
+ 'sse-kms': {
+ 'args': {
+ 'ServerSideEncryption': 'aws:kms',
+ 'SSEKMSKeyId': lambda: get_main_kms_keyid()
+ },
+ }
+}
+_copy_enc_dest_modes = {
+ 'unencrypted': {
+ 'marks': [pytest.mark.fails_on_aws],
+ },
+ 'sse-s3': {
+ 'args': {'ServerSideEncryption': 'AES256'},
+ 'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
+ 'marks': [pytest.mark.sse_s3],
+ },
+ 'sse-c': {
+ 'args': {
+ 'SSECustomerAlgorithm': 'AES256',
+ 'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
+ 'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
+ },
+ 'get_args': {
+ 'SSECustomerAlgorithm': 'AES256',
+ 'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
+ 'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
+ },
+ 'assert': lambda r: (
+ r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
+ r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-md5'] == 'arxBvwY2V4SiOne6yppVPQ=='
+ )
+ },
+ 'sse-kms': {
+ 'args': {
+ 'ServerSideEncryption': 'aws:kms',
+ 'SSEKMSKeyId': lambda: get_secondary_kms_keyid()
+ },
+ 'assert': lambda r: (
+ r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
+ r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_secondary_kms_keyid()
+ )
+ }
+}
+
+def _test_copy_enc(file_size, source_mode_key, dest_mode_key, source_sc=None, dest_sc=None):
+ source_args = _copy_enc_source_modes[source_mode_key]
+ dest_args = _copy_enc_dest_modes[dest_mode_key]
+
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ # upload original file with source encryption
+ data = 'A'*file_size
+ args = {key: value() if callable(value) else value for key, value in source_args.get('args', {}).items()}
+ if source_sc:
+ args['StorageClass'] = source_sc
+ response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data, **args)
+ assert source_args.get('assert', lambda r: True)(response)
+
+ # copy the object to a new key, with destination encryption
+ dest_bucket_name = get_new_bucket()
+ copy_args = {key: value() if callable(value) else value for key, value in dest_args.get('args', {}).items()}
+ copy_args.update(source_args.get('source_copy_args', {}))
+ if dest_sc:
+ copy_args['StorageClass'] = dest_sc
+ response = client.copy_object(Bucket=dest_bucket_name, Key='testobj2', CopySource={'Bucket': bucket_name, 'Key': 'testobj'}, **copy_args)
+ assert dest_args.get('assert', lambda r: True)(response)
+
+ # verify the copy is encrypted
+ get_args = dest_args.get('get_args', {})
+ response = client.get_object(Bucket=dest_bucket_name, Key='testobj2', **get_args)
+ assert dest_args.get('assert', lambda r: True)(response)
+ body = _get_body(response)
+ assert body == data
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+ pytest.param(
+ source_key,
+ dest_key,
+ marks=[
+ *_copy_enc_source_modes[source_key].get('marks', []),
+ *_copy_enc_dest_modes[dest_key].get('marks', [])
+ ]
+ )
+ for source_key in _copy_enc_source_modes.keys()
+ for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_1b(source_mode_key, dest_mode_key):
+ _test_copy_enc(1, source_mode_key, dest_mode_key)
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+ pytest.param(
+ source_key,
+ dest_key,
+ marks=[
+ *_copy_enc_source_modes[source_key].get('marks', []),
+ *_copy_enc_dest_modes[dest_key].get('marks', [])
+ ]
+ )
+ for source_key in _copy_enc_source_modes.keys()
+ for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_1kb(source_mode_key, dest_mode_key):
+ _test_copy_enc(1024, source_mode_key, dest_mode_key)
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+ pytest.param(
+ source_key,
+ dest_key,
+ marks=[
+ *_copy_enc_source_modes[source_key].get('marks', []),
+ *_copy_enc_dest_modes[dest_key].get('marks', [])
+ ]
+ )
+ for source_key in _copy_enc_source_modes.keys()
+ for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_1mb(source_mode_key, dest_mode_key):
+ _test_copy_enc(1024*1024, source_mode_key, dest_mode_key)
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
+ pytest.param(
+ source_key,
+ dest_key,
+ marks=[
+ *_copy_enc_source_modes[source_key].get('marks', []),
+ *_copy_enc_dest_modes[dest_key].get('marks', [])
+ ]
+ )
+ for source_key in _copy_enc_source_modes.keys()
+ for dest_key in _copy_enc_dest_modes.keys()
+])
+def test_copy_enc_8mb(source_mode_key, dest_mode_key):
+ _test_copy_enc(8*1024*1024, source_mode_key, dest_mode_key)
+
+def generate_copy_enc_storage_class_params():
+ configure()
+ sc = configured_storage_classes()
+ if len(sc) < 2:
+ return []
+
+ obj_sizes = [1, 1024, 1024*1024, 8*1024*1024]
+
+ params = []
+ for source_key in _copy_enc_source_modes.keys():
+ for dest_key in _copy_enc_dest_modes.keys():
+ source_marks = _copy_enc_source_modes[source_key].get('marks', [])
+ dest_marks = _copy_enc_dest_modes[dest_key].get('marks', [])
+
+ for source_sc in sc:
+ for dest_sc in sc:
+ if source_sc == dest_sc:
+ continue
+ for obj_size in obj_sizes:
+ param = pytest.param(
+ source_key,
+ dest_key,
+ source_sc,
+ dest_sc,
+ obj_size,
+ marks=[*source_marks, *dest_marks]
+ )
+ params.append(param)
+ return params
+
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.storage_class
+@pytest.mark.fails_on_aws # storage classes are not there
+@pytest.mark.parametrize(
+ "source_mode_key, dest_mode_key, source_storage_class, dest_storage_class, obj_size",
+ generate_copy_enc_storage_class_params()
+)
+def test_copy_enc_storage_class(source_mode_key, dest_mode_key, source_storage_class, dest_storage_class, obj_size):
+ if len(configured_storage_classes()) < 2:
+ pytest.skip('need at least two storage classes to test copy storage class')
+
+ print(
+ f"Testing copy from {source_mode_key} to {dest_mode_key} with storage class "
+ f"{source_storage_class} -> {dest_storage_class} and object size {obj_size}"
+ )
+ _test_copy_enc(obj_size, source_mode_key, dest_mode_key, source_storage_class, dest_storage_class)