@attr(method='get')
@attr(operation='list')
@attr(assertion='prefixes in multi-component object names')
-@attr('fails_on_dbstore')
def test_bucket_list_delimiter_basic():
bucket_name = _create_objects(keys=['foo/bar', 'foo/bar/xyzzy', 'quux/thud', 'asdf'])
client = get_client()
@attr(operation='list')
@attr(assertion='prefixes in multi-component object names')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_delimiter_basic():
bucket_name = _create_objects(keys=['foo/bar', 'foo/bar/xyzzy', 'quux/thud', 'asdf'])
client = get_client()
@attr(operation='list')
@attr(assertion='test url encoding')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_encoding_basic():
bucket_name = _create_objects(keys=['foo+1/bar', 'foo/bar/xyzzy', 'quux ab/thud', 'asdf+b'])
client = get_client()
@attr(operation='list')
@attr(assertion='test url encoding')
@attr('list-objects')
-@attr('fails_on_dbstore')
def test_bucket_list_encoding_basic():
bucket_name = _create_objects(keys=['foo+1/bar', 'foo/bar/xyzzy', 'quux ab/thud', 'asdf+b'])
client = get_client()
@attr(method='get')
@attr(operation='list')
@attr(assertion='prefix and delimiter handling when object ends with delimiter')
-@attr('fails_on_dbstore')
def test_bucket_list_delimiter_prefix_ends_with_delimiter():
bucket_name = _create_objects(keys=['asdf/'])
validate_bucket_list(bucket_name, 'asdf/', '/', '', 1000, False, ['asdf/'], [], None)
@attr(method='get')
@attr(operation='list')
@attr(assertion='non-slash delimiter characters')
-@attr('fails_on_dbstore')
def test_bucket_list_delimiter_alt():
bucket_name = _create_objects(keys=['bar', 'baz', 'cab', 'foo'])
client = get_client()
@attr(method='get')
@attr(assertion='non-slash delimiter characters')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_delimiter_alt():
bucket_name = _create_objects(keys=['bar', 'baz', 'cab', 'foo'])
client = get_client()
@attr(method='get')
@attr(operation='list')
@attr(assertion='percentage delimiter characters')
-@attr('fails_on_dbstore')
def test_bucket_list_delimiter_percentage():
bucket_name = _create_objects(keys=['b%ar', 'b%az', 'c%ab', 'foo'])
client = get_client()
@attr(method='get')
@attr(assertion='percentage delimiter characters')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_delimiter_percentage():
bucket_name = _create_objects(keys=['b%ar', 'b%az', 'c%ab', 'foo'])
client = get_client()
@attr(method='get')
@attr(operation='list')
@attr(assertion='whitespace delimiter characters')
-@attr('fails_on_dbstore')
def test_bucket_list_delimiter_whitespace():
bucket_name = _create_objects(keys=['b ar', 'b az', 'c ab', 'foo'])
client = get_client()
@attr(method='get')
@attr(assertion='whitespace delimiter characters')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_delimiter_whitespace():
bucket_name = _create_objects(keys=['b ar', 'b az', 'c ab', 'foo'])
client = get_client()
@attr(method='get')
@attr(operation='list')
@attr(assertion='dot delimiter characters')
-@attr('fails_on_dbstore')
def test_bucket_list_delimiter_dot():
bucket_name = _create_objects(keys=['b.ar', 'b.az', 'c.ab', 'foo'])
client = get_client()
@attr(method='get')
@attr(assertion='dot delimiter characters')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_delimiter_dot():
bucket_name = _create_objects(keys=['b.ar', 'b.az', 'c.ab', 'foo'])
client = get_client()
@attr(method='get')
@attr(operation='list under prefix')
@attr(assertion='returns only objects under prefix')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_basic():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list under prefix with list-objects-v2')
@attr(assertion='returns only objects under prefix')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_basic():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix')
@attr(assertion='prefixes w/o delimiters')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_alt():
key_names = ['bar', 'baz', 'foo']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list under prefix with list-objects-v2')
@attr(assertion='prefixes w/o delimiters')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_alt():
key_names = ['bar', 'baz', 'foo']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix')
@attr(assertion='nonexistent prefix returns nothing')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_not_exist():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list under prefix with list-objects-v2')
@attr(assertion='nonexistent prefix returns nothing')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_not_exist():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix')
@attr(assertion='non-printable prefix can be specified')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_unreadable():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list under prefix with list-objects-v2')
@attr(assertion='non-printable prefix can be specified')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_unreadable():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix w/delimiter')
@attr(assertion='returns only objects directly under prefix')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_delimiter_basic():
key_names = ['foo/bar', 'foo/baz/xyzzy', 'quux/thud', 'asdf']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list-objects-v2 under prefix w/delimiter')
@attr(assertion='returns only objects directly under prefix')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_delimiter_basic():
key_names = ['foo/bar', 'foo/baz/xyzzy', 'quux/thud', 'asdf']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix w/delimiter')
@attr(assertion='non-slash delimiters')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_delimiter_alt():
key_names = ['bar', 'bazar', 'cab', 'foo']
bucket_name = _create_objects(keys=key_names)
eq(prefixes, ['baza'])
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_delimiter_alt():
key_names = ['bar', 'bazar', 'cab', 'foo']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix w/delimiter')
@attr(assertion='finds nothing w/unmatched prefix')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_delimiter_prefix_not_exist():
key_names = ['b/a/r', 'b/a/c', 'b/a/g', 'g']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list-objects-v2 under prefix w/delimiter')
@attr(assertion='finds nothing w/unmatched prefix')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_delimiter_prefix_not_exist():
key_names = ['b/a/r', 'b/a/c', 'b/a/g', 'g']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix w/delimiter')
@attr(assertion='over-ridden slash ceases to be a delimiter')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_delimiter_delimiter_not_exist():
key_names = ['b/a/c', 'b/a/g', 'b/a/r', 'g']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list-objects-v2 under prefix w/delimiter')
@attr(assertion='over-ridden slash ceases to be a delimiter')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_delimiter_delimiter_not_exist():
key_names = ['b/a/c', 'b/a/g', 'b/a/r', 'g']
bucket_name = _create_objects(keys=key_names)
@attr(method='get')
@attr(operation='list under prefix w/delimiter')
@attr(assertion='finds nothing w/unmatched prefix and delimiter')
-@attr('fails_on_dbstore')
def test_bucket_list_prefix_delimiter_prefix_delimiter_not_exist():
key_names = ['b/a/c', 'b/a/g', 'b/a/r', 'g']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list-objects-v2 under prefix w/delimiter')
@attr(assertion='finds nothing w/unmatched prefix and delimiter')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_prefix_delimiter_prefix_delimiter_not_exist():
key_names = ['b/a/c', 'b/a/g', 'b/a/r', 'g']
bucket_name = _create_objects(keys=key_names)
@attr(operation='list keys with list-objects-v2')
@attr(assertion='no pagination, non-empty continuationtoken')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_bucket_listv2_continuationtoken():
key_names = ['bar', 'baz', 'foo', 'quxx']
bucket_name = _create_objects(keys=key_names)
@attr(method='post')
@attr(operation='delete multiple objects has upper limit of 1000 keys')
@attr(assertion='fails 400')
-@attr('fails_on_dbstore')
def test_multi_object_delete_key_limit():
key_names = [f"key-{i}" for i in range(1001)]
bucket_name = _create_objects(keys=key_names)
@attr(method='post')
@attr(operation='delete multiple objects has upper limit of 1000 keys with list-objects-v2')
@attr(assertion='fails 400')
-@attr('fails_on_dbstore')
def test_multi_objectv2_delete_key_limit():
key_names = [f"key-{i}" for i in range(1001)]
bucket_name = _create_objects(keys=key_names)
@attr(method='put')
@attr(operation='acl bucket-owner-read')
@attr(assertion='read back expected values')
-@attr('fails_on_dbstore')
def test_object_acl_canned_bucketownerread():
bucket_name = get_new_bucket_name()
main_client = get_client()
@attr(method='put')
@attr(operation='acl bucket-owner-read')
@attr(assertion='read back expected values')
-@attr('fails_on_dbstore')
def test_object_acl_canned_bucketownerfullcontrol():
bucket_name = get_new_bucket_name()
main_client = get_client()
@attr(operation='set acl w/userid READ')
@attr(assertion='can read data, no other r/w')
@attr('fails_on_aws') # <Error><Code>InvalidArgument</Code><Message>Invalid id</Message><ArgumentName>CanonicalUser/ID</ArgumentName><ArgumentValue>${ALTUSER}</ArgumentValue>
-@attr('fails_on_dbstore')
def test_bucket_acl_grant_userid_read():
bucket_name = _bucket_acl_grant_userid('READ')
@attr(operation='set acl w/userid READ_ACP')
@attr(assertion='can read acl, no other r/w')
@attr('fails_on_aws') # <Error><Code>InvalidArgument</Code><Message>Invalid id</Message><ArgumentName>CanonicalUser/ID</ArgumentName><ArgumentValue>${ALTUSER}</ArgumentValue>
-@attr('fails_on_dbstore')
def test_bucket_acl_grant_userid_readacp():
bucket_name = _bucket_acl_grant_userid('READ_ACP')
@attr(operation='set acl w/userid WRITE')
@attr(assertion='can write data, no other r/w')
@attr('fails_on_aws') # <Error><Code>InvalidArgument</Code><Message>Invalid id</Message><ArgumentName>CanonicalUser/ID</ArgumentName><ArgumentValue>${ALTUSER}</ArgumentValue>
-@attr('fails_on_dbstore')
def test_bucket_acl_grant_userid_write():
bucket_name = _bucket_acl_grant_userid('WRITE')
@attr(operation='set acl w/userid WRITE_ACP')
@attr(assertion='can write acls, no other r/w')
@attr('fails_on_aws') # <Error><Code>InvalidArgument</Code><Message>Invalid id</Message><ArgumentName>CanonicalUser/ID</ArgumentName><ArgumentValue>${ALTUSER}</ArgumentValue>
-@attr('fails_on_dbstore')
def test_bucket_acl_grant_userid_writeacp():
bucket_name = _bucket_acl_grant_userid('WRITE_ACP')
@attr(assertion='adds all grants individually to second user')
@attr('fails_on_dho')
@attr('fails_on_aws') # <Error><Code>InvalidArgument</Code><Message>Invalid id</Message><ArgumentName>CanonicalUser/ID</ArgumentName><ArgumentValue>${ALTUSER}</ArgumentValue>
-@attr('fails_on_dbstore')
def test_object_header_acl_grants():
bucket_name = get_new_bucket()
client = get_client()
@attr(assertion='adds all grants individually to second user')
@attr('fails_on_dho')
@attr('fails_on_aws') # <Error><Code>InvalidArgument</Code><Message>Invalid id</Message><ArgumentName>CanonicalUser/ID</ArgumentName><ArgumentValue>${ALTUSER}</ArgumentValue>
-@attr('fails_on_dbstore')
def test_bucket_header_acl_grants():
headers = _get_acl_header()
bucket_name = get_new_bucket_name()
@attr(method='ACLs')
@attr(operation='set bucket/object acls: private/private')
@attr(assertion='public has no access to bucket or objects')
-@attr('fails_on_dbstore')
def test_access_bucket_private_object_private():
# all the test_access_* tests follow this template
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='private', object_acl='private')
@attr(operation='set bucket/object acls: private/private with list-objects-v2')
@attr(assertion='public has no access to bucket or objects')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_access_bucket_private_objectv2_private():
# all the test_access_* tests follow this template
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='private', object_acl='private')
@attr(method='ACLs')
@attr(operation='set bucket/object acls: private/public-read')
@attr(assertion='public can only read readable object')
-@attr('fails_on_dbstore')
def test_access_bucket_private_object_publicread():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='private', object_acl='public-read')
@attr(operation='set bucket/object acls: private/public-read with list-objects-v2')
@attr(assertion='public can only read readable object')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_access_bucket_private_objectv2_publicread():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='private', object_acl='public-read')
@attr(method='ACLs')
@attr(operation='set bucket/object acls: private/public-read/write')
@attr(assertion='public can only read the readable object')
-@attr('fails_on_dbstore')
def test_access_bucket_private_object_publicreadwrite():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='private', object_acl='public-read-write')
alt_client = get_alt_client()
@attr(operation='set bucket/object acls: private/public-read/write with list-objects-v2')
@attr(assertion='public can only read the readable object')
@attr('list-objects-v2')
-@attr('fails_on_dbstore')
def test_access_bucket_private_objectv2_publicreadwrite():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='private', object_acl='public-read-write')
alt_client = get_alt_client()
@attr(method='ACLs')
@attr(operation='set bucket/object acls: public-read/private')
@attr(assertion='public can only list the bucket')
-@attr('fails_on_dbstore')
def test_access_bucket_publicread_object_private():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='public-read', object_acl='private')
alt_client = get_alt_client()
@attr(method='ACLs')
@attr(operation='set bucket/object acls: public-read/public-read')
@attr(assertion='public can read readable objects and list bucket')
-@attr('fails_on_dbstore')
def test_access_bucket_publicread_object_publicread():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='public-read', object_acl='public-read')
alt_client = get_alt_client()
@attr(method='ACLs')
@attr(operation='set bucket/object acls: public-read/public-read-write')
@attr(assertion='public can read readable objects and list bucket')
-@attr('fails_on_dbstore')
def test_access_bucket_publicread_object_publicreadwrite():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='public-read', object_acl='public-read-write')
alt_client = get_alt_client()
@attr(method='ACLs')
@attr(operation='set bucket/object acls: public-read-write/private')
@attr(assertion='private objects cannot be read, but can be overwritten')
-@attr('fails_on_dbstore')
def test_access_bucket_publicreadwrite_object_private():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='public-read-write', object_acl='private')
alt_client = get_alt_client()
@attr(method='ACLs')
@attr(operation='set bucket/object acls: public-read-write/public-read')
@attr(assertion='private objects cannot be read, but can be overwritten')
-@attr('fails_on_dbstore')
def test_access_bucket_publicreadwrite_object_publicread():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='public-read-write', object_acl='public-read')
alt_client = get_alt_client()
@attr(method='ACLs')
@attr(operation='set bucket/object acls: public-read-write/public-read-write')
@attr(assertion='private objects cannot be read, but can be overwritten')
-@attr('fails_on_dbstore')
def test_access_bucket_publicreadwrite_object_publicreadwrite():
bucket_name, key1, key2, newkey = _setup_access(bucket_acl='public-read-write', object_acl='public-read-write')
alt_client = get_alt_client()
@attr(method='get')
@attr(operation='list all buckets (bad auth)')
@attr(assertion='fails 403')
-@attr('fails_on_dbstore')
def test_list_buckets_invalid_auth():
bad_auth_client = get_bad_auth_client()
e = assert_raises(ClientError, bad_auth_client.list_buckets)
@attr(method='get')
@attr(operation='create and list objects with underscore as prefix, list using prefix')
@attr(assertion='listing works correctly')
-@attr('fails_on_dbstore')
def test_bucket_list_special_prefix():
key_names = ['_bla/1', '_bla/2', '_bla/3', '_bla/4', 'abcd']
bucket_name = _create_objects(keys=key_names)
@attr(method='put')
@attr(operation='copy object to itself')
@attr(assertion='fails')
-@attr('fails_on_dbstore')
def test_object_copy_to_itself():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='put')
@attr(operation='copy to an inaccessible bucket')
@attr(assertion='fails w/AttributeError')
-@attr('fails_on_dbstore')
def test_object_copy_not_owned_bucket():
client = get_client()
alt_client = get_alt_client()
@attr(method='put')
@attr(operation='copy a non-owned object in a non-owned bucket, but with perms')
@attr(assertion='works')
-@attr('fails_on_dbstore')
def test_object_copy_not_owned_object_bucket():
client = get_client()
alt_client = get_alt_client()
@attr(resource='object')
@attr(method='put')
@attr(operation='copy from non-existent bucket')
-@attr('fails_on_dbstore')
def test_object_copy_bucket_not_found():
bucket_name = get_new_bucket()
client = get_client()
@attr(resource='object')
@attr(method='put')
@attr(operation='copy from non-existent object')
-@attr('fails_on_dbstore')
def test_object_copy_key_not_found():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='put')
@attr(operation='write atomicity')
@attr(assertion='1MB successful')
-@attr('fails_on_dbstore')
def test_atomic_write_1mb():
_test_atomic_write(1024*1024)
@attr(method='create')
@attr(operation='create versioned bucket')
@attr(assertion='can create and suspend bucket versioning')
-@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_bucket_create_suspend():
bucket_name = get_new_bucket()
check_versioning(bucket_name, None)
@attr(operation='create and remove versioned object')
@attr(assertion='can create access and remove appropriate versions')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_create_read_remove():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='create and remove versioned object and head')
@attr(assertion='can create access and remove appropriate versions')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_create_read_remove_head():
bucket_name = get_new_bucket()
@attr(operation='create object, then switch to versioning')
@attr(assertion='behaves correctly')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_plain_null_version_removal():
bucket_name = get_new_bucket()
check_versioning(bucket_name, None)
@attr(operation='create object, then switch to versioning')
@attr(assertion='behaves correctly')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_plain_null_version_overwrite():
bucket_name = get_new_bucket()
check_versioning(bucket_name, None)
@attr(operation='create object, then switch to versioning')
@attr(assertion='behaves correctly')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_plain_null_version_overwrite_suspended():
bucket_name = get_new_bucket()
check_versioning(bucket_name, None)
@attr(operation='suspend versioned bucket')
@attr(assertion='suspended versioning behaves correctly')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_suspend_versions():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='create and remove versions')
@attr(assertion='everything works')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_create_versions_remove_all():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='remove')
@attr(operation='create and remove versions')
@attr(assertion='everything works')
-@attr('fails_on_dbstore')
@attr('versioning')
def test_versioning_obj_create_versions_remove_special_names():
bucket_name = get_new_bucket()
@attr(operation='list versioned objects')
@attr(assertion='everything works')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_obj_list_marker():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='delete multiple versions')
@attr(assertion='deletes multiple versions of an object with a single call')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_multi_object_delete():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='delete multiple versions')
@attr(assertion='deletes multiple versions of an object and delete marker with a single call')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioning_multi_object_delete_with_marker():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='change acl on an object version changes specific version')
@attr(assertion='works')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioned_object_acl():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='concurrent creation and removal of objects')
@attr(assertion='works')
@attr('versioning')
-@attr('fails_on_dbstore')
def test_versioned_concurrent_object_create_and_remove():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='put')
@attr(operation='set lifecycle config')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_set():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='get')
@attr(operation='get lifecycle config')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_get():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='get')
@attr(operation='get lifecycle config no id')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_get_no_id():
bucket_name = get_new_bucket()
client = get_client()
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
-@attr('fails_on_dbstore')
def test_lifecycle_expiration_versioning_enabled():
bucket_name = get_new_bucket()
client = get_client()
@attr('lifecycle')
@attr('lifecycle_expiration')
@attr('fails_on_aws')
-@attr('fails_on_dbstore')
def test_lifecycle_expiration_tags1():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='id too long in lifecycle rule')
@attr('lifecycle')
@attr(assertion='fails 400')
-@attr('fails_on_dbstore')
def test_lifecycle_id_too_long():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='same id')
@attr('lifecycle')
@attr(assertion='fails 400')
-@attr('fails_on_dbstore')
def test_lifecycle_same_id():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='invalid status in lifecycle rule')
@attr('lifecycle')
@attr(assertion='fails 400')
-@attr('fails_on_dbstore')
def test_lifecycle_invalid_status():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='put')
@attr(operation='set lifecycle config with expiration date')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_set_date():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='test lifecycle expiration days 0')
@attr('lifecycle')
@attr('lifecycle_expiration')
-@attr('fails_on_dbstore')
def test_lifecycle_expiration_days0():
bucket_name = _create_objects(keys=['days0/foo', 'days0/bar'])
client = get_client()
@attr(operation='test lifecycle expiration header put')
@attr('lifecycle')
@attr('lifecycle_expiration')
-@attr('fails_on_dbstore')
def test_lifecycle_expiration_header_put():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='test lifecycle expiration header head with tags and And')
@attr('lifecycle')
@attr('lifecycle_expiration')
-@attr('fails_on_dbstore')
def test_lifecycle_expiration_header_and_tags_head():
now = datetime.datetime.now(None)
bucket_name = get_new_bucket()
@attr(method='put')
@attr(operation='set lifecycle config with noncurrent version expiration')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_set_noncurrent():
bucket_name = _create_objects(keys=['past/foo', 'future/bar'])
client = get_client()
@attr(method='put')
@attr(operation='set lifecycle config with delete marker expiration')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_set_deletemarker():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='put')
@attr(operation='set lifecycle config with Filter')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_set_filter():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='put')
@attr(operation='set lifecycle config with empty Filter')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_set_empty_filter():
bucket_name = get_new_bucket()
client = get_client()
@attr(method='put')
@attr(operation='set lifecycle config with multipart expiration')
@attr('lifecycle')
-@attr('fails_on_dbstore')
def test_lifecycle_set_multipart():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='set lifecycle config transition with not iso8601 date')
@attr('lifecycle')
@attr(assertion='fails 400')
-@attr('fails_on_dbstore')
def test_lifecycle_transition_set_invalid_date():
bucket_name = get_new_bucket()
client = get_client()
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
-@attr('fails_on_dbstore')
def test_lifecycle_transition():
sc = configured_storage_classes()
if len(sc) < 3:
@attr('lifecycle')
@attr('lifecycle_transition')
@attr('fails_on_aws')
-@attr('fails_on_dbstore')
def test_lifecycle_transition_single_rule_multi_trans():
sc = configured_storage_classes()
if len(sc) < 3:
@attr(operation='set lifecycle config with noncurrent version expiration')
@attr('lifecycle')
@attr('lifecycle_transition')
-@attr('fails_on_dbstore')
def test_lifecycle_set_noncurrent_transition():
sc = configured_storage_classes()
if len(sc) < 3:
@attr('lifecycle_expiration')
@attr('lifecycle_transition')
@attr('fails_on_aws')
-@attr('fails_on_dbstore')
def test_lifecycle_noncur_transition():
sc = configured_storage_classes()
if len(sc) < 3:
@attr(operation='write encrypted with SSE-C and read without SSE-C')
@attr(assertion='operation fails')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_encryption_sse_c_present():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='write encrypted with SSE-C but read with other key')
@attr(assertion='operation fails')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_encryption_sse_c_other_key():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='write encrypted with SSE-C, but md5 is bad')
@attr(assertion='operation fails')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_encryption_sse_c_invalid_md5():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='write encrypted with SSE-C, but dont provide MD5')
@attr(assertion='operation fails')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_encryption_sse_c_no_md5():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='declare SSE-C but do not provide key')
@attr(assertion='operation fails')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_encryption_sse_c_no_key():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='Do not declare SSE-C but provide key and MD5')
@attr(assertion='operation successfull, no encryption')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_encryption_key_no_sse_c():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='declare SSE-KMS but do not provide key_id')
@attr(assertion='operation fails')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_sse_kms_no_key():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='Do not declare SSE-KMS but provide key_id')
@attr(assertion='operation successfull, no encryption')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_sse_kms_not_declared():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='write encrypted with SSE-KMS and read with SSE-KMS')
@attr(assertion='operation fails')
@attr('encryption')
-@attr('fails_on_dbstore')
def test_sse_kms_read_declare():
bucket_name = get_new_bucket()
client = get_client()
@attr(operation='Test put obj with amz-grant back to bucket-owner')
@attr(assertion='success')
@attr('bucket-policy')
-@attr('fails_on_dbstore')
def test_bucket_policy_put_obj_grant():
bucket_name = get_new_bucket()
@attr('encryption')
@attr('bucket-policy')
@attr('sse-s3')
-@attr('fails_on_dbstore')
def test_bucket_policy_put_obj_s3_kms():
kms_keyid = get_main_kms_keyid()
if kms_keyid is None:
@attr(assertion='success')
@attr('encryption')
@attr('bucket-policy')
-@attr('fails_on_dbstore')
def test_bucket_policy_put_obj_kms_s3():
bucket_name = get_new_bucket()
client = get_v2_client()
@attr(operation='Test put object lock with bucket object lock not enabled')
@attr(assertion='fails')
@attr('object-lock')
-@attr('fails_on_dbstore')
def test_object_lock_put_obj_lock_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
@attr(operation='Test get object lock with bucket object lock not enabled')
@attr(assertion='fails')
@attr('object-lock')
-@attr('fails_on_dbstore')
def test_object_lock_get_obj_lock_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
@attr(operation='Test put object retention with bucket object lock not enabled')
@attr(assertion='fails')
@attr('object-lock')
-@attr('fails_on_dbstore')
def test_object_lock_put_obj_retention_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
@attr(operation='Test get object retention with invalid bucket')
@attr(assertion='fails')
@attr('object-lock')
-@attr('fails_on_dbstore')
def test_object_lock_get_obj_retention_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
@attr(operation='Test put legal hold with invalid bucket')
@attr(assertion='fails')
@attr('object-lock')
-@attr('fails_on_dbstore')
def test_object_lock_put_legal_hold_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
@attr(operation='Test get legal hold with invalid bucket')
@attr(assertion='fails')
@attr('object-lock')
-@attr('fails_on_dbstore')
def test_object_lock_get_legal_hold_invalid_bucket():
bucket_name = get_new_bucket_name()
client = get_client()
@attr(operation='Test User Policy')
@attr(assertion='succeeds')
@attr('user-policy')
-@attr('fails_on_dbstore')
def test_user_policy():
client = get_tenant_iam_client()
@attr(operation='ignore public acls on canned acls')
@attr(assertion='succeeds')
@attr('policy_status')
-@attr('fails_on_dbstore')
def test_ignore_public_acls():
bucket_name = get_new_bucket()
client = get_client()