PyYAML
-boto >=2.6.0
boto3 >=1.0.0
# botocore-1.28 broke v2 signatures, see https://tracker.ceph.com/issues/58059
botocore <1.28.0
+++ /dev/null
-import boto.s3.connection
-import munch
-import itertools
-import os
-import random
-import string
-import yaml
-import re
-from lxml import etree
-
-from doctest import Example
-from lxml.doctestcompare import LXMLOutputChecker
-
-s3 = munch.Munch()
-config = munch.Munch()
-prefix = ''
-
-bucket_counter = itertools.count(1)
-key_counter = itertools.count(1)
-
-def choose_bucket_prefix(template, max_len=30):
- """
- Choose a prefix for our test buckets, so they're easy to identify.
-
- Use template and feed it more and more random filler, until it's
- as long as possible but still below max_len.
- """
- rand = ''.join(
- random.choice(string.ascii_lowercase + string.digits)
- for c in range(255)
- )
-
- while rand:
- s = template.format(random=rand)
- if len(s) <= max_len:
- return s
- rand = rand[:-1]
-
- raise RuntimeError(
- 'Bucket prefix template is impossible to fulfill: {template!r}'.format(
- template=template,
- ),
- )
-
-def nuke_bucket(bucket):
- try:
- bucket.set_canned_acl('private')
- # TODO: deleted_cnt and the while loop is a work around for rgw
- # not sending the
- deleted_cnt = 1
- while deleted_cnt:
- deleted_cnt = 0
- for key in bucket.list():
- print('Cleaning bucket {bucket} key {key}'.format(
- bucket=bucket,
- key=key,
- ))
- key.set_canned_acl('private')
- key.delete()
- deleted_cnt += 1
- bucket.delete()
- except boto.exception.S3ResponseError as e:
- # TODO workaround for buggy rgw that fails to send
- # error_code, remove
- if (e.status == 403
- and e.error_code is None
- and e.body == ''):
- e.error_code = 'AccessDenied'
- if e.error_code != 'AccessDenied':
- print('GOT UNWANTED ERROR', e.error_code)
- raise
- # seems like we're not the owner of the bucket; ignore
- pass
-
-def nuke_prefixed_buckets():
- for name, conn in list(s3.items()):
- print('Cleaning buckets from connection {name}'.format(name=name))
- for bucket in conn.get_all_buckets():
- if bucket.name.startswith(prefix):
- print('Cleaning bucket {bucket}'.format(bucket=bucket))
- nuke_bucket(bucket)
-
- print('Done with cleanup of test buckets.')
-
-def read_config(fp):
- config = munch.Munch()
- g = yaml.safe_load_all(fp)
- for new in g:
- config.update(munch.Munchify(new))
- return config
-
-def connect(conf):
- mapping = dict(
- port='port',
- host='host',
- is_secure='is_secure',
- access_key='aws_access_key_id',
- secret_key='aws_secret_access_key',
- )
- kwargs = dict((mapping[k],v) for (k,v) in conf.items() if k in mapping)
- #process calling_format argument
- calling_formats = dict(
- ordinary=boto.s3.connection.OrdinaryCallingFormat(),
- subdomain=boto.s3.connection.SubdomainCallingFormat(),
- vhost=boto.s3.connection.VHostCallingFormat(),
- )
- kwargs['calling_format'] = calling_formats['ordinary']
- if 'calling_format' in conf:
- raw_calling_format = conf['calling_format']
- try:
- kwargs['calling_format'] = calling_formats[raw_calling_format]
- except KeyError:
- raise RuntimeError(
- 'calling_format unknown: %r' % raw_calling_format
- )
- # TODO test vhost calling format
- conn = boto.s3.connection.S3Connection(**kwargs)
- return conn
-
-def setup():
- global s3, config, prefix
- s3.clear()
- config.clear()
-
- try:
- path = os.environ['S3TEST_CONF']
- except KeyError:
- raise RuntimeError(
- 'To run tests, point environment '
- + 'variable S3TEST_CONF to a config file.',
- )
- with file(path) as f:
- config.update(read_config(f))
-
- # These 3 should always be present.
- if 's3' not in config:
- raise RuntimeError('Your config file is missing the s3 section!')
- if 'defaults' not in config.s3:
- raise RuntimeError('Your config file is missing the s3.defaults section!')
- if 'fixtures' not in config:
- raise RuntimeError('Your config file is missing the fixtures section!')
-
- template = config.fixtures.get('bucket prefix', 'test-{random}-')
- prefix = choose_bucket_prefix(template=template)
- if prefix == '':
- raise RuntimeError("Empty Prefix! Aborting!")
-
- defaults = config.s3.defaults
- for section in list(config.s3.keys()):
- if section == 'defaults':
- continue
-
- conf = {}
- conf.update(defaults)
- conf.update(config.s3[section])
- conn = connect(conf)
- s3[section] = conn
-
- # WARNING! we actively delete all buckets we see with the prefix
- # we've chosen! Choose your prefix with care, and don't reuse
- # credentials!
-
- # We also assume nobody else is going to use buckets with that
- # prefix. This is racy but given enough randomness, should not
- # really fail.
- nuke_prefixed_buckets()
-
-def get_new_bucket(connection=None):
- """
- Get a bucket that exists and is empty.
-
- Always recreates a bucket from scratch. This is useful to also
- reset ACLs and such.
- """
- if connection is None:
- connection = s3.main
- name = '{prefix}{num}'.format(
- prefix=prefix,
- num=next(bucket_counter),
- )
- # the only way for this to fail with a pre-existing bucket is if
- # someone raced us between setup nuke_prefixed_buckets and here;
- # ignore that as astronomically unlikely
- bucket = connection.create_bucket(name)
- return bucket
-
-def teardown():
- nuke_prefixed_buckets()
-
-def with_setup_kwargs(setup, teardown=None):
- """Decorator to add setup and/or teardown methods to a test function::
-
- @with_setup_args(setup, teardown)
- def test_something():
- " ... "
-
- The setup function should return (kwargs) which will be passed to
- test function, and teardown function.
-
- Note that `with_setup_kwargs` is useful *only* for test functions, not for test
- methods or inside of TestCase subclasses.
- """
- def decorate(func):
- kwargs = {}
-
- def test_wrapped(*args, **kwargs2):
- k2 = kwargs.copy()
- k2.update(kwargs2)
- k2['testname'] = func.__name__
- func(*args, **k2)
-
- test_wrapped.__name__ = func.__name__
-
- def setup_wrapped():
- k = setup()
- kwargs.update(k)
- if hasattr(func, 'setup'):
- func.setup()
- test_wrapped.setup = setup_wrapped
-
- if teardown:
- def teardown_wrapped():
- if hasattr(func, 'teardown'):
- func.teardown()
- teardown(**kwargs)
-
- test_wrapped.teardown = teardown_wrapped
- else:
- if hasattr(func, 'teardown'):
- test_wrapped.teardown = func.teardown()
- return test_wrapped
- return decorate
-
-# Demo case for the above, when you run test_gen():
-# _test_gen will run twice,
-# with the following stderr printing
-# setup_func {'b': 2}
-# testcase ('1',) {'b': 2, 'testname': '_test_gen'}
-# teardown_func {'b': 2}
-# setup_func {'b': 2}
-# testcase () {'b': 2, 'testname': '_test_gen'}
-# teardown_func {'b': 2}
-#
-#def setup_func():
-# kwargs = {'b': 2}
-# print("setup_func", kwargs, file=sys.stderr)
-# return kwargs
-#
-#def teardown_func(**kwargs):
-# print("teardown_func", kwargs, file=sys.stderr)
-#
-#@with_setup_kwargs(setup=setup_func, teardown=teardown_func)
-#def _test_gen(*args, **kwargs):
-# print("testcase", args, kwargs, file=sys.stderr)
-#
-#def test_gen():
-# yield _test_gen, '1'
-# yield _test_gen
-
-def trim_xml(xml_str):
- p = etree.XMLParser(encoding="utf-8", remove_blank_text=True)
- xml_str = bytes(xml_str, "utf-8")
- elem = etree.XML(xml_str, parser=p)
- return etree.tostring(elem, encoding="unicode")
-
-def normalize_xml(xml, pretty_print=True):
- if xml is None:
- return xml
-
- root = etree.fromstring(xml.encode(encoding='ascii'))
-
- for element in root.iter('*'):
- if element.text is not None and not element.text.strip():
- element.text = None
- if element.text is not None:
- element.text = element.text.strip().replace("\n", "").replace("\r", "")
- if element.tail is not None and not element.tail.strip():
- element.tail = None
- if element.tail is not None:
- element.tail = element.tail.strip().replace("\n", "").replace("\r", "")
-
- # Sort the elements
- for parent in root.xpath('//*[./*]'): # Search for parent elements
- parent[:] = sorted(parent,key=lambda x: x.tag)
-
- xmlstr = etree.tostring(root, encoding="unicode", pretty_print=pretty_print)
- # there are two different DTD URIs
- xmlstr = re.sub(r'xmlns="[^"]+"', 'xmlns="s3"', xmlstr)
- xmlstr = re.sub(r'xmlns=\'[^\']+\'', 'xmlns="s3"', xmlstr)
- for uri in ['http://doc.s3.amazonaws.com/doc/2006-03-01/', 'http://s3.amazonaws.com/doc/2006-03-01/']:
- xmlstr = xmlstr.replace(uri, 'URI-DTD')
- #xmlstr = re.sub(r'>\s+', '>', xmlstr, count=0, flags=re.MULTILINE)
- return xmlstr
-
-def assert_xml_equal(got, want):
- assert want is not None, 'Wanted XML cannot be None'
- if got is None:
- raise AssertionError('Got input to validate was None')
- checker = LXMLOutputChecker()
- if not checker.check_output(want, got, 0):
- message = checker.output_difference(Example("", want), got, 0)
- raise AssertionError(message)
+++ /dev/null
-import sys
-import configparser
-import boto.exception
-import boto.s3.connection
-import munch
-import itertools
-import os
-import random
-import string
-import pytest
-from http.client import HTTPConnection, HTTPSConnection
-from urllib.parse import urlparse
-
-from .utils import region_sync_meta
-
-s3 = munch.Munch()
-config = munch.Munch()
-targets = munch.Munch()
-
-# this will be assigned by setup()
-prefix = None
-
-calling_formats = dict(
- ordinary=boto.s3.connection.OrdinaryCallingFormat(),
- subdomain=boto.s3.connection.SubdomainCallingFormat(),
- vhost=boto.s3.connection.VHostCallingFormat(),
- )
-
-def get_prefix():
- assert prefix is not None
- return prefix
-
-def is_slow_backend():
- return slow_backend
-
-def choose_bucket_prefix(template, max_len=30):
- """
- Choose a prefix for our test buckets, so they're easy to identify.
-
- Use template and feed it more and more random filler, until it's
- as long as possible but still below max_len.
- """
- rand = ''.join(
- random.choice(string.ascii_lowercase + string.digits)
- for c in range(255)
- )
-
- while rand:
- s = template.format(random=rand)
- if len(s) <= max_len:
- return s
- rand = rand[:-1]
-
- raise RuntimeError(
- 'Bucket prefix template is impossible to fulfill: {template!r}'.format(
- template=template,
- ),
- )
-
-
-def nuke_prefixed_buckets_on_conn(prefix, name, conn):
- print('Cleaning buckets from connection {name} prefix {prefix!r}.'.format(
- name=name,
- prefix=prefix,
- ))
-
- for bucket in conn.get_all_buckets():
- print('prefix=',prefix)
- if bucket.name.startswith(prefix):
- print('Cleaning bucket {bucket}'.format(bucket=bucket))
- success = False
- for i in range(2):
- try:
- try:
- iterator = iter(bucket.list_versions())
- # peek into iterator to issue list operation
- try:
- keys = itertools.chain([next(iterator)], iterator)
- except StopIteration:
- keys = [] # empty iterator
- except boto.exception.S3ResponseError as e:
- # some S3 implementations do not support object
- # versioning - fall back to listing without versions
- if e.error_code != 'NotImplemented':
- raise e
- keys = bucket.list();
- for key in keys:
- print('Cleaning bucket {bucket} key {key}'.format(
- bucket=bucket,
- key=key,
- ))
- # key.set_canned_acl('private')
- bucket.delete_key(key.name, version_id = key.version_id)
- try:
- bucket.delete()
- except boto.exception.S3ResponseError as e:
- # if DELETE times out, the retry may see NoSuchBucket
- if e.error_code != 'NoSuchBucket':
- raise e
- pass
- success = True
- except boto.exception.S3ResponseError as e:
- if e.error_code != 'AccessDenied':
- print('GOT UNWANTED ERROR', e.error_code)
- raise
- # seems like we don't have permissions set appropriately, we'll
- # modify permissions and retry
- pass
-
- if success:
- break
-
- bucket.set_canned_acl('private')
-
-
-def nuke_prefixed_buckets(prefix):
- # If no regions are specified, use the simple method
- if targets.main.master == None:
- for name, conn in list(s3.items()):
- print('Deleting buckets on {name}'.format(name=name))
- nuke_prefixed_buckets_on_conn(prefix, name, conn)
- else:
- # First, delete all buckets on the master connection
- for name, conn in list(s3.items()):
- if conn == targets.main.master.connection:
- print('Deleting buckets on {name} (master)'.format(name=name))
- nuke_prefixed_buckets_on_conn(prefix, name, conn)
-
- # Then sync to propagate deletes to secondaries
- region_sync_meta(targets.main, targets.main.master.connection)
- print('region-sync in nuke_prefixed_buckets')
-
- # Now delete remaining buckets on any other connection
- for name, conn in list(s3.items()):
- if conn != targets.main.master.connection:
- print('Deleting buckets on {name} (non-master)'.format(name=name))
- nuke_prefixed_buckets_on_conn(prefix, name, conn)
-
- print('Done with cleanup of test buckets.')
-
-class TargetConfig:
- def __init__(self, cfg, section):
- self.port = None
- self.api_name = ''
- self.is_master = False
- self.is_secure = False
- self.sync_agent_addr = None
- self.sync_agent_port = 0
- self.sync_meta_wait = 0
- try:
- self.api_name = cfg.get(section, 'api_name')
- except (configparser.NoSectionError, configparser.NoOptionError):
- pass
- try:
- self.port = cfg.getint(section, 'port')
- except configparser.NoOptionError:
- pass
- try:
- self.host=cfg.get(section, 'host')
- except configparser.NoOptionError:
- raise RuntimeError(
- 'host not specified for section {s}'.format(s=section)
- )
- try:
- self.is_master=cfg.getboolean(section, 'is_master')
- except configparser.NoOptionError:
- pass
-
- try:
- self.is_secure=cfg.getboolean(section, 'is_secure')
- except configparser.NoOptionError:
- pass
-
- try:
- raw_calling_format = cfg.get(section, 'calling_format')
- except configparser.NoOptionError:
- raw_calling_format = 'ordinary'
-
- try:
- self.sync_agent_addr = cfg.get(section, 'sync_agent_addr')
- except (configparser.NoSectionError, configparser.NoOptionError):
- pass
-
- try:
- self.sync_agent_port = cfg.getint(section, 'sync_agent_port')
- except (configparser.NoSectionError, configparser.NoOptionError):
- pass
-
- try:
- self.sync_meta_wait = cfg.getint(section, 'sync_meta_wait')
- except (configparser.NoSectionError, configparser.NoOptionError):
- pass
-
-
- try:
- self.calling_format = calling_formats[raw_calling_format]
- except KeyError:
- raise RuntimeError(
- 'calling_format unknown: %r' % raw_calling_format
- )
-
-class TargetConnection:
- def __init__(self, conf, conn):
- self.conf = conf
- self.connection = conn
-
-
-
-class RegionsInfo:
- def __init__(self):
- self.m = munch.Munch()
- self.master = None
- self.secondaries = []
-
- def add(self, name, region_config):
- self.m[name] = region_config
- if (region_config.is_master):
- if not self.master is None:
- raise RuntimeError(
- 'multiple regions defined as master'
- )
- self.master = region_config
- else:
- self.secondaries.append(region_config)
- def get(self, name):
- return self.m[name]
- def get(self):
- return self.m
- def items(self):
- return self.m.items()
-
-regions = RegionsInfo()
-
-
-class RegionsConn:
- def __init__(self):
- self.m = munch.Munch()
- self.default = None
- self.master = None
- self.secondaries = []
-
- def items(self):
- return self.m.items()
-
- def set_default(self, conn):
- self.default = conn
-
- def add(self, name, conn):
- self.m[name] = conn
- if not self.default:
- self.default = conn
- if (conn.conf.is_master):
- self.master = conn
- else:
- self.secondaries.append(conn)
-
-
-# nosetests --processes=N with N>1 is safe
-_multiprocess_can_split_ = True
-
-def setup():
-
- cfg = configparser.RawConfigParser()
- try:
- path = os.environ['S3TEST_CONF']
- except KeyError:
- raise RuntimeError(
- 'To run tests, point environment '
- + 'variable S3TEST_CONF to a config file.',
- )
- cfg.read(path)
-
- global prefix
- global targets
- global slow_backend
-
- try:
- template = cfg.get('fixtures', 'bucket prefix')
- except (configparser.NoSectionError, configparser.NoOptionError):
- template = 'test-{random}-'
- prefix = choose_bucket_prefix(template=template)
-
- try:
- slow_backend = cfg.getboolean('fixtures', 'slow backend')
- except (configparser.NoSectionError, configparser.NoOptionError):
- slow_backend = False
-
- # pull the default_region out, if it exists
- try:
- default_region = cfg.get('fixtures', 'default_region')
- except (configparser.NoSectionError, configparser.NoOptionError):
- default_region = None
-
- s3.clear()
- config.clear()
-
- for section in cfg.sections():
- try:
- (type_, name) = section.split(None, 1)
- except ValueError:
- continue
- if type_ != 'region':
- continue
- regions.add(name, TargetConfig(cfg, section))
-
- for section in cfg.sections():
- try:
- (type_, name) = section.split(None, 1)
- except ValueError:
- continue
- if type_ != 's3':
- continue
-
- if len(regions.get()) == 0:
- regions.add("default", TargetConfig(cfg, section))
-
- config[name] = munch.Munch()
- for var in [
- 'user_id',
- 'display_name',
- 'email',
- 's3website_domain',
- 'host',
- 'port',
- 'is_secure',
- 'kms_keyid',
- 'storage_classes',
- ]:
- try:
- config[name][var] = cfg.get(section, var)
- except configparser.NoOptionError:
- pass
-
- targets[name] = RegionsConn()
-
- for (k, conf) in regions.items():
- conn = boto.s3.connection.S3Connection(
- aws_access_key_id=cfg.get(section, 'access_key'),
- aws_secret_access_key=cfg.get(section, 'secret_key'),
- is_secure=conf.is_secure,
- port=conf.port,
- host=conf.host,
- # TODO test vhost calling format
- calling_format=conf.calling_format,
- )
-
- temp_targetConn = TargetConnection(conf, conn)
- targets[name].add(k, temp_targetConn)
-
- # Explicitly test for and set the default region, if specified.
- # If it was not specified, use the 'is_master' flag to set it.
- if default_region:
- if default_region == name:
- targets[name].set_default(temp_targetConn)
- elif conf.is_master:
- targets[name].set_default(temp_targetConn)
-
- s3[name] = targets[name].default.connection
-
- # WARNING! we actively delete all buckets we see with the prefix
- # we've chosen! Choose your prefix with care, and don't reuse
- # credentials!
-
- # We also assume nobody else is going to use buckets with that
- # prefix. This is racy but given enough randomness, should not
- # really fail.
- nuke_prefixed_buckets(prefix=prefix)
-
-
-def teardown():
- # remove our buckets here also, to avoid littering
- nuke_prefixed_buckets(prefix=prefix)
-
-@pytest.fixture(scope="package")
-def configfile():
- setup()
- yield config
-
-@pytest.fixture(autouse=True)
-def setup_teardown(configfile):
- yield
- teardown()
-
-bucket_counter = itertools.count(1)
-
-
-def get_new_bucket_name():
- """
- Get a bucket name that probably does not exist.
-
- We make every attempt to use a unique random prefix, so if a
- bucket by this name happens to exist, it's ok if tests give
- false negatives.
- """
- name = '{prefix}{num}'.format(
- prefix=prefix,
- num=next(bucket_counter),
- )
- return name
-
-
-def get_new_bucket(target=None, name=None, headers=None):
- """
- Get a bucket that exists and is empty.
-
- Always recreates a bucket from scratch. This is useful to also
- reset ACLs and such.
- """
- if target is None:
- target = targets.main.default
- connection = target.connection
- if name is None:
- name = get_new_bucket_name()
- # the only way for this to fail with a pre-existing bucket is if
- # someone raced us between setup nuke_prefixed_buckets and here;
- # ignore that as astronomically unlikely
- bucket = connection.create_bucket(name, location=target.conf.api_name, headers=headers)
- return bucket
-
-def _make_request(method, bucket, key, body=None, authenticated=False, response_headers=None, request_headers=None, expires_in=100000, path_style=True, timeout=None):
- """
- issue a request for a specified method, on a specified <bucket,key>,
- with a specified (optional) body (encrypted per the connection), and
- return the response (status, reason).
-
- If key is None, then this will be treated as a bucket-level request.
-
- If the request or response headers are None, then default values will be
- provided by later methods.
- """
- if not path_style:
- conn = bucket.connection
- request_headers['Host'] = conn.calling_format.build_host(conn.server_name(), bucket.name)
-
- if authenticated:
- urlobj = None
- if key is not None:
- urlobj = key
- elif bucket is not None:
- urlobj = bucket
- else:
- raise RuntimeError('Unable to find bucket name')
- url = urlobj.generate_url(expires_in, method=method, response_headers=response_headers, headers=request_headers)
- o = urlparse(url)
- path = o.path + '?' + o.query
- else:
- bucketobj = None
- if key is not None:
- path = '/{obj}'.format(obj=key.name)
- bucketobj = key.bucket
- elif bucket is not None:
- path = '/'
- bucketobj = bucket
- else:
- raise RuntimeError('Unable to find bucket name')
- if path_style:
- path = '/{bucket}'.format(bucket=bucketobj.name) + path
-
- return _make_raw_request(host=s3.main.host, port=s3.main.port, method=method, path=path, body=body, request_headers=request_headers, secure=s3.main.is_secure, timeout=timeout)
-
-def _make_bucket_request(method, bucket, body=None, authenticated=False, response_headers=None, request_headers=None, expires_in=100000, path_style=True, timeout=None):
- """
- issue a request for a specified method, on a specified <bucket>,
- with a specified (optional) body (encrypted per the connection), and
- return the response (status, reason)
- """
- return _make_request(method=method, bucket=bucket, key=None, body=body, authenticated=authenticated, response_headers=response_headers, request_headers=request_headers, expires_in=expires_in, path_style=path_style, timeout=timeout)
-
-def _make_raw_request(host, port, method, path, body=None, request_headers=None, secure=False, timeout=None):
- """
- issue a request to a specific host & port, for a specified method, on a
- specified path with a specified (optional) body (encrypted per the
- connection), and return the response (status, reason).
-
- This allows construction of special cases not covered by the bucket/key to
- URL mapping of _make_request/_make_bucket_request.
- """
- if secure:
- class_ = HTTPSConnection
- else:
- class_ = HTTPConnection
-
- if request_headers is None:
- request_headers = {}
-
- c = class_(host, port=port, timeout=timeout)
-
- # TODO: We might have to modify this in future if we need to interact with
- # how httplib.request handles Accept-Encoding and Host.
- c.request(method, path, body=body, headers=request_headers)
-
- res = c.getresponse()
- #c.close()
-
- print(res.status, res.reason)
- return res
-
-
+++ /dev/null
-import json
-
-class Statement(object):
- def __init__(self, action, resource, principal = {"AWS" : "*"}, effect= "Allow", condition = None):
- self.principal = principal
- self.action = action
- self.resource = resource
- self.condition = condition
- self.effect = effect
-
- def to_dict(self):
- d = { "Action" : self.action,
- "Principal" : self.principal,
- "Effect" : self.effect,
- "Resource" : self.resource
- }
-
- if self.condition is not None:
- d["Condition"] = self.condition
-
- return d
-
-class Policy(object):
- def __init__(self):
- self.statements = []
-
- def add_statement(self, s):
- self.statements.append(s)
- return self
-
- def to_json(self):
- policy_dict = {
- "Version" : "2012-10-17",
- "Statement":
- [s.to_dict() for s in self.statements]
- }
-
- return json.dumps(policy_dict)
-
-def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None):
- """
- Helper function to make single statement policies
- """
- s = Statement(action, resource, principal, condition=conditions)
- p = Policy()
- return p.add_statement(s).to_json()
+++ /dev/null
-from io import StringIO
-import boto.connection
-import boto.exception
-import boto.s3.connection
-import boto.s3.acl
-import boto.utils
-import pytest
-import operator
-import random
-import string
-import socket
-import ssl
-import os
-import re
-from email.utils import formatdate
-
-from urllib.parse import urlparse
-
-from boto.s3.connection import S3Connection
-
-from .utils import assert_raises
-
-from email.header import decode_header
-
-from . import (
- configfile,
- setup_teardown,
- _make_raw_request,
- nuke_prefixed_buckets,
- get_new_bucket,
- s3,
- config,
- get_prefix,
- TargetConnection,
- targets,
- )
-
-
-_orig_authorize = None
-_custom_headers = {}
-_remove_headers = []
-
-
-# HeaderS3Connection and _our_authorize are necessary to be able to arbitrarily
-# overwrite headers. Depending on the version of boto, one or the other is
-# necessary. We later determine in setup what needs to be used.
-
-def _update_headers(headers):
- """ update a set of headers with additions/removals
- """
- global _custom_headers, _remove_headers
-
- headers.update(_custom_headers)
-
- for header in _remove_headers:
- try:
- del headers[header]
- except KeyError:
- pass
-
-
-# Note: We need to update the headers twice. The first time so the
-# authentication signing is done correctly. The second time to overwrite any
-# headers modified or created in the authentication step.
-
-class HeaderS3Connection(S3Connection):
- """ establish an authenticated connection w/customized headers
- """
- def fill_in_auth(self, http_request, **kwargs):
- _update_headers(http_request.headers)
- S3Connection.fill_in_auth(self, http_request, **kwargs)
- _update_headers(http_request.headers)
-
- return http_request
-
-
-def _our_authorize(self, connection, **kwargs):
- """ perform an authentication w/customized headers
- """
- _update_headers(self.headers)
- _orig_authorize(self, connection, **kwargs)
- _update_headers(self.headers)
-
-
-@pytest.fixture
-def hook_headers(setup_teardown):
- boto_type = None
- _orig_conn = {}
-
- # we determine what we need to replace by the existence of particular
- # attributes. boto 2.0rc1 as fill_in_auth for S3Connection, while boto 2.0
- # has authorize for HTTPRequest.
- if hasattr(S3Connection, 'fill_in_auth'):
- boto_type = 'S3Connection'
- for conn in s3:
- _orig_conn[conn] = s3[conn]
- header_conn = HeaderS3Connection(
- aws_access_key_id=s3[conn].aws_access_key_id,
- aws_secret_access_key=s3[conn].aws_secret_access_key,
- is_secure=s3[conn].is_secure,
- port=s3[conn].port,
- host=s3[conn].host,
- calling_format=s3[conn].calling_format
- )
-
- s3[conn] = header_conn
- elif hasattr(boto.connection.HTTPRequest, 'authorize'):
- global _orig_authorize
-
- boto_type = 'HTTPRequest'
-
- _orig_authorize = boto.connection.HTTPRequest.authorize
- boto.connection.HTTPRequest.authorize = _our_authorize
- else:
- raise RuntimeError
-
- yield
-
- # replace original functionality depending on the boto version
- if boto_type is 'S3Connection':
- for conn in s3:
- s3[conn] = _orig_conn[conn]
- _orig_conn = {}
- elif boto_type is 'HTTPRequest':
- boto.connection.HTTPRequest.authorize = _orig_authorize
- _orig_authorize = None
- else:
- raise RuntimeError
-
-
-def _clear_custom_headers():
- """ Eliminate any header customizations
- """
- global _custom_headers, _remove_headers
- _custom_headers = {}
- _remove_headers = []
-
-@pytest.fixture(autouse=True)
-def clear_custom_headers(setup_teardown, hook_headers):
- yield
- _clear_custom_headers() # clear headers before teardown()
-
-def _add_custom_headers(headers=None, remove=None):
- """ Define header customizations (additions, replacements, removals)
- """
- global _custom_headers, _remove_headers
- if not _custom_headers:
- _custom_headers = {}
-
- if headers is not None:
- _custom_headers.update(headers)
- if remove is not None:
- _remove_headers.extend(remove)
-
-
-def _setup_bad_object(headers=None, remove=None):
- """ Create a new bucket, add an object w/header customizations
- """
- bucket = get_new_bucket()
-
- _add_custom_headers(headers=headers, remove=remove)
- return bucket.new_key('foo')
-
-#
-# common tests
-#
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_contentlength_none():
- key = _setup_bad_object(remove=('Content-Length',))
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 411
- assert e.reason == 'Length Required'
- assert e.error_code == 'MissingContentLength'
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_rgw
-def test_object_create_bad_contentlength_mismatch_above():
- content = 'bar'
- length = len(content) + 1
-
- key = _setup_bad_object({'Content-Length': length})
-
- # Disable retries since key.should_retry will discard the response with
- # PleaseRetryException.
- def no_retry(response, chunked_transfer): return False
- key.should_retry = no_retry
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, content)
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'RequestTimeout'
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_empty():
- key = _setup_bad_object({'Authorization': ''})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'AccessDenied'
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_date_and_amz_date():
- date = formatdate(usegmt=True)
- key = _setup_bad_object({'Date': date, 'X-Amz-Date': date})
- key.set_contents_from_string('bar')
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_amz_date_and_no_date():
- date = formatdate(usegmt=True)
- key = _setup_bad_object({'X-Amz-Date': date}, ('Date',))
- key.set_contents_from_string('bar')
-
-
-# the teardown is really messed up here. check it out
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_none():
- key = _setup_bad_object(remove=('Authorization',))
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'AccessDenied'
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_contentlength_none():
- _add_custom_headers(remove=('Content-Length',))
- get_new_bucket()
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_acl_create_contentlength_none():
- bucket = get_new_bucket()
- key = bucket.new_key('foo')
- key.set_contents_from_string('blah')
-
- _add_custom_headers(remove=('Content-Length',))
- key.set_acl('public-read')
-
-def _create_new_connection():
- # We're going to need to manually build a connection using bad authorization info.
- # But to save the day, lets just hijack the settings from s3.main. :)
- main = s3.main
- conn = HeaderS3Connection(
- aws_access_key_id=main.aws_access_key_id,
- aws_secret_access_key=main.aws_secret_access_key,
- is_secure=main.is_secure,
- port=main.port,
- host=main.host,
- calling_format=main.calling_format,
- )
- return TargetConnection(targets.main.default.conf, conn)
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_rgw
-def test_bucket_create_bad_contentlength_empty():
- conn = _create_new_connection()
- _add_custom_headers({'Content-Length': ''})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, conn)
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_contentlength_none():
- _add_custom_headers(remove=('Content-Length',))
- bucket = get_new_bucket()
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_authorization_empty():
- _add_custom_headers({'Authorization': ''})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'AccessDenied'
-
-
-# the teardown is really messed up here. check it out
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_authorization_none():
- _add_custom_headers(remove=('Authorization',))
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'AccessDenied'
-
-#
-# AWS2 specific tests
-#
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_contentlength_mismatch_below_aws2():
- check_aws2_support()
- content = 'bar'
- length = len(content) - 1
- key = _setup_bad_object({'Content-Length': length})
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, content)
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'BadDigest'
-
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_incorrect_aws2():
- check_aws2_support()
- key = _setup_bad_object({'Authorization': 'AWS AKIAIGR7ZNNBHC5BKSUB:FWeDfwojDSdS2Ztmpfeubhd9isU='})
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch', 'InvalidAccessKeyId')
-
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_invalid_aws2():
- check_aws2_support()
- key = _setup_bad_object({'Authorization': 'AWS HAHAHA'})
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'InvalidArgument'
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_date_none_aws2():
- check_aws2_support()
- key = _setup_bad_object(remove=('Date',))
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'AccessDenied'
-
-
-@pytest.mark.auth_aws2
-def test_bucket_create_bad_authorization_invalid_aws2():
- check_aws2_support()
- _add_custom_headers({'Authorization': 'AWS HAHAHA'})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'InvalidArgument'
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_date_none_aws2():
- check_aws2_support()
- _add_custom_headers(remove=('Date',))
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'AccessDenied'
-
-#
-# AWS4 specific tests
-#
-
-def check_aws4_support():
- if 'S3_USE_SIGV4' not in os.environ:
- pytest.skip('sigv4 tests not enabled by S3_USE_SIGV4')
-
-def check_aws2_support():
- if 'S3_USE_SIGV4' in os.environ:
- pytest.skip('sigv2 tests disabled by S3_USE_SIGV4')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_md5_invalid_garbage_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Content-MD5':'AWS4 HAHAHA'})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'InvalidDigest'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_contentlength_mismatch_below_aws4():
- check_aws4_support()
- content = 'bar'
- length = len(content) - 1
- key = _setup_bad_object({'Content-Length': length})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, content)
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'XAmzContentSHA256Mismatch'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_authorization_incorrect_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Authorization': 'AWS4-HMAC-SHA256 Credential=AKIAIGR7ZNNBHC5BKSUB/20150930/us-east-1/s3/aws4_request,SignedHeaders=host;user-agent,Signature=FWeDfwojDSdS2Ztmpfeubhd9isU='})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch', 'InvalidAccessKeyId')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_authorization_invalid_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Authorization': 'AWS4-HMAC-SHA256 Credential=HAHAHA'})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code in ('AuthorizationHeaderMalformed', 'InvalidArgument')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_ua_empty_aws4():
- check_aws4_support()
- key = _setup_bad_object({'User-Agent': ''})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'SignatureDoesNotMatch'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_ua_none_aws4():
- check_aws4_support()
- key = _setup_bad_object(remove=('User-Agent',))
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'SignatureDoesNotMatch'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_invalid_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Date': 'Bad Date'})
- key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_invalid_aws4():
- check_aws4_support()
- key = _setup_bad_object({'X-Amz-Date': 'Bad Date'})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_empty_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Date': ''})
- key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_empty_aws4():
- check_aws4_support()
- key = _setup_bad_object({'X-Amz-Date': ''})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_none_aws4():
- check_aws4_support()
- key = _setup_bad_object(remove=('Date',))
- key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_none_aws4():
- check_aws4_support()
- key = _setup_bad_object(remove=('X-Amz-Date',))
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_before_today_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
- key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_before_today_aws4():
- check_aws4_support()
- key = _setup_bad_object({'X-Amz-Date': '20100707T215304Z'})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_after_today_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
- key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_after_today_aws4():
- check_aws4_support()
- key = _setup_bad_object({'X-Amz-Date': '20300707T215304Z'})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_before_epoch_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
- key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_before_epoch_aws4():
- check_aws4_support()
- key = _setup_bad_object({'X-Amz-Date': '19500707T215304Z'})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_after_end_aws4():
- check_aws4_support()
- key = _setup_bad_object({'Date': 'Tue, 07 Jul 9999 21:53:04 GMT'})
- key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_after_end_aws4():
- check_aws4_support()
- key = _setup_bad_object({'X-Amz-Date': '99990707T215304Z'})
-
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_missing_signed_custom_header_aws4():
- check_aws4_support()
- method='PUT'
- expires_in='100000'
- bucket = get_new_bucket()
- key = bucket.new_key('foo')
- body='zoo'
-
- # compute the signature with 'x-amz-foo=bar' in the headers...
- request_headers = {'x-amz-foo':'bar'}
- url = key.generate_url(expires_in, method=method, headers=request_headers)
-
- o = urlparse(url)
- path = o.path + '?' + o.query
-
- # avoid sending 'x-amz-foo=bar' in the headers
- request_headers.pop('x-amz-foo')
-
- res =_make_raw_request(host=s3.main.host, port=s3.main.port, method=method, path=path,
- body=body, request_headers=request_headers, secure=s3.main.is_secure)
-
- assert res.status == 403
- assert res.reason == 'Forbidden'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_missing_signed_header_aws4():
- check_aws4_support()
- method='PUT'
- expires_in='100000'
- bucket = get_new_bucket()
- key = bucket.new_key('foo')
- body='zoo'
-
- # compute the signature...
- request_headers = {}
- url = key.generate_url(expires_in, method=method, headers=request_headers)
-
- o = urlparse(url)
- path = o.path + '?' + o.query
-
- # 'X-Amz-Expires' is missing
- target = r'&X-Amz-Expires=' + expires_in
- path = re.sub(target, '', path)
-
- res =_make_raw_request(host=s3.main.host, port=s3.main.port, method=method, path=path,
- body=body, request_headers=request_headers, secure=s3.main.is_secure)
-
- assert res.status == 403
- assert res.reason == 'Forbidden'
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_authorization_invalid_aws4():
- check_aws4_support()
- _add_custom_headers({'Authorization': 'AWS4 HAHAHA'})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'InvalidArgument'
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_ua_empty_aws4():
- check_aws4_support()
- _add_custom_headers({'User-Agent': ''})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'SignatureDoesNotMatch'
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_ua_none_aws4():
- check_aws4_support()
- _add_custom_headers(remove=('User-Agent',))
-
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'SignatureDoesNotMatch'
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_invalid_aws4():
- check_aws4_support()
- _add_custom_headers({'Date': 'Bad Date'})
- get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_invalid_aws4():
- check_aws4_support()
- _add_custom_headers({'X-Amz-Date': 'Bad Date'})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_empty_aws4():
- check_aws4_support()
- _add_custom_headers({'Date': ''})
- get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_empty_aws4():
- check_aws4_support()
- _add_custom_headers({'X-Amz-Date': ''})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_none_aws4():
- check_aws4_support()
- _add_custom_headers(remove=('Date',))
- get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_none_aws4():
- check_aws4_support()
- _add_custom_headers(remove=('X-Amz-Date',))
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_before_today_aws4():
- check_aws4_support()
- _add_custom_headers({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
- get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_before_today_aws4():
- check_aws4_support()
- _add_custom_headers({'X-Amz-Date': '20100707T215304Z'})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_after_today_aws4():
- check_aws4_support()
- _add_custom_headers({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
- get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_after_today_aws4():
- check_aws4_support()
- _add_custom_headers({'X-Amz-Date': '20300707T215304Z'})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_before_epoch_aws4():
- check_aws4_support()
- _add_custom_headers({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
- get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_before_epoch_aws4():
- check_aws4_support()
- _add_custom_headers({'X-Amz-Date': '19500707T215304Z'})
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
+++ /dev/null
-from io import StringIO
-import boto.exception
-import boto.s3.connection
-import boto.s3.acl
-import boto.s3.lifecycle
-import datetime
-import time
-import email.utils
-import isodate
-import pytest
-import operator
-import socket
-import ssl
-import os
-import requests
-import base64
-import hmac
-import pytz
-import json
-import httplib2
-import threading
-import itertools
-import string
-import random
-import re
-
-from collections import defaultdict
-from urllib.parse import urlparse
-
-from . import utils
-from .utils import assert_raises
-
-from .policy import Policy, Statement, make_json_policy
-
-from . import (
- configfile,
- setup_teardown,
- nuke_prefixed_buckets,
- get_new_bucket,
- get_new_bucket_name,
- s3,
- targets,
- config,
- get_prefix,
- is_slow_backend,
- _make_request,
- _make_bucket_request,
- _make_raw_request,
- )
-
-
-def check_access_denied(fn, *args, **kwargs):
- e = assert_raises(boto.exception.S3ResponseError, fn, *args, **kwargs)
- assert e.status == 403
- assert e.reason == 'Forbidden'
- assert e.error_code == 'AccessDenied'
-
-def check_bad_bucket_name(name):
- """
- Attempt to create a bucket with a specified name, and confirm
- that the request fails because of an invalid bucket name.
- """
- e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, targets.main.default, name)
- assert e.status == 400
- assert e.reason.lower() == 'bad request' # some proxies vary the case
- assert e.error_code == 'InvalidBucketName'
-
-def _create_keys(bucket=None, keys=[]):
- """
- Populate a (specified or new) bucket with objects with
- specified names (and contents identical to their names).
- """
- if bucket is None:
- bucket = get_new_bucket()
-
- for s in keys:
- key = bucket.new_key(s)
- key.set_contents_from_string(s)
-
- return bucket
-
-
-def _get_alt_connection():
- return boto.s3.connection.S3Connection(
- aws_access_key_id=s3['alt'].aws_access_key_id,
- aws_secret_access_key=s3['alt'].aws_secret_access_key,
- is_secure=s3['alt'].is_secure,
- port=s3['alt'].port,
- host=s3['alt'].host,
- calling_format=s3['alt'].calling_format,
- )
-
-
-# Breaks DNS with SubdomainCallingFormat
-@pytest.mark.fails_with_subdomain
-def test_bucket_create_naming_bad_punctuation():
- # characters other than [a-zA-Z0-9._-]
- check_bad_bucket_name('alpha!soup')
-
-def check_versioning(bucket, status):
- try:
- assert bucket.get_versioning_status()['Versioning'] == status
- except KeyError:
- assert status == None
-
-# amazon is eventual consistent, retry a bit if failed
-def check_configure_versioning_retry(bucket, status, expected_string):
- bucket.configure_versioning(status)
-
- read_status = None
-
- for i in range(5):
- try:
- read_status = bucket.get_versioning_status()['Versioning']
- except KeyError:
- read_status = None
-
- if (expected_string == read_status):
- break
-
- time.sleep(1)
-
- assert expected_string == read_status
-
-@pytest.mark.versioning
-@pytest.mark.fails_on_dbstore
-def test_versioning_obj_read_not_exist_null():
- bucket = get_new_bucket()
- check_versioning(bucket, None)
-
- check_configure_versioning_retry(bucket, True, "Enabled")
-
- content = 'fooz'
- objname = 'testobj'
-
- key = bucket.new_key(objname)
- key.set_contents_from_string(content)
-
- key = bucket.get_key(objname, version_id='null')
- assert key == None
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_with_subdomain
-@pytest.mark.appendobject
-@pytest.mark.fails_on_dbstore
-def test_append_object():
- bucket = get_new_bucket()
- key = bucket.new_key('foo')
- expires_in = 100000
- url = key.generate_url(expires_in, method='PUT')
- o = urlparse(url)
- path = o.path + '?' + o.query
- path1 = path + '&append&position=0'
- res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
- path2 = path + '&append&position=3'
- res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
- assert res.status == 200
- assert res.reason == 'OK'
-
- key = bucket.get_key('foo')
- assert key.size == 6
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_with_subdomain
-@pytest.mark.appendobject
-@pytest.mark.fails_on_dbstore
-def test_append_normal_object():
- bucket = get_new_bucket()
- key = bucket.new_key('foo')
- key.set_contents_from_string('abc')
- expires_in = 100000
- url = key.generate_url(expires_in, method='PUT')
- o = urlparse(url)
- path = o.path + '?' + o.query
- path = path + '&append&position=3'
- res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path, body='abc', secure=s3.main.is_secure)
- assert res.status == 409
-
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_with_subdomain
-@pytest.mark.appendobject
-@pytest.mark.fails_on_dbstore
-def test_append_object_position_wrong():
- bucket = get_new_bucket()
- key = bucket.new_key('foo')
- expires_in = 100000
- url = key.generate_url(expires_in, method='PUT')
- o = urlparse(url)
- path = o.path + '?' + o.query
- path1 = path + '&append&position=0'
- res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
- path2 = path + '&append&position=9'
- res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
- assert res.status == 409
- assert int(res.getheader('x-rgw-next-append-position')) == 3
-
-
-# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
-# http://tracker.newdream.net/issues/984
-@pytest.mark.fails_on_rgw
-def test_logging_toggle():
- bucket = get_new_bucket()
- log_bucket = get_new_bucket(targets.main.default, bucket.name + '-log')
- log_bucket.set_as_logging_target()
- bucket.enable_logging(target_bucket=log_bucket, target_prefix=bucket.name)
- bucket.disable_logging()
- # NOTE: this does not actually test whether or not logging works
-
-def list_bucket_storage_class(bucket):
- result = defaultdict(list)
- for k in bucket.get_all_versions():
- result[k.storage_class].append(k)
-
- return result
-
-def transfer_part(bucket, mp_id, mp_keyname, i, part, headers=None):
- """Transfer a part of a multipart upload. Designed to be run in parallel.
- """
- mp = boto.s3.multipart.MultiPartUpload(bucket)
- mp.key_name = mp_keyname
- mp.id = mp_id
- part_out = StringIO(part)
- mp.upload_part_from_file(part_out, i+1, headers=headers)
-
-def generate_random(size, part_size=5*1024*1024):
- """
- Generate the specified number random data.
- (actually each MB is a repetition of the first KB)
- """
- chunk = 1024
- allowed = string.ascii_letters
- for x in range(0, size, part_size):
- strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in range(chunk)])
- s = ''
- left = size - x
- this_part_size = min(left, part_size)
- for y in range(this_part_size // chunk):
- s = s + strpart
- if this_part_size > len(s):
- s = s + strpart[0:this_part_size - len(s)]
- yield s
- if (x == size):
- return
-
-def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None, storage_class=None, resend_parts=[]):
- """
- generate a multi-part upload for a random file of specifed size,
- if requested, generate a list of the parts
- return the upload descriptor
- """
-
- if storage_class is not None:
- if not headers:
- headers = {}
- headers['X-Amz-Storage-Class'] = storage_class
-
- upload = bucket.initiate_multipart_upload(s3_key_name, headers=headers, metadata=metadata)
- s = ''
- for i, part in enumerate(generate_random(size, part_size)):
- s += part
- transfer_part(bucket, upload.id, upload.key_name, i, part, headers)
- if i in resend_parts:
- transfer_part(bucket, upload.id, upload.key_name, i, part, headers)
-
- if do_list is not None:
- l = bucket.list_multipart_uploads()
- l = list(l)
-
- return (upload, s)
-
-def _populate_key(bucket, keyname, size=7*1024*1024, storage_class=None):
- if bucket is None:
- bucket = get_new_bucket()
- key = bucket.new_key(keyname)
- if storage_class:
- key.storage_class = storage_class
- data_str = str(next(generate_random(size, size)))
- data = StringIO(data_str)
- key.set_contents_from_file(fp=data)
- return (key, data_str)
-
-def gen_rand_string(size, chars=string.ascii_uppercase + string.digits):
- return ''.join(random.choice(chars) for _ in range(size))
-
-def verify_object(bucket, k, data=None, storage_class=None):
- if storage_class:
- assert k.storage_class == storage_class
-
- if data:
- read_data = k.get_contents_as_string()
-
- equal = data == read_data.decode() # avoid spamming log if data not equal
- assert equal == True
-
-def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class):
- query_args=None
-
- if dest_key.version_id:
- query_arg='versionId={v}'.format(v=dest_key.version_id)
-
- headers = {}
- headers['X-Amz-Copy-Source'] = '/{bucket}/{object}'.format(bucket=src_bucket.name, object=src_key.name)
- if src_key.version_id:
- headers['X-Amz-Copy-Source-Version-Id'] = src_key.version_id
- headers['X-Amz-Storage-Class'] = storage_class
-
- res = dest_bucket.connection.make_request('PUT', dest_bucket.name, dest_key.name,
- query_args=query_args, headers=headers)
- assert res.status == 200
-
-def _populate_multipart_key(bucket, kname, size, storage_class=None):
- (upload, data) = _multipart_upload(bucket, kname, size, storage_class=storage_class)
- upload.complete_upload()
-
- k = bucket.get_key(kname)
-
- return (k, data)
-
-# Create a lifecycle config. Either days (int) and prefix (string) is given, or rules.
-# Rules is an array of dictionaries, each dict has a 'days' and a 'prefix' key
-def create_lifecycle(days = None, prefix = 'test/', rules = None):
- lifecycle = boto.s3.lifecycle.Lifecycle()
- if rules == None:
- expiration = boto.s3.lifecycle.Expiration(days=days)
- rule = boto.s3.lifecycle.Rule(id=prefix, prefix=prefix, status='Enabled',
- expiration=expiration)
- lifecycle.append(rule)
- else:
- for rule in rules:
- expiration = None
- transition = None
- try:
- expiration = boto.s3.lifecycle.Expiration(days=rule['days'])
- except:
- pass
-
- try:
- transition = rule['transition']
- except:
- pass
-
- _id = rule.get('id',None)
- rule = boto.s3.lifecycle.Rule(id=_id, prefix=rule['prefix'],
- status=rule['status'], expiration=expiration, transition=transition)
- lifecycle.append(rule)
- return lifecycle
-
-def set_lifecycle(rules = None):
- bucket = get_new_bucket()
- lifecycle = create_lifecycle(rules=rules)
- bucket.configure_lifecycle(lifecycle)
- return bucket
-
-def configured_storage_classes():
- sc = [ 'STANDARD' ]
-
- if 'storage_classes' in config['main']:
- extra_sc = re.split('\W+', config['main']['storage_classes'])
-
- for item in extra_sc:
- if item != 'STANDARD':
- sc.append(item)
-
- sc = [i for i in sc if i]
- print("storage classes configured: " + str(sc))
-
- return sc
-
-def lc_transition(days=None, date=None, storage_class=None):
- return boto.s3.lifecycle.Transition(days=days, date=date, storage_class=storage_class)
-
-def lc_transitions(transitions=None):
- result = boto.s3.lifecycle.Transitions()
- for t in transitions:
- result.add_transition(days=t.days, date=t.date, storage_class=t.storage_class)
-
- return result
-
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class():
- sc = configured_storage_classes()
- if len(sc) < 2:
- pytest.skip('requires multiple storage classes')
-
- bucket = get_new_bucket()
-
- for storage_class in sc:
- kname = 'foo-' + storage_class
- k, data = _populate_key(bucket, kname, size=9*1024*1024, storage_class=storage_class)
-
- verify_object(bucket, k, data, storage_class)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class_multipart():
- sc = configured_storage_classes()
- if len(sc) < 2:
- pytest.skip('requires multiple storage classes')
-
- bucket = get_new_bucket()
- size = 11 * 1024 * 1024
-
- for storage_class in sc:
- key = "mymultipart-" + storage_class
- (upload, data) = _multipart_upload(bucket, key, size, storage_class=storage_class)
- upload.complete_upload()
- key2 = bucket.get_key(key)
- assert key2.size == size
- assert key2.storage_class == storage_class
-
-def _do_test_object_modify_storage_class(obj_write_func, size):
- sc = configured_storage_classes()
- if len(sc) < 2:
- pytest.skip('requires multiple storage classes')
-
- bucket = get_new_bucket()
-
- for storage_class in sc:
- kname = 'foo-' + storage_class
- k, data = obj_write_func(bucket, kname, size, storage_class=storage_class)
-
- verify_object(bucket, k, data, storage_class)
-
- for new_storage_class in sc:
- if new_storage_class == storage_class:
- continue
-
- copy_object_storage_class(bucket, k, bucket, k, new_storage_class)
- verify_object(bucket, k, data, storage_class)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_modify_storage_class():
- _do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
-
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_modify_storage_class_multipart():
- _do_test_object_modify_storage_class(_populate_multipart_key, size=11*1024*1024)
-
-def _do_test_object_storage_class_copy(obj_write_func, size):
- sc = configured_storage_classes()
- if len(sc) < 2:
- pytest.skip('requires multiple storage classes')
-
- src_bucket = get_new_bucket()
- dest_bucket = get_new_bucket()
- kname = 'foo'
-
- src_key, data = obj_write_func(src_bucket, kname, size)
- verify_object(src_bucket, src_key, data)
-
- for new_storage_class in sc:
- if new_storage_class == src_key.storage_class:
- continue
-
- dest_key = dest_bucket.get_key('foo-' + new_storage_class, validate=False)
-
- copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, new_storage_class)
- verify_object(dest_bucket, dest_key, data, new_storage_class)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class_copy():
- _do_test_object_storage_class_copy(_populate_key, size=9*1024*1024)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class_copy_multipart():
- _do_test_object_storage_class_copy(_populate_multipart_key, size=9*1024*1024)
-
-class FakeFile(object):
- """
- file that simulates seek, tell, and current character
- """
- def __init__(self, char='A', interrupt=None):
- self.offset = 0
- self.char = bytes(char, 'utf-8')
- self.interrupt = interrupt
-
- def seek(self, offset, whence=os.SEEK_SET):
- if whence == os.SEEK_SET:
- self.offset = offset
- elif whence == os.SEEK_END:
- self.offset = self.size + offset;
- elif whence == os.SEEK_CUR:
- self.offset += offset
-
- def tell(self):
- return self.offset
-
-class FakeWriteFile(FakeFile):
- """
- file that simulates interruptable reads of constant data
- """
- def __init__(self, size, char='A', interrupt=None):
- FakeFile.__init__(self, char, interrupt)
- self.size = size
-
- def read(self, size=-1):
- if size < 0:
- size = self.size - self.offset
- count = min(size, self.size - self.offset)
- self.offset += count
-
- # Sneaky! do stuff before we return (the last time)
- if self.interrupt != None and self.offset == self.size and count > 0:
- self.interrupt()
-
- return self.char*count
-
-class FakeFileVerifier(object):
- """
- file that verifies expected data has been written
- """
- def __init__(self, char=None):
- self.char = char
- self.size = 0
-
- def write(self, data):
- size = len(data)
- if self.char == None:
- self.char = data[0]
- self.size += size
- assert data.decode() == self.char*size
-
-def _verify_atomic_key_data(key, size=-1, char=None):
- """
- Make sure file is of the expected size and (simulated) content
- """
- fp_verify = FakeFileVerifier(char)
- key.get_contents_to_file(fp_verify)
- if size >= 0:
- assert fp_verify.size == size
-
-def _test_atomic_dual_conditional_write(file_size):
- """
- create an object, two sessions writing different contents
- confirm that it is all one or the other
- """
- bucket = get_new_bucket()
- objname = 'testobj'
- key = bucket.new_key(objname)
-
- fp_a = FakeWriteFile(file_size, 'A')
- key.set_contents_from_file(fp_a)
- _verify_atomic_key_data(key, file_size, 'A')
- etag_fp_a = key.etag.replace('"', '').strip()
-
- # get a second key object (for the same key)
- # so both can be writing without interfering
- key2 = bucket.new_key(objname)
-
- # write <file_size> file of C's
- # but before we're done, try to write all B's
- fp_b = FakeWriteFile(file_size, 'B')
- fp_c = FakeWriteFile(file_size, 'C',
- lambda: key2.set_contents_from_file(fp_b, rewind=True, headers={'If-Match': etag_fp_a})
- )
- # key.set_contents_from_file(fp_c, headers={'If-Match': etag_fp_a})
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_c,
- headers={'If-Match': etag_fp_a})
- assert e.status == 412
- assert e.reason == 'Precondition Failed'
- assert e.error_code == 'PreconditionFailed'
-
- # verify the file
- _verify_atomic_key_data(key, file_size, 'B')
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_atomic_dual_conditional_write_1mb():
- _test_atomic_dual_conditional_write(1024*1024)
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_atomic_write_bucket_gone():
- bucket = get_new_bucket()
-
- def remove_bucket():
- bucket.delete()
-
- # create file of A's but delete the bucket it's in before we finish writing
- # all of them
- key = bucket.new_key('foo')
- fp_a = FakeWriteFile(1024*1024, 'A', remove_bucket)
- e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_a)
- assert e.status == 404
- assert e.reason == 'Not Found'
- assert e.error_code == 'NoSuchBucket'
-
-def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
- do_list=None, init_headers=None, part_headers=None,
- metadata=None, resend_parts=[]):
- """
- generate a multi-part upload for a random file of specifed size,
- if requested, generate a list of the parts
- return the upload descriptor
- """
- upload = bucket.initiate_multipart_upload(s3_key_name, headers=init_headers, metadata=metadata)
- s = ''
- for i, part in enumerate(generate_random(size, part_size)):
- s += part
- transfer_part(bucket, upload.id, upload.key_name, i, part, part_headers)
- if i in resend_parts:
- transfer_part(bucket, upload.id, upload.key_name, i, part, part_headers)
-
- if do_list is not None:
- l = bucket.list_multipart_uploads()
- l = list(l)
-
- return (upload, s)
-
-
-
-@pytest.mark.encryption
-@pytest.mark.fails_on_dbstore
-def test_encryption_sse_c_multipart_invalid_chunks_1():
- bucket = get_new_bucket()
- key = "multipart_enc"
- content_type = 'text/bla'
- objlen = 30 * 1024 * 1024
- init_headers = {
- 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
- 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
- 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
- 'Content-Type': content_type
- }
- part_headers = {
- 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
- 'x-amz-server-side-encryption-customer-key': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
- 'x-amz-server-side-encryption-customer-key-md5': 'arxBvwY2V4SiOne6yppVPQ=='
- }
- e = assert_raises(boto.exception.S3ResponseError,
- _multipart_upload_enc, bucket, key, objlen,
- init_headers=init_headers, part_headers=part_headers,
- metadata={'foo': 'bar'})
- assert e.status == 400
-
-@pytest.mark.encryption
-@pytest.mark.fails_on_dbstore
-def test_encryption_sse_c_multipart_invalid_chunks_2():
- bucket = get_new_bucket()
- key = "multipart_enc"
- content_type = 'text/plain'
- objlen = 30 * 1024 * 1024
- init_headers = {
- 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
- 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
- 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
- 'Content-Type': content_type
- }
- part_headers = {
- 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
- 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
- 'x-amz-server-side-encryption-customer-key-md5': 'AAAAAAAAAAAAAAAAAAAAAA=='
- }
- e = assert_raises(boto.exception.S3ResponseError,
- _multipart_upload_enc, bucket, key, objlen,
- init_headers=init_headers, part_headers=part_headers,
- metadata={'foo': 'bar'})
- assert e.status == 400
-
-@pytest.mark.fails_with_subdomain
-@pytest.mark.bucket_policy
-@pytest.mark.fails_on_dbstore
-def test_bucket_policy_different_tenant():
- bucket = get_new_bucket()
- key = bucket.new_key('asdf')
- key.set_contents_from_string('asdf')
- l = bucket.list()
- resource1 = "arn:aws:s3::*:" + bucket.name
- resource2 = "arn:aws:s3::*:" + bucket.name + "/*"
- policy_document = json.dumps(
- {
- "Version": "2012-10-17",
- "Statement": [{
- "Effect": "Allow",
- "Principal": {"AWS": "*"},
- "Action": "s3:ListBucket",
- "Resource": [
- "{}".format(resource1),
- "{}".format(resource2)
- ]
- }]
- })
- bucket.set_policy(policy_document)
-
- new_conn = boto.s3.connection.S3Connection(
- aws_access_key_id=s3['tenant'].aws_access_key_id,
- aws_secret_access_key=s3['tenant'].aws_secret_access_key,
- is_secure=s3['tenant'].is_secure,
- port=s3['tenant'].port,
- host=s3['tenant'].host,
- calling_format=s3['tenant'].calling_format,
- )
- bucket_name = ":" + bucket.name
- b = new_conn.get_bucket(bucket_name)
- b.get_all_keys()
-
-@pytest.mark.bucket_policy
-@pytest.mark.fails_on_dbstore
-def test_bucket_policy_set_condition_operator_end_with_IfExists():
- bucket = _create_keys(keys=['foo'])
- policy = '''{
- "Version":"2012-10-17",
- "Statement": [{
- "Sid": "Allow Public Access to All Objects",
- "Effect": "Allow",
- "Principal": "*",
- "Action": "s3:GetObject",
- "Condition": {
- "StringLikeIfExists": {
- "aws:Referer": "http://www.example.com/*"
- }
- },
- "Resource": "arn:aws:s3:::%s/*"
- }
- ]
- }''' % bucket.name
- assert bucket.set_policy(policy) == True
- res = _make_request('GET', bucket.name, bucket.get_key("foo"),
- request_headers={'referer': 'http://www.example.com/'})
- assert res.status == 200
- res = _make_request('GET', bucket.name, bucket.get_key("foo"),
- request_headers={'referer': 'http://www.example.com/index.html'})
- assert res.status == 200
- res = _make_request('GET', bucket.name, bucket.get_key("foo"))
- assert res.status == 200
- res = _make_request('GET', bucket.name, bucket.get_key("foo"),
- request_headers={'referer': 'http://example.com'})
- assert res.status == 403
-
-def _make_arn_resource(path="*"):
- return "arn:aws:s3:::{}".format(path)
-
-@pytest.mark.tagging
-@pytest.mark.bucket_policy
-@pytest.mark.fails_on_dbstore
-def test_bucket_policy_put_obj_request_obj_tag():
-
- bucket = get_new_bucket()
-
- tag_conditional = {"StringEquals": {
- "s3:RequestObjectTag/security" : "public"
- }}
-
- p = Policy()
- resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
-
- s1 = Statement("s3:PutObject", resource, effect="Allow", condition=tag_conditional)
- policy_document = p.add_statement(s1).to_json()
-
- bucket.set_policy(policy_document)
-
- new_conn = _get_alt_connection()
- bucket1 = new_conn.get_bucket(bucket.name, validate=False)
- key1_str ='testobj'
- key1 = bucket1.new_key(key1_str)
- check_access_denied(key1.set_contents_from_string, key1_str)
-
- headers = {"x-amz-tagging" : "security=public"}
- key1.set_contents_from_string(key1_str, headers=headers)
-
+++ /dev/null
-import sys
-from collections.abc import Container
-import pytest
-import string
-import random
-from pprint import pprint
-import time
-import boto.exception
-import socket
-
-from urllib.parse import urlparse
-
-from .. import common
-
-from . import (
- configfile,
- setup_teardown,
- get_new_bucket,
- get_new_bucket_name,
- s3,
- config,
- _make_raw_request,
- choose_bucket_prefix,
- )
-
-IGNORE_FIELD = 'IGNORETHIS'
-
-SLEEP_INTERVAL = 0.01
-SLEEP_MAX = 2.0
-
-WEBSITE_CONFIGS_XMLFRAG = {
- 'IndexDoc': '<IndexDocument><Suffix>${IndexDocument_Suffix}</Suffix></IndexDocument>${RoutingRules}',
- 'IndexDocErrorDoc': '<IndexDocument><Suffix>${IndexDocument_Suffix}</Suffix></IndexDocument><ErrorDocument><Key>${ErrorDocument_Key}</Key></ErrorDocument>${RoutingRules}',
- 'RedirectAll': '<RedirectAllRequestsTo><HostName>${RedirectAllRequestsTo_HostName}</HostName></RedirectAllRequestsTo>${RoutingRules}',
- 'RedirectAll+Protocol': '<RedirectAllRequestsTo><HostName>${RedirectAllRequestsTo_HostName}</HostName><Protocol>${RedirectAllRequestsTo_Protocol}</Protocol></RedirectAllRequestsTo>${RoutingRules}',
- }
-INDEXDOC_TEMPLATE = '<html><h1>IndexDoc</h1><body>{random}</body></html>'
-ERRORDOC_TEMPLATE = '<html><h1>ErrorDoc</h1><body>{random}</body></html>'
-
-CAN_WEBSITE = None
-
-@pytest.fixture(autouse=True, scope="module")
-def check_can_test_website():
- bucket = get_new_bucket()
- try:
- wsconf = bucket.get_website_configuration()
- return True
- except boto.exception.S3ResponseError as e:
- if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']:
- return True
- elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed':
- pytest.skip('rgw_enable_static_website is false')
- elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden':
- # This is older versions that do not support the website code
- pytest.skip('static website is not implemented')
- elif e.status == 501 and e.error_code == 'NotImplemented':
- pytest.skip('static website is not implemented')
- else:
- raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e)
- finally:
- bucket.delete()
-
-def make_website_config(xml_fragment):
- """
- Take the tedious stuff out of the config
- """
- return '<?xml version="1.0" encoding="UTF-8"?><WebsiteConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">' + xml_fragment + '</WebsiteConfiguration>'
-
-def get_website_url(**kwargs):
- """
- Return the URL to a website page
- """
- proto, bucket, hostname, path = 'http', None, None, '/'
-
- if 'proto' in kwargs:
- proto = kwargs['proto']
- if 'bucket' in kwargs:
- bucket = kwargs['bucket']
- if 'hostname' in kwargs:
- hostname = kwargs['hostname']
- if 'path' in kwargs:
- path = kwargs['path']
-
- if hostname is None and bucket is None:
- return '/' + path.lstrip('/')
-
- domain = config['main']['host']
- if('s3website_domain' in config['main']):
- domain = config['main']['s3website_domain']
- elif('s3website_domain' in config['alt']):
- domain = config['DEFAULT']['s3website_domain']
- if hostname is None and bucket is not None:
- hostname = '%s.%s' % (bucket, domain)
- path = path.lstrip('/')
- return "%s://%s/%s" % (proto, hostname, path)
-
-def _test_website_populate_fragment(xml_fragment, fields):
- for k in ['RoutingRules']:
- if k in list(fields.keys()) and len(fields[k]) > 0:
- fields[k] = '<%s>%s</%s>' % (k, fields[k], k)
- f = {
- 'IndexDocument_Suffix': choose_bucket_prefix(template='index-{random}.html', max_len=32),
- 'ErrorDocument_Key': choose_bucket_prefix(template='error-{random}.html', max_len=32),
- 'RedirectAllRequestsTo_HostName': choose_bucket_prefix(template='{random}.{random}.com', max_len=32),
- 'RoutingRules': ''
- }
- f.update(fields)
- xml_fragment = string.Template(xml_fragment).safe_substitute(**f)
- return xml_fragment, f
-
-def _test_website_prep(bucket, xml_template, hardcoded_fields = {}, expect_fail=None):
- xml_fragment, f = _test_website_populate_fragment(xml_template, hardcoded_fields)
- f['WebsiteConfiguration'] = ''
- if not xml_template:
- bucket.delete_website_configuration()
- return f
-
- config_xmlnew = make_website_config(xml_fragment)
-
- config_xmlold = ''
- try:
- config_xmlold = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True)
- except boto.exception.S3ResponseError as e:
- if str(e.status) == str(404) \
- and ('NoSuchWebsiteConfiguration' in e.body or 'NoSuchWebsiteConfiguration' in e.code or
- 'NoSuchKey' in e.body or 'NoSuchKey' in e.code):
- pass
- else:
- raise e
-
- try:
- bucket.set_website_configuration_xml(common.trim_xml(config_xmlnew))
- config_xmlnew = common.normalize_xml(config_xmlnew, pretty_print=True)
- except boto.exception.S3ResponseError as e:
- if expect_fail is not None:
- if isinstance(expect_fail, dict):
- pass
- elif isinstance(expect_fail, str):
- pass
- raise e
-
- # TODO: in some cases, it takes non-zero time for the config to be applied by AmazonS3
- # We should figure out how to poll for changes better
- # WARNING: eu-west-1 as of 2015/06/22 was taking at least 4 seconds to propogate website configs, esp when you cycle between non-null configs
- time.sleep(0.1)
- config_xmlcmp = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True)
-
- #if config_xmlold is not None:
- # print('old',config_xmlold.replace("\n",''))
- #if config_xmlcmp is not None:
- # print('cmp',config_xmlcmp.replace("\n",''))
- #if config_xmlnew is not None:
- # print('new',config_xmlnew.replace("\n",''))
- # Cleanup for our validation
- common.assert_xml_equal(config_xmlcmp, config_xmlnew)
- #print("config_xmlcmp\n", config_xmlcmp)
- #assert config_xmlnew == config_xmlcmp
- f['WebsiteConfiguration'] = config_xmlcmp
- return f
-
-def __website_expected_reponse_status(res, status, reason):
- if not isinstance(status, Container):
- status = set([status])
- if not isinstance(reason, Container):
- reason = set([reason])
-
- if status is not IGNORE_FIELD:
- assert res.status in status, 'HTTP code was %s should be %s' % (res.status, status)
- if reason is not IGNORE_FIELD:
- assert res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason)
-
-def _website_expected_default_html(**kwargs):
- fields = []
- for k in list(kwargs.keys()):
- # AmazonS3 seems to be inconsistent, some HTML errors include BucketName, but others do not.
- if k is 'BucketName':
- continue
-
- v = kwargs[k]
- if isinstance(v, str):
- v = [v]
- elif not isinstance(v, Container):
- v = [v]
- for v2 in v:
- s = '<li>%s: %s</li>' % (k,v2)
- fields.append(s)
- return fields
-
-def _website_expected_error_response(res, bucket_name, status, reason, code, content=None, body=None):
- if body is None:
- body = res.read()
- print(body)
- __website_expected_reponse_status(res, status, reason)
-
- # Argh, AmazonS3 is really inconsistent, so we have a conditional test!
- # This is most visible if you have an ErrorDoc present
- errorcode = res.getheader('x-amz-error-code', None)
- if errorcode is not None:
- if code is not IGNORE_FIELD:
- assert errorcode == code
-
- if not isinstance(content, Container):
- content = set([content])
- for f in content:
- if f is not IGNORE_FIELD and f is not None:
- f = bytes(f, 'utf-8')
- assert f in body, 'HTML should contain "%s"' % (f, )
-
-def _website_expected_redirect_response(res, status, reason, new_url):
- body = res.read()
- print(body)
- __website_expected_reponse_status(res, status, reason)
- loc = res.getheader('Location', None)
- assert loc == new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,)
- assert len(body) == 0, 'Body of a redirect should be empty'
-
-def _website_request(bucket_name, path, connect_hostname=None, method='GET', timeout=None):
- url = get_website_url(proto='http', bucket=bucket_name, path=path)
- print("url", url)
- o = urlparse(url)
- if connect_hostname is None:
- connect_hostname = o.hostname
- path = o.path + '?' + o.query
- request_headers={}
- request_headers['Host'] = o.hostname
- request_headers['Accept'] = '*/*'
- print('Request: {method} {path}\n{headers}'.format(method=method, path=path, headers=''.join([t[0]+':'+t[1]+"\n" for t in list(request_headers.items())])))
- res = _make_raw_request(connect_hostname, config.main.port, method, path, request_headers=request_headers, secure=False, timeout=timeout)
- for (k,v) in res.getheaders():
- print(k,v)
- return res
-
-# ---------- Non-existant buckets via the website endpoint
-@pytest.mark.s3website
-@pytest.mark.fails_on_rgw
-def test_website_nonexistant_bucket_s3():
- bucket_name = get_new_bucket_name()
- res = _website_request(bucket_name, '')
- _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_s3
-@pytest.mark.fails_on_dbstore
-def test_website_nonexistant_bucket_rgw():
- bucket_name = get_new_bucket_name()
- res = _website_request(bucket_name, '')
- #_website_expected_error_response(res, bucket_name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
- _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
-
-#------------- IndexDocument only, successes
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-@pytest.mark.timeout(10)
-def test_website_public_bucket_list_public_index():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.make_public()
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.make_public()
- #time.sleep(1)
- while bucket.get_key(f['IndexDocument_Suffix']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- body = res.read()
- print(body)
- indexstring = bytes(indexstring, 'utf-8')
- assert body == indexstring # default content should match index.html set content
- __website_expected_reponse_status(res, 200, 'OK')
- indexhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_public_index():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.set_canned_acl('private')
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.make_public()
- #time.sleep(1)
- while bucket.get_key(f['IndexDocument_Suffix']) is None:
- time.sleep(SLEEP_INTERVAL)
-
-
- res = _website_request(bucket.name, '')
- __website_expected_reponse_status(res, 200, 'OK')
- body = res.read()
- print(body)
- indexstring = bytes(indexstring, 'utf-8')
- assert body == indexstring, 'default content should match index.html set content'
- indexhtml.delete()
- bucket.delete()
-
-
-# ---------- IndexDocument only, failures
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.set_canned_acl('private')
- # TODO: wait for sync
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.make_public()
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'))
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.make_public()
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- #time.sleep(1)
- #time.sleep(1)
- while bucket.get_key(f['IndexDocument_Suffix']) is None:
- time.sleep(SLEEP_INTERVAL)
-
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
- indexhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.set_canned_acl('private')
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- ##time.sleep(1)
- while bucket.get_key(f['IndexDocument_Suffix']) is None:
- time.sleep(SLEEP_INTERVAL)
-
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
- indexhtml.delete()
- bucket.delete()
-
-# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty_missingerrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.set_canned_acl('private')
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty_missingerrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.make_public()
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey')
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index_missingerrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.make_public()
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- #time.sleep(1)
- while bucket.get_key(f['IndexDocument_Suffix']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
- indexhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index_missingerrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.set_canned_acl('private')
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- #time.sleep(1)
- while bucket.get_key(f['IndexDocument_Suffix']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
- indexhtml.delete()
- bucket.delete()
-
-# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty_blockederrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.set_canned_acl('private')
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('private')
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- body = res.read()
- print(body)
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
- errorstring = bytes(errorstring, 'utf-8')
- assert errorstring not in body, 'error content should NOT match error.html set content'
-
- errorhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_pubilc_errordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.make_public()
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('public-read')
-
- url = get_website_url(proto='http', bucket=bucket.name, path='')
- o = urlparse(url)
- host = o.hostname
- port = s3.main.port
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect((host, port))
-
- request = "GET / HTTP/1.1\r\nHost:%s.%s:%s\r\n\r\n" % (bucket.name, host, port)
- sock.send(request.encode())
-
- #receive header
- resp = sock.recv(4096)
- print(resp)
-
- #receive body
- resp = sock.recv(4096)
- print('payload length=%d' % len(resp))
- print(resp)
-
- #check if any additional payload is left
- resp_len = 0
- sock.settimeout(2)
- try:
- resp = sock.recv(4096)
- resp_len = len(resp)
- print('invalid payload length=%d' % resp_len)
- print(resp)
- except socket.timeout:
- print('no invalid payload')
-
- assert resp_len == 0, 'invalid payload'
-
- errorhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty_blockederrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.make_public()
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('private')
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- body = res.read()
- print(body)
- _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'), body=body)
- errorstring = bytes(errorstring, 'utf-8')
- assert errorstring not in body, 'error content should match error.html set content'
-
- errorhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index_blockederrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.make_public()
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('private')
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- body = res.read()
- print(body)
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
- errorstring = bytes(errorstring, 'utf-8')
- assert errorstring not in body, 'error content should match error.html set content'
-
- indexhtml.delete()
- errorhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index_blockederrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.set_canned_acl('private')
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('private')
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- body = res.read()
- print(body)
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
- errorstring = bytes(errorstring, 'utf-8')
- assert errorstring not in body, 'error content should match error.html set content'
-
- indexhtml.delete()
- errorhtml.delete()
- bucket.delete()
-
-# ---------- IndexDocument & ErrorDocument, failures with errordoc available
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty_gooderrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.set_canned_acl('private')
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring, policy='public-read')
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
-
- errorhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty_gooderrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.make_public()
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('public-read')
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=[errorstring])
-
- errorhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index_gooderrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.make_public()
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('public-read')
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
-
- indexhtml.delete()
- errorhtml.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index_gooderrordoc():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- bucket.set_canned_acl('private')
- indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
- indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
- indexhtml.set_contents_from_string(indexstring)
- indexhtml.set_canned_acl('private')
- errorhtml = bucket.new_key(f['ErrorDocument_Key'])
- errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
- errorhtml.set_contents_from_string(errorstring)
- errorhtml.set_canned_acl('public-read')
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- res = _website_request(bucket.name, '')
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
-
- indexhtml.delete()
- errorhtml.delete()
- bucket.delete()
-
-# ------ RedirectAll tests
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_bucket_private_redirectall_base():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
- bucket.set_canned_acl('private')
-
- res = _website_request(bucket.name, '')
- new_url = 'http://%s/' % f['RedirectAllRequestsTo_HostName']
- _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_bucket_private_redirectall_path():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
- bucket.set_canned_acl('private')
-
- pathfragment = choose_bucket_prefix(template='/{random}', max_len=16)
-
- res = _website_request(bucket.name, pathfragment)
- new_url = 'http://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment)
- _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_bucket_private_redirectall_path_upgrade():
- bucket = get_new_bucket()
- x = string.Template(WEBSITE_CONFIGS_XMLFRAG['RedirectAll+Protocol']).safe_substitute(RedirectAllRequestsTo_Protocol='https')
- f = _test_website_prep(bucket, x)
- bucket.set_canned_acl('private')
-
- pathfragment = choose_bucket_prefix(template='/{random}', max_len=16)
-
- res = _website_request(bucket.name, pathfragment)
- new_url = 'https://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment)
- _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
- bucket.delete()
-
-# ------ x-amz redirect tests
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_nonwebsite():
- bucket = get_new_bucket()
- #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
- #bucket.set_canned_acl('private')
-
- k = bucket.new_key('page')
- content = 'wrong-content'
- redirect_dest = '/relative'
- headers = {'x-amz-website-redirect-location': redirect_dest}
- k.set_contents_from_string(content, headers=headers, policy='public-read')
- redirect = k.get_redirect()
- assert k.get_redirect() == redirect_dest
-
- res = _website_request(bucket.name, '/page')
- body = res.read()
- print(body)
- expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name)
- # TODO: RGW does not have custom error messages for different 404s yet
- #expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name, Message='The specified bucket does not have a website configuration')
- print(expected_content)
- _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchWebsiteConfiguration', content=expected_content, body=body)
-
- k.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_public_relative():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.make_public()
-
- k = bucket.new_key('page')
- content = 'wrong-content'
- redirect_dest = '/relative'
- headers = {'x-amz-website-redirect-location': redirect_dest}
- k.set_contents_from_string(content, headers=headers, policy='public-read')
- redirect = k.get_redirect()
- assert k.get_redirect() == redirect_dest
-
- res = _website_request(bucket.name, '/page')
- #new_url = get_website_url(bucket_name=bucket.name, path=redirect_dest)
- _website_expected_redirect_response(res, 301, ['Moved Permanently'], redirect_dest)
-
- k.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_public_abs():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.make_public()
-
- k = bucket.new_key('page')
- content = 'wrong-content'
- redirect_dest = 'http://example.com/foo'
- headers = {'x-amz-website-redirect-location': redirect_dest}
- k.set_contents_from_string(content, headers=headers, policy='public-read')
- redirect = k.get_redirect()
- assert k.get_redirect() == redirect_dest
-
- res = _website_request(bucket.name, '/page')
- new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
- _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
- k.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_private_relative():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.make_public()
-
- k = bucket.new_key('page')
- content = 'wrong-content'
- redirect_dest = '/relative'
- headers = {'x-amz-website-redirect-location': redirect_dest}
- k.set_contents_from_string(content, headers=headers, policy='private')
- redirect = k.get_redirect()
- assert k.get_redirect() == redirect_dest
-
- res = _website_request(bucket.name, '/page')
- # We get a 403 because the page is private
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
- k.delete()
- bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_private_abs():
- bucket = get_new_bucket()
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
- bucket.make_public()
-
- k = bucket.new_key('page')
- content = 'wrong-content'
- redirect_dest = 'http://example.com/foo'
- headers = {'x-amz-website-redirect-location': redirect_dest}
- k.set_contents_from_string(content, headers=headers, policy='private')
- redirect = k.get_redirect()
- assert k.get_redirect() == redirect_dest
-
- res = _website_request(bucket.name, '/page')
- new_url = get_website_url(proto='http', hostname='example.com', path='/foo')
- # We get a 403 because the page is private
- _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
- k.delete()
- bucket.delete()
-# ------ RoutingRules tests
-
-# RoutingRules
-ROUTING_RULES = {
- 'empty': '',
- 'AmazonExample1': \
-"""
- <RoutingRule>
- <Condition>
- <KeyPrefixEquals>docs/</KeyPrefixEquals>
- </Condition>
- <Redirect>
- <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
- </Redirect>
- </RoutingRule>
-""",
- 'AmazonExample1+Protocol=https': \
-"""
- <RoutingRule>
- <Condition>
- <KeyPrefixEquals>docs/</KeyPrefixEquals>
- </Condition>
- <Redirect>
- <Protocol>https</Protocol>
- <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
- </Redirect>
- </RoutingRule>
-""",
- 'AmazonExample1+Protocol=https+Hostname=xyzzy': \
-"""
- <RoutingRule>
- <Condition>
- <KeyPrefixEquals>docs/</KeyPrefixEquals>
- </Condition>
- <Redirect>
- <Protocol>https</Protocol>
- <HostName>xyzzy</HostName>
- <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
- </Redirect>
- </RoutingRule>
-""",
- 'AmazonExample1+Protocol=http2': \
-"""
- <RoutingRule>
- <Condition>
- <KeyPrefixEquals>docs/</KeyPrefixEquals>
- </Condition>
- <Redirect>
- <Protocol>http2</Protocol>
- <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
- </Redirect>
- </RoutingRule>
-""",
- 'AmazonExample2': \
-"""
- <RoutingRule>
- <Condition>
- <KeyPrefixEquals>images/</KeyPrefixEquals>
- </Condition>
- <Redirect>
- <ReplaceKeyWith>folderdeleted.html</ReplaceKeyWith>
- </Redirect>
- </RoutingRule>
-""",
- 'AmazonExample2+HttpRedirectCode=TMPL': \
-"""
- <RoutingRule>
- <Condition>
- <KeyPrefixEquals>images/</KeyPrefixEquals>
- </Condition>
- <Redirect>
- <HttpRedirectCode>{HttpRedirectCode}</HttpRedirectCode>
- <ReplaceKeyWith>folderdeleted.html</ReplaceKeyWith>
- </Redirect>
- </RoutingRule>
-""",
- 'AmazonExample3': \
-"""
- <RoutingRule>
- <Condition>
- <HttpErrorCodeReturnedEquals>404</HttpErrorCodeReturnedEquals>
- </Condition>
- <Redirect>
- <HostName>ec2-11-22-333-44.compute-1.amazonaws.com</HostName>
- <ReplaceKeyPrefixWith>report-404/</ReplaceKeyPrefixWith>
- </Redirect>
- </RoutingRule>
-""",
- 'AmazonExample3+KeyPrefixEquals': \
-"""
- <RoutingRule>
- <Condition>
- <KeyPrefixEquals>images/</KeyPrefixEquals>
- <HttpErrorCodeReturnedEquals>404</HttpErrorCodeReturnedEquals>
- </Condition>
- <Redirect>
- <HostName>ec2-11-22-333-44.compute-1.amazonaws.com</HostName>
- <ReplaceKeyPrefixWith>report-404/</ReplaceKeyPrefixWith>
- </Redirect>
- </RoutingRule>
-""",
-}
-
-for k in list(ROUTING_RULES.keys()):
- if len(ROUTING_RULES[k]) > 0:
- ROUTING_RULES[k] = "<!-- %s -->\n%s" % (k, ROUTING_RULES[k])
-
-ROUTING_RULES_TESTS = [
- dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='', location=None, code=200),
- dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/', location=None, code=200),
- dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/x', location=None, code=404),
-
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/', location=None, code=200),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/x', location=None, code=404),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/', location=dict(proto='http',bucket='{bucket_name}',path='/documents/'), code=301),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/x', location=dict(proto='http',bucket='{bucket_name}',path='/documents/x'), code=301),
-
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/', location=None, code=200),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/x', location=None, code=404),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/', location=dict(proto='https',bucket='{bucket_name}',path='/documents/'), code=301),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/x', location=dict(proto='https',bucket='{bucket_name}',path='/documents/x'), code=301),
-
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/', location=None, code=200),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/x', location=None, code=404),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/', location=dict(proto='https',hostname='xyzzy',path='/documents/'), code=301),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/x', location=dict(proto='https',hostname='xyzzy',path='/documents/x'), code=301),
-
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301),
-
-
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/images/x'), code=301),
-
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/x', location=None, code=404),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301),
-]
-
-ROUTING_ERROR_PROTOCOL = dict(code=400, reason='Bad Request', errorcode='InvalidRequest', bodyregex=r'Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.')
-
-ROUTING_RULES_TESTS_ERRORS = [ # TODO: Unused!
- # Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
- dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
-]
-
-VALID_AMZ_REDIRECT = set([301,302,303,304,305,307,308])
-
-# General lots of tests
-for redirect_code in VALID_AMZ_REDIRECT:
- rules = ROUTING_RULES['AmazonExample2+HttpRedirectCode=TMPL'].format(HttpRedirectCode=redirect_code)
- result = redirect_code
- ROUTING_RULES_TESTS.append(
- dict(xml=dict(RoutingRules=rules), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result)
- )
- ROUTING_RULES_TESTS.append(
- dict(xml=dict(RoutingRules=rules), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result)
- )
-
-# TODO:
-# codes other than those in VALID_AMZ_REDIRECT
-# give an error of 'The provided HTTP redirect code (314) is not valid. Valid codes are 3XX except 300.' during setting the website config
-# we should check that we can return that too on ceph
-
-@pytest.fixture
-def routing_setup():
- kwargs = {'obj':[]}
- bucket = get_new_bucket()
- kwargs['bucket'] = bucket
- kwargs['obj'].append(bucket)
- #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
- f = _test_website_prep(bucket, '')
- kwargs.update(f)
- bucket.set_canned_acl('public-read')
-
- k = bucket.new_key('debug-ws.xml')
- kwargs['obj'].append(k)
- k.set_contents_from_string('', policy='public-read')
-
- k = bucket.new_key(f['IndexDocument_Suffix'])
- kwargs['obj'].append(k)
- s = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=64)
- k.set_contents_from_string(s)
- k.set_canned_acl('public-read')
-
- k = bucket.new_key(f['ErrorDocument_Key'])
- kwargs['obj'].append(k)
- s = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=64)
- k.set_contents_from_string(s)
- k.set_canned_acl('public-read')
-
- #time.sleep(1)
- while bucket.get_key(f['ErrorDocument_Key']) is None:
- time.sleep(SLEEP_INTERVAL)
-
- yield kwargs
-
- for o in reversed(kwargs['obj']):
- print('Deleting', str(o))
- o.delete()
-
-def routing_check(*args, **kwargs):
- bucket = kwargs['bucket']
- args=args[0]
- #print(args)
- pprint(args)
- xml_fields = kwargs.copy()
- xml_fields.update(args['xml'])
-
- k = bucket.get_key('debug-ws.xml')
- k.set_contents_from_string(str(args)+str(kwargs), policy='public-read')
-
- pprint(xml_fields)
- f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'], hardcoded_fields=xml_fields)
- #print(f)
- config_xmlcmp = bucket.get_website_configuration_xml()
- config_xmlcmp = common.normalize_xml(config_xmlcmp, pretty_print=True) # For us to read
- res = _website_request(bucket.name, args['url'])
- print(config_xmlcmp)
- new_url = args['location']
- if new_url is not None:
- new_url = get_website_url(**new_url)
- new_url = new_url.format(bucket_name=bucket.name)
- if args['code'] >= 200 and args['code'] < 300:
- #body = res.read()
- #print(body)
- #assert body == args['content'], 'default content should match index.html set content'
- assert int(res.getheader('Content-Length', -1)) > 0
- elif args['code'] >= 300 and args['code'] < 400:
- _website_expected_redirect_response(res, args['code'], IGNORE_FIELD, new_url)
- elif args['code'] >= 400:
- _website_expected_error_response(res, bucket.name, args['code'], IGNORE_FIELD, IGNORE_FIELD)
- else:
- assert(False)
-
-@pytest.mark.s3website_routing_rules
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-@pytest.mark.parametrize('t', ROUTING_RULES_TESTS)
-def test_routing_generator(t, routing_setup):
- if 'xml' in t and 'RoutingRules' in t['xml'] and len(t['xml']['RoutingRules']) > 0:
- t['xml']['RoutingRules'] = common.trim_xml(t['xml']['RoutingRules'])
- routing_check(t, **routing_setup)
+++ /dev/null
-from . import utils
-
-def test_generate():
- FIVE_MB = 5 * 1024 * 1024
- assert len(''.join(utils.generate_random(0))) == 0
- assert len(''.join(utils.generate_random(1))) == 1
- assert len(''.join(utils.generate_random(FIVE_MB - 1))) == FIVE_MB - 1
- assert len(''.join(utils.generate_random(FIVE_MB))) == FIVE_MB
- assert len(''.join(utils.generate_random(FIVE_MB + 1))) == FIVE_MB + 1
+++ /dev/null
-import random
-import requests
-import string
-import time
-
-def assert_raises(excClass, callableObj, *args, **kwargs):
- """
- Like unittest.TestCase.assertRaises, but returns the exception.
- """
- try:
- callableObj(*args, **kwargs)
- except excClass as e:
- return e
- else:
- if hasattr(excClass, '__name__'):
- excName = excClass.__name__
- else:
- excName = str(excClass)
- raise AssertionError("%s not raised" % excName)
-
-def generate_random(size, part_size=5*1024*1024):
- """
- Generate the specified number random data.
- (actually each MB is a repetition of the first KB)
- """
- chunk = 1024
- allowed = string.ascii_letters
- for x in range(0, size, part_size):
- strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in range(chunk)])
- s = ''
- left = size - x
- this_part_size = min(left, part_size)
- for y in range(this_part_size // chunk):
- s = s + strpart
- s = s + strpart[:(this_part_size % chunk)]
- yield s
- if (x == size):
- return
-
-# syncs all the regions except for the one passed in
-def region_sync_meta(targets, region):
-
- for (k, r) in targets.items():
- if r == region:
- continue
- conf = r.conf
- if conf.sync_agent_addr:
- ret = requests.post('http://{addr}:{port}/metadata/incremental'.format(addr = conf.sync_agent_addr, port = conf.sync_agent_port))
- assert ret.status_code == 200
- if conf.sync_meta_wait:
- time.sleep(conf.sync_meta_wait)
-
-
-def get_grantee(policy, permission):
- '''
- Given an object/bucket policy, extract the grantee with the required permission
- '''
-
- for g in policy.acl.grants:
- if g.permission == permission:
- return g.id
keywords='s3 web testing',
install_requires=[
- 'boto >=2.0b4',
'boto3 >=1.0.0',
'PyYAML',
'munch >=2.0.0',