is_empty = _bucket_is_empty(bucket2)
assert is_empty == True
-def _create_objects(bucket=None, bucket_name=None, keys=[]):
+def _create_objects(bucket=None, bucket_name=None, keys=[], put_object_args={}):
"""
Populate a (specified or new) bucket with objects with
specified names (and contents identical to their names).
bucket = get_new_bucket_resource(name=bucket_name)
for key in keys:
- obj = bucket.put_object(Body=key, Key=key)
+ obj = bucket.put_object(Body=key, Key=key, **put_object_args)
return bucket_name
'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
},
+ 'get_args': {
+ 'SSECustomerAlgorithm': 'AES256',
+ 'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
+ },
'source_copy_args': {
'CopySourceSSECustomerAlgorithm': 'AES256',
'CopySourceSSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
f"{source_storage_class} -> {dest_storage_class} and object size {obj_size}"
)
_test_copy_enc(obj_size, source_mode_key, dest_mode_key, source_storage_class, dest_storage_class)
+
+def generate_lifecycle_transition_params():
+ configure()
+ sc = configured_storage_classes()
+ if len(sc) < 2:
+ return []
+
+ params = []
+ for source_key in _copy_enc_source_modes.keys():
+ source_marks = _copy_enc_source_modes[source_key].get('marks', [])
+
+ for source_sc in sc:
+ for dest_sc in sc:
+ if source_sc == dest_sc:
+ continue
+
+ params.append(pytest.param(
+ source_key,
+ source_sc,
+ dest_sc,
+ marks=source_marks
+ ))
+
+ return params
+
+def _test_lifecycle_transition(source_mode_key, source_sc=None, dest_sc=None):
+ source_args = _copy_enc_source_modes[source_mode_key]
+ args = {key: value() if callable(value) else value for key, value in source_args.get('args', {}).items()}
+ if source_sc:
+ args['StorageClass'] = source_sc
+
+ bucket_name = _create_objects(keys=['expire1/foo', 'expire1/bar'], put_object_args=args)
+ client = get_client()
+ rules=[{'ID': 'rule1', 'Prefix': '', 'Transitions': [{'Days': 1, 'StorageClass': dest_sc}], 'Status': 'Enabled'}]
+ lifecycle = {'Rules': rules}
+ client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lifecycle)
+
+ # Get list of all keys
+ response = client.list_objects(Bucket=bucket_name)
+ init_keys = _get_keys(response)
+ assert len(init_keys) == 2
+
+ lc_interval = get_lc_debug_interval()
+
+ # Wait for expiration
+ time.sleep(4*lc_interval)
+ expire1_keys = list_bucket_storage_class(client, bucket_name)
+ assert len(expire1_keys[source_sc]) == 0
+ assert len(expire1_keys[dest_sc]) == 2
+
+ # retrieve the objects
+ get_args = source_args.get('get_args', {})
+ for key in init_keys:
+ response = client.get_object(Bucket=bucket_name, Key=key, **get_args)
+ body = _get_body(response)
+ assert body == key
+
+@pytest.mark.lifecycle
+@pytest.mark.lifecycle_transition
+@pytest.mark.fails_on_aws
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+@pytest.mark.parametrize(
+ "source_mode_key, source_storage_class, dest_storage_class",
+ generate_lifecycle_transition_params()
+)
+def test_lifecycle_transition_encrypted(source_mode_key, source_storage_class, dest_storage_class):
+ if len(configured_storage_classes()) < 2:
+ pytest.skip('need at least two storage classes to test lifecycle transition')
+
+ print(
+ f"Testing lifecycle transition of {source_mode_key} with storage class {source_storage_class} -> {dest_storage_class}"
+ )
+ _test_lifecycle_transition(source_mode_key, source_storage_class, dest_storage_class)