]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
remove boto2 tests and dependency
authorCasey Bodley <cbodley@redhat.com>
Fri, 19 Sep 2025 14:10:20 +0000 (10:10 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 15 Oct 2025 14:52:42 +0000 (10:52 -0400)
boto2 import fails on recent python with:
> E   ModuleNotFoundError: No module named 'boto.vendored.six.moves'

remove all tests under s3tests/ that weren't converted to boto3

Fixes: https://tracker.ceph.com/issues/72876
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 8710644ecdcaa12e9699242b68a2c17d2064237a)

requirements.txt
s3tests/__init__.py [deleted file]
s3tests/common.py [deleted file]
s3tests/functional/__init__.py [deleted file]
s3tests/functional/policy.py [deleted file]
s3tests/functional/test_headers.py [deleted file]
s3tests/functional/test_s3.py [deleted file]
s3tests/functional/test_s3_website.py [deleted file]
s3tests/functional/test_utils.py [deleted file]
s3tests/functional/utils.py [deleted file]
setup.py

index 724e9902e1a2a6f2d669eb988f624332789066dc..59f21519900241620d73edcdae5d7e1d2d3a49bc 100644 (file)
@@ -1,5 +1,4 @@
 PyYAML
-boto >=2.6.0
 boto3 >=1.0.0
 # botocore-1.28 broke v2 signatures, see https://tracker.ceph.com/issues/58059
 botocore <1.28.0
diff --git a/s3tests/__init__.py b/s3tests/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/s3tests/common.py b/s3tests/common.py
deleted file mode 100644 (file)
index 53caa53..0000000
+++ /dev/null
@@ -1,302 +0,0 @@
-import boto.s3.connection
-import munch
-import itertools
-import os
-import random
-import string
-import yaml
-import re
-from lxml import etree
-
-from doctest import Example
-from lxml.doctestcompare import LXMLOutputChecker
-
-s3 = munch.Munch()
-config = munch.Munch()
-prefix = ''
-
-bucket_counter = itertools.count(1)
-key_counter = itertools.count(1)
-
-def choose_bucket_prefix(template, max_len=30):
-    """
-    Choose a prefix for our test buckets, so they're easy to identify.
-
-    Use template and feed it more and more random filler, until it's
-    as long as possible but still below max_len.
-    """
-    rand = ''.join(
-        random.choice(string.ascii_lowercase + string.digits)
-        for c in range(255)
-        )
-
-    while rand:
-        s = template.format(random=rand)
-        if len(s) <= max_len:
-            return s
-        rand = rand[:-1]
-
-    raise RuntimeError(
-        'Bucket prefix template is impossible to fulfill: {template!r}'.format(
-            template=template,
-            ),
-        )
-
-def nuke_bucket(bucket):
-    try:
-        bucket.set_canned_acl('private')
-        # TODO: deleted_cnt and the while loop is a work around for rgw
-        # not sending the
-        deleted_cnt = 1
-        while deleted_cnt:
-            deleted_cnt = 0
-            for key in bucket.list():
-                print('Cleaning bucket {bucket} key {key}'.format(
-                    bucket=bucket,
-                    key=key,
-                    ))
-                key.set_canned_acl('private')
-                key.delete()
-                deleted_cnt += 1
-        bucket.delete()
-    except boto.exception.S3ResponseError as e:
-        # TODO workaround for buggy rgw that fails to send
-        # error_code, remove
-        if (e.status == 403
-            and e.error_code is None
-            and e.body == ''):
-            e.error_code = 'AccessDenied'
-        if e.error_code != 'AccessDenied':
-            print('GOT UNWANTED ERROR', e.error_code)
-            raise
-        # seems like we're not the owner of the bucket; ignore
-        pass
-
-def nuke_prefixed_buckets():
-    for name, conn in list(s3.items()):
-        print('Cleaning buckets from connection {name}'.format(name=name))
-        for bucket in conn.get_all_buckets():
-            if bucket.name.startswith(prefix):
-                print('Cleaning bucket {bucket}'.format(bucket=bucket))
-                nuke_bucket(bucket)
-
-    print('Done with cleanup of test buckets.')
-
-def read_config(fp):
-    config = munch.Munch()
-    g = yaml.safe_load_all(fp)
-    for new in g:
-        config.update(munch.Munchify(new))
-    return config
-
-def connect(conf):
-    mapping = dict(
-        port='port',
-        host='host',
-        is_secure='is_secure',
-        access_key='aws_access_key_id',
-        secret_key='aws_secret_access_key',
-        )
-    kwargs = dict((mapping[k],v) for (k,v) in conf.items() if k in mapping)
-    #process calling_format argument
-    calling_formats = dict(
-        ordinary=boto.s3.connection.OrdinaryCallingFormat(),
-        subdomain=boto.s3.connection.SubdomainCallingFormat(),
-        vhost=boto.s3.connection.VHostCallingFormat(),
-        )
-    kwargs['calling_format'] = calling_formats['ordinary']
-    if 'calling_format' in conf:
-        raw_calling_format = conf['calling_format']
-        try:
-            kwargs['calling_format'] = calling_formats[raw_calling_format]
-        except KeyError:
-            raise RuntimeError(
-                'calling_format unknown: %r' % raw_calling_format
-                )
-    # TODO test vhost calling format
-    conn = boto.s3.connection.S3Connection(**kwargs)
-    return conn
-
-def setup():
-    global s3, config, prefix
-    s3.clear()
-    config.clear()
-
-    try:
-        path = os.environ['S3TEST_CONF']
-    except KeyError:
-        raise RuntimeError(
-            'To run tests, point environment '
-            + 'variable S3TEST_CONF to a config file.',
-            )
-    with file(path) as f:
-        config.update(read_config(f))
-
-    # These 3 should always be present.
-    if 's3' not in config:
-        raise RuntimeError('Your config file is missing the s3 section!')
-    if 'defaults' not in config.s3:
-        raise RuntimeError('Your config file is missing the s3.defaults section!')
-    if 'fixtures' not in config:
-        raise RuntimeError('Your config file is missing the fixtures section!')
-
-    template = config.fixtures.get('bucket prefix', 'test-{random}-')
-    prefix = choose_bucket_prefix(template=template)
-    if prefix == '':
-        raise RuntimeError("Empty Prefix! Aborting!")
-
-    defaults = config.s3.defaults
-    for section in list(config.s3.keys()):
-        if section == 'defaults':
-            continue
-
-        conf = {}
-        conf.update(defaults)
-        conf.update(config.s3[section])
-        conn = connect(conf)
-        s3[section] = conn
-
-    # WARNING! we actively delete all buckets we see with the prefix
-    # we've chosen! Choose your prefix with care, and don't reuse
-    # credentials!
-
-    # We also assume nobody else is going to use buckets with that
-    # prefix. This is racy but given enough randomness, should not
-    # really fail.
-    nuke_prefixed_buckets()
-
-def get_new_bucket(connection=None):
-    """
-    Get a bucket that exists and is empty.
-
-    Always recreates a bucket from scratch. This is useful to also
-    reset ACLs and such.
-    """
-    if connection is None:
-        connection = s3.main
-    name = '{prefix}{num}'.format(
-        prefix=prefix,
-        num=next(bucket_counter),
-        )
-    # the only way for this to fail with a pre-existing bucket is if
-    # someone raced us between setup nuke_prefixed_buckets and here;
-    # ignore that as astronomically unlikely
-    bucket = connection.create_bucket(name)
-    return bucket
-
-def teardown():
-    nuke_prefixed_buckets()
-
-def with_setup_kwargs(setup, teardown=None):
-    """Decorator to add setup and/or teardown methods to a test function::
-
-      @with_setup_args(setup, teardown)
-      def test_something():
-          " ... "
-
-    The setup function should return (kwargs) which will be passed to
-    test function, and teardown function.
-
-    Note that `with_setup_kwargs` is useful *only* for test functions, not for test
-    methods or inside of TestCase subclasses.
-    """
-    def decorate(func):
-        kwargs = {}
-
-        def test_wrapped(*args, **kwargs2):
-            k2 = kwargs.copy()
-            k2.update(kwargs2)
-            k2['testname'] = func.__name__
-            func(*args, **k2)
-
-        test_wrapped.__name__ = func.__name__
-
-        def setup_wrapped():
-            k = setup()
-            kwargs.update(k)
-            if hasattr(func, 'setup'):
-                func.setup()
-        test_wrapped.setup = setup_wrapped
-
-        if teardown:
-            def teardown_wrapped():
-                if hasattr(func, 'teardown'):
-                    func.teardown()
-                teardown(**kwargs)
-
-            test_wrapped.teardown = teardown_wrapped
-        else:
-            if hasattr(func, 'teardown'):
-                test_wrapped.teardown = func.teardown()
-        return test_wrapped
-    return decorate
-
-# Demo case for the above, when you run test_gen():
-# _test_gen will run twice,
-# with the following stderr printing
-# setup_func {'b': 2}
-# testcase ('1',) {'b': 2, 'testname': '_test_gen'}
-# teardown_func {'b': 2}
-# setup_func {'b': 2}
-# testcase () {'b': 2, 'testname': '_test_gen'}
-# teardown_func {'b': 2}
-# 
-#def setup_func():
-#    kwargs = {'b': 2}
-#    print("setup_func", kwargs, file=sys.stderr)
-#    return kwargs
-#
-#def teardown_func(**kwargs):
-#    print("teardown_func", kwargs, file=sys.stderr)
-#
-#@with_setup_kwargs(setup=setup_func, teardown=teardown_func)
-#def _test_gen(*args, **kwargs):
-#    print("testcase", args, kwargs, file=sys.stderr)
-#
-#def test_gen():
-#    yield _test_gen, '1'
-#    yield _test_gen
-
-def trim_xml(xml_str):
-    p = etree.XMLParser(encoding="utf-8", remove_blank_text=True)
-    xml_str = bytes(xml_str, "utf-8")
-    elem = etree.XML(xml_str, parser=p)
-    return etree.tostring(elem, encoding="unicode")
-
-def normalize_xml(xml, pretty_print=True):
-    if xml is None:
-        return xml
-
-    root = etree.fromstring(xml.encode(encoding='ascii'))
-
-    for element in root.iter('*'):
-        if element.text is not None and not element.text.strip():
-            element.text = None
-        if element.text is not None:
-            element.text = element.text.strip().replace("\n", "").replace("\r", "")
-        if element.tail is not None and not element.tail.strip():
-            element.tail = None
-        if element.tail is not None:
-            element.tail = element.tail.strip().replace("\n", "").replace("\r", "")
-
-    # Sort the elements
-    for parent in root.xpath('//*[./*]'): # Search for parent elements
-          parent[:] = sorted(parent,key=lambda x: x.tag)
-
-    xmlstr = etree.tostring(root, encoding="unicode", pretty_print=pretty_print)
-    # there are two different DTD URIs
-    xmlstr = re.sub(r'xmlns="[^"]+"', 'xmlns="s3"', xmlstr)
-    xmlstr = re.sub(r'xmlns=\'[^\']+\'', 'xmlns="s3"', xmlstr)
-    for uri in ['http://doc.s3.amazonaws.com/doc/2006-03-01/', 'http://s3.amazonaws.com/doc/2006-03-01/']:
-        xmlstr = xmlstr.replace(uri, 'URI-DTD')
-    #xmlstr = re.sub(r'>\s+', '>', xmlstr, count=0, flags=re.MULTILINE)
-    return xmlstr
-
-def assert_xml_equal(got, want):
-    assert want is not None, 'Wanted XML cannot be None'
-    if got is None:
-        raise AssertionError('Got input to validate was None')
-    checker = LXMLOutputChecker()
-    if not checker.check_output(want, got, 0):
-        message = checker.output_difference(Example("", want), got, 0)
-        raise AssertionError(message)
diff --git a/s3tests/functional/__init__.py b/s3tests/functional/__init__.py
deleted file mode 100644 (file)
index a3890d4..0000000
+++ /dev/null
@@ -1,498 +0,0 @@
-import sys
-import configparser
-import boto.exception
-import boto.s3.connection
-import munch
-import itertools
-import os
-import random
-import string
-import pytest
-from http.client import HTTPConnection, HTTPSConnection
-from urllib.parse import urlparse
-
-from .utils import region_sync_meta
-
-s3 = munch.Munch()
-config = munch.Munch()
-targets = munch.Munch()
-
-# this will be assigned by setup()
-prefix = None
-
-calling_formats = dict(
-    ordinary=boto.s3.connection.OrdinaryCallingFormat(),
-    subdomain=boto.s3.connection.SubdomainCallingFormat(),
-    vhost=boto.s3.connection.VHostCallingFormat(),
-    )
-
-def get_prefix():
-    assert prefix is not None
-    return prefix
-
-def is_slow_backend():
-    return slow_backend
-
-def choose_bucket_prefix(template, max_len=30):
-    """
-    Choose a prefix for our test buckets, so they're easy to identify.
-
-    Use template and feed it more and more random filler, until it's
-    as long as possible but still below max_len.
-    """
-    rand = ''.join(
-        random.choice(string.ascii_lowercase + string.digits)
-        for c in range(255)
-        )
-
-    while rand:
-        s = template.format(random=rand)
-        if len(s) <= max_len:
-            return s
-        rand = rand[:-1]
-
-    raise RuntimeError(
-        'Bucket prefix template is impossible to fulfill: {template!r}'.format(
-            template=template,
-            ),
-        )
-
-
-def nuke_prefixed_buckets_on_conn(prefix, name, conn):
-    print('Cleaning buckets from connection {name} prefix {prefix!r}.'.format(
-        name=name,
-        prefix=prefix,
-        ))
-
-    for bucket in conn.get_all_buckets():
-        print('prefix=',prefix)
-        if bucket.name.startswith(prefix):
-            print('Cleaning bucket {bucket}'.format(bucket=bucket))
-            success = False
-            for i in range(2):
-                try:
-                    try:
-                        iterator = iter(bucket.list_versions())
-                        # peek into iterator to issue list operation
-                        try:
-                            keys = itertools.chain([next(iterator)], iterator)
-                        except StopIteration:
-                            keys = []  # empty iterator
-                    except boto.exception.S3ResponseError as e:
-                        # some S3 implementations do not support object
-                        # versioning - fall back to listing without versions
-                        if e.error_code != 'NotImplemented':
-                            raise e
-                        keys = bucket.list();
-                    for key in keys:
-                        print('Cleaning bucket {bucket} key {key}'.format(
-                            bucket=bucket,
-                            key=key,
-                            ))
-                        # key.set_canned_acl('private')
-                        bucket.delete_key(key.name, version_id = key.version_id)
-                    try:
-                        bucket.delete()
-                    except boto.exception.S3ResponseError as e:
-                        # if DELETE times out, the retry may see NoSuchBucket
-                        if e.error_code != 'NoSuchBucket':
-                            raise e
-                        pass
-                    success = True
-                except boto.exception.S3ResponseError as e:
-                    if e.error_code != 'AccessDenied':
-                        print('GOT UNWANTED ERROR', e.error_code)
-                        raise
-                    # seems like we don't have permissions set appropriately, we'll
-                    # modify permissions and retry
-                    pass
-
-                if success:
-                    break
-
-                bucket.set_canned_acl('private')
-
-
-def nuke_prefixed_buckets(prefix):
-    # If no regions are specified, use the simple method
-    if targets.main.master == None:
-        for name, conn in list(s3.items()):
-            print('Deleting buckets on {name}'.format(name=name))
-            nuke_prefixed_buckets_on_conn(prefix, name, conn)
-    else: 
-                   # First, delete all buckets on the master connection 
-                   for name, conn in list(s3.items()):
-                       if conn == targets.main.master.connection:
-                           print('Deleting buckets on {name} (master)'.format(name=name))
-                           nuke_prefixed_buckets_on_conn(prefix, name, conn)
-               
-                   # Then sync to propagate deletes to secondaries
-                   region_sync_meta(targets.main, targets.main.master.connection)
-                   print('region-sync in nuke_prefixed_buckets')
-               
-                   # Now delete remaining buckets on any other connection 
-                   for name, conn in list(s3.items()):
-                       if conn != targets.main.master.connection:
-                           print('Deleting buckets on {name} (non-master)'.format(name=name))
-                           nuke_prefixed_buckets_on_conn(prefix, name, conn)
-
-    print('Done with cleanup of test buckets.')
-
-class TargetConfig:
-    def __init__(self, cfg, section):
-        self.port = None
-        self.api_name = ''
-        self.is_master = False
-        self.is_secure = False
-        self.sync_agent_addr = None
-        self.sync_agent_port = 0
-        self.sync_meta_wait = 0
-        try:
-            self.api_name = cfg.get(section, 'api_name')
-        except (configparser.NoSectionError, configparser.NoOptionError):
-            pass
-        try:
-            self.port = cfg.getint(section, 'port')
-        except configparser.NoOptionError:
-            pass
-        try:
-            self.host=cfg.get(section, 'host')
-        except configparser.NoOptionError:
-            raise RuntimeError(
-                'host not specified for section {s}'.format(s=section)
-                )
-        try:
-            self.is_master=cfg.getboolean(section, 'is_master')
-        except configparser.NoOptionError:
-            pass
-
-        try:
-            self.is_secure=cfg.getboolean(section, 'is_secure')
-        except configparser.NoOptionError:
-            pass
-
-        try:
-            raw_calling_format = cfg.get(section, 'calling_format')
-        except configparser.NoOptionError:
-            raw_calling_format = 'ordinary'
-
-        try:
-            self.sync_agent_addr = cfg.get(section, 'sync_agent_addr')
-        except (configparser.NoSectionError, configparser.NoOptionError):
-            pass
-
-        try:
-            self.sync_agent_port = cfg.getint(section, 'sync_agent_port')
-        except (configparser.NoSectionError, configparser.NoOptionError):
-            pass
-
-        try:
-            self.sync_meta_wait = cfg.getint(section, 'sync_meta_wait')
-        except (configparser.NoSectionError, configparser.NoOptionError):
-            pass
-
-
-        try:
-            self.calling_format = calling_formats[raw_calling_format]
-        except KeyError:
-            raise RuntimeError(
-                'calling_format unknown: %r' % raw_calling_format
-                )
-
-class TargetConnection:
-    def __init__(self, conf, conn):
-        self.conf = conf
-        self.connection = conn
-
-
-
-class RegionsInfo:
-    def __init__(self):
-        self.m = munch.Munch()
-        self.master = None
-        self.secondaries = []
-
-    def add(self, name, region_config):
-        self.m[name] = region_config
-        if (region_config.is_master):
-            if not self.master is None:
-                raise RuntimeError(
-                    'multiple regions defined as master'
-                    )
-            self.master = region_config
-        else:
-            self.secondaries.append(region_config)
-    def get(self, name):
-        return self.m[name]
-    def get(self):
-        return self.m
-    def items(self):
-        return self.m.items()
-
-regions = RegionsInfo()
-
-
-class RegionsConn:
-    def __init__(self):
-        self.m = munch.Munch()
-        self.default = None
-        self.master = None
-        self.secondaries = []
-
-    def items(self):
-        return self.m.items()
-
-    def set_default(self, conn):
-        self.default = conn
-
-    def add(self, name, conn):
-        self.m[name] = conn
-        if not self.default:
-            self.default = conn
-        if (conn.conf.is_master):
-            self.master = conn
-        else:
-            self.secondaries.append(conn)
-
-
-# nosetests --processes=N with N>1 is safe
-_multiprocess_can_split_ = True
-
-def setup():
-
-    cfg = configparser.RawConfigParser()
-    try:
-        path = os.environ['S3TEST_CONF']
-    except KeyError:
-        raise RuntimeError(
-            'To run tests, point environment '
-            + 'variable S3TEST_CONF to a config file.',
-            )
-    cfg.read(path)
-
-    global prefix
-    global targets
-    global slow_backend
-
-    try:
-        template = cfg.get('fixtures', 'bucket prefix')
-    except (configparser.NoSectionError, configparser.NoOptionError):
-        template = 'test-{random}-'
-    prefix = choose_bucket_prefix(template=template)
-
-    try:
-        slow_backend = cfg.getboolean('fixtures', 'slow backend')
-    except (configparser.NoSectionError, configparser.NoOptionError):
-        slow_backend = False
-
-    # pull the default_region out, if it exists
-    try:
-        default_region = cfg.get('fixtures', 'default_region')
-    except (configparser.NoSectionError, configparser.NoOptionError):
-        default_region = None
-
-    s3.clear()
-    config.clear()
-
-    for section in cfg.sections():
-        try:
-            (type_, name) = section.split(None, 1)
-        except ValueError:
-            continue
-        if type_ != 'region':
-            continue
-        regions.add(name, TargetConfig(cfg, section))
-
-    for section in cfg.sections():
-        try:
-            (type_, name) = section.split(None, 1)
-        except ValueError:
-            continue
-        if type_ != 's3':
-            continue
-
-        if len(regions.get()) == 0:
-            regions.add("default", TargetConfig(cfg, section))
-
-        config[name] = munch.Munch()
-        for var in [
-            'user_id',
-            'display_name',
-            'email',
-            's3website_domain',
-            'host',
-            'port',
-            'is_secure',
-            'kms_keyid',
-            'storage_classes',
-            ]:
-            try:
-                config[name][var] = cfg.get(section, var)
-            except configparser.NoOptionError:
-                pass
-
-        targets[name] = RegionsConn()
-
-        for (k, conf) in regions.items():
-            conn = boto.s3.connection.S3Connection(
-                aws_access_key_id=cfg.get(section, 'access_key'),
-                aws_secret_access_key=cfg.get(section, 'secret_key'),
-                is_secure=conf.is_secure,
-                port=conf.port,
-                host=conf.host,
-                # TODO test vhost calling format
-                calling_format=conf.calling_format,
-                )
-
-            temp_targetConn = TargetConnection(conf, conn)
-            targets[name].add(k, temp_targetConn)
-
-            # Explicitly test for and set the default region, if specified.
-            # If it was not specified, use the 'is_master' flag to set it.
-            if default_region:
-                if default_region == name:
-                    targets[name].set_default(temp_targetConn)
-            elif conf.is_master:
-                targets[name].set_default(temp_targetConn)
-
-        s3[name] = targets[name].default.connection
-
-    # WARNING! we actively delete all buckets we see with the prefix
-    # we've chosen! Choose your prefix with care, and don't reuse
-    # credentials!
-
-    # We also assume nobody else is going to use buckets with that
-    # prefix. This is racy but given enough randomness, should not
-    # really fail.
-    nuke_prefixed_buckets(prefix=prefix)
-
-
-def teardown():
-    # remove our buckets here also, to avoid littering
-    nuke_prefixed_buckets(prefix=prefix)
-
-@pytest.fixture(scope="package")
-def configfile():
-    setup()
-    yield config
-
-@pytest.fixture(autouse=True)
-def setup_teardown(configfile):
-    yield
-    teardown()
-
-bucket_counter = itertools.count(1)
-
-
-def get_new_bucket_name():
-    """
-    Get a bucket name that probably does not exist.
-
-    We make every attempt to use a unique random prefix, so if a
-    bucket by this name happens to exist, it's ok if tests give
-    false negatives.
-    """
-    name = '{prefix}{num}'.format(
-        prefix=prefix,
-        num=next(bucket_counter),
-        )
-    return name
-
-
-def get_new_bucket(target=None, name=None, headers=None):
-    """
-    Get a bucket that exists and is empty.
-
-    Always recreates a bucket from scratch. This is useful to also
-    reset ACLs and such.
-    """
-    if target is None:
-        target = targets.main.default
-    connection = target.connection
-    if name is None:
-        name = get_new_bucket_name()
-    # the only way for this to fail with a pre-existing bucket is if
-    # someone raced us between setup nuke_prefixed_buckets and here;
-    # ignore that as astronomically unlikely
-    bucket = connection.create_bucket(name, location=target.conf.api_name, headers=headers)
-    return bucket
-
-def _make_request(method, bucket, key, body=None, authenticated=False, response_headers=None, request_headers=None, expires_in=100000, path_style=True, timeout=None):
-    """
-    issue a request for a specified method, on a specified <bucket,key>,
-    with a specified (optional) body (encrypted per the connection), and
-    return the response (status, reason).
-
-    If key is None, then this will be treated as a bucket-level request.
-
-    If the request or response headers are None, then default values will be
-    provided by later methods.
-    """
-    if not path_style:
-        conn = bucket.connection
-        request_headers['Host'] = conn.calling_format.build_host(conn.server_name(), bucket.name)
-
-    if authenticated:
-        urlobj = None
-        if key is not None:
-            urlobj = key
-        elif bucket is not None:
-            urlobj = bucket
-        else:
-            raise RuntimeError('Unable to find bucket name')
-        url = urlobj.generate_url(expires_in, method=method, response_headers=response_headers, headers=request_headers)
-        o = urlparse(url)
-        path = o.path + '?' + o.query
-    else:
-        bucketobj = None
-        if key is not None:
-            path = '/{obj}'.format(obj=key.name)
-            bucketobj = key.bucket
-        elif bucket is not None:
-            path = '/'
-            bucketobj = bucket
-        else:
-            raise RuntimeError('Unable to find bucket name')
-        if path_style:
-            path = '/{bucket}'.format(bucket=bucketobj.name) + path
-
-    return _make_raw_request(host=s3.main.host, port=s3.main.port, method=method, path=path, body=body, request_headers=request_headers, secure=s3.main.is_secure, timeout=timeout)
-
-def _make_bucket_request(method, bucket, body=None, authenticated=False, response_headers=None, request_headers=None, expires_in=100000, path_style=True, timeout=None):
-    """
-    issue a request for a specified method, on a specified <bucket>,
-    with a specified (optional) body (encrypted per the connection), and
-    return the response (status, reason)
-    """
-    return _make_request(method=method, bucket=bucket, key=None, body=body, authenticated=authenticated, response_headers=response_headers, request_headers=request_headers, expires_in=expires_in, path_style=path_style, timeout=timeout)
-
-def _make_raw_request(host, port, method, path, body=None, request_headers=None, secure=False, timeout=None):
-    """
-    issue a request to a specific host & port, for a specified method, on a
-    specified path with a specified (optional) body (encrypted per the
-    connection), and return the response (status, reason).
-
-    This allows construction of special cases not covered by the bucket/key to
-    URL mapping of _make_request/_make_bucket_request.
-    """
-    if secure:
-        class_ = HTTPSConnection
-    else:
-        class_ = HTTPConnection
-
-    if request_headers is None:
-        request_headers = {}
-
-    c = class_(host, port=port, timeout=timeout)
-
-    # TODO: We might have to modify this in future if we need to interact with
-    # how httplib.request handles Accept-Encoding and Host.
-    c.request(method, path, body=body, headers=request_headers)
-
-    res = c.getresponse()
-    #c.close()
-
-    print(res.status, res.reason)
-    return res
-
-
diff --git a/s3tests/functional/policy.py b/s3tests/functional/policy.py
deleted file mode 100644 (file)
index aae5454..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-import json
-
-class Statement(object):
-    def __init__(self, action, resource, principal = {"AWS" : "*"}, effect= "Allow", condition = None):
-        self.principal = principal
-        self.action = action
-        self.resource = resource
-        self.condition = condition
-        self.effect = effect
-
-    def to_dict(self):
-        d = { "Action" : self.action,
-              "Principal" : self.principal,
-              "Effect" : self.effect,
-              "Resource" : self.resource
-        }
-
-        if self.condition is not None:
-            d["Condition"] = self.condition
-
-        return d
-
-class Policy(object):
-    def __init__(self):
-        self.statements = []
-
-    def add_statement(self, s):
-        self.statements.append(s)
-        return self
-
-    def to_json(self):
-        policy_dict = {
-            "Version" : "2012-10-17",
-            "Statement":
-            [s.to_dict() for s in self.statements]
-        }
-
-        return json.dumps(policy_dict)
-
-def make_json_policy(action, resource, principal={"AWS": "*"}, conditions=None):
-    """
-    Helper function to make single statement policies
-    """
-    s = Statement(action, resource, principal, condition=conditions)
-    p = Policy()
-    return p.add_statement(s).to_json()
diff --git a/s3tests/functional/test_headers.py b/s3tests/functional/test_headers.py
deleted file mode 100644 (file)
index 72c9bc9..0000000
+++ /dev/null
@@ -1,767 +0,0 @@
-from io import StringIO
-import boto.connection
-import boto.exception
-import boto.s3.connection
-import boto.s3.acl
-import boto.utils
-import pytest
-import operator
-import random
-import string
-import socket
-import ssl
-import os
-import re
-from email.utils import formatdate
-
-from urllib.parse import urlparse
-
-from boto.s3.connection import S3Connection
-
-from .utils import assert_raises
-
-from email.header import decode_header
-
-from . import (
-    configfile,
-    setup_teardown,
-    _make_raw_request,
-    nuke_prefixed_buckets,
-    get_new_bucket,
-    s3,
-    config,
-    get_prefix,
-    TargetConnection,
-    targets,
-    )
-
-
-_orig_authorize = None
-_custom_headers = {}
-_remove_headers = []
-
-
-# HeaderS3Connection and _our_authorize are necessary to be able to arbitrarily
-# overwrite headers. Depending on the version of boto, one or the other is
-# necessary. We later determine in setup what needs to be used.
-
-def _update_headers(headers):
-    """ update a set of headers with additions/removals
-    """
-    global _custom_headers, _remove_headers
-
-    headers.update(_custom_headers)
-
-    for header in _remove_headers:
-        try:
-            del headers[header]
-        except KeyError:
-            pass
-
-
-# Note: We need to update the headers twice. The first time so the
-# authentication signing is done correctly. The second time to overwrite any
-# headers modified or created in the authentication step.
-
-class HeaderS3Connection(S3Connection):
-    """ establish an authenticated connection w/customized headers
-    """
-    def fill_in_auth(self, http_request, **kwargs):
-        _update_headers(http_request.headers)
-        S3Connection.fill_in_auth(self, http_request, **kwargs)
-        _update_headers(http_request.headers)
-
-        return http_request
-
-
-def _our_authorize(self, connection, **kwargs):
-    """ perform an authentication w/customized headers
-    """
-    _update_headers(self.headers)
-    _orig_authorize(self, connection, **kwargs)
-    _update_headers(self.headers)
-
-
-@pytest.fixture
-def hook_headers(setup_teardown):
-    boto_type = None
-    _orig_conn = {}
-
-    # we determine what we need to replace by the existence of particular
-    # attributes. boto 2.0rc1 as fill_in_auth for S3Connection, while boto 2.0
-    # has authorize for HTTPRequest.
-    if hasattr(S3Connection, 'fill_in_auth'):
-        boto_type = 'S3Connection'
-        for conn in s3:
-            _orig_conn[conn] = s3[conn]
-            header_conn = HeaderS3Connection(
-                aws_access_key_id=s3[conn].aws_access_key_id,
-                aws_secret_access_key=s3[conn].aws_secret_access_key,
-                is_secure=s3[conn].is_secure,
-                port=s3[conn].port,
-                host=s3[conn].host,
-                calling_format=s3[conn].calling_format
-                )
-
-            s3[conn] = header_conn
-    elif hasattr(boto.connection.HTTPRequest, 'authorize'):
-        global _orig_authorize
-
-        boto_type = 'HTTPRequest'
-
-        _orig_authorize = boto.connection.HTTPRequest.authorize
-        boto.connection.HTTPRequest.authorize = _our_authorize
-    else:
-        raise RuntimeError
-
-    yield
-
-    # replace original functionality depending on the boto version
-    if boto_type is 'S3Connection':
-        for conn in s3:
-            s3[conn] = _orig_conn[conn]
-        _orig_conn = {}
-    elif boto_type is 'HTTPRequest':
-        boto.connection.HTTPRequest.authorize = _orig_authorize
-        _orig_authorize = None
-    else:
-        raise RuntimeError
-
-
-def _clear_custom_headers():
-    """ Eliminate any header customizations
-    """
-    global _custom_headers, _remove_headers
-    _custom_headers = {}
-    _remove_headers = []
-
-@pytest.fixture(autouse=True)
-def clear_custom_headers(setup_teardown, hook_headers):
-    yield
-    _clear_custom_headers() # clear headers before teardown()
-
-def _add_custom_headers(headers=None, remove=None):
-    """ Define header customizations (additions, replacements, removals)
-    """
-    global _custom_headers, _remove_headers
-    if not _custom_headers:
-        _custom_headers = {}
-
-    if headers is not None:
-        _custom_headers.update(headers)
-    if remove is not None:
-        _remove_headers.extend(remove)
-
-
-def _setup_bad_object(headers=None, remove=None):
-    """ Create a new bucket, add an object w/header customizations
-    """
-    bucket = get_new_bucket()
-
-    _add_custom_headers(headers=headers, remove=remove)
-    return bucket.new_key('foo')
-
-#
-# common tests
-#
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_contentlength_none():
-    key = _setup_bad_object(remove=('Content-Length',))
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 411
-    assert e.reason == 'Length Required'
-    assert e.error_code == 'MissingContentLength'
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_rgw
-def test_object_create_bad_contentlength_mismatch_above():
-    content = 'bar'
-    length = len(content) + 1
-
-    key = _setup_bad_object({'Content-Length': length})
-
-    # Disable retries since key.should_retry will discard the response with
-    # PleaseRetryException.
-    def no_retry(response, chunked_transfer): return False
-    key.should_retry = no_retry
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, content)
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'RequestTimeout'
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_empty():
-    key = _setup_bad_object({'Authorization': ''})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'AccessDenied'
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_date_and_amz_date():
-    date = formatdate(usegmt=True)
-    key = _setup_bad_object({'Date': date, 'X-Amz-Date': date})
-    key.set_contents_from_string('bar')
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_amz_date_and_no_date():
-    date = formatdate(usegmt=True)
-    key = _setup_bad_object({'X-Amz-Date': date}, ('Date',))
-    key.set_contents_from_string('bar')
-
-
-# the teardown is really messed up here. check it out
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_none():
-    key = _setup_bad_object(remove=('Authorization',))
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'AccessDenied'
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_contentlength_none():
-    _add_custom_headers(remove=('Content-Length',))
-    get_new_bucket()
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_object_acl_create_contentlength_none():
-    bucket = get_new_bucket()
-    key = bucket.new_key('foo')
-    key.set_contents_from_string('blah')
-
-    _add_custom_headers(remove=('Content-Length',))
-    key.set_acl('public-read')
-
-def _create_new_connection():
-    # We're going to need to manually build a connection using bad authorization info.
-    # But to save the day, lets just hijack the settings from s3.main. :)
-    main = s3.main
-    conn = HeaderS3Connection(
-        aws_access_key_id=main.aws_access_key_id,
-        aws_secret_access_key=main.aws_secret_access_key,
-        is_secure=main.is_secure,
-        port=main.port,
-        host=main.host,
-        calling_format=main.calling_format,
-        )
-    return TargetConnection(targets.main.default.conf, conn)
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_rgw
-def test_bucket_create_bad_contentlength_empty():
-    conn = _create_new_connection()
-    _add_custom_headers({'Content-Length': ''})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, conn)
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_contentlength_none():
-    _add_custom_headers(remove=('Content-Length',))
-    bucket = get_new_bucket()
-
-
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_authorization_empty():
-    _add_custom_headers({'Authorization': ''})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'AccessDenied'
-
-
-# the teardown is really messed up here. check it out
-@pytest.mark.auth_common
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_authorization_none():
-    _add_custom_headers(remove=('Authorization',))
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'AccessDenied'
-
-#
-# AWS2 specific tests
-#
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_contentlength_mismatch_below_aws2():
-    check_aws2_support()
-    content = 'bar'
-    length = len(content) - 1
-    key = _setup_bad_object({'Content-Length': length})
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, content)
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'BadDigest'
-
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_incorrect_aws2():
-    check_aws2_support()
-    key = _setup_bad_object({'Authorization': 'AWS AKIAIGR7ZNNBHC5BKSUB:FWeDfwojDSdS2Ztmpfeubhd9isU='})
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch', 'InvalidAccessKeyId')
-
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_authorization_invalid_aws2():
-    check_aws2_support()
-    key = _setup_bad_object({'Authorization': 'AWS HAHAHA'})
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'InvalidArgument'
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_object_create_bad_date_none_aws2():
-    check_aws2_support()
-    key = _setup_bad_object(remove=('Date',))
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'AccessDenied'
-
-
-@pytest.mark.auth_aws2
-def test_bucket_create_bad_authorization_invalid_aws2():
-    check_aws2_support()
-    _add_custom_headers({'Authorization': 'AWS HAHAHA'})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'InvalidArgument'
-
-@pytest.mark.auth_aws2
-@pytest.mark.fails_on_dbstore
-def test_bucket_create_bad_date_none_aws2():
-    check_aws2_support()
-    _add_custom_headers(remove=('Date',))
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'AccessDenied'
-
-#
-# AWS4 specific tests
-#
-
-def check_aws4_support():
-    if 'S3_USE_SIGV4' not in os.environ:
-        pytest.skip('sigv4 tests not enabled by S3_USE_SIGV4')
-
-def check_aws2_support():
-    if 'S3_USE_SIGV4' in os.environ:
-        pytest.skip('sigv2 tests disabled by S3_USE_SIGV4')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_md5_invalid_garbage_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Content-MD5':'AWS4 HAHAHA'})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'InvalidDigest'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_contentlength_mismatch_below_aws4():
-    check_aws4_support()
-    content = 'bar'
-    length = len(content) - 1
-    key = _setup_bad_object({'Content-Length': length})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, content)
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'XAmzContentSHA256Mismatch'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_authorization_incorrect_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Authorization': 'AWS4-HMAC-SHA256 Credential=AKIAIGR7ZNNBHC5BKSUB/20150930/us-east-1/s3/aws4_request,SignedHeaders=host;user-agent,Signature=FWeDfwojDSdS2Ztmpfeubhd9isU='})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch', 'InvalidAccessKeyId')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_authorization_invalid_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Authorization': 'AWS4-HMAC-SHA256 Credential=HAHAHA'})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code in ('AuthorizationHeaderMalformed', 'InvalidArgument')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_ua_empty_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'User-Agent': ''})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'SignatureDoesNotMatch'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_ua_none_aws4():
-    check_aws4_support()
-    key = _setup_bad_object(remove=('User-Agent',))
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'SignatureDoesNotMatch'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_invalid_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Date': 'Bad Date'})
-    key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_invalid_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'X-Amz-Date': 'Bad Date'})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_empty_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Date': ''})
-    key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_empty_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'X-Amz-Date': ''})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_none_aws4():
-    check_aws4_support()
-    key = _setup_bad_object(remove=('Date',))
-    key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_none_aws4():
-    check_aws4_support()
-    key = _setup_bad_object(remove=('X-Amz-Date',))
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_before_today_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
-    key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_before_today_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'X-Amz-Date': '20100707T215304Z'})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_after_today_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
-    key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_after_today_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'X-Amz-Date': '20300707T215304Z'})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_before_epoch_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
-    key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_before_epoch_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'X-Amz-Date': '19500707T215304Z'})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_date_after_end_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'Date': 'Tue, 07 Jul 9999 21:53:04 GMT'})
-    key.set_contents_from_string('bar')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_bad_amz_date_after_end_aws4():
-    check_aws4_support()
-    key = _setup_bad_object({'X-Amz-Date': '99990707T215304Z'})
-
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_string, 'bar')
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_object_create_missing_signed_custom_header_aws4():
-    check_aws4_support()
-    method='PUT'
-    expires_in='100000'
-    bucket = get_new_bucket()
-    key = bucket.new_key('foo')
-    body='zoo'
-
-    # compute the signature with 'x-amz-foo=bar' in the headers...
-    request_headers = {'x-amz-foo':'bar'}
-    url = key.generate_url(expires_in, method=method, headers=request_headers)
-
-    o = urlparse(url)
-    path = o.path + '?' + o.query
-
-    # avoid sending 'x-amz-foo=bar' in the headers
-    request_headers.pop('x-amz-foo')
-
-    res =_make_raw_request(host=s3.main.host, port=s3.main.port, method=method, path=path,
-                           body=body, request_headers=request_headers, secure=s3.main.is_secure)
-
-    assert res.status == 403
-    assert res.reason == 'Forbidden'
-
-
-@pytest.mark.auth_aws4
-def test_object_create_missing_signed_header_aws4():
-    check_aws4_support()
-    method='PUT'
-    expires_in='100000'
-    bucket = get_new_bucket()
-    key = bucket.new_key('foo')
-    body='zoo'
-
-    # compute the signature...
-    request_headers = {}
-    url = key.generate_url(expires_in, method=method, headers=request_headers)
-
-    o = urlparse(url)
-    path = o.path + '?' + o.query
-
-    # 'X-Amz-Expires' is missing
-    target = r'&X-Amz-Expires=' + expires_in
-    path = re.sub(target, '', path)
-
-    res =_make_raw_request(host=s3.main.host, port=s3.main.port, method=method, path=path,
-                           body=body, request_headers=request_headers, secure=s3.main.is_secure)
-
-    assert res.status == 403
-    assert res.reason == 'Forbidden'
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_authorization_invalid_aws4():
-    check_aws4_support()
-    _add_custom_headers({'Authorization': 'AWS4 HAHAHA'})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'InvalidArgument'
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_ua_empty_aws4():
-    check_aws4_support()
-    _add_custom_headers({'User-Agent': ''})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'SignatureDoesNotMatch'
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_ua_none_aws4():
-    check_aws4_support()
-    _add_custom_headers(remove=('User-Agent',))
-
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'SignatureDoesNotMatch'
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_invalid_aws4():
-    check_aws4_support()
-    _add_custom_headers({'Date': 'Bad Date'})
-    get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_invalid_aws4():
-    check_aws4_support()
-    _add_custom_headers({'X-Amz-Date': 'Bad Date'})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_empty_aws4():
-    check_aws4_support()
-    _add_custom_headers({'Date': ''})
-    get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_empty_aws4():
-    check_aws4_support()
-    _add_custom_headers({'X-Amz-Date': ''})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_none_aws4():
-    check_aws4_support()
-    _add_custom_headers(remove=('Date',))
-    get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_none_aws4():
-    check_aws4_support()
-    _add_custom_headers(remove=('X-Amz-Date',))
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_before_today_aws4():
-    check_aws4_support()
-    _add_custom_headers({'Date': 'Tue, 07 Jul 2010 21:53:04 GMT'})
-    get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_before_today_aws4():
-    check_aws4_support()
-    _add_custom_headers({'X-Amz-Date': '20100707T215304Z'})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_after_today_aws4():
-    check_aws4_support()
-    _add_custom_headers({'Date': 'Tue, 07 Jul 2030 21:53:04 GMT'})
-    get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_after_today_aws4():
-    check_aws4_support()
-    _add_custom_headers({'X-Amz-Date': '20300707T215304Z'})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('RequestTimeTooSkewed', 'SignatureDoesNotMatch')
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_date_before_epoch_aws4():
-    check_aws4_support()
-    _add_custom_headers({'Date': 'Tue, 07 Jul 1950 21:53:04 GMT'})
-    get_new_bucket()
-
-
-@pytest.mark.auth_aws4
-def test_bucket_create_bad_amz_date_before_epoch_aws4():
-    check_aws4_support()
-    _add_custom_headers({'X-Amz-Date': '19500707T215304Z'})
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket)
-
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code in ('AccessDenied', 'SignatureDoesNotMatch')
diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py
deleted file mode 100644 (file)
index 9072c80..0000000
+++ /dev/null
@@ -1,774 +0,0 @@
-from io import StringIO
-import boto.exception
-import boto.s3.connection
-import boto.s3.acl
-import boto.s3.lifecycle
-import datetime
-import time
-import email.utils
-import isodate
-import pytest
-import operator
-import socket
-import ssl
-import os
-import requests
-import base64
-import hmac
-import pytz
-import json
-import httplib2
-import threading
-import itertools
-import string
-import random
-import re
-
-from collections import defaultdict
-from urllib.parse import urlparse
-
-from . import utils
-from .utils import assert_raises
-
-from .policy import Policy, Statement, make_json_policy
-
-from . import (
-    configfile,
-    setup_teardown,
-    nuke_prefixed_buckets,
-    get_new_bucket,
-    get_new_bucket_name,
-    s3,
-    targets,
-    config,
-    get_prefix,
-    is_slow_backend,
-    _make_request,
-    _make_bucket_request,
-    _make_raw_request,
-    )
-
-
-def check_access_denied(fn, *args, **kwargs):
-    e = assert_raises(boto.exception.S3ResponseError, fn, *args, **kwargs)
-    assert e.status == 403
-    assert e.reason == 'Forbidden'
-    assert e.error_code == 'AccessDenied'
-
-def check_bad_bucket_name(name):
-    """
-    Attempt to create a bucket with a specified name, and confirm
-    that the request fails because of an invalid bucket name.
-    """
-    e = assert_raises(boto.exception.S3ResponseError, get_new_bucket, targets.main.default, name)
-    assert e.status == 400
-    assert e.reason.lower() == 'bad request' # some proxies vary the case
-    assert e.error_code == 'InvalidBucketName'
-
-def _create_keys(bucket=None, keys=[]):
-    """
-    Populate a (specified or new) bucket with objects with
-    specified names (and contents identical to their names).
-    """
-    if bucket is None:
-        bucket = get_new_bucket()
-
-    for s in keys:
-        key = bucket.new_key(s)
-        key.set_contents_from_string(s)
-
-    return bucket
-
-
-def _get_alt_connection():
-    return boto.s3.connection.S3Connection(
-        aws_access_key_id=s3['alt'].aws_access_key_id,
-        aws_secret_access_key=s3['alt'].aws_secret_access_key,
-        is_secure=s3['alt'].is_secure,
-        port=s3['alt'].port,
-        host=s3['alt'].host,
-        calling_format=s3['alt'].calling_format,
-    )
-
-
-# Breaks DNS with SubdomainCallingFormat
-@pytest.mark.fails_with_subdomain
-def test_bucket_create_naming_bad_punctuation():
-    # characters other than [a-zA-Z0-9._-]
-    check_bad_bucket_name('alpha!soup')
-
-def check_versioning(bucket, status):
-    try:
-        assert bucket.get_versioning_status()['Versioning'] == status
-    except KeyError:
-        assert status == None
-
-# amazon is eventual consistent, retry a bit if failed
-def check_configure_versioning_retry(bucket, status, expected_string):
-    bucket.configure_versioning(status)
-
-    read_status = None
-
-    for i in range(5):
-        try:
-            read_status = bucket.get_versioning_status()['Versioning']
-        except KeyError:
-            read_status = None
-
-        if (expected_string == read_status):
-            break
-
-        time.sleep(1)
-
-    assert expected_string == read_status
-
-@pytest.mark.versioning
-@pytest.mark.fails_on_dbstore
-def test_versioning_obj_read_not_exist_null():
-    bucket = get_new_bucket()
-    check_versioning(bucket, None)
-
-    check_configure_versioning_retry(bucket, True, "Enabled")
-
-    content = 'fooz'
-    objname = 'testobj'
-
-    key = bucket.new_key(objname)
-    key.set_contents_from_string(content)
-
-    key = bucket.get_key(objname, version_id='null')
-    assert key == None
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_with_subdomain
-@pytest.mark.appendobject
-@pytest.mark.fails_on_dbstore
-def test_append_object():
-    bucket = get_new_bucket()
-    key = bucket.new_key('foo')
-    expires_in = 100000
-    url = key.generate_url(expires_in, method='PUT')
-    o = urlparse(url)
-    path = o.path + '?' + o.query
-    path1 = path + '&append&position=0'
-    res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
-    path2 = path + '&append&position=3'
-    res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
-    assert res.status == 200
-    assert res.reason == 'OK'
-
-    key = bucket.get_key('foo')
-    assert key.size == 6 
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_with_subdomain
-@pytest.mark.appendobject
-@pytest.mark.fails_on_dbstore
-def test_append_normal_object():
-    bucket = get_new_bucket()
-    key = bucket.new_key('foo')
-    key.set_contents_from_string('abc')
-    expires_in = 100000
-    url = key.generate_url(expires_in, method='PUT')
-    o = urlparse(url)
-    path = o.path + '?' + o.query
-    path = path + '&append&position=3'
-    res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path, body='abc', secure=s3.main.is_secure)
-    assert res.status == 409
-
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_with_subdomain
-@pytest.mark.appendobject
-@pytest.mark.fails_on_dbstore
-def test_append_object_position_wrong():
-    bucket = get_new_bucket()
-    key = bucket.new_key('foo')
-    expires_in = 100000
-    url = key.generate_url(expires_in, method='PUT')
-    o = urlparse(url)
-    path = o.path + '?' + o.query
-    path1 = path + '&append&position=0'
-    res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path1, body='abc', secure=s3.main.is_secure)
-    path2 = path + '&append&position=9'
-    res = _make_raw_request(host=s3.main.host, port=s3.main.port, method='PUT', path=path2, body='abc', secure=s3.main.is_secure)
-    assert res.status == 409
-    assert int(res.getheader('x-rgw-next-append-position')) == 3
-
-
-# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
-# http://tracker.newdream.net/issues/984
-@pytest.mark.fails_on_rgw
-def test_logging_toggle():
-    bucket = get_new_bucket()
-    log_bucket = get_new_bucket(targets.main.default, bucket.name + '-log')
-    log_bucket.set_as_logging_target()
-    bucket.enable_logging(target_bucket=log_bucket, target_prefix=bucket.name)
-    bucket.disable_logging()
-    # NOTE: this does not actually test whether or not logging works
-
-def list_bucket_storage_class(bucket):
-    result = defaultdict(list)
-    for k in bucket.get_all_versions():
-        result[k.storage_class].append(k)
-
-    return result
-
-def transfer_part(bucket, mp_id, mp_keyname, i, part, headers=None):
-    """Transfer a part of a multipart upload. Designed to be run in parallel.
-    """
-    mp = boto.s3.multipart.MultiPartUpload(bucket)
-    mp.key_name = mp_keyname
-    mp.id = mp_id
-    part_out = StringIO(part)
-    mp.upload_part_from_file(part_out, i+1, headers=headers)
-
-def generate_random(size, part_size=5*1024*1024):
-    """
-    Generate the specified number random data.
-    (actually each MB is a repetition of the first KB)
-    """
-    chunk = 1024
-    allowed = string.ascii_letters
-    for x in range(0, size, part_size):
-        strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in range(chunk)])
-        s = ''
-        left = size - x
-        this_part_size = min(left, part_size)
-        for y in range(this_part_size // chunk):
-            s = s + strpart
-        if this_part_size > len(s):
-            s = s + strpart[0:this_part_size - len(s)]
-        yield s
-        if (x == size):
-            return
-
-def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None, storage_class=None, resend_parts=[]):
-    """
-    generate a multi-part upload for a random file of specifed size,
-    if requested, generate a list of the parts
-    return the upload descriptor
-    """
-
-    if storage_class is not None:
-        if not headers:
-            headers = {}
-        headers['X-Amz-Storage-Class'] = storage_class
-
-    upload = bucket.initiate_multipart_upload(s3_key_name, headers=headers, metadata=metadata)
-    s = ''
-    for i, part in enumerate(generate_random(size, part_size)):
-        s += part
-        transfer_part(bucket, upload.id, upload.key_name, i, part, headers)
-        if i in resend_parts:
-            transfer_part(bucket, upload.id, upload.key_name, i, part, headers)
-
-    if do_list is not None:
-        l = bucket.list_multipart_uploads()
-        l = list(l)
-
-    return (upload, s)
-
-def _populate_key(bucket, keyname, size=7*1024*1024, storage_class=None):
-    if bucket is None:
-        bucket = get_new_bucket()
-    key = bucket.new_key(keyname)
-    if storage_class:
-        key.storage_class = storage_class
-    data_str = str(next(generate_random(size, size)))
-    data = StringIO(data_str)
-    key.set_contents_from_file(fp=data)
-    return (key, data_str)
-
-def gen_rand_string(size, chars=string.ascii_uppercase + string.digits):
-    return ''.join(random.choice(chars) for _ in range(size))
-
-def verify_object(bucket, k, data=None, storage_class=None):
-    if storage_class:
-        assert k.storage_class == storage_class
-
-    if data:
-        read_data = k.get_contents_as_string()
-
-        equal = data == read_data.decode() # avoid spamming log if data not equal
-        assert equal == True
-
-def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class):
-            query_args=None
-
-            if dest_key.version_id:
-                query_arg='versionId={v}'.format(v=dest_key.version_id)
-
-            headers = {}
-            headers['X-Amz-Copy-Source'] = '/{bucket}/{object}'.format(bucket=src_bucket.name, object=src_key.name)
-            if src_key.version_id:
-                headers['X-Amz-Copy-Source-Version-Id'] = src_key.version_id
-            headers['X-Amz-Storage-Class'] = storage_class
-
-            res = dest_bucket.connection.make_request('PUT', dest_bucket.name, dest_key.name,
-                    query_args=query_args, headers=headers)
-            assert res.status == 200
-
-def _populate_multipart_key(bucket, kname, size, storage_class=None):
-    (upload, data) = _multipart_upload(bucket, kname, size, storage_class=storage_class)
-    upload.complete_upload()
-
-    k = bucket.get_key(kname)
-
-    return (k, data)
-
-# Create a lifecycle config.  Either days (int) and prefix (string) is given, or rules.
-# Rules is an array of dictionaries, each dict has a 'days' and a 'prefix' key
-def create_lifecycle(days = None, prefix = 'test/', rules = None):
-    lifecycle = boto.s3.lifecycle.Lifecycle()
-    if rules == None:
-        expiration = boto.s3.lifecycle.Expiration(days=days)
-        rule = boto.s3.lifecycle.Rule(id=prefix, prefix=prefix, status='Enabled',
-                                      expiration=expiration)
-        lifecycle.append(rule)
-    else:
-        for rule in rules:
-            expiration = None
-            transition = None
-            try:
-                expiration = boto.s3.lifecycle.Expiration(days=rule['days'])
-            except:
-                pass
-
-            try:
-                transition = rule['transition']
-            except:
-                pass
-
-            _id = rule.get('id',None)
-            rule = boto.s3.lifecycle.Rule(id=_id, prefix=rule['prefix'],
-                                          status=rule['status'], expiration=expiration, transition=transition)
-            lifecycle.append(rule)
-    return lifecycle
-
-def set_lifecycle(rules = None):
-    bucket = get_new_bucket()
-    lifecycle = create_lifecycle(rules=rules)
-    bucket.configure_lifecycle(lifecycle)
-    return bucket
-
-def configured_storage_classes():
-    sc = [ 'STANDARD' ]
-
-    if 'storage_classes' in config['main']:
-        extra_sc = re.split('\W+', config['main']['storage_classes'])
-
-        for item in extra_sc:
-            if item != 'STANDARD':
-                sc.append(item)
-
-    sc = [i for i in sc if i]
-    print("storage classes configured: " + str(sc))
-
-    return sc
-
-def lc_transition(days=None, date=None, storage_class=None):
-    return boto.s3.lifecycle.Transition(days=days, date=date, storage_class=storage_class)
-
-def lc_transitions(transitions=None):
-    result = boto.s3.lifecycle.Transitions()
-    for t in transitions:
-        result.add_transition(days=t.days, date=t.date, storage_class=t.storage_class)
-
-    return result
-
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class():
-    sc = configured_storage_classes()
-    if len(sc) < 2:
-        pytest.skip('requires multiple storage classes')
-
-    bucket = get_new_bucket()
-
-    for storage_class in sc:
-        kname = 'foo-' + storage_class
-        k, data = _populate_key(bucket, kname, size=9*1024*1024, storage_class=storage_class)
-
-        verify_object(bucket, k, data, storage_class)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class_multipart():
-    sc = configured_storage_classes()
-    if len(sc) < 2:
-        pytest.skip('requires multiple storage classes')
-
-    bucket = get_new_bucket()
-    size = 11 * 1024 * 1024
-
-    for storage_class in sc:
-        key = "mymultipart-" + storage_class
-        (upload, data) = _multipart_upload(bucket, key, size, storage_class=storage_class)
-        upload.complete_upload()
-        key2 = bucket.get_key(key)
-        assert key2.size == size
-        assert key2.storage_class == storage_class
-
-def _do_test_object_modify_storage_class(obj_write_func, size):
-    sc = configured_storage_classes()
-    if len(sc) < 2:
-        pytest.skip('requires multiple storage classes')
-
-    bucket = get_new_bucket()
-
-    for storage_class in sc:
-        kname = 'foo-' + storage_class
-        k, data = obj_write_func(bucket, kname, size, storage_class=storage_class)
-
-        verify_object(bucket, k, data, storage_class)
-
-        for new_storage_class in sc:
-            if new_storage_class == storage_class:
-                continue
-
-            copy_object_storage_class(bucket, k, bucket, k, new_storage_class)
-            verify_object(bucket, k, data, storage_class)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_modify_storage_class():
-    _do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
-
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_modify_storage_class_multipart():
-    _do_test_object_modify_storage_class(_populate_multipart_key, size=11*1024*1024)
-
-def _do_test_object_storage_class_copy(obj_write_func, size):
-    sc = configured_storage_classes()
-    if len(sc) < 2:
-        pytest.skip('requires multiple storage classes')
-
-    src_bucket = get_new_bucket()
-    dest_bucket = get_new_bucket()
-    kname = 'foo'
-
-    src_key, data = obj_write_func(src_bucket, kname, size)
-    verify_object(src_bucket, src_key, data)
-
-    for new_storage_class in sc:
-        if new_storage_class == src_key.storage_class:
-            continue
-
-        dest_key = dest_bucket.get_key('foo-' + new_storage_class, validate=False)
-
-        copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, new_storage_class)
-        verify_object(dest_bucket, dest_key, data, new_storage_class)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class_copy():
-    _do_test_object_storage_class_copy(_populate_key, size=9*1024*1024)
-
-@pytest.mark.storage_class
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_object_storage_class_copy_multipart():
-    _do_test_object_storage_class_copy(_populate_multipart_key, size=9*1024*1024)
-
-class FakeFile(object):
-    """
-    file that simulates seek, tell, and current character
-    """
-    def __init__(self, char='A', interrupt=None):
-        self.offset = 0
-        self.char = bytes(char, 'utf-8')
-        self.interrupt = interrupt
-
-    def seek(self, offset, whence=os.SEEK_SET):
-        if whence == os.SEEK_SET:
-            self.offset = offset
-        elif whence == os.SEEK_END:
-            self.offset = self.size + offset;
-        elif whence == os.SEEK_CUR:
-            self.offset += offset
-
-    def tell(self):
-        return self.offset
-
-class FakeWriteFile(FakeFile):
-    """
-    file that simulates interruptable reads of constant data
-    """
-    def __init__(self, size, char='A', interrupt=None):
-        FakeFile.__init__(self, char, interrupt)
-        self.size = size
-
-    def read(self, size=-1):
-        if size < 0:
-            size = self.size - self.offset
-        count = min(size, self.size - self.offset)
-        self.offset += count
-
-        # Sneaky! do stuff before we return (the last time)
-        if self.interrupt != None and self.offset == self.size and count > 0:
-            self.interrupt()
-
-        return self.char*count
-
-class FakeFileVerifier(object):
-    """
-    file that verifies expected data has been written
-    """
-    def __init__(self, char=None):
-        self.char = char
-        self.size = 0
-
-    def write(self, data):
-        size = len(data)
-        if self.char == None:
-            self.char = data[0]
-        self.size += size
-        assert data.decode() == self.char*size
-
-def _verify_atomic_key_data(key, size=-1, char=None):
-    """
-    Make sure file is of the expected size and (simulated) content
-    """
-    fp_verify = FakeFileVerifier(char)
-    key.get_contents_to_file(fp_verify)
-    if size >= 0:
-        assert fp_verify.size == size
-
-def _test_atomic_dual_conditional_write(file_size):
-    """
-    create an object, two sessions writing different contents
-    confirm that it is all one or the other
-    """
-    bucket = get_new_bucket()
-    objname = 'testobj'
-    key = bucket.new_key(objname)
-
-    fp_a = FakeWriteFile(file_size, 'A')
-    key.set_contents_from_file(fp_a)
-    _verify_atomic_key_data(key, file_size, 'A')
-    etag_fp_a = key.etag.replace('"', '').strip()
-
-    # get a second key object (for the same key)
-    # so both can be writing without interfering
-    key2 = bucket.new_key(objname)
-
-    # write <file_size> file of C's
-    # but before we're done, try to write all B's
-    fp_b = FakeWriteFile(file_size, 'B')
-    fp_c = FakeWriteFile(file_size, 'C',
-        lambda: key2.set_contents_from_file(fp_b, rewind=True, headers={'If-Match': etag_fp_a})
-        )
-    # key.set_contents_from_file(fp_c, headers={'If-Match': etag_fp_a})
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_c,
-                      headers={'If-Match': etag_fp_a})
-    assert e.status == 412
-    assert e.reason == 'Precondition Failed'
-    assert e.error_code == 'PreconditionFailed'
-
-    # verify the file
-    _verify_atomic_key_data(key, file_size, 'B')
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_atomic_dual_conditional_write_1mb():
-    _test_atomic_dual_conditional_write(1024*1024)
-
-@pytest.mark.fails_on_aws
-@pytest.mark.fails_on_dbstore
-def test_atomic_write_bucket_gone():
-    bucket = get_new_bucket()
-
-    def remove_bucket():
-        bucket.delete()
-
-    # create file of A's but delete the bucket it's in before we finish writing
-    # all of them
-    key = bucket.new_key('foo')
-    fp_a = FakeWriteFile(1024*1024, 'A', remove_bucket)
-    e = assert_raises(boto.exception.S3ResponseError, key.set_contents_from_file, fp_a)
-    assert e.status == 404
-    assert e.reason == 'Not Found'
-    assert e.error_code == 'NoSuchBucket'
-
-def _multipart_upload_enc(bucket, s3_key_name, size, part_size=5*1024*1024,
-                          do_list=None, init_headers=None, part_headers=None,
-                          metadata=None, resend_parts=[]):
-    """
-    generate a multi-part upload for a random file of specifed size,
-    if requested, generate a list of the parts
-    return the upload descriptor
-    """
-    upload = bucket.initiate_multipart_upload(s3_key_name, headers=init_headers, metadata=metadata)
-    s = ''
-    for i, part in enumerate(generate_random(size, part_size)):
-        s += part
-        transfer_part(bucket, upload.id, upload.key_name, i, part, part_headers)
-        if i in resend_parts:
-            transfer_part(bucket, upload.id, upload.key_name, i, part, part_headers)
-
-    if do_list is not None:
-        l = bucket.list_multipart_uploads()
-        l = list(l)
-
-    return (upload, s)
-
-
-
-@pytest.mark.encryption
-@pytest.mark.fails_on_dbstore
-def test_encryption_sse_c_multipart_invalid_chunks_1():
-    bucket = get_new_bucket()
-    key = "multipart_enc"
-    content_type = 'text/bla'
-    objlen = 30 * 1024 * 1024
-    init_headers = {
-        'x-amz-server-side-encryption-customer-algorithm': 'AES256',
-        'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
-        'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
-        'Content-Type': content_type
-    }
-    part_headers = {
-        'x-amz-server-side-encryption-customer-algorithm': 'AES256',
-        'x-amz-server-side-encryption-customer-key': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
-        'x-amz-server-side-encryption-customer-key-md5': 'arxBvwY2V4SiOne6yppVPQ=='
-    }
-    e = assert_raises(boto.exception.S3ResponseError,
-                      _multipart_upload_enc, bucket, key, objlen,
-                      init_headers=init_headers, part_headers=part_headers,
-                      metadata={'foo': 'bar'})
-    assert e.status == 400
-
-@pytest.mark.encryption
-@pytest.mark.fails_on_dbstore
-def test_encryption_sse_c_multipart_invalid_chunks_2():
-    bucket = get_new_bucket()
-    key = "multipart_enc"
-    content_type = 'text/plain'
-    objlen = 30 * 1024 * 1024
-    init_headers = {
-        'x-amz-server-side-encryption-customer-algorithm': 'AES256',
-        'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
-        'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==',
-        'Content-Type': content_type
-    }
-    part_headers = {
-        'x-amz-server-side-encryption-customer-algorithm': 'AES256',
-        'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
-        'x-amz-server-side-encryption-customer-key-md5': 'AAAAAAAAAAAAAAAAAAAAAA=='
-    }
-    e = assert_raises(boto.exception.S3ResponseError,
-                      _multipart_upload_enc, bucket, key, objlen,
-                      init_headers=init_headers, part_headers=part_headers,
-                      metadata={'foo': 'bar'})
-    assert e.status == 400
-
-@pytest.mark.fails_with_subdomain
-@pytest.mark.bucket_policy
-@pytest.mark.fails_on_dbstore
-def test_bucket_policy_different_tenant():
-    bucket = get_new_bucket()
-    key = bucket.new_key('asdf')
-    key.set_contents_from_string('asdf')
-    l = bucket.list()
-    resource1 = "arn:aws:s3::*:" + bucket.name
-    resource2 = "arn:aws:s3::*:" + bucket.name + "/*"
-    policy_document = json.dumps(
-    {
-        "Version": "2012-10-17",
-        "Statement": [{
-        "Effect": "Allow",
-        "Principal": {"AWS": "*"},
-        "Action": "s3:ListBucket",
-        "Resource": [
-            "{}".format(resource1),
-            "{}".format(resource2)
-          ]
-        }]
-     })
-    bucket.set_policy(policy_document)
-
-    new_conn = boto.s3.connection.S3Connection(
-        aws_access_key_id=s3['tenant'].aws_access_key_id,
-        aws_secret_access_key=s3['tenant'].aws_secret_access_key,
-        is_secure=s3['tenant'].is_secure,
-        port=s3['tenant'].port,
-        host=s3['tenant'].host,
-        calling_format=s3['tenant'].calling_format,
-        )
-    bucket_name = ":" + bucket.name
-    b = new_conn.get_bucket(bucket_name)
-    b.get_all_keys()
-
-@pytest.mark.bucket_policy
-@pytest.mark.fails_on_dbstore
-def test_bucket_policy_set_condition_operator_end_with_IfExists():
-    bucket = _create_keys(keys=['foo'])
-    policy = '''{
-      "Version":"2012-10-17",
-      "Statement": [{
-        "Sid": "Allow Public Access to All Objects",
-        "Effect": "Allow",
-        "Principal": "*",
-        "Action": "s3:GetObject",
-        "Condition": {
-                    "StringLikeIfExists": {
-                        "aws:Referer": "http://www.example.com/*"
-                    }
-                },
-        "Resource": "arn:aws:s3:::%s/*"
-      }
-     ]
-    }''' % bucket.name
-    assert bucket.set_policy(policy) == True
-    res = _make_request('GET', bucket.name, bucket.get_key("foo"),
-                        request_headers={'referer': 'http://www.example.com/'})
-    assert res.status == 200
-    res = _make_request('GET', bucket.name, bucket.get_key("foo"),
-                        request_headers={'referer': 'http://www.example.com/index.html'})
-    assert res.status == 200
-    res = _make_request('GET', bucket.name, bucket.get_key("foo"))
-    assert res.status == 200
-    res = _make_request('GET', bucket.name, bucket.get_key("foo"),
-                        request_headers={'referer': 'http://example.com'})
-    assert res.status == 403
-
-def _make_arn_resource(path="*"):
-    return "arn:aws:s3:::{}".format(path)
-
-@pytest.mark.tagging
-@pytest.mark.bucket_policy
-@pytest.mark.fails_on_dbstore
-def test_bucket_policy_put_obj_request_obj_tag():
-
-    bucket = get_new_bucket()
-
-    tag_conditional = {"StringEquals": {
-        "s3:RequestObjectTag/security" : "public"
-    }}
-
-    p = Policy()
-    resource = _make_arn_resource("{}/{}".format(bucket.name, "*"))
-
-    s1 = Statement("s3:PutObject", resource, effect="Allow", condition=tag_conditional)
-    policy_document = p.add_statement(s1).to_json()
-
-    bucket.set_policy(policy_document)
-
-    new_conn = _get_alt_connection()
-    bucket1 = new_conn.get_bucket(bucket.name, validate=False)
-    key1_str ='testobj'
-    key1  = bucket1.new_key(key1_str)
-    check_access_denied(key1.set_contents_from_string, key1_str)
-
-    headers = {"x-amz-tagging" : "security=public"}
-    key1.set_contents_from_string(key1_str, headers=headers)
-
diff --git a/s3tests/functional/test_s3_website.py b/s3tests/functional/test_s3_website.py
deleted file mode 100644 (file)
index 4df5f90..0000000
+++ /dev/null
@@ -1,1083 +0,0 @@
-import sys
-from collections.abc import Container
-import pytest
-import string
-import random
-from pprint import pprint
-import time
-import boto.exception
-import socket
-
-from urllib.parse import urlparse
-
-from .. import common
-
-from . import (
-    configfile,
-    setup_teardown,
-    get_new_bucket,
-    get_new_bucket_name,
-    s3,
-    config,
-    _make_raw_request,
-    choose_bucket_prefix,
-    )
-
-IGNORE_FIELD = 'IGNORETHIS'
-
-SLEEP_INTERVAL = 0.01
-SLEEP_MAX = 2.0
-
-WEBSITE_CONFIGS_XMLFRAG = {
-        'IndexDoc': '<IndexDocument><Suffix>${IndexDocument_Suffix}</Suffix></IndexDocument>${RoutingRules}',
-        'IndexDocErrorDoc': '<IndexDocument><Suffix>${IndexDocument_Suffix}</Suffix></IndexDocument><ErrorDocument><Key>${ErrorDocument_Key}</Key></ErrorDocument>${RoutingRules}',
-        'RedirectAll': '<RedirectAllRequestsTo><HostName>${RedirectAllRequestsTo_HostName}</HostName></RedirectAllRequestsTo>${RoutingRules}',
-        'RedirectAll+Protocol': '<RedirectAllRequestsTo><HostName>${RedirectAllRequestsTo_HostName}</HostName><Protocol>${RedirectAllRequestsTo_Protocol}</Protocol></RedirectAllRequestsTo>${RoutingRules}',
-        }
-INDEXDOC_TEMPLATE = '<html><h1>IndexDoc</h1><body>{random}</body></html>'
-ERRORDOC_TEMPLATE = '<html><h1>ErrorDoc</h1><body>{random}</body></html>'
-
-CAN_WEBSITE = None
-
-@pytest.fixture(autouse=True, scope="module")
-def check_can_test_website():
-    bucket = get_new_bucket()
-    try:
-        wsconf = bucket.get_website_configuration()
-        return True
-    except boto.exception.S3ResponseError as e:
-        if e.status == 404 and e.reason == 'Not Found' and e.error_code in ['NoSuchWebsiteConfiguration', 'NoSuchKey']:
-            return True
-        elif e.status == 405 and e.reason == 'Method Not Allowed' and e.error_code == 'MethodNotAllowed':
-            pytest.skip('rgw_enable_static_website is false')
-        elif e.status == 403 and e.reason == 'SignatureDoesNotMatch' and e.error_code == 'Forbidden':
-            # This is older versions that do not support the website code
-            pytest.skip('static website is not implemented')
-        elif e.status == 501 and e.error_code == 'NotImplemented':
-            pytest.skip('static website is not implemented')
-        else:
-            raise RuntimeError("Unknown response in checking if WebsiteConf is supported", e)
-    finally:
-        bucket.delete()
-
-def make_website_config(xml_fragment):
-    """
-    Take the tedious stuff out of the config
-    """
-    return '<?xml version="1.0" encoding="UTF-8"?><WebsiteConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">' + xml_fragment + '</WebsiteConfiguration>'
-
-def get_website_url(**kwargs):
-    """
-    Return the URL to a website page
-    """
-    proto, bucket, hostname, path = 'http', None, None, '/'
-
-    if 'proto' in kwargs:
-        proto = kwargs['proto']
-    if 'bucket' in kwargs:
-        bucket = kwargs['bucket']
-    if 'hostname' in kwargs:
-        hostname = kwargs['hostname']
-    if 'path' in kwargs:
-        path = kwargs['path']
-
-    if hostname is None and bucket is None:
-        return '/' + path.lstrip('/')
-
-    domain = config['main']['host']
-    if('s3website_domain' in config['main']):
-        domain = config['main']['s3website_domain']
-    elif('s3website_domain' in config['alt']):
-        domain = config['DEFAULT']['s3website_domain']
-    if hostname is None and bucket is not None:
-        hostname = '%s.%s' % (bucket, domain)
-    path = path.lstrip('/')
-    return "%s://%s/%s" % (proto, hostname, path)
-
-def _test_website_populate_fragment(xml_fragment, fields):
-    for k in ['RoutingRules']:
-      if k in list(fields.keys()) and len(fields[k]) > 0:
-         fields[k] = '<%s>%s</%s>' % (k, fields[k], k)
-    f = {
-          'IndexDocument_Suffix': choose_bucket_prefix(template='index-{random}.html', max_len=32),
-          'ErrorDocument_Key': choose_bucket_prefix(template='error-{random}.html', max_len=32),
-          'RedirectAllRequestsTo_HostName': choose_bucket_prefix(template='{random}.{random}.com', max_len=32),
-          'RoutingRules': ''
-        }
-    f.update(fields)
-    xml_fragment = string.Template(xml_fragment).safe_substitute(**f)
-    return xml_fragment, f
-
-def _test_website_prep(bucket, xml_template, hardcoded_fields = {}, expect_fail=None):
-    xml_fragment, f = _test_website_populate_fragment(xml_template, hardcoded_fields)
-    f['WebsiteConfiguration'] = ''
-    if not xml_template:
-        bucket.delete_website_configuration()
-        return f
-
-    config_xmlnew = make_website_config(xml_fragment)
-
-    config_xmlold = ''
-    try:
-        config_xmlold = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True)
-    except boto.exception.S3ResponseError as e:
-        if str(e.status) == str(404) \
-            and ('NoSuchWebsiteConfiguration' in e.body or 'NoSuchWebsiteConfiguration' in e.code or
-                    'NoSuchKey' in e.body or 'NoSuchKey' in e.code):
-            pass
-        else:
-            raise e
-
-    try:
-        bucket.set_website_configuration_xml(common.trim_xml(config_xmlnew))
-        config_xmlnew = common.normalize_xml(config_xmlnew, pretty_print=True)
-    except boto.exception.S3ResponseError as e:
-        if expect_fail is not None:
-            if isinstance(expect_fail, dict):
-                pass
-            elif isinstance(expect_fail, str):
-                pass
-        raise e
-
-    # TODO: in some cases, it takes non-zero time for the config to be applied by AmazonS3
-    # We should figure out how to poll for changes better
-    # WARNING: eu-west-1 as of 2015/06/22 was taking at least 4 seconds to propogate website configs, esp when you cycle between non-null configs
-    time.sleep(0.1)
-    config_xmlcmp = common.normalize_xml(bucket.get_website_configuration_xml(), pretty_print=True)
-
-    #if config_xmlold is not None:
-    #    print('old',config_xmlold.replace("\n",''))
-    #if config_xmlcmp is not None:
-    #    print('cmp',config_xmlcmp.replace("\n",''))
-    #if config_xmlnew is not None:
-    #    print('new',config_xmlnew.replace("\n",''))
-    # Cleanup for our validation
-    common.assert_xml_equal(config_xmlcmp, config_xmlnew)
-    #print("config_xmlcmp\n", config_xmlcmp)
-    #assert config_xmlnew == config_xmlcmp
-    f['WebsiteConfiguration'] = config_xmlcmp
-    return f
-
-def __website_expected_reponse_status(res, status, reason):
-    if not isinstance(status, Container):
-        status = set([status])
-    if not isinstance(reason, Container):
-        reason = set([reason])
-
-    if status is not IGNORE_FIELD:
-        assert res.status in status, 'HTTP code was %s should be %s' % (res.status, status)
-    if reason is not IGNORE_FIELD:
-        assert res.reason in reason, 'HTTP reason was was %s should be %s' % (res.reason, reason)
-
-def _website_expected_default_html(**kwargs):
-    fields = []
-    for k in list(kwargs.keys()):
-        # AmazonS3 seems to be inconsistent, some HTML errors include BucketName, but others do not.
-        if k is 'BucketName':
-            continue
-
-        v = kwargs[k]
-        if isinstance(v, str):
-            v = [v]
-        elif not isinstance(v, Container):
-            v = [v]
-        for v2 in v:
-            s = '<li>%s: %s</li>' % (k,v2)
-            fields.append(s)
-    return fields
-
-def _website_expected_error_response(res, bucket_name, status, reason, code, content=None, body=None):
-    if body is None:
-        body = res.read()
-        print(body)
-    __website_expected_reponse_status(res, status, reason)
-
-    # Argh, AmazonS3 is really inconsistent, so we have a conditional test!
-    # This is most visible if you have an ErrorDoc present
-    errorcode = res.getheader('x-amz-error-code', None)
-    if errorcode is not None:
-        if code is not IGNORE_FIELD:
-            assert errorcode == code
-
-    if not isinstance(content, Container):
-        content = set([content])
-    for f in content:
-        if f is not IGNORE_FIELD and f is not None:
-            f = bytes(f, 'utf-8')
-            assert f in body, 'HTML should contain "%s"' % (f, )
-
-def _website_expected_redirect_response(res, status, reason, new_url):
-    body = res.read()
-    print(body)
-    __website_expected_reponse_status(res, status, reason)
-    loc = res.getheader('Location', None)
-    assert loc == new_url, 'Location header should be set "%s" != "%s"' % (loc,new_url,)
-    assert len(body) == 0, 'Body of a redirect should be empty'
-
-def _website_request(bucket_name, path, connect_hostname=None, method='GET', timeout=None):
-    url = get_website_url(proto='http', bucket=bucket_name, path=path)
-    print("url", url)
-    o = urlparse(url)
-    if connect_hostname is None:
-        connect_hostname = o.hostname
-    path = o.path + '?' + o.query
-    request_headers={}
-    request_headers['Host'] = o.hostname
-    request_headers['Accept'] = '*/*'
-    print('Request: {method} {path}\n{headers}'.format(method=method, path=path, headers=''.join([t[0]+':'+t[1]+"\n" for t in list(request_headers.items())])))
-    res = _make_raw_request(connect_hostname, config.main.port, method, path, request_headers=request_headers, secure=False, timeout=timeout)
-    for (k,v) in res.getheaders():
-        print(k,v)
-    return res
-
-# ---------- Non-existant buckets via the website endpoint
-@pytest.mark.s3website
-@pytest.mark.fails_on_rgw
-def test_website_nonexistant_bucket_s3():
-    bucket_name = get_new_bucket_name()
-    res = _website_request(bucket_name, '')
-    _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_s3
-@pytest.mark.fails_on_dbstore
-def test_website_nonexistant_bucket_rgw():
-    bucket_name = get_new_bucket_name()
-    res = _website_request(bucket_name, '')
-    #_website_expected_error_response(res, bucket_name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-    _website_expected_error_response(res, bucket_name, 404, 'Not Found', 'NoSuchBucket', content=_website_expected_default_html(Code='NoSuchBucket'))
-
-#------------- IndexDocument only, successes
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-@pytest.mark.timeout(10)
-def test_website_public_bucket_list_public_index():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.make_public()
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.make_public()
-    #time.sleep(1)
-    while bucket.get_key(f['IndexDocument_Suffix']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    body = res.read()
-    print(body)
-    indexstring = bytes(indexstring, 'utf-8')
-    assert body == indexstring # default content should match index.html set content
-    __website_expected_reponse_status(res, 200, 'OK')
-    indexhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_public_index():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.set_canned_acl('private')
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.make_public()
-    #time.sleep(1)
-    while bucket.get_key(f['IndexDocument_Suffix']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-
-    res = _website_request(bucket.name, '')
-    __website_expected_reponse_status(res, 200, 'OK')
-    body = res.read()
-    print(body)
-    indexstring = bytes(indexstring, 'utf-8')
-    assert body == indexstring, 'default content should match index.html set content'
-    indexhtml.delete()
-    bucket.delete()
-
-
-# ---------- IndexDocument only, failures
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.set_canned_acl('private')
-    # TODO: wait for sync
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.make_public()
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'))
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.make_public()
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    #time.sleep(1)
-    #time.sleep(1)
-    while bucket.get_key(f['IndexDocument_Suffix']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-    indexhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.set_canned_acl('private')
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    ##time.sleep(1)
-    while bucket.get_key(f['IndexDocument_Suffix']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
-    indexhtml.delete()
-    bucket.delete()
-
-# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but missing
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty_missingerrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.set_canned_acl('private')
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty_missingerrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.make_public()
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey')
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index_missingerrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.make_public()
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    #time.sleep(1)
-    while bucket.get_key(f['IndexDocument_Suffix']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
-    indexhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index_missingerrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.set_canned_acl('private')
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    #time.sleep(1)
-    while bucket.get_key(f['IndexDocument_Suffix']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
-    indexhtml.delete()
-    bucket.delete()
-
-# ---------- IndexDocument & ErrorDocument, failures due to errordoc assigned but not accessible
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty_blockederrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.set_canned_acl('private')
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('private')
-    #time.sleep(1)
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    body = res.read()
-    print(body)
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
-    errorstring = bytes(errorstring, 'utf-8')
-    assert errorstring not in body, 'error content should NOT match error.html set content'
-
-    errorhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_pubilc_errordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.make_public()
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('public-read')
-
-    url = get_website_url(proto='http', bucket=bucket.name, path='')
-    o = urlparse(url)
-    host = o.hostname
-    port = s3.main.port
-
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    sock.connect((host, port))
-
-    request = "GET / HTTP/1.1\r\nHost:%s.%s:%s\r\n\r\n" % (bucket.name, host, port)
-    sock.send(request.encode())
-    
-    #receive header
-    resp = sock.recv(4096)
-    print(resp)
-
-    #receive body
-    resp = sock.recv(4096)
-    print('payload length=%d' % len(resp))
-    print(resp)
-
-    #check if any additional payload is left
-    resp_len = 0
-    sock.settimeout(2)
-    try:
-        resp = sock.recv(4096)
-        resp_len = len(resp)
-        print('invalid payload length=%d' % resp_len)
-        print(resp)
-    except socket.timeout:
-        print('no invalid payload')
-
-    assert resp_len == 0, 'invalid payload'
-
-    errorhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty_blockederrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.make_public()
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('private')
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    body = res.read()
-    print(body)
-    _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=_website_expected_default_html(Code='NoSuchKey'), body=body)
-    errorstring = bytes(errorstring, 'utf-8')
-    assert errorstring not in body, 'error content should match error.html set content'
-
-    errorhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index_blockederrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.make_public()
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('private')
-    #time.sleep(1)
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    body = res.read()
-    print(body)
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
-    errorstring = bytes(errorstring, 'utf-8')
-    assert errorstring not in body, 'error content should match error.html set content'
-
-    indexhtml.delete()
-    errorhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index_blockederrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.set_canned_acl('private')
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('private')
-    #time.sleep(1)
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    body = res.read()
-    print(body)
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'), body=body)
-    errorstring = bytes(errorstring, 'utf-8')
-    assert errorstring not in body, 'error content should match error.html set content'
-
-    indexhtml.delete()
-    errorhtml.delete()
-    bucket.delete()
-
-# ---------- IndexDocument & ErrorDocument, failures with errordoc available
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_empty_gooderrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.set_canned_acl('private')
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring, policy='public-read')
-    #time.sleep(1)
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
-
-    errorhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_empty_gooderrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.make_public()
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('public-read')
-   #time.sleep(1)
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchKey', content=[errorstring])
-
-    errorhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_public_bucket_list_private_index_gooderrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.make_public()
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('public-read')
-    #time.sleep(1)
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
-
-    indexhtml.delete()
-    errorhtml.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_private_bucket_list_private_index_gooderrordoc():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-    bucket.set_canned_acl('private')
-    indexhtml = bucket.new_key(f['IndexDocument_Suffix'])
-    indexstring = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=256)
-    indexhtml.set_contents_from_string(indexstring)
-    indexhtml.set_canned_acl('private')
-    errorhtml = bucket.new_key(f['ErrorDocument_Key'])
-    errorstring = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=256)
-    errorhtml.set_contents_from_string(errorstring)
-    errorhtml.set_canned_acl('public-read')
-    #time.sleep(1)
-    while bucket.get_key(f['ErrorDocument_Key']) is None:
-        time.sleep(SLEEP_INTERVAL)
-
-    res = _website_request(bucket.name, '')
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=[errorstring])
-
-    indexhtml.delete()
-    errorhtml.delete()
-    bucket.delete()
-
-# ------ RedirectAll tests
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_bucket_private_redirectall_base():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
-    bucket.set_canned_acl('private')
-
-    res = _website_request(bucket.name, '')
-    new_url = 'http://%s/' % f['RedirectAllRequestsTo_HostName']
-    _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_bucket_private_redirectall_path():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
-    bucket.set_canned_acl('private')
-
-    pathfragment = choose_bucket_prefix(template='/{random}', max_len=16)
-
-    res = _website_request(bucket.name, pathfragment)
-    new_url = 'http://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment)
-    _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-def test_website_bucket_private_redirectall_path_upgrade():
-    bucket = get_new_bucket()
-    x = string.Template(WEBSITE_CONFIGS_XMLFRAG['RedirectAll+Protocol']).safe_substitute(RedirectAllRequestsTo_Protocol='https')
-    f = _test_website_prep(bucket, x)
-    bucket.set_canned_acl('private')
-
-    pathfragment = choose_bucket_prefix(template='/{random}', max_len=16)
-
-    res = _website_request(bucket.name, pathfragment)
-    new_url = 'https://%s%s' % (f['RedirectAllRequestsTo_HostName'], pathfragment)
-    _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
-    bucket.delete()
-
-# ------ x-amz redirect tests
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_nonwebsite():
-    bucket = get_new_bucket()
-    #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['RedirectAll'])
-    #bucket.set_canned_acl('private')
-
-    k = bucket.new_key('page')
-    content = 'wrong-content'
-    redirect_dest = '/relative'
-    headers = {'x-amz-website-redirect-location': redirect_dest}
-    k.set_contents_from_string(content, headers=headers, policy='public-read')
-    redirect = k.get_redirect()
-    assert k.get_redirect() == redirect_dest
-
-    res = _website_request(bucket.name, '/page')
-    body = res.read()
-    print(body)
-    expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name)
-    # TODO: RGW does not have custom error messages for different 404s yet
-    #expected_content = _website_expected_default_html(Code='NoSuchWebsiteConfiguration', BucketName=bucket.name, Message='The specified bucket does not have a website configuration')
-    print(expected_content)
-    _website_expected_error_response(res, bucket.name, 404, 'Not Found', 'NoSuchWebsiteConfiguration', content=expected_content, body=body)
-
-    k.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_public_relative():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.make_public()
-
-    k = bucket.new_key('page')
-    content = 'wrong-content'
-    redirect_dest = '/relative'
-    headers = {'x-amz-website-redirect-location': redirect_dest}
-    k.set_contents_from_string(content, headers=headers, policy='public-read')
-    redirect = k.get_redirect()
-    assert k.get_redirect() == redirect_dest
-
-    res = _website_request(bucket.name, '/page')
-    #new_url =  get_website_url(bucket_name=bucket.name, path=redirect_dest)
-    _website_expected_redirect_response(res, 301, ['Moved Permanently'], redirect_dest)
-
-    k.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_public_abs():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.make_public()
-
-    k = bucket.new_key('page')
-    content = 'wrong-content'
-    redirect_dest = 'http://example.com/foo'
-    headers = {'x-amz-website-redirect-location': redirect_dest}
-    k.set_contents_from_string(content, headers=headers, policy='public-read')
-    redirect = k.get_redirect()
-    assert k.get_redirect() == redirect_dest
-
-    res = _website_request(bucket.name, '/page')
-    new_url =  get_website_url(proto='http', hostname='example.com', path='/foo')
-    _website_expected_redirect_response(res, 301, ['Moved Permanently'], new_url)
-
-    k.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_private_relative():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.make_public()
-
-    k = bucket.new_key('page')
-    content = 'wrong-content'
-    redirect_dest = '/relative'
-    headers = {'x-amz-website-redirect-location': redirect_dest}
-    k.set_contents_from_string(content, headers=headers, policy='private')
-    redirect = k.get_redirect()
-    assert k.get_redirect() == redirect_dest
-
-    res = _website_request(bucket.name, '/page')
-    # We get a 403 because the page is private
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
-    k.delete()
-    bucket.delete()
-
-@pytest.mark.s3website
-@pytest.mark.s3website_redirect_location
-@pytest.mark.fails_on_dbstore
-def test_website_xredirect_private_abs():
-    bucket = get_new_bucket()
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDoc'])
-    bucket.make_public()
-
-    k = bucket.new_key('page')
-    content = 'wrong-content'
-    redirect_dest = 'http://example.com/foo'
-    headers = {'x-amz-website-redirect-location': redirect_dest}
-    k.set_contents_from_string(content, headers=headers, policy='private')
-    redirect = k.get_redirect()
-    assert k.get_redirect() == redirect_dest
-
-    res = _website_request(bucket.name, '/page')
-    new_url =  get_website_url(proto='http', hostname='example.com', path='/foo')
-    # We get a 403 because the page is private
-    _website_expected_error_response(res, bucket.name, 403, 'Forbidden', 'AccessDenied', content=_website_expected_default_html(Code='AccessDenied'))
-
-    k.delete()
-    bucket.delete()
-# ------ RoutingRules tests
-
-# RoutingRules
-ROUTING_RULES = {
-    'empty': '',
-    'AmazonExample1': \
-"""
-    <RoutingRule>
-    <Condition>
-      <KeyPrefixEquals>docs/</KeyPrefixEquals>
-    </Condition>
-    <Redirect>
-      <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
-    </Redirect>
-    </RoutingRule>
-""",
-    'AmazonExample1+Protocol=https': \
-"""
-    <RoutingRule>
-    <Condition>
-      <KeyPrefixEquals>docs/</KeyPrefixEquals>
-    </Condition>
-    <Redirect>
-      <Protocol>https</Protocol>
-      <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
-    </Redirect>
-    </RoutingRule>
-""",
-    'AmazonExample1+Protocol=https+Hostname=xyzzy': \
-"""
-    <RoutingRule>
-    <Condition>
-      <KeyPrefixEquals>docs/</KeyPrefixEquals>
-    </Condition>
-    <Redirect>
-      <Protocol>https</Protocol>
-      <HostName>xyzzy</HostName>
-      <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
-    </Redirect>
-    </RoutingRule>
-""",
-    'AmazonExample1+Protocol=http2': \
-"""
-    <RoutingRule>
-    <Condition>
-      <KeyPrefixEquals>docs/</KeyPrefixEquals>
-    </Condition>
-    <Redirect>
-      <Protocol>http2</Protocol>
-      <ReplaceKeyPrefixWith>documents/</ReplaceKeyPrefixWith>
-    </Redirect>
-    </RoutingRule>
-""",
-   'AmazonExample2': \
-"""
-    <RoutingRule>
-    <Condition>
-       <KeyPrefixEquals>images/</KeyPrefixEquals>
-    </Condition>
-    <Redirect>
-      <ReplaceKeyWith>folderdeleted.html</ReplaceKeyWith>
-    </Redirect>
-    </RoutingRule>
-""",
-   'AmazonExample2+HttpRedirectCode=TMPL': \
-"""
-    <RoutingRule>
-    <Condition>
-       <KeyPrefixEquals>images/</KeyPrefixEquals>
-    </Condition>
-    <Redirect>
-      <HttpRedirectCode>{HttpRedirectCode}</HttpRedirectCode>
-      <ReplaceKeyWith>folderdeleted.html</ReplaceKeyWith>
-    </Redirect>
-    </RoutingRule>
-""",
-   'AmazonExample3': \
-"""
-    <RoutingRule>
-    <Condition>
-      <HttpErrorCodeReturnedEquals>404</HttpErrorCodeReturnedEquals>
-    </Condition>
-    <Redirect>
-      <HostName>ec2-11-22-333-44.compute-1.amazonaws.com</HostName>
-      <ReplaceKeyPrefixWith>report-404/</ReplaceKeyPrefixWith>
-    </Redirect>
-    </RoutingRule>
-""",
-   'AmazonExample3+KeyPrefixEquals': \
-"""
-    <RoutingRule>
-    <Condition>
-      <KeyPrefixEquals>images/</KeyPrefixEquals>
-      <HttpErrorCodeReturnedEquals>404</HttpErrorCodeReturnedEquals>
-    </Condition>
-    <Redirect>
-      <HostName>ec2-11-22-333-44.compute-1.amazonaws.com</HostName>
-      <ReplaceKeyPrefixWith>report-404/</ReplaceKeyPrefixWith>
-    </Redirect>
-    </RoutingRule>
-""",
-}
-
-for k in list(ROUTING_RULES.keys()):
-  if len(ROUTING_RULES[k]) > 0:
-    ROUTING_RULES[k] = "<!-- %s -->\n%s" % (k, ROUTING_RULES[k])
-
-ROUTING_RULES_TESTS = [
-  dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='', location=None, code=200),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/', location=None, code=200),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['empty']), url='/x', location=None, code=404),
-
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/', location=None, code=200),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/x', location=None, code=404),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/', location=dict(proto='http',bucket='{bucket_name}',path='/documents/'), code=301),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1']), url='/docs/x', location=dict(proto='http',bucket='{bucket_name}',path='/documents/x'), code=301),
-
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/', location=None, code=200),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/x', location=None, code=404),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/', location=dict(proto='https',bucket='{bucket_name}',path='/documents/'), code=301),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https']), url='/docs/x', location=dict(proto='https',bucket='{bucket_name}',path='/documents/x'), code=301),
-
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/', location=None, code=200),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/x', location=None, code=404),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/', location=dict(proto='https',hostname='xyzzy',path='/documents/'), code=301),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=https+Hostname=xyzzy']), url='/docs/x', location=dict(proto='https',hostname='xyzzy',path='/documents/x'), code=301),
-
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample2']), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=301),
-
-
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/images/x'), code=301),
-
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/x', location=None, code=404),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample3+KeyPrefixEquals']), url='/images/x', location=dict(proto='http',hostname='ec2-11-22-333-44.compute-1.amazonaws.com',path='/report-404/x'), code=301),
-]
-
-ROUTING_ERROR_PROTOCOL = dict(code=400, reason='Bad Request', errorcode='InvalidRequest', bodyregex=r'Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.')
-
-ROUTING_RULES_TESTS_ERRORS = [ # TODO: Unused!
-  # Invalid protocol, protocol can be http or https. If not defined the protocol will be selected automatically.
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
-  dict(xml=dict(RoutingRules=ROUTING_RULES['AmazonExample1+Protocol=http2']), url='/docs/x', location=None, code=400, error=ROUTING_ERROR_PROTOCOL),
-]
-
-VALID_AMZ_REDIRECT = set([301,302,303,304,305,307,308])
-
-# General lots of tests
-for redirect_code in VALID_AMZ_REDIRECT:
-  rules = ROUTING_RULES['AmazonExample2+HttpRedirectCode=TMPL'].format(HttpRedirectCode=redirect_code)
-  result = redirect_code
-  ROUTING_RULES_TESTS.append(
-    dict(xml=dict(RoutingRules=rules), url='/images/', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result)
-  )
-  ROUTING_RULES_TESTS.append(
-    dict(xml=dict(RoutingRules=rules), url='/images/x', location=dict(proto='http',bucket='{bucket_name}',path='/folderdeleted.html'), code=result)
-  )
-
-# TODO:
-# codes other than those in VALID_AMZ_REDIRECT
-# give an error of 'The provided HTTP redirect code (314) is not valid. Valid codes are 3XX except 300.' during setting the website config
-# we should check that we can return that too on ceph
-
-@pytest.fixture
-def routing_setup():
-  kwargs = {'obj':[]}
-  bucket = get_new_bucket()
-  kwargs['bucket'] = bucket
-  kwargs['obj'].append(bucket)
-  #f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'])
-  f = _test_website_prep(bucket, '')
-  kwargs.update(f)
-  bucket.set_canned_acl('public-read')
-  
-  k = bucket.new_key('debug-ws.xml')
-  kwargs['obj'].append(k)
-  k.set_contents_from_string('', policy='public-read')
-
-  k = bucket.new_key(f['IndexDocument_Suffix'])
-  kwargs['obj'].append(k)
-  s = choose_bucket_prefix(template=INDEXDOC_TEMPLATE, max_len=64)
-  k.set_contents_from_string(s)
-  k.set_canned_acl('public-read')
-
-  k = bucket.new_key(f['ErrorDocument_Key'])
-  kwargs['obj'].append(k)
-  s = choose_bucket_prefix(template=ERRORDOC_TEMPLATE, max_len=64)
-  k.set_contents_from_string(s)
-  k.set_canned_acl('public-read')
-
-  #time.sleep(1)
-  while bucket.get_key(f['ErrorDocument_Key']) is None:
-      time.sleep(SLEEP_INTERVAL)
-
-  yield kwargs
-
-  for o in reversed(kwargs['obj']):
-    print('Deleting', str(o))
-    o.delete()
-
-def routing_check(*args, **kwargs):
-    bucket = kwargs['bucket']
-    args=args[0]
-    #print(args)
-    pprint(args)
-    xml_fields = kwargs.copy()
-    xml_fields.update(args['xml'])
-
-    k = bucket.get_key('debug-ws.xml')
-    k.set_contents_from_string(str(args)+str(kwargs), policy='public-read')
-
-    pprint(xml_fields)
-    f = _test_website_prep(bucket, WEBSITE_CONFIGS_XMLFRAG['IndexDocErrorDoc'], hardcoded_fields=xml_fields)
-    #print(f)
-    config_xmlcmp = bucket.get_website_configuration_xml()
-    config_xmlcmp = common.normalize_xml(config_xmlcmp, pretty_print=True) # For us to read
-    res = _website_request(bucket.name, args['url'])
-    print(config_xmlcmp)
-    new_url = args['location']
-    if new_url is not None:
-        new_url = get_website_url(**new_url)
-        new_url = new_url.format(bucket_name=bucket.name)
-    if args['code'] >= 200 and args['code'] < 300:
-        #body = res.read()
-        #print(body)
-        #assert body == args['content'], 'default content should match index.html set content'
-        assert int(res.getheader('Content-Length', -1)) > 0
-    elif args['code'] >= 300 and args['code'] < 400:
-        _website_expected_redirect_response(res, args['code'], IGNORE_FIELD, new_url)
-    elif args['code'] >= 400:
-        _website_expected_error_response(res, bucket.name, args['code'], IGNORE_FIELD, IGNORE_FIELD)
-    else:
-        assert(False)
-
-@pytest.mark.s3website_routing_rules
-@pytest.mark.s3website
-@pytest.mark.fails_on_dbstore
-@pytest.mark.parametrize('t', ROUTING_RULES_TESTS)
-def test_routing_generator(t, routing_setup):
-    if 'xml' in t and 'RoutingRules' in t['xml'] and len(t['xml']['RoutingRules']) > 0:
-        t['xml']['RoutingRules'] = common.trim_xml(t['xml']['RoutingRules'])
-    routing_check(t, **routing_setup)
diff --git a/s3tests/functional/test_utils.py b/s3tests/functional/test_utils.py
deleted file mode 100644 (file)
index c0dd398..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-from . import utils
-
-def test_generate():
-    FIVE_MB = 5 * 1024 * 1024
-    assert len(''.join(utils.generate_random(0))) == 0
-    assert len(''.join(utils.generate_random(1))) == 1
-    assert len(''.join(utils.generate_random(FIVE_MB - 1))) == FIVE_MB - 1
-    assert len(''.join(utils.generate_random(FIVE_MB))) == FIVE_MB
-    assert len(''.join(utils.generate_random(FIVE_MB + 1))) == FIVE_MB + 1
diff --git a/s3tests/functional/utils.py b/s3tests/functional/utils.py
deleted file mode 100644 (file)
index 3083415..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-import random
-import requests
-import string
-import time
-
-def assert_raises(excClass, callableObj, *args, **kwargs):
-    """
-    Like unittest.TestCase.assertRaises, but returns the exception.
-    """
-    try:
-        callableObj(*args, **kwargs)
-    except excClass as e:
-        return e
-    else:
-        if hasattr(excClass, '__name__'):
-            excName = excClass.__name__
-        else:
-            excName = str(excClass)
-        raise AssertionError("%s not raised" % excName)
-
-def generate_random(size, part_size=5*1024*1024):
-    """
-    Generate the specified number random data.
-    (actually each MB is a repetition of the first KB)
-    """
-    chunk = 1024
-    allowed = string.ascii_letters
-    for x in range(0, size, part_size):
-        strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in range(chunk)])
-        s = ''
-        left = size - x
-        this_part_size = min(left, part_size)
-        for y in range(this_part_size // chunk):
-            s = s + strpart
-        s = s + strpart[:(this_part_size % chunk)]
-        yield s
-        if (x == size):
-            return
-
-# syncs all the regions except for the one passed in
-def region_sync_meta(targets, region):
-
-    for (k, r) in targets.items():
-        if r == region:
-            continue
-        conf = r.conf
-        if conf.sync_agent_addr:
-            ret = requests.post('http://{addr}:{port}/metadata/incremental'.format(addr = conf.sync_agent_addr, port = conf.sync_agent_port))
-            assert ret.status_code == 200
-        if conf.sync_meta_wait:
-            time.sleep(conf.sync_meta_wait)
-
-
-def get_grantee(policy, permission):
-    '''
-    Given an object/bucket policy, extract the grantee with the required permission
-    '''
-
-    for g in policy.acl.grants:
-        if g.permission == permission:
-            return g.id
index a8ca2075b3e0c2483e20e8f44aa73c7643ad6b1c..d3c7e7bf2ec7a095c49e57a04c74ac791f83ebc7 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -13,7 +13,6 @@ setup(
     keywords='s3 web testing',
 
     install_requires=[
-        'boto >=2.0b4',
         'boto3 >=1.0.0',
         'PyYAML',
         'munch >=2.0.0',