bucket.configure_lifecycle(lifecycle)
return bucket
+def configured_storage_classes():
+ sc = [ 'STANDARD' ]
+
+ if 'storage_classes' in config['main']:
+ extra_sc = re.split('\W+', config['main']['storage_classes'])
+
+ for item in extra_sc:
+ if item != 'STANDARD':
+ sc.append(item)
+
+ return sc
+
def lc_transition(days=None, date=None, storage_class=None):
return boto.s3.lifecycle.Transition(days=days, date=date, storage_class=storage_class)
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition():
- bucket = set_lifecycle(rules=[{'id': 'rule1', 'transition': lc_transition(days=1, storage_class='FOOCLASS'), 'prefix': 'expire1/', 'status': 'Enabled'},
- {'id':'rule2', 'transition': lc_transition(days=4, storage_class='BARCLASS'), 'prefix': 'expire3/', 'status': 'Enabled'}])
+ sc = configured_storage_classes()
+ if len(sc) < 3:
+ raise SkipTest
+
+ bucket = set_lifecycle(rules=[{'id': 'rule1', 'transition': lc_transition(days=1, storage_class=sc[1]), 'prefix': 'expire1/', 'status': 'Enabled'},
+ {'id':'rule2', 'transition': lc_transition(days=4, storage_class=sc[2]), 'prefix': 'expire3/', 'status': 'Enabled'}])
_create_keys(bucket=bucket, keys=['expire1/foo', 'expire1/bar', 'keep2/foo',
'keep2/bar', 'expire3/foo', 'expire3/bar'])
# Get list of all keys
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
- eq(len(expire1_keys['FOOCLASS']), 2)
- eq(len(expire1_keys['BARCLASS']), 0)
+ eq(len(expire1_keys[sc[1]]), 2)
+ eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
- eq(len(keep2_keys['FOOCLASS']), 2)
- eq(len(keep2_keys['BARCLASS']), 0)
+ eq(len(keep2_keys[sc[1]]), 2)
+ eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 2)
- eq(len(expire3_keys['FOOCLASS']), 2)
- eq(len(expire3_keys['BARCLASS']), 2)
+ eq(len(expire3_keys[sc[1]]), 2)
+ eq(len(expire3_keys[sc[2]]), 2)
# The test harness for lifecycle is configured to treat days as 10 second intervals.
@attr(resource='bucket')
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_transition_single_rule_multi_trans():
+ sc = configured_storage_classes()
+ if len(sc) < 3:
+ raise SkipTest
+
bucket = set_lifecycle(rules=[
{'id': 'rule1',
'transition': lc_transitions([
- lc_transition(days=1, storage_class='FOOCLASS'),
- lc_transition(days=4, storage_class='BARCLASS')]),
+ lc_transition(days=1, storage_class=sc[1]),
+ lc_transition(days=4, storage_class=sc[2])]),
'prefix': 'expire1/',
'status': 'Enabled'}])
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 4)
- eq(len(expire1_keys['FOOCLASS']), 2)
- eq(len(expire1_keys['BARCLASS']), 0)
+ eq(len(expire1_keys[sc[1]]), 2)
+ eq(len(expire1_keys[sc[2]]), 0)
# Wait for next expiration cycle
time.sleep(10)
keep2_keys = list_bucket_storage_class(bucket)
eq(len(keep2_keys['STANDARD']), 4)
- eq(len(keep2_keys['FOOCLASS']), 2)
- eq(len(keep2_keys['BARCLASS']), 0)
+ eq(len(keep2_keys[sc[1]]), 2)
+ eq(len(keep2_keys[sc[2]]), 0)
# Wait for final expiration cycle
time.sleep(20)
expire3_keys = list_bucket_storage_class(bucket)
eq(len(expire3_keys['STANDARD']), 4)
- eq(len(expire3_keys['FOOCLASS']), 0)
- eq(len(expire3_keys['BARCLASS']), 2)
+ eq(len(expire3_keys[sc[1]]), 0)
+ eq(len(expire3_keys[sc[2]]), 2)
@attr(resource='bucket')
@attr(method='put')
@attr('lifecycle')
@attr('lifecycle_transition')
def test_lifecycle_set_noncurrent_transition():
+ sc = configured_storage_classes()
+ if len(sc) < 3:
+ raise SkipTest
+
bucket = get_new_bucket()
rules = [
{
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 2,
- 'StorageClass': 'FOOCLASS'
+ 'StorageClass': sc[1]
},
{
'NoncurrentDays': 4,
- 'StorageClass': 'BARCLASS'
+ 'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
@attr('lifecycle_transition')
@attr('fails_on_aws')
def test_lifecycle_noncur_transition():
+ sc = configured_storage_classes()
+ if len(sc) < 3:
+ raise SkipTest
+
bucket = get_new_bucket()
check_configure_versioning_retry(bucket, True, "Enabled")
'NoncurrentVersionTransition': [
{
'NoncurrentDays': 1,
- 'StorageClass': 'FOOCLASS'
+ 'StorageClass': sc[1]
},
{
'NoncurrentDays': 3,
- 'StorageClass': 'BARCLASS'
+ 'StorageClass': sc[2]
}
],
'NoncurrentVersionExpiration': {
time.sleep(25)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
- eq(len(expire1_keys['FOOCLASS']), 4)
- eq(len(expire1_keys['BARCLASS']), 0)
+ eq(len(expire1_keys[sc[1]]), 4)
+ eq(len(expire1_keys[sc[2]]), 0)
time.sleep(20)
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
- eq(len(expire1_keys['FOOCLASS']), 0)
- eq(len(expire1_keys['BARCLASS']), 4)
+ eq(len(expire1_keys[sc[1]]), 0)
+ eq(len(expire1_keys[sc[2]]), 4)
time.sleep(20)
expire_keys = bucket.get_all_versions()
expire1_keys = list_bucket_storage_class(bucket)
eq(len(expire1_keys['STANDARD']), 2)
- eq(len(expire1_keys['FOOCLASS']), 0)
- eq(len(expire1_keys['BARCLASS']), 0)
+ eq(len(expire1_keys[sc[1]]), 0)
+ eq(len(expire1_keys[sc[2]]), 0)
@attr(resource='bucket')
@attr(method='put')