body = _get_body(response)
assert body == content
-def verify_transition(client, bucket, key, sc=None):
- response = client.head_object(Bucket=bucket, Key=key)
+def verify_transition(client, bucket, key, sc=None, version=None):
+ if (version != None):
+ response = client.head_object(Bucket=bucket, Key=key, VersionId=version)
+ else:
+ response = client.head_object(Bucket=bucket, Key=key)
# Iterate over the contents to find the StorageClass
if 'StorageClass' in response:
response = client.get_object(Bucket=bucket, Key=key)
assert e.exception.response['Error']['Code'] == '403'
+@pytest.mark.cloud_restore
+@pytest.mark.fails_on_aws
+@pytest.mark.fails_on_dbstore
+def test_restore_noncur_obj():
+ cloud_sc = get_cloud_storage_class()
+ if cloud_sc == None:
+ pytest.skip('[s3 cloud] section missing cloud_storage_class')
+
+ retain_head_object = get_cloud_retain_head_object()
+ target_path = get_cloud_target_path()
+ target_sc = get_cloud_target_storage_class()
+
+ sc = ['STANDARD', cloud_sc]
+
+ bucket = get_new_bucket()
+ client = get_client()
+
+ # before enabling versioning, create a plain entry
+ # which should get transitioned/expired similar to
+ # other non-current versioned entries.
+ key = 'test1/a'
+ content = 'fooz'
+ client.put_object(Bucket=bucket, Key=key, Body=content)
+
+ check_configure_versioning_retry(bucket, "Enabled", "Enabled")
+
+ rules = [
+ {
+ 'ID': 'rule1',
+ 'Prefix': 'test1/',
+ 'Status': 'Enabled',
+ 'NoncurrentVersionTransitions': [
+ {
+ 'NoncurrentDays': 2,
+ 'StorageClass': cloud_sc
+ }
+ ],
+ }
+ ]
+ lifecycle = {'Rules': rules}
+ response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle)
+
+ keys = ['test1/a']
+
+ (version_ids, contents) = create_multiple_versions(client, bucket, "test1/a", 2)
+
+ contents.append(content)
+
+ init_keys = list_bucket_storage_class(client, bucket)
+ print(init_keys)
+ assert len(init_keys['STANDARD']) == 3
+
+ version_ids = []
+ response = client.list_object_versions(Bucket=bucket)
+ for version in response['Versions']:
+ version_ids.append(version['VersionId'])
+
+ lc_interval = get_lc_debug_interval()
+
+ time.sleep(7*lc_interval)
+ expire1_keys = list_bucket_storage_class(client, bucket)
+ assert len(expire1_keys['STANDARD']) == 1
+ assert len(expire1_keys[cloud_sc]) == 2
+
+ restore_interval = get_restore_debug_interval()
+
+ for num in range(1, 2):
+ verify_transition(client, bucket, key, cloud_sc, version_ids[num])
+ # Restore object temporarily
+ client.restore_object(Bucket=bucket, Key=key, VersionId=version_ids[num], RestoreRequest={'Days': 2})
+ time.sleep(2)
+
+ # Verify object is restored temporarily
+ response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num])
+ assert response['ContentLength'] == len(contents[num])
+ response = client.list_object_versions(Bucket=bucket)
+ versions = response['Versions']
+ assert versions[1]['IsLatest'] == False
+
+ time.sleep(2 * (restore_interval + lc_interval))
+
+ #verify object expired
+ for num in range(1, 2):
+ response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num])
+ assert response['ContentLength'] == 0
+
@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_encrypted_transfer_1b():