]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
s3: test conditional multipart_put_object() 693/head
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Wed, 6 Aug 2025 10:37:23 +0000 (13:37 +0300)
committerAli Masarwa <amasarwa@redhat.com>
Thu, 11 Sep 2025 07:43:49 +0000 (10:43 +0300)
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
Signed-off-by: Ali Masarwa <ALI.SAED.MASARWA@gmail.com>
s3tests_boto3/functional/test_s3.py

index aa00da7b14ab59a8a7adcc7cc60a20be548d8c2c..256e87516e05b7532b9d4a27d9ed4fb1062146fa 100644 (file)
@@ -18438,6 +18438,69 @@ def test_put_object_if_match():
     response = client.put_object(Bucket=bucket, Key=key, IfMatch=etag)
     assert 200 == response['ResponseMetadata']['HTTPStatusCode']
 
+def prepare_multipart_upload(client, bucket, key, body):
+    upload_id = client.create_multipart_upload(Bucket=bucket, Key=key)['UploadId']
+    response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body)
+    parts = [{'ETag': response['ETag'].strip('"'), 'PartNumber': 1}]
+    return upload_id, parts
+
+def successful_conditional_multipart_upload(client, bucket, key, body='abc', IfMatch=None, IfNoneMatch=None):
+    upload_id, parts = prepare_multipart_upload(client, bucket, key, body)
+    if IfMatch:
+        response = client.complete_multipart_upload(Bucket=bucket, Key=key, IfMatch=IfMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+    elif IfNoneMatch :
+        response = client.complete_multipart_upload(Bucket=bucket, Key=key, IfNoneMatch=IfNoneMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+    else :
+        response = client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+    return response
+
+def failing_conditional_multipart_upload(expected_failure, client, bucket, key, body='abc', IfMatch=None, IfNoneMatch=None):
+    upload_id, parts = prepare_multipart_upload(client, bucket, key, body)
+    if IfMatch:
+        e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, IfMatch=IfMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+    elif IfNoneMatch:
+        e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, IfNoneMatch=IfNoneMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+    else :
+        e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+    assert expected_failure == _get_status_and_error_code(e.response)
+
+@pytest.mark.conditional_write
+@pytest.mark.fails_on_dbstore
+def test_multipart_put_object_if_match():
+    client = get_client()
+    bucket = get_new_bucket(client)
+    key = 'obj'
+
+    response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch='*')
+    etag = response['ETag']
+
+    failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch='*')
+    failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch=etag)
+
+    response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch='badetag')
+    etag = response['ETag']
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
+    response = successful_conditional_multipart_upload(client, bucket, key, IfMatch=etag)
+    etag = response['ETag']
+
+    client.delete_object(Bucket=bucket, Key=key)
+
+    failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='*')
+    failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='badetag')
+
+    response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch=etag)
+    etag = response['ETag']
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+    response = successful_conditional_multipart_upload(client, bucket, key, IfMatch='*')
+    etag = response['ETag']
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+    failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfMatch='badetag')
+    response = successful_conditional_multipart_upload(client, bucket, key, IfMatch=etag)
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
 @pytest.mark.conditional_write
 @pytest.mark.fails_on_dbstore
 def test_put_current_object_if_none_match():
@@ -18467,6 +18530,33 @@ def test_put_current_object_if_none_match():
 
     client.delete_object(Bucket=bucket, Key=key)
 
+@pytest.mark.conditional_write
+@pytest.mark.fails_on_dbstore
+def test_multipart_put_current_object_if_none_match():
+    client = get_client()
+    bucket = get_new_bucket(client)
+    check_configure_versioning_retry(bucket, "Enabled", "Enabled")
+    key = 'obj'
+    data1 = 'data1'
+    data2 = 'data2'
+
+    response = successful_conditional_multipart_upload(client, bucket, key, data1, IfNoneMatch='*')
+    etag = response['ETag']
+
+    response = successful_conditional_multipart_upload(client, bucket, key, data2)
+    etag2 = response['ETag']
+
+    failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch='*')
+    failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch=etag2)
+
+    # we can't specify a version, so we only check against current object
+    response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch=etag)
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+    response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch='badetag')
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
+    client.delete_object(Bucket=bucket, Key=key)
+
 @pytest.mark.conditional_write
 @pytest.mark.fails_on_dbstore
 def test_put_current_object_if_match():
@@ -18498,6 +18588,33 @@ def test_put_current_object_if_match():
     response = client.put_object(Bucket=bucket, Key=key, IfMatch=etag2)
     assert 200 == response['ResponseMetadata']['HTTPStatusCode']
 
+@pytest.mark.conditional_write
+@pytest.mark.fails_on_dbstore
+def test_multipart_put_current_object_if_match():
+    client = get_client()
+    bucket = get_new_bucket(client)
+    check_configure_versioning_retry(bucket, "Enabled", "Enabled")
+    key = 'obj'
+    data1 = 'data1'
+    data2 = 'data2'
+    etag = 'deadbeef'
+
+    failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='*')
+    failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='badetag')
+
+    response = successful_conditional_multipart_upload(client, bucket, key, data1, IfNoneMatch=etag)
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+    etag = response['ETag']
+    versionId = response['VersionId']
+    response = successful_conditional_multipart_upload(client, bucket, key, data2, IfMatch='*')
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+    etag2 = response['ETag']
+
+    failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfMatch='badetag')
+    failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfMatch=etag)
+    response = successful_conditional_multipart_upload(client, bucket, key, IfMatch=etag2)
+    assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
 @pytest.mark.conditional_write
 @pytest.mark.fails_on_dbstore
 def test_put_object_current_if_match():