From: Andrew Gaul Date: Mon, 11 Nov 2024 04:33:07 +0000 (-0800) Subject: Tag copy tests X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F600%2Fhead;p=s3-tests.git Tag copy tests --- diff --git a/pytest.ini b/pytest.ini index 12918693..5ff5da8f 100644 --- a/pytest.ini +++ b/pytest.ini @@ -13,6 +13,7 @@ markers = checksum cloud_transition cloud_restore + copy encryption fails_on_aws fails_on_dbstore diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index ceeae050..6723d71d 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -5437,6 +5437,7 @@ def test_bucket_list_special_prefix(): objs_list = get_objects_list(bucket_name, prefix='_bla/') assert len(objs_list) == 4 +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_zero_size(): key = 'foo123bar' @@ -5451,6 +5452,7 @@ def test_object_copy_zero_size(): response = client.get_object(Bucket=bucket_name, Key='bar321foo') assert response['ContentLength'] == 0 +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_16m(): bucket_name = get_new_bucket() @@ -5464,6 +5466,7 @@ def test_object_copy_16m(): response = client.get_object(Bucket=bucket_name, Key=key2) assert response['ContentLength'] == 16*1024*1024 +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_same_bucket(): bucket_name = get_new_bucket() @@ -5478,6 +5481,7 @@ def test_object_copy_same_bucket(): body = _get_body(response) assert 'foo' == body +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_verify_contenttype(): bucket_name = get_new_bucket() @@ -5496,6 +5500,7 @@ def test_object_copy_verify_contenttype(): response_content_type = response['ContentType'] assert response_content_type == content_type +@pytest.mark.copy def test_object_copy_to_itself(): bucket_name = get_new_bucket() client = get_client() @@ -5508,6 +5513,7 @@ def test_object_copy_to_itself(): assert status == 400 assert error_code == 'InvalidRequest' +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_to_itself_with_metadata(): bucket_name = get_new_bucket() @@ -5520,6 +5526,7 @@ def test_object_copy_to_itself_with_metadata(): response = client.get_object(Bucket=bucket_name, Key='foo123bar') assert response['Metadata'] == metadata +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_diff_bucket(): bucket_name1 = get_new_bucket() @@ -5536,6 +5543,7 @@ def test_object_copy_diff_bucket(): body = _get_body(response) assert 'foo' == body +@pytest.mark.copy def test_object_copy_not_owned_bucket(): client = get_client() alt_client = get_alt_client() @@ -5552,6 +5560,7 @@ def test_object_copy_not_owned_bucket(): status, error_code = _get_status_and_error_code(e.response) assert status == 403 +@pytest.mark.copy def test_object_copy_not_owned_object_bucket(): client = get_client() alt_client = get_alt_client() @@ -5573,6 +5582,7 @@ def test_object_copy_not_owned_object_bucket(): copy_source = {'Bucket': bucket_name, 'Key': 'foo123bar'} alt_client.copy(copy_source, bucket_name, 'bar321foo') +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_canned_acl(): bucket_name = get_new_bucket() @@ -5593,6 +5603,7 @@ def test_object_copy_canned_acl(): # check ACL is applied by doing GET from another user alt_client.get_object(Bucket=bucket_name, Key='foo123bar') +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_retaining_metadata(): for size in [3, 1024 * 1024]: @@ -5612,6 +5623,7 @@ def test_object_copy_retaining_metadata(): body = _get_body(response) assert size == response['ContentLength'] +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_replacing_metadata(): for size in [3, 1024 * 1024]: @@ -5633,6 +5645,7 @@ def test_object_copy_replacing_metadata(): assert metadata == response['Metadata'] assert size == response['ContentLength'] +@pytest.mark.copy def test_object_copy_bucket_not_found(): bucket_name = get_new_bucket() client = get_client() @@ -5642,6 +5655,7 @@ def test_object_copy_bucket_not_found(): status = _get_status(e.response) assert status == 404 +@pytest.mark.copy def test_object_copy_key_not_found(): bucket_name = get_new_bucket() client = get_client() @@ -5651,6 +5665,7 @@ def test_object_copy_key_not_found(): status = _get_status(e.response) assert status == 404 +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_versioned_bucket(): bucket_name = get_new_bucket() @@ -5715,6 +5730,7 @@ def test_object_copy_versioned_bucket(): assert data_str == body assert size == response['ContentLength'] +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_versioned_url_encoding(): bucket = get_new_bucket_resource() @@ -5819,6 +5835,7 @@ def _multipart_upload_checksum(bucket_name, key, size, part_size=5*1024*1024, cl return (upload_id, s, parts, part_checksums) +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_object_copy_versioning_multipart_upload(): bucket_name = get_new_bucket() @@ -5983,6 +6000,7 @@ def _check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name, ver src_data = _get_body(response) assert src_data == dest_data +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_multipart_copy_small(): src_key = 'foo' @@ -6000,6 +6018,7 @@ def test_multipart_copy_small(): assert size == response['ContentLength'] _check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name) +@pytest.mark.copy def test_multipart_copy_invalid_range(): client = get_client() src_key = 'source' @@ -6019,6 +6038,7 @@ def test_multipart_copy_invalid_range(): assert error_code == 'InvalidRange' +@pytest.mark.copy # TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40795 is resolved @pytest.mark.fails_on_rgw def test_multipart_copy_improper_range(): @@ -6050,6 +6070,7 @@ def test_multipart_copy_improper_range(): assert error_code == 'InvalidArgument' +@pytest.mark.copy def test_multipart_copy_without_range(): client = get_client() src_key = 'source' @@ -6075,6 +6096,7 @@ def test_multipart_copy_without_range(): assert response['ContentLength'] == 10 _check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name) +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_multipart_copy_special_names(): src_bucket_name = get_new_bucket() @@ -6167,6 +6189,7 @@ def check_configure_versioning_retry(bucket_name, status, expected_string): assert expected_string == read_status +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_multipart_copy_versioned(): src_bucket_name = get_new_bucket() @@ -6254,6 +6277,7 @@ def test_multipart_upload_multiple_sizes(): (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen) client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}) +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_multipart_copy_multiple_sizes(): src_key = 'foo' @@ -7818,6 +7842,7 @@ def test_versioning_obj_suspend_versions(): assert len(version_ids) == 0 assert len(version_ids) == len(contents) +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_versioning_obj_suspended_copy(): bucket_name = get_new_bucket() @@ -7970,6 +7995,7 @@ def test_versioning_obj_list_marker(): check_obj_content(client, bucket_name, key, version['VersionId'], contents[j]) i += 1 +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_versioning_copy_obj_version(): bucket_name = get_new_bucket() @@ -12092,6 +12118,7 @@ def test_bucket_policy_put_obj_tagging_existing_tag(): assert status == 403 +@pytest.mark.copy @pytest.mark.bucket_policy @pytest.mark.fails_on_dbstore def test_bucket_policy_upload_part_copy(): @@ -12149,6 +12176,7 @@ def test_bucket_policy_upload_part_copy(): alt_client.abort_multipart_upload(Bucket=bucket_name2, Key='new_foo2', UploadId=upload_id) +@pytest.mark.copy @pytest.mark.tagging @pytest.mark.bucket_policy @pytest.mark.fails_on_dbstore @@ -12196,6 +12224,7 @@ def test_bucket_policy_put_obj_copy_source(): copy_source = {'Bucket': bucket_name, 'Key': 'private/foo'} check_access_denied(alt_client.copy_object, Bucket=bucket_name2, CopySource=copy_source, Key='new_foo2') +@pytest.mark.copy @pytest.mark.tagging @pytest.mark.bucket_policy @pytest.mark.fails_on_dbstore @@ -13477,6 +13506,7 @@ def test_object_lock_changing_mode_from_compliance(): assert status == 403 assert error_code == 'AccessDenied' +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_copy_object_ifmatch_good(): bucket_name = get_new_bucket() @@ -13488,6 +13518,7 @@ def test_copy_object_ifmatch_good(): body = _get_body(response) assert body == 'bar' +@pytest.mark.copy # TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40808 is resolved @pytest.mark.fails_on_rgw def test_copy_object_ifmatch_failed(): @@ -13500,6 +13531,7 @@ def test_copy_object_ifmatch_failed(): assert status == 412 assert error_code == 'PreconditionFailed' +@pytest.mark.copy # TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40808 is resolved @pytest.mark.fails_on_rgw def test_copy_object_ifnonematch_good(): @@ -13512,6 +13544,7 @@ def test_copy_object_ifnonematch_good(): assert status == 412 assert error_code == 'PreconditionFailed' +@pytest.mark.copy @pytest.mark.fails_on_dbstore def test_copy_object_ifnonematch_failed(): bucket_name = get_new_bucket()