eq(obj['Owner']['ID'],key_data['ID'])
_compare_dates(obj['LastModified'],key_data['LastModified'])
-# amazon is eventual consistent, retry a bit if failed
+# amazon is eventually consistent, retry a bit if failed
def check_configure_versioning_retry(bucket_name, status, expected_string):
client = get_client()
eq(len(init_objects), 2)
eq(len(expire_objects), 1)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='test lifecycle expiration days 0')
+@attr('lifecycle')
+@attr('lifecycle_expiration')
+def test_lifecycle_expiration_days0():
+ bucket_name = _create_objects(keys=['days0/foo', 'days0/bar'])
+ client = get_client()
+
+ rules=[{'ID': 'rule1', 'Expiration': {'Days': 0}, 'Prefix': 'days0/',
+ 'Status':'Enabled'}]
+ lifecycle = {'Rules': rules}
+ response = client.put_bucket_lifecycle_configuration(
+ Bucket=bucket_name, LifecycleConfiguration=lifecycle)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+ time.sleep(20)
+
+ response = client.list_objects(Bucket=bucket_name)
+ expire_objects = response['Contents']
+
+ eq(len(expire_objects), 0)
+
+def setup_lifecycle_expiration_test(bucket_name, rule_id, delta_days,
+ rule_prefix):
+ """
+ Common setup for lifecycle expiration header checks:
+ """
+ rules=[{'ID': rule_id,
+ 'Expiration': {'Days': delta_days}, 'Prefix': rule_prefix,
+ 'Status':'Enabled'}]
+ lifecycle = {'Rules': rules}
+ response = client.put_bucket_lifecycle_configuration(
+ Bucket=bucket_name, LifecycleConfiguration=lifecycle)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+
+ key = rule_prefix + '/foo'
+ body = 'bar'
+ response = client.put_object(Bucket=bucket_name, Key=key, Body=bar)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ return response
+
+def check_lifecycle_expiration_header(response, start_time, rule_id,
+ delta_days):
+ exp_header = response['ResponseMetadata']['HTTPHeaders']['x-amz-expiration']
+ m = re.search(r'expiry-date="(.+)", rule-id="(.+)"', exp_header)
+
+ expiration = datetime.datetime.strptime(m.group(1),
+ '%a %b %d %H:%M:%S %Y')
+ eq((expiration - start_time).days, delta_days)
+ eq(m.group(2), rule_id)
+
+ return True
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='test lifecycle expiration header put')
+@attr('lifecycle')
+@attr('lifecycle_expiration')
+def test_lifecycle_expiration_header_put():
+ """
+ Check for valid x-amz-expiration header after PUT
+ """
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ now = datetime.datetime.now(None)
+ response = setup_lifecycle_expiration_test(
+ bucket_name, 'rule1', 1, 'days1/')
+ eq(check_lifecycle_expiration_header(response, now, 'rule1', 1), True)
+
+@attr(resource='bucket')
+@attr(method='head')
+@attr(operation='test lifecycle expiration header head')
+@attr('lifecycle')
+@attr('lifecycle_expiration')
+def test_lifecycle_expiration_header_head():
+ """
+ Check for valid x-amz-expiration header on HEAD request
+ """
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ now = datetime.datetime.now(None)
+ response = setup_lifecycle_expiration_test(
+ bucket_name, 'rule1', 1, 'days1/')
+
+ # stat the object, check header
+ response = client.head_object(Bucket=bucket_name, Key=key)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ eq(check_lifecycle_expiration_header(response, now, 'rule1', 1), True)
+
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')