]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
cloud-restore: testcase for non-current versioned object 627/head
authorSoumya Koduri <skoduri@redhat.com>
Tue, 4 Mar 2025 17:49:24 +0000 (23:19 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Wed, 5 Mar 2025 12:17:36 +0000 (17:47 +0530)
Signed-off-by: Soumya Koduri <skoduri@redhat.com>
s3tests_boto3/functional/test_s3.py

index 2e86d44310c3ee84b1e8963d2816fe5dca46921f..a7da2f94f94c6d06a4f4fa992670edd4bc01ea7e 100644 (file)
@@ -9536,8 +9536,11 @@ def verify_object(client, bucket, key, content=None, sc=None):
         body = _get_body(response)
         assert body == content
 
-def verify_transition(client, bucket, key, sc=None):
-    response = client.head_object(Bucket=bucket, Key=key)
+def verify_transition(client, bucket, key, sc=None, version=None):
+    if (version != None):
+        response = client.head_object(Bucket=bucket, Key=key, VersionId=version)
+    else:
+        response = client.head_object(Bucket=bucket, Key=key)
 
     # Iterate over the contents to find the StorageClass
     if 'StorageClass' in response:
@@ -9966,6 +9969,92 @@ def test_read_through():
             response = client.get_object(Bucket=bucket, Key=key)
         assert e.exception.response['Error']['Code'] == '403'
 
+@pytest.mark.cloud_restore
+@pytest.mark.fails_on_aws
+@pytest.mark.fails_on_dbstore
+def test_restore_noncur_obj():
+    cloud_sc = get_cloud_storage_class()
+    if cloud_sc == None:
+        pytest.skip('[s3 cloud] section missing cloud_storage_class')
+
+    retain_head_object = get_cloud_retain_head_object()
+    target_path = get_cloud_target_path()
+    target_sc = get_cloud_target_storage_class()
+
+    sc = ['STANDARD', cloud_sc]
+
+    bucket = get_new_bucket()
+    client = get_client()
+
+    # before enabling versioning, create a plain entry
+    # which should get transitioned/expired similar to
+    # other non-current versioned entries.
+    key = 'test1/a'
+    content = 'fooz'
+    client.put_object(Bucket=bucket, Key=key, Body=content)
+
+    check_configure_versioning_retry(bucket, "Enabled", "Enabled")
+
+    rules = [
+        {
+            'ID': 'rule1',
+            'Prefix': 'test1/',
+            'Status': 'Enabled',
+            'NoncurrentVersionTransitions': [
+                {
+                    'NoncurrentDays': 2,
+                    'StorageClass': cloud_sc
+                }
+            ],
+        }
+    ]
+    lifecycle = {'Rules': rules}
+    response = client.put_bucket_lifecycle_configuration(Bucket=bucket, LifecycleConfiguration=lifecycle)
+
+    keys = ['test1/a']
+
+    (version_ids, contents) = create_multiple_versions(client, bucket, "test1/a", 2)
+
+    contents.append(content)
+
+    init_keys = list_bucket_storage_class(client, bucket)
+    print(init_keys)
+    assert len(init_keys['STANDARD']) == 3
+
+    version_ids = []
+    response = client.list_object_versions(Bucket=bucket)
+    for version in response['Versions']:
+        version_ids.append(version['VersionId'])
+
+    lc_interval = get_lc_debug_interval()
+
+    time.sleep(7*lc_interval)
+    expire1_keys = list_bucket_storage_class(client, bucket)
+    assert len(expire1_keys['STANDARD']) == 1
+    assert len(expire1_keys[cloud_sc]) == 2
+
+    restore_interval = get_restore_debug_interval()
+
+    for num in range(1, 2):
+        verify_transition(client, bucket, key, cloud_sc, version_ids[num])
+        # Restore object temporarily
+        client.restore_object(Bucket=bucket, Key=key, VersionId=version_ids[num], RestoreRequest={'Days': 2})
+        time.sleep(2)
+
+        # Verify object is restored temporarily
+        response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num])
+        assert response['ContentLength'] == len(contents[num])
+        response  = client.list_object_versions(Bucket=bucket)
+        versions = response['Versions']
+        assert versions[1]['IsLatest'] == False
+
+    time.sleep(2 * (restore_interval + lc_interval))
+
+    #verify object expired
+    for num in range(1, 2):
+        response = client.head_object(Bucket=bucket, Key=key, VersionId=version_ids[num])
+        assert response['ContentLength'] == 0
+
 @pytest.mark.encryption
 @pytest.mark.fails_on_dbstore
 def test_encrypted_transfer_1b():