objs_list = get_objects_list(bucket_name, prefix='_bla/')
assert len(objs_list) == 4
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_zero_size():
key = 'foo123bar'
response = client.get_object(Bucket=bucket_name, Key='bar321foo')
assert response['ContentLength'] == 0
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_16m():
bucket_name = get_new_bucket()
response = client.get_object(Bucket=bucket_name, Key=key2)
assert response['ContentLength'] == 16*1024*1024
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_same_bucket():
bucket_name = get_new_bucket()
body = _get_body(response)
assert 'foo' == body
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_verify_contenttype():
bucket_name = get_new_bucket()
response_content_type = response['ContentType']
assert response_content_type == content_type
+@pytest.mark.copy
def test_object_copy_to_itself():
bucket_name = get_new_bucket()
client = get_client()
assert status == 400
assert error_code == 'InvalidRequest'
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_to_itself_with_metadata():
bucket_name = get_new_bucket()
response = client.get_object(Bucket=bucket_name, Key='foo123bar')
assert response['Metadata'] == metadata
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_diff_bucket():
bucket_name1 = get_new_bucket()
body = _get_body(response)
assert 'foo' == body
+@pytest.mark.copy
def test_object_copy_not_owned_bucket():
client = get_client()
alt_client = get_alt_client()
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
+@pytest.mark.copy
def test_object_copy_not_owned_object_bucket():
client = get_client()
alt_client = get_alt_client()
copy_source = {'Bucket': bucket_name, 'Key': 'foo123bar'}
alt_client.copy(copy_source, bucket_name, 'bar321foo')
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_canned_acl():
bucket_name = get_new_bucket()
# check ACL is applied by doing GET from another user
alt_client.get_object(Bucket=bucket_name, Key='foo123bar')
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_retaining_metadata():
for size in [3, 1024 * 1024]:
body = _get_body(response)
assert size == response['ContentLength']
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_replacing_metadata():
for size in [3, 1024 * 1024]:
assert metadata == response['Metadata']
assert size == response['ContentLength']
+@pytest.mark.copy
def test_object_copy_bucket_not_found():
bucket_name = get_new_bucket()
client = get_client()
status = _get_status(e.response)
assert status == 404
+@pytest.mark.copy
def test_object_copy_key_not_found():
bucket_name = get_new_bucket()
client = get_client()
status = _get_status(e.response)
assert status == 404
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_versioned_bucket():
bucket_name = get_new_bucket()
assert data_str == body
assert size == response['ContentLength']
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_versioned_url_encoding():
bucket = get_new_bucket_resource()
return (upload_id, s, parts, part_checksums)
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_versioning_multipart_upload():
bucket_name = get_new_bucket()
src_data = _get_body(response)
assert src_data == dest_data
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_small():
src_key = 'foo'
assert size == response['ContentLength']
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
+@pytest.mark.copy
def test_multipart_copy_invalid_range():
client = get_client()
src_key = 'source'
assert error_code == 'InvalidRange'
+@pytest.mark.copy
# TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40795 is resolved
@pytest.mark.fails_on_rgw
def test_multipart_copy_improper_range():
assert error_code == 'InvalidArgument'
+@pytest.mark.copy
def test_multipart_copy_without_range():
client = get_client()
src_key = 'source'
assert response['ContentLength'] == 10
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_special_names():
src_bucket_name = get_new_bucket()
assert expected_string == read_status
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_versioned():
src_bucket_name = get_new_bucket()
(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_multiple_sizes():
src_key = 'foo'
assert len(version_ids) == 0
assert len(version_ids) == len(contents)
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_versioning_obj_suspended_copy():
bucket_name = get_new_bucket()
check_obj_content(client, bucket_name, key, version['VersionId'], contents[j])
i += 1
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_versioning_copy_obj_version():
bucket_name = get_new_bucket()
assert status == 403
+@pytest.mark.copy
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
def test_bucket_policy_upload_part_copy():
alt_client.abort_multipart_upload(Bucket=bucket_name2, Key='new_foo2', UploadId=upload_id)
+@pytest.mark.copy
@pytest.mark.tagging
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
copy_source = {'Bucket': bucket_name, 'Key': 'private/foo'}
check_access_denied(alt_client.copy_object, Bucket=bucket_name2, CopySource=copy_source, Key='new_foo2')
+@pytest.mark.copy
@pytest.mark.tagging
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
assert status == 403
assert error_code == 'AccessDenied'
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_copy_object_ifmatch_good():
bucket_name = get_new_bucket()
body = _get_body(response)
assert body == 'bar'
+@pytest.mark.copy
# TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40808 is resolved
@pytest.mark.fails_on_rgw
def test_copy_object_ifmatch_failed():
assert status == 412
assert error_code == 'PreconditionFailed'
+@pytest.mark.copy
# TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40808 is resolved
@pytest.mark.fails_on_rgw
def test_copy_object_ifnonematch_good():
assert status == 412
assert error_code == 'PreconditionFailed'
+@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_copy_object_ifnonematch_failed():
bucket_name = get_new_bucket()