# necessary. We later determine in setup what needs to be used.
def _update_headers(headers):
+ """ update a set of headers with additions/removals
+ """
global _custom_headers, _remove_headers
headers.update(_custom_headers)
# headers modified or created in the authentication step.
class HeaderS3Connection(S3Connection):
+ """ establish an authenticated connection w/customized headers
+ """
def fill_in_auth(self, http_request, **kwargs):
_update_headers(http_request.headers)
S3Connection.fill_in_auth(self, http_request, **kwargs)
def _our_authorize(self, connection, **kwargs):
+ """ perform an authentication w/customized headers
+ """
_update_headers(self.headers)
_orig_authorize(self, connection, **kwargs)
_update_headers(self.headers)
def _clear_custom_headers():
+ """ Eliminate any header customizations
+ """
global _custom_headers, _remove_headers
_custom_headers = {}
_remove_headers = []
def _add_custom_headers(headers=None, remove=None):
+ """ Define header customizations (additions, replacements, removals)
+ """
global _custom_headers, _remove_headers
if not _custom_headers:
_custom_headers = {}
def _setup_bad_object(headers=None, remove=None):
+ """ Create a new bucket, add an object w/header customizations
+ """
bucket = get_new_bucket()
_add_custom_headers(headers=headers, remove=remove)
return bucket.new_key('foo')
-
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/invalid MD5')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_md5_invalid():
key = _setup_bad_object({'Content-MD5':'AWS HAHAHA'})
eq(e.error_code, 'InvalidDigest')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/incorrect MD5')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_md5_wrong():
key = _setup_bad_object({'Content-MD5':'YWJyYWNhZGFicmE='})
eq(e.error_code, 'InvalidDigest')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/empty MD5')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_md5_empty():
key = _setup_bad_object({'Content-MD5': ''})
eq(e.error_code, 'InvalidDigest')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/non-graphics in MD5')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_md5_unreadable():
key = _setup_bad_object({'Content-MD5': '\x07'})
assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/no MD5 header')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_md5_none():
key = _setup_bad_object(remove=('Content-MD5',))
# strangely, amazon doesn't report an error with a non-expect 100 also, our
# error comes back as html, and not xml as I normally expect
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/Expect 200')
+@attr(assertion='garbage, but S3 succeeds!')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
def test_object_create_bad_expect_mismatch():
# this is a really long test, and I don't know if it's valid...
# again, accepts this with no troubles
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/empty expect')
+@attr(assertion='succeeds ... should it?')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_expect_empty():
key = _setup_bad_object({'Expect': ''})
key.set_contents_from_string('bar')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/no expect')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_expect_none():
key = _setup_bad_object(remove=('Expect',))
# this is a really long test..
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/non-graphic expect')
+@attr(assertion='garbage, but S3 succeeds!')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
def test_object_create_bad_expect_unreadable():
key.set_contents_from_string('bar')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/empty content length')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_dho')
@attr('fails_on_rgw')
eq(e.error_code, None)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/negative content length')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_dho')
def test_object_create_bad_contentlength_negative():
eq(e.error_code, None)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/no content length')
+@attr(assertion='fails 411')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_contentlength_none():
key = _setup_bad_object(remove=('Content-Length',))
eq(e.reason, 'Length Required')
eq(e.error_code,'MissingContentLength')
-
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/non-graphic content length')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_dho')
def test_object_create_bad_contentlength_unreadable():
eq(e.error_code, None)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/content length too long')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
def test_object_create_bad_contentlength_mismatch_above():
eq(e.error_code, 'RequestTimeout')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/content length too short')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_contentlength_mismatch_below():
content = 'bar'
eq(e.error_code, 'BadDigest')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/content type text/plain')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_contenttype_invalid():
key = _setup_bad_object({'Content-Type': 'text/plain'})
key.set_contents_from_string('bar')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/empty content type')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_contenttype_empty():
key = _setup_bad_object({'Content-Type': ''})
key.set_contents_from_string('bar')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/no content type')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_contenttype_none():
key = _setup_bad_object(remove=('Content-Type',))
key.set_contents_from_string('bar')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/non-graphic content type')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
@attr('fails_on_dho')
assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-@nose.with_setup(teardown=_clear_custom_headers)
-def test_object_create_bad_contenttype_none():
- key = _setup_bad_object(remove=('Content-Type',))
- key.set_contents_from_string('bar')
-
-
-@nose.with_setup(teardown=_clear_custom_headers)
-def test_object_create_bad_ua_invalid():
- key = _setup_bad_object({'User-Agent': ''})
- key.set_contents_from_string('bar')
-
-
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/empty user agent')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_ua_empty():
key = _setup_bad_object({'User-Agent': ''})
key.set_contents_from_string('bar')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/non-graphic user agent')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_ua_unreadable():
key = _setup_bad_object({'User-Agent': '\x07'})
key.set_contents_from_string('bar')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/no user agent')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_ua_none():
key = _setup_bad_object(remove=('User-Agent',))
@nose.with_setup(teardown=_clear_custom_headers)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/invalid authorization')
+@attr(assertion='fails 400')
def test_object_create_bad_authorization_invalid():
key = _setup_bad_object({'Authorization': 'AWS HAHAHA'})
# the teardown is really messed up here. check it out
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/non-graphic authorization')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
@attr('fails_on_dho')
eq(e.error_code, 'AccessDenied')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/empty authorization')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_authorization_empty():
key = _setup_bad_object({'Authorization': ''})
# the teardown is really messed up here. check it out
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/no authorization')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_authorization_none():
key = _setup_bad_object(remove=('Authorization',))
eq(e.error_code, 'AccessDenied')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/incorrect authorization')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_authorization_incorrect():
key = _setup_bad_object({'Authorization': 'AWS AKIAIGR7ZNNBHC5BKSUA:FWeDfwojDSdS2Ztmpfeubhd9isU='})
assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/invalid date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_invalid():
key = _setup_bad_object({'Date': 'Bad Date'})
eq(e.error_code, 'AccessDenied')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/empty date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_empty():
key = _setup_bad_object({'Date': ''})
eq(e.error_code, 'AccessDenied')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/non-graphic date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_unreadable():
key = _setup_bad_object({'Date': '\x07'})
eq(e.error_code, 'AccessDenied')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/no date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_none():
key = _setup_bad_object(remove=('Date',))
eq(e.error_code, 'AccessDenied')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/date in past')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_before_today():
key = _setup_bad_object({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
eq(e.error_code, 'RequestTimeTooSkewed')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/date in future')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_after_today():
key = _setup_bad_object({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
eq(e.error_code, 'RequestTimeTooSkewed')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/date before epoch')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_before_epoch():
key = _setup_bad_object({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
eq(e.error_code, 'AccessDenied')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='create w/date after 9999')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_create_bad_date_after_end():
key = _setup_bad_object({'Date': 'Tue, 07 Jul 9999 21:53:04 GMT'})
eq(e.error_code, 'RequestTimeTooSkewed')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/no content length')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_contentlength_none():
_add_custom_headers(remove=('Content-Length',))
get_new_bucket()
-
+@attr(resource='bucket')
+@attr(method='acls')
+@attr(operation='set w/no content length')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_object_acl_create_contentlength_none():
bucket = get_new_bucket()
_add_custom_headers(remove=('Content-Length',))
key.set_acl('public-read')
+@attr(resource='bucket')
+@attr(method='acls')
+@attr(operation='set w/invalid permission')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_put_bad_canned_acl():
bucket = get_new_bucket()
# strangely, amazon doesn't report an error with a non-expect 100 also, our
# error comes back as html, and not xml as I normally expect
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/expect 200')
+@attr(assertion='garbage, but S3 succeeds!')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
def test_bucket_create_bad_expect_mismatch():
# this is a really long test, and I don't know if it's valid...
# again, accepts this with no troubles
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/expect empty')
+@attr(assertion='garbage, but S3 succeeds!')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_expect_empty():
_add_custom_headers({'Expect': ''})
bucket = get_new_bucket()
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/expect nongraphic')
+@attr(assertion='garbage, but S3 succeeds!')
# this is a really long test..
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
)
return conn
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/empty content length')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_dho')
@attr('fails_on_rgw')
eq(e.error_code, None)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/negative content length')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_dho')
def test_bucket_create_bad_contentlength_negative():
eq(e.error_code, None)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/no content length')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_contentlength_none():
_add_custom_headers(remove=('Content-Length',))
bucket = get_new_bucket()
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/non-graphic content length')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_dho')
def test_bucket_create_bad_contentlength_unreadable():
eq(e.error_code, None)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/empty user agent')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_ua_empty():
_add_custom_headers({'User-Agent': ''})
bucket = get_new_bucket()
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/non-graphic user agent')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_ua_unreadable():
_add_custom_headers({'User-Agent': '\x07'})
bucket = get_new_bucket()
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/no user agent')
+@attr(assertion='succeeds')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_ua_none():
_add_custom_headers(remove=('User-Agent',))
bucket = get_new_bucket()
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/invalid authorization')
+@attr(assertion='fails 400')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_authorization_invalid():
_add_custom_headers({'Authorization': 'AWS HAHAHA'})
# the teardown is really messed up here. check it out
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/non-graphic authorization')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
@attr('fails_on_rgw')
@attr('fails_on_dho')
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/empty authorization')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_authorization_empty():
_add_custom_headers({'Authorization': ''})
# the teardown is really messed up here. check it out
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/no authorization')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_authorization_none():
_add_custom_headers(remove=('Authorization',))
eq(e.reason, 'Forbidden')
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/invalid date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_invalid():
_add_custom_headers({'Date': 'Bad Date'})
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/empty date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_empty():
_add_custom_headers({'Date': ''})
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/non-graphic date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_unreadable():
_add_custom_headers({'Date': '\x07'})
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/no date')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_none():
- _add_custom_headers({'Date': '\x07'})
+ _add_custom_headers(remove=('Date',))
e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
eq(e.status, 403)
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/date in past')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_before_today():
_add_custom_headers({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
eq(e.error_code, 'RequestTimeTooSkewed')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/date in future')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_after_today():
_add_custom_headers({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
eq(e.error_code, 'RequestTimeTooSkewed')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/date before epoch')
+@attr(assertion='fails 403')
@nose.with_setup(teardown=_clear_custom_headers)
def test_bucket_create_bad_date_before_epoch():
_add_custom_headers({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
eq(g.type, w.pop('type'))
eq(w, {})
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='empty buckets return no contents')
def test_bucket_list_empty():
bucket = get_new_bucket()
l = bucket.list()
l = list(l)
eq(l, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='distinct buckets have different contents')
def test_bucket_list_distinct():
bucket1 = get_new_bucket()
bucket2 = get_new_bucket()
eq(l, [])
def _create_keys(bucket=None, keys=[]):
+ """
+ Populate a (specified or new) bucket with objects with
+ specified names (and contents identical to their names).
+ """
if bucket is None:
bucket = get_new_bucket()
def _get_keys_prefixes(li):
+ """
+ figure out which of the strings in a list are actually keys
+ return lists of strings that are (keys) and are not (prefixes)
+ """
keys = [x for x in li if isinstance(x, boto.s3.key.Key)]
prefixes = [x for x in li if not isinstance(x, boto.s3.key.Key)]
return (keys, prefixes)
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='pagination w/max_keys=2, no marker')
def test_bucket_list_many():
bucket = _create_keys(keys=['foo', 'bar', 'baz'])
- # bucket.list() is high-level and will not set us set max-keys,
+ # bucket.list() is high-level and will not let us set max-keys,
# using it would require using >1000 keys to test, and that would
# be too slow; use the lower-level call bucket.get_all_keys()
# instead
eq(names, ['foo'])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='prefixes in multi-component object names')
def test_bucket_list_delimiter_basic():
bucket = _create_keys(keys=['foo/bar', 'foo/baz/xyzzy', 'quux/thud', 'asdf'])
+ # listings should treat / delimiter in a directory-like fashion
li = bucket.list(delimiter='/')
eq(li.delimiter, '/')
+ # asdf is the only terminal object that should appear in the listing
(keys,prefixes) = _get_keys_prefixes(li)
names = [e.name for e in keys]
eq(names, ['asdf'])
# Unfortunately, boto considers a CommonPrefixes element as a prefix, and
# will store the last Prefix element within a CommonPrefixes element,
# effectively overwriting any other prefixes.
+
+ # the other returned values should be the pure prefixes foo/ and quux/
prefix_names = [e.name for e in prefixes]
eq(len(prefixes), 2)
eq(prefix_names, ['foo/', 'quux/'])
-# just testing that we can do the delimeter and prefix logic on non-slashes
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='non-slash delimiter characters')
def test_bucket_list_delimiter_alt():
bucket = _create_keys(keys=['bar', 'baz', 'cab', 'foo'])
li = bucket.list(delimiter='a')
eq(li.delimiter, 'a')
+ # foo contains no 'a' and so is a complete key
(keys,prefixes) = _get_keys_prefixes(li)
names = [e.name for e in keys]
eq(names, ['foo'])
+ # bar, baz, and cab should be broken up by the 'a' delimiters
prefix_names = [e.name for e in prefixes]
eq(len(prefixes), 2)
eq(prefix_names, ['ba', 'ca'])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='non-printable delimiter can be specified')
def test_bucket_list_delimiter_unreadable():
key_names = ['bar', 'baz', 'cab', 'foo']
bucket = _create_keys(keys=key_names)
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='empty delimiter can be specified')
def test_bucket_list_delimiter_empty():
key_names = ['bar', 'baz', 'cab', 'foo']
bucket = _create_keys(keys=key_names)
eq(prefixes, [])
+
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='unspecified delimiter defaults to none')
def test_bucket_list_delimiter_none():
key_names = ['bar', 'baz', 'cab', 'foo']
bucket = _create_keys(keys=key_names)
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list')
+@attr(assertion='unused delimiter is not found')
def test_bucket_list_delimiter_not_exist():
key_names = ['bar', 'baz', 'cab', 'foo']
bucket = _create_keys(keys=key_names)
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix')
+@attr(assertion='returns only objects under prefix')
def test_bucket_list_prefix_basic():
bucket = _create_keys(keys=['foo/bar', 'foo/baz', 'quux'])
# just testing that we can do the delimeter and prefix logic on non-slashes
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix')
+@attr(assertion='prefixes w/o delimiters')
def test_bucket_list_prefix_alt():
bucket = _create_keys(keys=['bar', 'baz', 'foo'])
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix')
+@attr(assertion='empty prefix returns everything')
def test_bucket_list_prefix_empty():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket = _create_keys(keys=key_names)
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix')
+@attr(assertion='unspecified prefix returns everything')
def test_bucket_list_prefix_none():
key_names = ['foo/bar', 'foo/baz', 'quux']
bucket = _create_keys(keys=key_names)
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix')
+@attr(assertion='nonexistent prefix returns nothing')
def test_bucket_list_prefix_not_exist():
bucket = _create_keys(keys=['foo/bar', 'foo/baz', 'quux'])
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix')
+@attr(assertion='non-printable prefix can be specified')
def test_bucket_list_prefix_unreadable():
+ # FIX: shouldn't this test include strings that start with the tested prefix
bucket = _create_keys(keys=['foo/bar', 'foo/baz', 'quux'])
li = bucket.list(prefix='\x0a')
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix w/delimiter')
+@attr(assertion='returns only objects directly under prefix')
def test_bucket_list_prefix_delimiter_basic():
bucket = _create_keys(keys=['foo/bar', 'foo/baz/xyzzy', 'quux/thud', 'asdf'])
eq(prefix_names, ['foo/baz/'])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix w/delimiter')
+@attr(assertion='non-slash delimiters')
def test_bucket_list_prefix_delimiter_alt():
bucket = _create_keys(keys=['bar', 'bazar', 'cab', 'foo'])
eq(prefix_names, ['baza'])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix w/delimiter')
+@attr(assertion='finds nothing w/unmatched prefix')
def test_bucket_list_prefix_delimiter_prefix_not_exist():
bucket = _create_keys(keys=['b/a/r', 'b/a/c', 'b/a/g', 'g'])
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix w/delimiter')
+@attr(assertion='over-ridden slash ceases to be a delimiter')
def test_bucket_list_prefix_delimiter_delimiter_not_exist():
bucket = _create_keys(keys=['b/a/c', 'b/a/g', 'b/a/r', 'g'])
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list under prefix w/delimiter')
+@attr(assertion='finds nothing w/unmatched prefix and delimiter')
def test_bucket_list_prefix_delimiter_prefix_delimiter_not_exist():
bucket = _create_keys(keys=['b/a/c', 'b/a/g', 'b/a/r', 'g'])
eq(prefixes, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='pagination w/max_keys=1, marker')
def test_bucket_list_maxkeys_one():
key_names = ['bar', 'baz', 'foo', 'quxx']
bucket = _create_keys(keys=key_names)
eq(names, key_names[1:])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='pagination w/max_keys=0')
def test_bucket_list_maxkeys_zero():
bucket = _create_keys(keys=['bar', 'baz', 'foo', 'quxx'])
eq(li, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='pagination w/o max_keys')
def test_bucket_list_maxkeys_none():
key_names = ['bar', 'baz', 'foo', 'quxx']
bucket = _create_keys(keys=key_names)
eq(li.MaxKeys, '1000')
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='invalid max_keys')
def test_bucket_list_maxkeys_invalid():
bucket = _create_keys(keys=['bar', 'baz', 'foo', 'quxx'])
@attr('fails_on_rgw')
@attr('fails_on_dho')
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='non-printing max_keys')
def test_bucket_list_maxkeys_unreadable():
bucket = _create_keys(keys=['bar', 'baz', 'foo', 'quxx'])
eq(e.error_code, 'InvalidArgument')
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='no pagination, no marker')
def test_bucket_list_marker_none():
key_names = ['bar', 'baz', 'foo', 'quxx']
bucket = _create_keys(keys=key_names)
eq(li.marker, '')
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='no pagination, empty marker')
def test_bucket_list_marker_empty():
key_names = ['bar', 'baz', 'foo', 'quxx']
bucket = _create_keys(keys=key_names)
eq(names, key_names)
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='non-printing marker')
def test_bucket_list_marker_unreadable():
key_names = ['bar', 'baz', 'foo', 'quxx']
bucket = _create_keys(keys=key_names)
eq(names, key_names)
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='marker not-in-list')
def test_bucket_list_marker_not_in_list():
bucket = _create_keys(keys=['bar', 'baz', 'foo', 'quxx'])
eq(names, ['foo', 'quxx'])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='marker after list')
def test_bucket_list_marker_after_list():
bucket = _create_keys(keys=['bar', 'baz', 'foo', 'quxx'])
eq(li, [])
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all keys')
+@attr(assertion='marker before list')
def test_bucket_list_marker_before_list():
key_names = ['bar', 'baz', 'foo', 'quxx']
bucket = _create_keys(keys=key_names)
def _compare_dates(iso_datetime, http_datetime):
+ """
+ compare an iso date and an http date, within an epsiolon
+ """
date = isodate.parse_datetime(iso_datetime)
pd = email.utils.parsedate_tz(http_datetime)
date2=http_datetime,
)
-
+@attr(resource='object')
+@attr(method='head')
+@attr(operation='compare w/bucket list')
+@attr(assertion='return same metadata')
def test_bucket_list_return_data():
key_names = ['bar', 'baz', 'foo']
bucket = _create_keys(keys=key_names)
_compare_dates(key.last_modified, key_data['last_modified'])
+@attr(resource='object.metadata')
+@attr(method='head')
+@attr(operation='modification-times')
+@attr(assertion='http and ISO-6801 times agree')
def test_bucket_list_object_time():
bucket = _create_keys(keys=['foo'])
_compare_dates(iso_datetime, http_datetime)
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='non-existant bucket')
+@attr(assertion='fails 404')
def test_bucket_notexist():
+ # generate a (hopefully) unique, not-yet existent bucket name
name = '{prefix}foo'.format(prefix=get_prefix())
print 'Trying bucket {name!r}'.format(name=name)
+
e = assert_raises(boto.exception.S3ResponseError, s3.main.get_bucket, name)
eq(e.status, 404)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchBucket')
+@attr(resource='bucket')
+@attr(method='delete')
+@attr(operation='non-existant bucket')
+@attr(assertion='fails 404')
def test_bucket_delete_notexist():
name = '{prefix}foo'.format(prefix=get_prefix())
print 'Trying bucket {name!r}'.format(name=name)
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchBucket')
+@attr(resource='bucket')
+@attr(method='delete')
+@attr(operation='non-empty bucket')
+@attr(assertion='fails 409')
def test_bucket_delete_nonempty():
bucket = get_new_bucket()
eq(e.reason, 'Conflict')
eq(e.error_code, 'BucketNotEmpty')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='non-existant bucket')
+@attr(assertion='fails 404')
def test_object_write_to_nonexist_bucket():
name = '{prefix}foo'.format(prefix=get_prefix())
print 'Trying bucket {name!r}'.format(name=name)
eq(e.error_code, 'NoSuchBucket')
+@attr(resource='bucket')
+@attr(method='del')
+@attr(operation='deleted bucket')
+@attr(assertion='fails 404')
def test_bucket_create_delete():
name = '{prefix}foo'.format(prefix=get_prefix())
print 'Trying bucket {name!r}'.format(name=name)
eq(e.error_code, 'NoSuchBucket')
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='read contents that were never written')
+@attr(assertion='fails 404')
def test_object_read_notexist():
bucket = get_new_bucket()
key = bucket.new_key('foobar')
# While the test itself passes, there's a SAX parser error during teardown. It
# seems to be a boto bug. It happens with both amazon and dho.
# http://code.google.com/p/boto/issues/detail?id=501
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write to non-printing key')
+@attr(assertion='fails 404')
def test_object_create_unreadable():
bucket = get_new_bucket()
key = bucket.new_key('\x0a')
key.set_contents_from_string('bar')
-# This should test the basic lifecycle of the key
+@attr(resource='object')
+@attr(method='all')
+@attr(operation='complete object life cycle')
+@attr(assertion='read back what we wrote and rewrote')
def test_object_write_read_update_read_delete():
bucket = get_new_bucket()
# Write
def _set_get_metadata(metadata, bucket=None):
+ """
+ create a new key in a (new or specified) bucket,
+ set the meta1 property to a specified, value,
+ and then re-read and return that property
+ """
if bucket is None:
bucket = get_new_bucket()
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('bar')
key2 = bucket.get_key('foo')
return key2.get_metadata('meta1')
-
+
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write/re-read')
+@attr(assertion='reread what we wrote')
def test_object_set_get_metadata_none_to_good():
got = _set_get_metadata('mymeta')
eq(got, 'mymeta')
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write/re-read')
+@attr(assertion='write empty value, returns empty value')
def test_object_set_get_metadata_none_to_empty():
got = _set_get_metadata('')
eq(got, '')
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write/re-write')
+@attr(assertion='new value replaces old')
def test_object_set_get_metadata_overwrite_to_good():
bucket = get_new_bucket()
got = _set_get_metadata('oldmeta', bucket)
eq(got, 'newmeta')
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write/re-write')
+@attr(assertion='empty value replaces old')
def test_object_set_get_metadata_overwrite_to_empty():
bucket = get_new_bucket()
got = _set_get_metadata('oldmeta', bucket)
eq(got, '')
-# UTF-8 encoded data should pass straight through
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write/re-write')
+@attr(assertion='UTF-8 values passed through')
def test_object_set_get_unicode_metadata():
bucket = get_new_bucket()
key = boto.s3.key.Key(bucket)
eq(got, u"Hello World\xe9")
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write/re-write')
+@attr(assertion='non-UTF-8 values detected, but preserved')
def test_object_set_get_non_utf8_metadata():
bucket = get_new_bucket()
key = boto.s3.key.Key(bucket)
def _set_get_metadata_unreadable(metadata, bucket=None):
+ """
+ set and then read back a meta-data value (which presumably
+ includes some interesting characters), and return a list
+ containing the stored value AND the encoding with which it
+ was returned.
+ """
got = _set_get_metadata(metadata, bucket)
got = decode_header(got)
return got
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write')
+@attr(assertion='non-priting prefixes noted and preserved')
def test_object_set_get_metadata_empty_to_unreadable_prefix():
metadata = '\x04w'
got = _set_get_metadata_unreadable(metadata)
eq(got, [(metadata, 'utf-8')])
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write')
+@attr(assertion='non-priting suffixes noted and preserved')
def test_object_set_get_metadata_empty_to_unreadable_suffix():
metadata = 'h\x04'
got = _set_get_metadata_unreadable(metadata)
eq(got, [(metadata, 'utf-8')])
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata write')
+@attr(assertion='non-priting in-fixes noted and preserved')
def test_object_set_get_metadata_empty_to_unreadable_infix():
metadata = 'h\x04w'
got = _set_get_metadata_unreadable(metadata)
eq(got, [(metadata, 'utf-8')])
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata re-write')
+@attr(assertion='non-priting prefixes noted and preserved')
def test_object_set_get_metadata_overwrite_to_unreadable_prefix():
metadata = '\x04w'
got = _set_get_metadata_unreadable(metadata)
eq(got2, [(metadata2, 'utf-8')])
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata re-write')
+@attr(assertion='non-priting suffixes noted and preserved')
def test_object_set_get_metadata_overwrite_to_unreadable_suffix():
metadata = 'h\x04'
got = _set_get_metadata_unreadable(metadata)
eq(got2, [(metadata2, 'utf-8')])
+@attr(resource='object.metadata')
+@attr(method='put')
+@attr(operation='metadata re-write')
+@attr(assertion='non-priting in-fixes noted and preserved')
def test_object_set_get_metadata_overwrite_to_unreadable_infix():
metadata = 'h\x04w'
got = _set_get_metadata_unreadable(metadata)
eq(got2, [(metadata2, 'utf-8')])
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='data re-write')
+@attr(assertion='replaces previous metadata')
def test_object_metadata_replaced_on_put():
bucket = get_new_bucket()
assert got is None, "did not expect to see metadata: %r" % got
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='data write from file (w/100-Continue)')
+@attr(assertion='returns written data')
def test_object_write_file():
# boto Key.set_contents_from_file / .send_file uses Expect:
# 100-Continue, so this test exercises that (though a bit too
def _setup_request(bucket_acl=None, object_acl=None):
+ """
+ add a foo key, and specified key and bucket acls to
+ a (new or existing) bucket.
+ """
bucket = _create_keys(keys=['foo'])
key = bucket.get_key('foo')
def _make_request(method, bucket, key, body=None, authenticated=False):
+ """
+ issue a request for a specified method, on a specified <bucket,key>,
+ with a specified (optional) body (encrypted per the connection), and
+ return the response (status, reason)
+ """
if authenticated:
url = key.generate_url(100000, method=method)
o = urlparse(url)
return res
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='publically readable bucket')
+@attr(assertion='bucket is readable')
def test_object_raw_get():
(bucket, key) = _setup_request('public-read', 'public-read')
res = _make_request('GET', bucket, key)
eq(res.reason, 'OK')
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='deleted object and bucket')
+@attr(assertion='fails 404')
def test_object_raw_get_bucket_gone():
(bucket, key) = _setup_request('public-read', 'public-read')
key.delete()
eq(res.reason, 'Not Found')
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='deleted object')
+@attr(assertion='fails 404')
def test_object_raw_get_object_gone():
(bucket, key) = _setup_request('public-read', 'public-read')
key.delete()
eq(res.reason, 'Not Found')
-# a private bucket should not affect reading or writing to a bucket
+@attr(resource='bucket.acl')
+@attr(method='get')
+@attr(operation='unauthenticated on private bucket')
+@attr(assertion='succeeds')
def test_object_raw_get_bucket_acl():
(bucket, key) = _setup_request('private', 'public-read')
eq(res.reason, 'OK')
+@attr(resource='object.acl')
+@attr(method='get')
+@attr(operation='unauthenticated on private object')
+@attr(assertion='fails 403')
def test_object_raw_get_object_acl():
(bucket, key) = _setup_request('public-read', 'private')
eq(res.reason, 'Forbidden')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='authenticated on public bucket/object')
+@attr(assertion='succeeds')
def test_object_raw_authenticated():
(bucket, key) = _setup_request('public-read', 'public-read')
eq(res.reason, 'OK')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='authenticated on private bucket/public object')
+@attr(assertion='succeeds')
def test_object_raw_authenticated_bucket_acl():
(bucket, key) = _setup_request('private', 'public-read')
eq(res.reason, 'OK')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='authenticated on public bucket/private object')
+@attr(assertion='succeeds')
def test_object_raw_authenticated_object_acl():
(bucket, key) = _setup_request('public-read', 'private')
eq(res.reason, 'OK')
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='authenticated on deleted object and bucket')
+@attr(assertion='fails 404')
def test_object_raw_authenticated_bucket_gone():
(bucket, key) = _setup_request('public-read', 'public-read')
key.delete()
eq(res.reason, 'Not Found')
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='authenticated on deleted object')
+@attr(assertion='fails 404')
def test_object_raw_authenticated_object_gone():
(bucket, key) = _setup_request('public-read', 'public-read')
key.delete()
eq(res.reason, 'Not Found')
-# test for unsigned PUT
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='unauthenticated, no object acls')
+@attr(assertion='fails 403')
def test_object_raw_put():
bucket = get_new_bucket()
key = bucket.new_key('foo')
eq(res.reason, 'Forbidden')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='unauthenticated, publically writable object')
+@attr(assertion='succeeds')
def test_object_raw_put_write_access():
bucket = get_new_bucket()
bucket.set_acl('public-read-write')
eq(res.reason, 'OK')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='authenticated, no object acls')
+@attr(assertion='succeeds')
def test_object_raw_put_authenticated():
bucket = get_new_bucket()
key = bucket.new_key('foo')
def check_bad_bucket_name(name):
+ """
+ Attempt to create a bucket with a specified name, and confirm
+ that the request fails because of an invalid bucket name.
+ """
e = assert_raises(boto.exception.S3ResponseError, s3.main.create_bucket, name)
eq(e.status, 400)
eq(e.reason, 'Bad Request')
@attr('fails_on_aws')
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='name begins with underscore')
+@attr(assertion='fails with subdomain: 400')
def test_bucket_create_naming_bad_starts_nonalpha():
check_bad_bucket_name('_alphasoup')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='empty name')
+@attr(assertion='fails 405')
def test_bucket_create_naming_bad_short_empty():
# bucket creates where name is empty look like PUTs to the parent
# resource (with slash), hence their error response is different
eq(e.error_code, 'MethodNotAllowed')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='short (one character) name')
+@attr(assertion='fails 400')
def test_bucket_create_naming_bad_short_one():
check_bad_bucket_name('a')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='short (two character) name')
+@attr(assertion='fails 400')
def test_bucket_create_naming_bad_short_two():
check_bad_bucket_name('aa')
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='excessively long names')
+@attr(assertion='fails with subdomain: 400')
def test_bucket_create_naming_bad_long():
check_bad_bucket_name(256*'a')
check_bad_bucket_name(280*'a')
def check_good_bucket_name(name, _prefix=None):
- # prefixing to make then unique
-
+ """
+ Attempt to create a bucket with a specified name
+ and (specified or default) prefix, returning the
+ results of that effort.
+ """
# tests using this with the default prefix must *not* rely on
# being able to set the initial character, or exceed the max len
def _test_bucket_create_naming_good_long(length):
+ """
+ Attempt to create a bucket whose name (including the
+ prefix) is of a specified length.
+ """
prefix = get_prefix()
assert len(prefix) < 255
num = length - len(prefix)
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/250 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_good_long_250():
_test_bucket_create_naming_good_long(250)
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/251 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_good_long_251():
_test_bucket_create_naming_good_long(251)
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/252 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_good_long_252():
_test_bucket_create_naming_good_long(252)
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/253 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_good_long_253():
_test_bucket_create_naming_good_long(253)
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/254 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_good_long_254():
_test_bucket_create_naming_good_long(254)
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/255 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_good_long_255():
_test_bucket_create_naming_good_long(255)
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list w/251 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_list_long_name():
prefix = get_prefix()
length = 251
# AWS does not enforce all documented bucket restrictions.
# http://docs.amazonwebservices.com/AmazonS3/2006-03-01/dev/index.html?BucketRestrictions.html
@attr('fails_on_aws')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/ip address for name')
+@attr(assertion='fails on aws')
def test_bucket_create_naming_bad_ip():
check_bad_bucket_name('192.168.5.123')
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/! in name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_bad_punctuation():
# characters other than [a-zA-Z0-9._-]
check_bad_bucket_name('alpha!soup')
# test_bucket_create_naming_dns_* are valid but not recommended
-
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/underscore in name')
+@attr(assertion='succeeds')
def test_bucket_create_naming_dns_underscore():
check_good_bucket_name('foo_bar')
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/100 byte name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_dns_long():
prefix = get_prefix()
assert len(prefix) < 50
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/dash at end of name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_dns_dash_at_end():
check_good_bucket_name('foo-')
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/.. in name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_dns_dot_dot():
check_good_bucket_name('foo..bar')
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/.- in name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_dns_dot_dash():
check_good_bucket_name('foo.-bar')
# Breaks DNS with SubdomainCallingFormat
@attr('fails_with_subdomain')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create w/-. in name')
+@attr(assertion='fails with subdomain')
def test_bucket_create_naming_dns_dash_dot():
check_good_bucket_name('foo-.bar')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='re-create')
+@attr(assertion='idempotent success')
def test_bucket_create_exists():
bucket = get_new_bucket()
# REST idempotency means this should be a nop
s3.main.create_bucket(bucket.name)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='re-create by non-owner')
+@attr(assertion='fails 409')
def test_bucket_create_exists_nonowner():
# Names are shared across a global namespace. As such, no two
# users can create a bucket with that same name.
eq(e.error_code, 'BucketAlreadyExists')
+@attr(resource='bucket')
+@attr(method='del')
+@attr(operation='delete by non-owner')
+@attr(assertion='fails')
def test_bucket_delete_nonowner():
bucket = get_new_bucket()
check_access_denied(s3.alt.delete_bucket, bucket.name)
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='default acl')
+@attr(assertion='read back expected defaults')
def test_bucket_acl_default():
bucket = get_new_bucket()
policy = bucket.get_acl()
)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='acl: public-read,private')
+@attr(assertion='read back expected values')
def test_bucket_acl_canned():
bucket = get_new_bucket()
# Since it defaults to private, set it public-read first
)
+@attr(resource='bucket.acls')
+@attr(method='put')
+@attr(operation='acl: public-read-write')
+@attr(assertion='read back expected values')
def test_bucket_acl_canned_publicreadwrite():
bucket = get_new_bucket()
bucket.set_acl('public-read-write')
)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='acl: authenticated-read')
+@attr(assertion='read back expected values')
def test_bucket_acl_canned_authenticatedread():
bucket = get_new_bucket()
bucket.set_acl('authenticated-read')
)
+@attr(resource='object.acls')
+@attr(method='get')
+@attr(operation='default acl')
+@attr(assertion='read back expected defaults')
def test_object_acl_default():
bucket = get_new_bucket()
key = bucket.new_key('foo')
)
+@attr(resource='object.acls')
+@attr(method='put')
+@attr(operation='acl public-read,private')
+@attr(assertion='read back expected values')
def test_object_acl_canned():
bucket = get_new_bucket()
key = bucket.new_key('foo')
)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='acl public-read-write')
+@attr(assertion='read back expected values')
def test_object_acl_canned_publicreadwrite():
bucket = get_new_bucket()
key = bucket.new_key('foo')
)
+@attr(resource='object.acls')
+@attr(method='put')
+@attr(operation='acl authenticated-read')
+@attr(assertion='read back expected values')
def test_object_acl_canned_authenticatedread():
bucket = get_new_bucket()
key = bucket.new_key('foo')
)
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='set acl private')
+@attr(assertion='a private object can be set to private')
def test_bucket_acl_canned_private_to_private():
bucket = get_new_bucket()
bucket.set_acl('private')
def _make_acl_xml(acl):
+ """
+ Return the xml form of an ACL entry
+ """
return '<?xml version="1.0" encoding="UTF-8"?><AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Owner><ID>' + config.main.user_id + '</ID></Owner>' + acl.to_xml() + '</AccessControlPolicy>'
def _build_bucket_acl_xml(permission, bucket=None):
+ """
+ add the specified permission for the current user to
+ a (new or specified) bucket, in XML form, set it, and
+ then read it back to confirm it was correctly set
+ """
acl = boto.s3.acl.ACL()
acl.add_user_grant(permission=permission, user_id=config.main.user_id)
XML = _make_acl_xml(acl)
)
+@attr(resource='bucket.acls')
+@attr(method='ACLs')
+@attr(operation='set acl FULL_CONTROL (xml)')
+@attr(assertion='reads back correctly')
def test_bucket_acl_xml_fullcontrol():
_build_bucket_acl_xml('FULL_CONTROL')
+@attr(resource='bucket.acls')
+@attr(method='ACLs')
+@attr(operation='set acl WRITE (xml)')
+@attr(assertion='reads back correctly')
def test_bucket_acl_xml_write():
_build_bucket_acl_xml('WRITE')
+@attr(resource='bucket.acls')
+@attr(method='ACLs')
+@attr(operation='set acl WRITE_ACP (xml)')
+@attr(assertion='reads back correctly')
def test_bucket_acl_xml_writeacp():
_build_bucket_acl_xml('WRITE_ACP')
+@attr(resource='bucket.acls')
+@attr(method='ACLs')
+@attr(operation='set acl READ (xml)')
+@attr(assertion='reads back correctly')
def test_bucket_acl_xml_read():
_build_bucket_acl_xml('READ')
+@attr(resource='bucket.acls')
+@attr(method='ACLs')
+@attr(operation='set acl READ_ACP (xml)')
+@attr(assertion='reads back correctly')
def test_bucket_acl_xml_readacp():
_build_bucket_acl_xml('READ_ACP')
def _build_object_acl_xml(permission):
+ """
+ add the specified permission for the current user to
+ a new object in a new bucket, in XML form, set it, and
+ then read it back to confirm it was correctly set
+ """
acl = boto.s3.acl.ACL()
acl.add_user_grant(permission=permission, user_id=config.main.user_id)
XML = _make_acl_xml(acl)
)
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set acl FULL_CONTROL (xml)')
+@attr(assertion='reads back correctly')
def test_object_acl_xml():
_build_object_acl_xml('FULL_CONTROL')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set acl WRITE (xml)')
+@attr(assertion='reads back correctly')
def test_object_acl_xml_write():
_build_object_acl_xml('WRITE')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set acl WRITE_ACP (xml)')
+@attr(assertion='reads back correctly')
def test_object_acl_xml_writeacp():
_build_object_acl_xml('WRITE_ACP')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set acl READ (xml)')
+@attr(assertion='reads back correctly')
def test_object_acl_xml_read():
_build_object_acl_xml('READ')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set acl READ_ACP (xml)')
+@attr(assertion='reads back correctly')
def test_object_acl_xml_readacp():
_build_object_acl_xml('READ_ACP')
def _bucket_acl_grant_userid(permission):
+ """
+ create a new bucket, grant a specific user the specified
+ permission, read back the acl and verify correct setting
+ """
bucket = get_new_bucket()
# add alt user
policy = bucket.get_acl()
def _check_bucket_acl_grant_can_read(bucket):
+ """
+ verify ability to read the specified bucket
+ """
bucket2 = s3.alt.get_bucket(bucket.name)
def _check_bucket_acl_grant_cant_read(bucket):
+ """
+ verify inability to read the specified bucket
+ """
check_access_denied(s3.alt.get_bucket, bucket.name)
def _check_bucket_acl_grant_can_readacp(bucket):
+ """
+ verify ability to read acls on specified bucket
+ """
bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
bucket2.get_acl()
def _check_bucket_acl_grant_cant_readacp(bucket):
+ """
+ verify inability to read acls on specified bucket
+ """
bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
check_access_denied(bucket2.get_acl)
def _check_bucket_acl_grant_can_write(bucket):
+ """
+ verify ability to write the specified bucket
+ """
bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
key = bucket2.new_key('foo-write')
key.set_contents_from_string('bar')
def _check_bucket_acl_grant_cant_write(bucket):
+ """
+ verify inability to write the specified bucket
+ """
bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
key = bucket2.new_key('foo-write')
check_access_denied(key.set_contents_from_string, 'bar')
def _check_bucket_acl_grant_can_writeacp(bucket):
+ """
+ verify ability to set acls on the specified bucket
+ """
bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
bucket2.set_acl('public-read')
def _check_bucket_acl_grant_cant_writeacp(bucket):
+ """
+ verify inability to set acls on the specified bucket
+ """
bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
check_access_denied(bucket2.set_acl, 'public-read')
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='set acl w/userid FULL_CONTROL')
+@attr(assertion='can read/write data/acls')
def test_bucket_acl_grant_userid_fullcontrol():
bucket = _bucket_acl_grant_userid('FULL_CONTROL')
_check_bucket_acl_grant_can_writeacp(bucket)
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='set acl w/userid READ')
+@attr(assertion='can read data, no other r/w')
def test_bucket_acl_grant_userid_read():
bucket = _bucket_acl_grant_userid('READ')
_check_bucket_acl_grant_cant_writeacp(bucket)
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='set acl w/userid READ_ACP')
+@attr(assertion='can read acl, no other r/w')
def test_bucket_acl_grant_userid_readacp():
bucket = _bucket_acl_grant_userid('READ_ACP')
#_check_bucket_acl_grant_cant_writeacp_can_readacp(bucket)
_check_bucket_acl_grant_cant_writeacp(bucket)
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='set acl w/userid WRITE')
+@attr(assertion='can write data, no other r/w')
def test_bucket_acl_grant_userid_write():
bucket = _bucket_acl_grant_userid('WRITE')
_check_bucket_acl_grant_cant_writeacp(bucket)
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='set acl w/userid WRITE_ACP')
+@attr(assertion='can write acls, no other r/w')
def test_bucket_acl_grant_userid_writeacp():
bucket = _bucket_acl_grant_userid('WRITE_ACP')
_check_bucket_acl_grant_can_writeacp(bucket)
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='set acl w/invalid userid')
+@attr(assertion='fails 400')
def test_bucket_acl_grant_nonexist_user():
bucket = get_new_bucket()
# add alt user
eq(e.error_code, 'InvalidArgument')
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='revoke all ACLs')
+@attr(assertion='can: read obj, get/set bucket acl, cannot write objs')
def test_bucket_acl_no_grants():
bucket = get_new_bucket()
# This test will fail on DH Objects. DHO allows multiple users with one account, which
# would violate the uniqueness requirement of a user's email. As such, DHO users are
# created without an email.
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='add second FULL_CONTROL user')
+@attr(assertion='works for S3, fails for DHO')
@attr('fails_on_dho')
def test_bucket_acl_grant_email():
bucket = get_new_bucket()
key.set_contents_from_string('bar')
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='add acl for nonexistent user')
+@attr(assertion='fail 400')
def test_bucket_acl_grant_email_notexist():
# behavior not documented by amazon
bucket = get_new_bucket()
eq(e.error_code, 'UnresolvableGrantByEmailAddress')
+@attr(resource='bucket')
+@attr(method='ACLs')
+@attr(operation='revoke all ACLs')
+@attr(assertion='acls read back as empty')
def test_bucket_acl_revoke_all():
# revoke all access, including the owner's access
bucket = get_new_bucket()
# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
# http://tracker.newdream.net/issues/984
+@attr(resource='bucket.log')
+@attr(method='put')
+@attr(operation='set/enable/disable logging target')
+@attr(assertion='operations succeed')
@attr('fails_on_rgw')
@attr('fails_on_dho')
def test_logging_toggle():
log_bucket.set_as_logging_target()
bucket.enable_logging(target_bucket=log_bucket, target_prefix=bucket.name)
bucket.disable_logging()
+ # NOTE: this does not actually test whether or not logging works
def _setup_access(bucket_acl, object_acl):
"""
Simple test fixture: create a bucket with given ACL, with objects:
-
- - a: given ACL
- - b: default ACL
+ - a: owning user, given ACL
+ - a2: same object accessed by some other user
+ - b: owning user, default ACL in bucket w/given ACL
+ - b2: same object accessed by a some other user
"""
obj = bunch.Bunch()
bucket = get_new_bucket()
obj.b = bucket.new_key('bar')
obj.b.set_contents_from_string('barcontent')
+ # bucket2 is being accessed by a different user
obj.bucket2 = s3.alt.get_bucket(bucket.name, validate=False)
obj.a2 = obj.bucket2.new_key(obj.a.name)
obj.b2 = obj.bucket2.new_key(obj.b.name)
return frozenset(k.name for k in bucket.list())
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: private/private')
+@attr(assertion='public has no access to bucket or objects')
def test_access_bucket_private_object_private():
# all the test_access_* tests follow this template
obj = _setup_access(bucket_acl='private', object_acl='private')
+ # a should be public-read, b gets default (private)
# acled object read fail
check_access_denied(obj.a2.get_contents_as_string)
# acled object write fail
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: private/public-read')
+@attr(assertion='public can only read readable object')
def test_access_bucket_private_object_publicread():
obj = _setup_access(bucket_acl='private', object_acl='public-read')
+ # a should be public-read, b gets default (private)
eq(obj.a2.get_contents_as_string(), 'foocontent')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: private/public-read/write')
+@attr(assertion='public can only read the readable object')
def test_access_bucket_private_object_publicreadwrite():
obj = _setup_access(bucket_acl='private', object_acl='public-read-write')
+ # a should be public-read-only ... because it is in a private bucket
+ # b gets default (private)
eq(obj.a2.get_contents_as_string(), 'foocontent')
- ### TODO: it seems AWS denies this write, even when we expected it
- ### to complete; as it is unclear what the actual desired behavior
- ### is (the docs are somewhat unclear), we'll just codify current
- ### AWS behavior, at least for now.
- # obj.a2.set_contents_from_string('foooverwrite')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: public-read/private')
+@attr(assertion='public can only list the bucket')
def test_access_bucket_publicread_object_private():
obj = _setup_access(bucket_acl='public-read', object_acl='private')
+ # a should be private, b gets default (private)
check_access_denied(obj.a2.get_contents_as_string)
check_access_denied(obj.a2.set_contents_from_string, 'barcontent')
- ### TODO: i don't understand why this gets denied, but codifying what
- ### AWS does
- # eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: public-read/public-read')
+@attr(assertion='public can read readable objects and list bucket')
def test_access_bucket_publicread_object_publicread():
obj = _setup_access(bucket_acl='public-read', object_acl='public-read')
+ # a should be public-read, b gets default (private)
eq(obj.a2.get_contents_as_string(), 'foocontent')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
- ### TODO: i don't understand why this gets denied, but codifying what
- ### AWS does
- # eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: public-read/public-read-write')
+@attr(assertion='public can read readable objects and list bucket')
def test_access_bucket_publicread_object_publicreadwrite():
obj = _setup_access(bucket_acl='public-read', object_acl='public-read-write')
+ # a should be public-read-only ... because it is in a r/o bucket
+ # b gets default (private)
eq(obj.a2.get_contents_as_string(), 'foocontent')
- ### TODO: it seems AWS denies this write, even when we expected it
- ### to complete; as it is unclear what the actual desired behavior
- ### is (the docs are somewhat unclear), we'll just codify current
- ### AWS behavior, at least for now.
- # obj.a2.set_contents_from_string('foooverwrite')
check_access_denied(obj.a2.set_contents_from_string, 'foooverwrite')
- ### TODO: i don't understand why this gets denied, but codifying what
- ### AWS does
- # eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
check_access_denied(obj.b2.set_contents_from_string, 'baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
check_access_denied(obj.new.set_contents_from_string, 'newcontent')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: public-read-write/private')
+@attr(assertion='private objects cannot be read, but can be overwritten')
def test_access_bucket_publicreadwrite_object_private():
obj = _setup_access(bucket_acl='public-read-write', object_acl='private')
+ # a should be private, b gets default (private)
check_access_denied(obj.a2.get_contents_as_string)
obj.a2.set_contents_from_string('barcontent')
- ### TODO: i don't understand why this gets denied, but codifying what
- ### AWS does
- # eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
obj.b2.set_contents_from_string('baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
obj.new.set_contents_from_string('newcontent')
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: public-read-write/public-read')
+@attr(assertion='private objects cannot be read, but can be overwritten')
def test_access_bucket_publicreadwrite_object_publicread():
obj = _setup_access(bucket_acl='public-read-write', object_acl='public-read')
+ # a should be public-read, b gets default (private)
eq(obj.a2.get_contents_as_string(), 'foocontent')
obj.a2.set_contents_from_string('barcontent')
- ### TODO: i don't understand why this gets denied, but codifying what
- ### AWS does
- # eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
obj.b2.set_contents_from_string('baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
obj.new.set_contents_from_string('newcontent')
-
+@attr(resource='object')
+@attr(method='ACLs')
+@attr(operation='set bucket/object acls: public-read-write/public-read-write')
+@attr(assertion='private objects cannot be read, but can be overwritten')
def test_access_bucket_publicreadwrite_object_publicreadwrite():
obj = _setup_access(bucket_acl='public-read-write', object_acl='public-read-write')
+ # a should be public-read-write, b gets default (private)
eq(obj.a2.get_contents_as_string(), 'foocontent')
obj.a2.set_contents_from_string('foooverwrite')
- ### TODO: i don't understand why this gets denied, but codifying what
- ### AWS does
- # eq(obj.b2.get_contents_as_string(), 'barcontent')
check_access_denied(obj.b2.get_contents_as_string)
obj.b2.set_contents_from_string('baroverwrite')
eq(get_bucket_key_names(obj.bucket2), frozenset(['foo', 'bar']))
obj.new.set_contents_from_string('newcontent')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='set object acls')
+@attr(assertion='valid XML ACL sets properly')
def test_object_set_valid_acl():
XML_1 = '<?xml version="1.0" encoding="UTF-8"?><AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Owner><ID>' + config.main.user_id + '</ID></Owner><AccessControlList><Grant><Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"><ID>' + config.main.user_id + '</ID></Grantee><Permission>FULL_CONTROL</Permission></Grant></AccessControlList></AccessControlPolicy>'
bucket = get_new_bucket()
key.set_contents_from_string('bar')
key.set_xml_acl(XML_1)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='set object acls')
+@attr(assertion='invalid XML ACL fails 403')
def test_object_giveaway():
CORRECT_ACL = '<?xml version="1.0" encoding="UTF-8"?><AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Owner><ID>' + config.main.user_id + '</ID></Owner><AccessControlList><Grant><Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"><ID>' + config.main.user_id + '</ID></Grantee><Permission>FULL_CONTROL</Permission></Grant></AccessControlList></AccessControlPolicy>'
WRONG_ACL = '<?xml version="1.0" encoding="UTF-8"?><AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Owner><ID>' + config.alt.user_id + '</ID></Owner><AccessControlList><Grant><Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"><ID>' + config.alt.user_id + '</ID></Grantee><Permission>FULL_CONTROL</Permission></Grant></AccessControlList></AccessControlPolicy>'
eq(e.reason, 'Forbidden')
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all buckets')
+@attr(assertion='returns all expected buckets')
def test_buckets_create_then_list():
create_buckets = [get_new_bucket() for i in xrange(5)]
list_buckets = s3.main.get_all_buckets()
)
return conn
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all buckets (anonymous)')
+@attr(assertion='succeeds')
def test_list_buckets_anonymous():
# Get a connection with bad authorization, then change it to be our new Anonymous auth mechanism,
# emulating standard HTTP access.
buckets = conn.get_all_buckets()
eq(len(buckets), 0)
+@attr(resource='bucket')
+@attr(method='get')
+@attr(operation='list all buckets (bad auth)')
+@attr(assertion='fails 403')
def test_list_buckets_bad_auth():
conn = _create_connection_bad_auth()
e = assert_raises(boto.exception.S3ResponseError, conn.get_all_buckets)
eq(e.reason, 'Forbidden')
eq(e.error_code, 'AccessDenied')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create bucket')
+@attr(assertion='name starts with alphabetic works')
# this test goes outside the user-configure prefix because it needs to
# control the initial character of the bucket name
@nose.with_setup(
def test_bucket_create_naming_good_starts_alpha():
check_good_bucket_name('foo', _prefix='a'+get_prefix())
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create bucket')
+@attr(assertion='name starts with numeric works')
# this test goes outside the user-configure prefix because it needs to
# control the initial character of the bucket name
@nose.with_setup(
def test_bucket_create_naming_good_starts_digit():
check_good_bucket_name('foo', _prefix='0'+get_prefix())
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create bucket')
+@attr(assertion='name containing dot works')
def test_bucket_create_naming_good_contains_period():
check_good_bucket_name('aaa.111')
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='create bucket')
+@attr(assertion='name containing hyphen works')
def test_bucket_create_naming_good_contains_hyphen():
check_good_bucket_name('aaa-111')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='copy object in same bucket')
+@attr(assertion='works')
def test_object_copy_same_bucket():
bucket = get_new_bucket()
key = bucket.new_key('foo123bar')
key2 = bucket.get_key('bar321foo')
eq(key2.get_contents_as_string(), 'foo')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='copy object from different bucket')
+@attr(assertion='works')
def test_object_copy_diff_bucket():
buckets = [get_new_bucket(), get_new_bucket()]
key = buckets[0].new_key('foo123bar')
# is this a necessary check? a NoneType object is being touched here
# it doesn't get to the S3 level
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='copy from an inaccessible bucket')
+@attr(assertion='fails w/AttributeError')
def test_object_copy_not_owned_bucket():
buckets = [get_new_bucket(), get_new_bucket(s3.alt)]
print repr(buckets[1])
mp.upload_part_from_file(part_out, i+1)
def generate_random(mb_size):
+ """
+ Generate the specified number of megabytes of random data.
+ (actually each MB is a repetition of the first KB)
+ """
mb = 1024 * 1024
chunk = 1024
part_size_mb = 5
allowed = string.ascii_letters
for x in range(0, mb_size, part_size_mb):
- strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for x in xrange(chunk)])
+ strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in xrange(chunk)])
s = ''
left = mb_size - x
this_part_size = min(left, part_size_mb)
return
def _multipart_upload(bucket, s3_key_name, mb_size, do_list=None):
+ """
+ generate a multi-part upload for a random file of specifed size,
+ if requested, generate a list of the parts
+ return the upload descriptor
+ """
upload = bucket.initiate_multipart_upload(s3_key_name)
for i, part in enumerate(generate_random(mb_size)):
transfer_part(bucket, upload.id, upload.key_name, i, part)
return upload
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='complete multi-part upload')
+@attr(assertion='successful')
def test_multipart_upload():
bucket = get_new_bucket()
key="mymultipart"
upload = _multipart_upload(bucket, key, 30)
upload.complete_upload()
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='abort multi-part upload')
+@attr(assertion='successful')
def test_abort_multipart_upload():
bucket = get_new_bucket()
key="mymultipart"
upload.cancel_upload()
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='concurrent multi-part uploads')
+@attr(assertion='successful')
def test_list_multipart_upload():
bucket = get_new_bucket()
key="mymultipart"
upload3.cancel_upload()
def _simple_http_req_100_cont(host, port, is_secure, method, resource):
+ """
+ Send the specified request w/expect 100-continue
+ and await confirmation.
+ """
req = '{method} {resource} HTTP/1.1\r\nHost: {host}\r\nAccept-Encoding: identity\r\nContent-Length: 123\r\nExpect: 100-continue\r\n\r\n'.format(
method=method,
resource=resource,
return l[1]
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='w/expect continue')
+@attr(assertion='succeeds if object is public-read-write')
def test_100_continue():
bucket = get_new_bucket()
objname = 'testobj'
eq(status, '100')
def _test_bucket_acls_changes_persistent(bucket):
+ """
+ set and verify readback of each possible permission
+ """
perms = ('FULL_CONTROL', 'WRITE', 'WRITE_ACP', 'READ', 'READ_ACP')
for p in perms:
_build_bucket_acl_xml(p, bucket)
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='acl set')
+@attr(assertion='all permissions are persistent')
def test_bucket_acls_changes_persistent():
bucket = get_new_bucket()
_test_bucket_acls_changes_persistent(bucket);
+@attr(resource='bucket')
+@attr(method='put')
+@attr(operation='repeated acl set')
+@attr(assertion='all permissions are persistent')
def test_stress_bucket_acls_changes():
bucket = get_new_bucket()
for i in xrange(10):
_test_bucket_acls_changes_persistent(bucket);
class FakeFile(object):
+ """
+ file that simulates seek, tell, and current character
+ """
def __init__(self, char='A', interrupt=None):
self.offset = 0
self.char = char
return self.offset
class FakeWriteFile(FakeFile):
+ """
+ file that simulates interruptable reads of constant data
+ """
def __init__(self, size, char='A', interrupt=None):
FakeFile.__init__(self, char, interrupt)
self.size = size
return self.char*count
class FakeReadFile(FakeFile):
+ """
+ file that simulates writes, interrupting after the second
+ """
def __init__(self, size, char='A', interrupt=None):
FakeFile.__init__(self, char, interrupt)
self.interrupted = False
eq(self.size, self.expected_size)
class FakeFileVerifier(object):
+ """
+ file that verifies expected data has been written
+ """
def __init__(self, char=None):
self.char = char
self.size = 0
eq(data, self.char*size)
def _verify_atomic_key_data(key, size=-1, char=None):
+ """
+ Make sure file is of the expected size and (simulated) content
+ """
fp_verify = FakeFileVerifier(char)
key.get_contents_to_file(fp_verify)
if size >= 0:
eq(fp_verify.size, size)
def _test_atomic_read(file_size):
+ """
+ Create a file of A's, use it to set_contents_from_file.
+ Create a file of B's, use it to re-set_contents_from_file.
+ Re-read the contents, and confirm we get B's
+ """
bucket = get_new_bucket()
key = bucket.new_key('testobj')
_verify_atomic_key_data(key, file_size, 'B')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='read atomicity')
+@attr(assertion='1MB successful')
def test_atomic_read_1mb():
_test_atomic_read(1024*1024)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='read atomicity')
+@attr(assertion='4MB successful')
def test_atomic_read_4mb():
_test_atomic_read(1024*1024*4)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='read atomicity')
+@attr(assertion='8MB successful')
def test_atomic_read_8mb():
_test_atomic_read(1024*1024*8)
def _test_atomic_write(file_size):
+ """
+ Create a file of A's, use it to set_contents_from_file.
+ Verify the contents are all A's.
+ Create a file of B's, use it to re-set_contents_from_file.
+ Before re-set continues, verify content's still A's
+ Re-read the contents, and confirm we get B's
+ """
bucket = get_new_bucket()
objname = 'testobj'
key = bucket.new_key(objname)
# verify B's
_verify_atomic_key_data(key, file_size, 'B')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write atomicity')
+@attr(assertion='1MB successful')
def test_atomic_write_1mb():
_test_atomic_write(1024*1024)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write atomicity')
+@attr(assertion='4MB successful')
def test_atomic_write_4mb():
_test_atomic_write(1024*1024*4)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write atomicity')
+@attr(assertion='8MB successful')
def test_atomic_write_8mb():
_test_atomic_write(1024*1024*8)
def _test_atomic_dual_write(file_size):
+ """
+ create an object, two sessions writing different contents
+ confirm that it is all one or the other
+ """
bucket = get_new_bucket()
objname = 'testobj'
key = bucket.new_key(objname)
# verify the file
_verify_atomic_key_data(key, file_size)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write one or the other')
+@attr(assertion='1MB successful')
def test_atomic_dual_write_1mb():
_test_atomic_dual_write(1024*1024)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write one or the other')
+@attr(assertion='4MB successful')
def test_atomic_dual_write_4mb():
_test_atomic_dual_write(1024*1024*4)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write one or the other')
+@attr(assertion='8MB successful')
def test_atomic_dual_write_8mb():
_test_atomic_dual_write(1024*1024*8)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write file in deleted bucket')
+@attr(assertion='fail 404')
@attr('fails_on_aws')
@attr('fails_on_dho')
def test_atomic_write_bucket_gone():
eq(e.reason, 'Not Found')
eq(e.error_code, 'NoSuchBucket')
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='range')
+@attr(assertion='returns correct data, 206')
def test_ranged_request_response_code():
content = 'testcontent'