import boto.exception
import boto.s3.connection
import boto.s3.acl
+import boto.s3.lifecycle
import bunch
import datetime
import time
eq(e.reason, 'Conflict')
eq(e.error_code, 'BucketAlreadyOwnedByYou')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='recreate')
+def test_bucket_configure_recreate():
+ # aws-s3 default region allows recreation of buckets
+ # but all other regions fail with BucketAlreadyOwnedByYou.
+ bucket = get_new_bucket(targets.main.default)
+ try:
+ get_new_bucket(targets.main.default, bucket.name)
+ except boto.exception.S3CreateError, e:
+ eq(e.status, 409)
+ eq(e.reason, 'Conflict')
+ eq(e.error_code, 'BucketAlreadyOwnedByYou')
+
@attr(resource='bucket')
@attr(method='get')
eq(_count_bucket_versioned_objs(bucket), 0)
eq(len(bucket.get_all_keys()), 0)
+
+# Create a lifecycle config. Either days (int) and prefix (string) is given, or rules.
+# Rules is an array of dictionaries, each dict has a 'days' and a 'prefix' key
+def create_lifecycle(days = None, prefix = 'test/', rules = None):
+ lifecycle = boto.s3.lifecycle.Lifecycle()
+ if rules == None:
+ expiration = boto.s3.lifecycle.Expiration(days=days)
+ rule = boto.s3.lifecycle.Rule(id=prefix, prefix=prefix, status='Enabled',
+ expiration=expiration)
+ lifecycle.append(rule)
+ else:
+ for rule in rules:
+ expiration = boto.s3.lifecycle.Expiration(days=rule['days'])
+ rule = boto.s3.lifecycle.Rule(id=rule['prefix'], prefix=rule['prefix'],
+ status='Enabled', expiration=expiration)
+ lifecycle.append(rule)
+ return lifecycle
+
+def set_lifecycle(rules = None):
+ bucket = get_new_bucket()
+ lifecycle = create_lifecycle(rules=rules)
+ bucket.configure_lifecycle(lifecycle)
+ return bucket
+
+
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='set lifecycle config')
+@attr('lifecycle')
+def test_lifecycle_set():
+ bucket = get_new_bucket()
+ lifecycle = create_lifecycle(rules=[{'days': 1, 'prefix': 'test1/'},
+ {'days': 2, 'prefix': 'test2/'}])
+ eq(bucket.configure_lifecycle(lifecycle), True)
+
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='get lifecycle config')
+@attr('lifecycle')
+def test_lifecycle_get():
+ bucket = set_lifecycle(rules=[{'days': 31, 'prefix': 'test1/'},
+ {'days': 120, 'prefix': 'test2/'}])
+ current = bucket.get_lifecycle_config()
+ eq(current[0].expiration.days, 31)
+ eq(current[0].id, 'test1/')
+ eq(current[0].prefix, 'test1/')
+ eq(current[1].expiration.days, 120)
+ eq(current[1].id, 'test2/')
+ eq(current[1].prefix, 'test2/')
+
+# The test harnass for lifecycle is configured to treat days as 2 second intervals.
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='test lifecycle expiration')
+@attr('lifecycle')
+@attr('fails_on_aws')
+def test_lifecycle_expiration():
+ bucket = set_lifecycle(rules=[{'days': 2, 'prefix': 'expire1/'},
+ {'days': 6, 'prefix': 'expire3/'}])
+ _create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
+ 'keep2/bar', 'expire3/foo', 'expire3/bar'])
+ # Get list of all keys
+ init_keys = bucket.get_all_keys()
+ # Wait for first expiration (plus fudge to handle the timer window)
+ time.sleep(35)
+ expire1_keys = bucket.get_all_keys()
+ # Wait for next expiration cycle
+ time.sleep(15)
+ keep2_keys = bucket.get_all_keys()
+ # Wait for final expiration cycle
+ time.sleep(25)
+ expire3_keys = bucket.get_all_keys()
+
+ eq(len(init_keys), 6)
+ eq(len(expire1_keys), 4)
+ eq(len(keep2_keys), 4)
+ eq(len(expire3_keys), 2)