response = client.put_object(Bucket=bucket, Key=key, IfMatch=etag)
assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+def prepare_multipart_upload(client, bucket, key, body):
+ upload_id = client.create_multipart_upload(Bucket=bucket, Key=key)['UploadId']
+ response = client.upload_part(UploadId=upload_id, Bucket=bucket, Key=key, PartNumber=1, Body=body)
+ parts = [{'ETag': response['ETag'].strip('"'), 'PartNumber': 1}]
+ return upload_id, parts
+
+def successful_conditional_multipart_upload(client, bucket, key, body='abc', IfMatch=None, IfNoneMatch=None):
+ upload_id, parts = prepare_multipart_upload(client, bucket, key, body)
+ if IfMatch:
+ response = client.complete_multipart_upload(Bucket=bucket, Key=key, IfMatch=IfMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ elif IfNoneMatch :
+ response = client.complete_multipart_upload(Bucket=bucket, Key=key, IfNoneMatch=IfNoneMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ else :
+ response = client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ return response
+
+def failing_conditional_multipart_upload(expected_failure, client, bucket, key, body='abc', IfMatch=None, IfNoneMatch=None):
+ upload_id, parts = prepare_multipart_upload(client, bucket, key, body)
+ if IfMatch:
+ e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, IfMatch=IfMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ elif IfNoneMatch:
+ e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, IfNoneMatch=IfNoneMatch, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ else :
+ e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ assert expected_failure == _get_status_and_error_code(e.response)
+
+@pytest.mark.conditional_write
+@pytest.mark.fails_on_dbstore
+def test_multipart_put_object_if_match():
+ client = get_client()
+ bucket = get_new_bucket(client)
+ key = 'obj'
+
+ response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch='*')
+ etag = response['ETag']
+
+ failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch='*')
+ failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch=etag)
+
+ response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch='badetag')
+ etag = response['ETag']
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
+ response = successful_conditional_multipart_upload(client, bucket, key, IfMatch=etag)
+ etag = response['ETag']
+
+ client.delete_object(Bucket=bucket, Key=key)
+
+ failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='*')
+ failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='badetag')
+
+ response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch=etag)
+ etag = response['ETag']
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+ response = successful_conditional_multipart_upload(client, bucket, key, IfMatch='*')
+ etag = response['ETag']
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+ failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfMatch='badetag')
+ response = successful_conditional_multipart_upload(client, bucket, key, IfMatch=etag)
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
@pytest.mark.conditional_write
@pytest.mark.fails_on_dbstore
def test_put_current_object_if_none_match():
client.delete_object(Bucket=bucket, Key=key)
+@pytest.mark.conditional_write
+@pytest.mark.fails_on_dbstore
+def test_multipart_put_current_object_if_none_match():
+ client = get_client()
+ bucket = get_new_bucket(client)
+ check_configure_versioning_retry(bucket, "Enabled", "Enabled")
+ key = 'obj'
+ data1 = 'data1'
+ data2 = 'data2'
+
+ response = successful_conditional_multipart_upload(client, bucket, key, data1, IfNoneMatch='*')
+ etag = response['ETag']
+
+ response = successful_conditional_multipart_upload(client, bucket, key, data2)
+ etag2 = response['ETag']
+
+ failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch='*')
+ failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfNoneMatch=etag2)
+
+ # we can't specify a version, so we only check against current object
+ response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch=etag)
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+ response = successful_conditional_multipart_upload(client, bucket, key, IfNoneMatch='badetag')
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
+ client.delete_object(Bucket=bucket, Key=key)
+
@pytest.mark.conditional_write
@pytest.mark.fails_on_dbstore
def test_put_current_object_if_match():
response = client.put_object(Bucket=bucket, Key=key, IfMatch=etag2)
assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+@pytest.mark.conditional_write
+@pytest.mark.fails_on_dbstore
+def test_multipart_put_current_object_if_match():
+ client = get_client()
+ bucket = get_new_bucket(client)
+ check_configure_versioning_retry(bucket, "Enabled", "Enabled")
+ key = 'obj'
+ data1 = 'data1'
+ data2 = 'data2'
+ etag = 'deadbeef'
+
+ failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='*')
+ failing_conditional_multipart_upload((404, 'NoSuchKey'), client, bucket, key, IfMatch='badetag')
+
+ response = successful_conditional_multipart_upload(client, bucket, key, data1, IfNoneMatch=etag)
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+ etag = response['ETag']
+ versionId = response['VersionId']
+ response = successful_conditional_multipart_upload(client, bucket, key, data2, IfMatch='*')
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+ etag2 = response['ETag']
+
+ failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfMatch='badetag')
+ failing_conditional_multipart_upload((412, 'PreconditionFailed'), client, bucket, key, IfMatch=etag)
+ response = successful_conditional_multipart_upload(client, bucket, key, IfMatch=etag2)
+ assert 200 == response['ResponseMetadata']['HTTPStatusCode']
+
@pytest.mark.conditional_write
@pytest.mark.fails_on_dbstore
def test_put_object_current_if_match():