import string
import random
import socket
+import dateutil.parser
import ssl
from collections import namedtuple
@attr(resource='bucket')
@attr(method='put')
-@attr(operation='test lifecycle expiration on versining enabled bucket')
+@attr(operation='test lifecycle expiration on versioning enabled bucket')
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
def check_lifecycle_expiration_header(response, start_time, rule_id,
delta_days):
- print(response)
- print(response['ResponseMetadata']['HTTPHeaders'])
+ expr_exists = ('x-amz-expiration' in response['ResponseMetadata']['HTTPHeaders'])
+ if (not expr_exists):
+ return False
+ expr_hdr = response['ResponseMetadata']['HTTPHeaders']['x-amz-expiration']
- exp_header = response['ResponseMetadata']['HTTPHeaders']['x-amz-expiration']
- m = re.search(r'expiry-date="(.+)", rule-id="(.+)"', exp_header)
- datestr = m.group(1)
- exp_date = datetime.datetime.strptime(datestr, '%a, %d %b %Y %H:%M:%S %Z')
- exp_diff = exp_date - start_time
- rule_id = m.group(2)
+ m = re.search(r'expiry-date="(.+)", rule-id="(.+)"', expr_hdr)
- eq(exp_diff.days, delta_days)
- eq(m.group(2), rule_id)
+ expiration = dateutil.parser.parse(m.group(1))
+ days_to_expire = ((expiration.replace(tzinfo=None) - start_time).days == delta_days)
+ rule_eq_id = (m.group(2) == rule_id)
- return True
+ return days_to_expire and rule_eq_id
@attr(resource='bucket')
@attr(method='put')
eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
eq(check_lifecycle_expiration_header(response, now, 'rule1', 1), True)
+@attr(resource='bucket')
+@attr(method='head')
+@attr(operation='test lifecycle expiration header head with tags')
+@attr('lifecycle')
+@attr('lifecycle_expiration')
+def test_lifecycle_expiration_header_tags_head():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ lifecycle={
+ "Rules": [
+ {
+ "Filter": {
+ "Tag": {"Key": "key1", "Value": "tag1"}
+ },
+ "Status": "Enabled",
+ "Expiration": {
+ "Days": 1
+ },
+ "ID": "rule1"
+ },
+ ]
+ }
+ response = client.put_bucket_lifecycle_configuration(
+ Bucket=bucket_name, LifecycleConfiguration=lifecycle)
+ key1 = "obj_key1"
+ body1 = "obj_key1_body"
+ tags1={'TagSet': [{'Key': 'key1', 'Value': 'tag1'},
+ {'Key': 'key5','Value': 'tag5'}]}
+ response = client.put_object(Bucket=bucket_name, Key=key1, Body=body1)
+ response = client.put_object_tagging(Bucket=bucket_name, Key=key1,Tagging=tags1)
+
+ # stat the object, check header
+ response = client.head_object(Bucket=bucket_name, Key=key1)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ eq(check_lifecycle_expiration_header(response, datetime.datetime.now(None), 'rule1', 1), True)
+
+ # test that header is not returning when it should not
+ lifecycle={
+ "Rules": [
+ {
+ "Filter": {
+ "Tag": {"Key": "key2", "Value": "tag1"}
+ },
+ "Status": "Enabled",
+ "Expiration": {
+ "Days": 1
+ },
+ "ID": "rule1"
+ },
+ ]
+ }
+ response = client.put_bucket_lifecycle_configuration(
+ Bucket=bucket_name, LifecycleConfiguration=lifecycle)
+ # stat the object, check header
+ response = client.head_object(Bucket=bucket_name, Key=key1)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ eq(check_lifecycle_expiration_header(response, datetime.datetime.now(None), 'rule1', 1), False)
+
+@attr(resource='bucket')
+@attr(method='head')
+@attr(operation='test lifecycle expiration header head with tags and And')
+@attr('lifecycle')
+@attr('lifecycle_expiration')
+def test_lifecycle_expiration_header_and_tags_head():
+ now = datetime.datetime.now(None)
+ bucket_name = get_new_bucket()
+ client = get_client()
+ lifecycle={
+ "Rules": [
+ {
+ "Filter": {
+ "And": {
+ "Tags": [
+ {
+ "Key": "key1",
+ "Value": "tag1"
+ },
+ {
+ "Key": "key5",
+ "Value": "tag6"
+ }
+ ]
+ }
+ },
+ "Status": "Enabled",
+ "Expiration": {
+ "Days": 1
+ },
+ "ID": "rule1"
+ },
+ ]
+ }
+ response = client.put_bucket_lifecycle_configuration(
+ Bucket=bucket_name, LifecycleConfiguration=lifecycle)
+ key1 = "obj_key1"
+ body1 = "obj_key1_body"
+ tags1={'TagSet': [{'Key': 'key1', 'Value': 'tag1'},
+ {'Key': 'key5','Value': 'tag5'}]}
+ response = client.put_object(Bucket=bucket_name, Key=key1, Body=body1)
+ response = client.put_object_tagging(Bucket=bucket_name, Key=key1,Tagging=tags1)
+
+ # stat the object, check header
+ response = client.head_object(Bucket=bucket_name, Key=key1)
+ eq(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ eq(check_lifecycle_expiration_header(response, datetime.datetime.now(None), 'rule1', 1), False)
+
@attr(resource='bucket')
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')