s3 = bunch.Bunch()
config = bunch.Bunch()
+regions = bunch.Bunch()
+targets = bunch.Bunch()
# this will be assigned by setup()
prefix = None
+calling_formats = dict(
+ ordinary=boto.s3.connection.OrdinaryCallingFormat(),
+ subdomain=boto.s3.connection.SubdomainCallingFormat(),
+ vhost=boto.s3.connection.VHostCallingFormat(),
+ )
+
def get_prefix():
assert prefix is not None
return prefix
print 'Done with cleanup of test buckets.'
+class TargetConfig:
+ def __init__(self, cfg, section):
+ self.port = None
+ self.api_name = ''
+ self.is_master = False
+ self.is_secure = False
+ try:
+ self.api_name = cfg.get(section, 'api_name')
+ except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
+ pass
+ try:
+ self.port = cfg.getint(section, 'port')
+ except ConfigParser.NoOptionError:
+ pass
+ try:
+ self.host=cfg.get(section, 'host')
+ except ConfigParser.NoOptionError:
+ raise RuntimeError(
+ 'host not specified for section {s}'.format(s=section)
+ )
+ try:
+ self.is_secure=cfg.getboolean(section, 'is_secure')
+ except ConfigParser.NoOptionError:
+ pass
+
+ try:
+ raw_calling_format = cfg.get(section, 'calling_format')
+ except ConfigParser.NoOptionError:
+ raw_calling_format = 'ordinary'
+
+ try:
+ self.calling_format = calling_formats[raw_calling_format]
+ except KeyError:
+ raise RuntimeError(
+ 'calling_format unknown: %r' % raw_calling_format
+ )
+
+class TargetConnection:
+ def __init__(self, conf, conn):
+ self.conf = conf
+ self.connection = conn
+
# nosetests --processes=N with N>1 is safe
_multiprocess_can_split_ = True
cfg.readfp(f)
global prefix
- global location
+ global targets
+
try:
template = cfg.get('fixtures', 'bucket prefix')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
template = 'test-{random}-'
prefix = choose_bucket_prefix(template=template)
- try:
- location = cfg.get('region main', 'name')
- except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
- location = ''
-
s3.clear()
config.clear()
- calling_formats = dict(
- ordinary=boto.s3.connection.OrdinaryCallingFormat(),
- subdomain=boto.s3.connection.SubdomainCallingFormat(),
- vhost=boto.s3.connection.VHostCallingFormat(),
- )
+ regions.clear()
+
for section in cfg.sections():
try:
(type_, name) = section.split(None, 1)
except ValueError:
continue
- if type_ != 's3':
+ if type_ != 'region':
continue
- try:
- port = cfg.getint(section, 'port')
- except ConfigParser.NoOptionError:
- port = None
+ region_conf = TargetConfig(cfg, section)
+ regions[name] = region_conf
+ for section in cfg.sections():
try:
- raw_calling_format = cfg.get(section, 'calling_format')
- except ConfigParser.NoOptionError:
- raw_calling_format = 'ordinary'
+ (type_, name) = section.split(None, 1)
+ except ValueError:
+ continue
+ if type_ != 's3':
+ continue
try:
- calling_format = calling_formats[raw_calling_format]
- except KeyError:
- raise RuntimeError(
- 'calling_format unknown: %r' % raw_calling_format
- )
+ region_name = cfg.get(section, 'region')
+ region_config = regions[region_name]
+ except ConfigParser.NoOptionError:
+ region_config = TargetConfig(cfg, section)
config[name] = bunch.Bunch()
for var in [
conn = boto.s3.connection.S3Connection(
aws_access_key_id=cfg.get(section, 'access_key'),
aws_secret_access_key=cfg.get(section, 'secret_key'),
- is_secure=cfg.getboolean(section, 'is_secure'),
- port=port,
- host=cfg.get(section, 'host'),
+ is_secure=region_config.is_secure,
+ port=region_config.port,
+ host=region_config.host,
# TODO test vhost calling format
- calling_format=calling_format,
+ calling_format=region_config.calling_format,
)
s3[name] = conn
+ targets[name] = TargetConnection(region_config, conn)
# WARNING! we actively delete all buckets we see with the prefix
# we've chosen! Choose your prefix with care, and don't reuse
return name
-def get_new_bucket(connection=None, name=None, headers=None):
+def get_new_bucket(target=None, name=None, headers=None):
"""
Get a bucket that exists and is empty.
Always recreates a bucket from scratch. This is useful to also
reset ACLs and such.
"""
- if connection is None:
- connection = s3.main
+ if target is None:
+ target = targets.main
+ connection = target.connection
if name is None:
name = get_new_bucket_name()
# the only way for this to fail with a pre-existing bucket is if
# someone raced us between setup nuke_prefixed_buckets and here;
# ignore that as astronomically unlikely
- bucket = connection.create_bucket(name, location=location, headers=headers)
+ bucket = connection.create_bucket(name, location=target.conf.api_name, headers=headers)
return bucket
get_new_bucket,
get_new_bucket_name,
s3,
+ targets,
config,
get_prefix,
)
def test_bucket_create_delete():
name = '{prefix}foo'.format(prefix=get_prefix())
print 'Trying bucket {name!r}'.format(name=name)
- bucket = get_new_bucket(s3.main, name)
+ bucket = get_new_bucket(targets.main, name)
# make sure it's actually there
s3.main.get_bucket(bucket.name)
bucket.delete()
Attempt to create a bucket with a specified name, and confirm
that the request fails because of an invalid bucket name.
"""
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, s3.main, name)
+ e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, targets.main, name)
eq(e.status, 400)
eq(e.reason, 'Bad Request')
eq(e.error_code, 'InvalidBucketName')
def test_bucket_create_naming_bad_short_empty():
# bucket creates where name is empty look like PUTs to the parent
# resource (with slash), hence their error response is different
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, s3.main, '')
+ e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, targets.main, '')
eq(e.status, 405)
eq(e.reason, 'Method Not Allowed')
eq(e.error_code, 'MethodNotAllowed')
# should be very rare
if _prefix is None:
_prefix = get_prefix()
- get_new_bucket(s3.main, '{prefix}{name}'.format(
+ get_new_bucket(targets.main, '{prefix}{name}'.format(
prefix=_prefix,
name=name,
))
prefix = get_prefix()
assert len(prefix) < 255
num = length - len(prefix)
- get_new_bucket(s3.main, '{prefix}{name}'.format(
+ get_new_bucket(targets.main, '{prefix}{name}'.format(
prefix=prefix,
name=num*'a',
))
prefix = get_prefix()
length = 251
num = length - len(prefix)
- bucket = get_new_bucket(s3.main, '{prefix}{name}'.format(
+ bucket = get_new_bucket(targets.main, '{prefix}{name}'.format(
prefix=prefix,
name=num*'a',
))
@attr(operation='re-create')
@attr(assertion='idempotent success')
def test_bucket_create_exists():
- bucket = get_new_bucket(s3.main)
+ bucket = get_new_bucket(targets.main)
# REST idempotency means this should be a nop
- get_new_bucket(s3.main, bucket.name)
+ get_new_bucket(targets.main, bucket.name)
@attr(resource='bucket')
# Names are shared across a global namespace. As such, no two
# users can create a bucket with that same name.
bucket = get_new_bucket()
- e = assert_raises(boto.exception.S3CreateError, get_new_bucket, s3.alt, bucket.name)
+ e = assert_raises(boto.exception.S3CreateError, get_new_bucket, targets.alt, bucket.name)
eq(e.status, 409)
eq(e.reason, 'Conflict')
eq(e.error_code, 'BucketAlreadyExists')
@attr(operation='acl bucket-owner-read')
@attr(assertion='read back expected values')
def test_object_acl_canned_bucketownerread():
- bucket = get_new_bucket(s3.main)
+ bucket = get_new_bucket(targets.main)
bucket.set_acl('public-read-write')
key = s3.alt.get_bucket(bucket.name).new_key('foo')
@attr(operation='acl bucket-owner-read')
@attr(assertion='read back expected values')
def test_object_acl_canned_bucketownerfullcontrol():
- bucket = get_new_bucket(s3.main)
+ bucket = get_new_bucket(targets.main)
bucket.set_acl('public-read-write')
key = s3.alt.get_bucket(bucket.name).new_key('foo')
@attr('fails_on_dho')
def test_bucket_header_acl_grants():
headers = _get_acl_header()
- bucket = get_new_bucket(s3.main, get_prefix(), headers)
+ bucket = get_new_bucket(targets.main, get_prefix(), headers)
policy = bucket.get_acl()
check_grants(
@attr('fails_on_rgw')
def test_logging_toggle():
bucket = get_new_bucket()
- log_bucket = get_new_bucket(s3.main, bucket.name + '-log')
+ log_bucket = get_new_bucket(targets.main, bucket.name + '-log')
log_bucket.set_as_logging_target()
bucket.enable_logging(target_bucket=log_bucket, target_prefix=bucket.name)
bucket.disable_logging()
names = [e.name for e in list(li)]
eq(names, key_names)
- bucket2 = get_new_bucket(s3.main, bucket.name)
+ bucket2 = get_new_bucket(targets.main, bucket.name)
li = bucket.list()
@attr(operation='copy from an inaccessible bucket')
@attr(assertion='fails w/AttributeError')
def test_object_copy_not_owned_bucket():
- buckets = [get_new_bucket(), get_new_bucket(s3.alt)]
+ buckets = [get_new_bucket(), get_new_bucket(targets.alt)]
print repr(buckets[1])
key = buckets[0].new_key('foo123bar')
key.set_contents_from_string('foo')