from collections import namedtuple
from collections import defaultdict
from io import StringIO
+from io import BytesIO
from email.header import decode_header
# at T+20, 6 objects should exist (1 current and (9 - 5) noncurrent)
assert num_objs == 6
+def get_byte_buffer(nbytes):
+ buf = BytesIO(b"")
+ for x in range(nbytes):
+ buf.write(b"b")
+ buf.seek(0)
+ return buf
+
+@pytest.mark.lifecycle
+@pytest.mark.lifecycle_expiration
+@pytest.mark.fails_on_aws
+@pytest.mark.fails_on_dbstore
+def test_lifecycle_expiration_size_gt():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
+
+ # create one object lt and one object gt 2000 bytes
+ key = "myobject_small"
+ body = get_byte_buffer(1000)
+ response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ key = "myobject_big"
+ body = get_byte_buffer(3000)
+ response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ # add a lifecycle rule which expires objects greater than 2000 bytes
+ days = 1
+ lifecycle_config = {
+ 'Rules': [
+ {
+ 'Expiration': {
+ 'Days': days
+ },
+ 'ID': 'object_gt1',
+ 'Filter': {
+ 'Prefix': '',
+ 'ObjectSizeGreaterThan': 2000
+ },
+ 'Status': 'Enabled',
+ },
+ ]
+ }
+
+ response = client.put_bucket_lifecycle_configuration(
+ Bucket=bucket_name, LifecycleConfiguration=lifecycle_config)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ lc_interval = get_lc_debug_interval()
+ time.sleep(2*lc_interval)
+
+ # we should find only the small object present
+ response = client.list_objects(Bucket=bucket_name)
+ objects = response['Contents']
+
+ assert len(objects) == 1
+ assert objects[0]['Key'] == "myobject_small"
+
+@pytest.mark.lifecycle
+@pytest.mark.lifecycle_expiration
+@pytest.mark.fails_on_aws
+@pytest.mark.fails_on_dbstore
+def test_lifecycle_expiration_size_lt():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
+
+ # create one object lt and one object gt 2000 bytes
+ key = "myobject_small"
+ body = get_byte_buffer(1000)
+ response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ key = "myobject_big"
+ body = get_byte_buffer(3000)
+ response = client.put_object(Bucket=bucket_name, Key=key, Body=body)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ # add a lifecycle rule which expires objects greater than 2000 bytes
+ days = 1
+ lifecycle_config = {
+ 'Rules': [
+ {
+ 'Expiration': {
+ 'Days': days
+ },
+ 'ID': 'object_lt1',
+ 'Filter': {
+ 'Prefix': '',
+ 'ObjectSizeLessThan': 2000
+ },
+ 'Status': 'Enabled',
+ },
+ ]
+ }
+
+ response = client.put_bucket_lifecycle_configuration(
+ Bucket=bucket_name, LifecycleConfiguration=lifecycle_config)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ lc_interval = get_lc_debug_interval()
+ time.sleep(2*lc_interval)
+
+ # we should find only the large object present
+ response = client.list_objects(Bucket=bucket_name)
+ objects = response['Contents']
+
+ assert len(objects) == 1
+ assert objects[0]['Key'] == "myobject_big"
+
@pytest.mark.lifecycle
def test_lifecycle_id_too_long():
bucket_name = get_new_bucket()