import random
import string
import itertools
+import urllib3
config = munch.Munch
proto = 'https' if config.default_is_secure else 'http'
config.default_endpoint = "%s://%s:%d" % (proto, config.default_host, config.default_port)
+ try:
+ config.default_ssl_verify = cfg.getboolean('DEFAULT', "ssl_verify")
+ except configparser.NoOptionError:
+ config.default_ssl_verify = True
+
+ # Disable InsecureRequestWarning reported by urllib3 when ssl_verify is False
+ if not config.default_ssl_verify:
+ urllib3.disable_warnings()
+
# vars from the main section
config.main_access_key = cfg.get('s3 main',"access_key")
config.main_secret_key = cfg.get('s3 main',"secret_key")
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
+
def teardown():
alt_client = get_alt_client()
tenant_client = get_tenant_client()
aws_secret_access_key=config.main_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=client_config)
return client
aws_secret_access_key=config.main_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=Config(signature_version='s3'))
return client
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=client_config)
return client
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=client_config)
return client
aws_secret_access_key=config.alt_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=client_config)
return client
aws_secret_access_key=config.tenant_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=client_config)
return client
aws_access_key_id=config.tenant_access_key,
aws_secret_access_key=config.tenant_secret_key,
endpoint_url=config.default_endpoint,
+ verify=config.default_ssl_verify,
use_ssl=config.default_is_secure)
return client
aws_secret_access_key='',
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=Config(signature_version=UNSIGNED))
return client
aws_secret_access_key='roflmao',
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=Config(signature_version='s3v4'))
return client
aws_secret_access_key=config.main_secret_key,
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify,
config=client_config)
return client
aws_access_key_id=config.main_access_key,
aws_secret_access_key=config.main_secret_key,
endpoint_url=config.default_endpoint,
- use_ssl=config.default_is_secure)
+ use_ssl=config.default_is_secure,
+ verify=config.default_ssl_verify)
if name is None:
name = get_new_bucket_name()
bucket = s3.Bucket(name)
def get_config_endpoint():
return config.default_endpoint
+def get_config_ssl_verify():
+ return config.default_ssl_verify
+
def get_main_aws_access_key():
return config.main_access_key
get_config_host,
get_config_port,
get_config_endpoint,
+ get_config_ssl_verify,
get_main_aws_access_key,
get_main_aws_secret_key,
get_main_display_name,
("Content-Type" , "text/plain"),('file', ('bar'))])
client.create_bucket(ACL='public-read-write', Bucket=bucket_name)
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
body = _get_body(response)
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
body = _get_body(response)
("acl" , "private"),("signature" , signature),("policy" , policy),\
('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key="foo.txt")
body = _get_body(response)
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 403)
@attr(resource='object')
("success_action_status" , "201"),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 201)
message = ET.fromstring(r.content).find('Key')
eq(message.text,'foo.txt')
("success_action_status" , "404"),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
content = r.content.decode()
eq(content,'')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', foo_string)])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
body = _get_body(response)
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('foo.txt', 'bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
body = _get_body(response)
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),("x-ignore-foo" , "bar"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
@attr(resource='object')
("aCl" , "private"),("signature" , signature),("pOLICy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='\$foo.txt')
body = _get_body(response)
("Content-Type" , "text/plain"),("success_action_redirect" , redirect_url),\
('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 200)
url = r.url
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 403)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 403)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 403)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('x-amz-meta-foo' , 'barclamp'),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
eq(response['Metadata']['foo'], 'barclamp')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 403)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 403)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('x-amz-meta-foo' , 'barclamp'),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 403)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 400)
@attr(resource='object')
url = client.generate_presigned_url(ClientMethod='get_object', Params=params, ExpiresIn=100000, HttpMethod='GET')
- res = requests.get(url).__dict__
+ res = requests.get(url, verify=get_config_ssl_verify()).__dict__
eq(res['status_code'], 200)
@attr(resource='object')
url = client.generate_presigned_url(ClientMethod='get_object', Params=params, ExpiresIn=0, HttpMethod='GET')
- res = requests.get(url).__dict__
+ res = requests.get(url, verify=get_config_ssl_verify()).__dict__
eq(res['status_code'], 403)
@attr(resource='object')
url = client.generate_presigned_url(ClientMethod='get_object', Params=params, ExpiresIn=609901, HttpMethod='GET')
- res = requests.get(url).__dict__
+ res = requests.get(url, verify=get_config_ssl_verify()).__dict__
eq(res['status_code'], 403)
@attr(resource='object')
url = client.generate_presigned_url(ClientMethod='get_object', Params=params, ExpiresIn=-7, HttpMethod='GET')
- res = requests.get(url).__dict__
+ res = requests.get(url, verify=get_config_ssl_verify()).__dict__
eq(res['status_code'], 403)
url = client.generate_presigned_url(ClientMethod='put_object', Params=params, ExpiresIn=-1000, HttpMethod='PUT')
# params wouldn't take a 'Body' parameter so we're passing it in here
- res = requests.put(url,data="foo").__dict__
+ res = requests.put(url, data="foo", verify=get_config_ssl_verify()).__dict__
eq(res['status_code'], 403)
def check_bad_bucket_name(bucket_name):
eq(status, 404)
def _cors_request_and_check(func, url, headers, expect_status, expect_allow_origin, expect_allow_methods):
- r = func(url, headers=headers)
+ r = func(url, headers=headers, verify=get_config_ssl_verify())
eq(r.status_code, expect_status)
assert r.headers.get('access-control-allow-origin', None) == expect_allow_origin
('x-amz-server-side-encryption-customer-key-md5', 'DWygnHRtgiJ77HCm+1rvHw=='), \
('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
get_headers = {
('x-amz-server-side-encryption-aws-kms-key-id', kms_keyid), \
('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
('file', ('bar')),
])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key=key_name)
body = _get_body(response)
("Content-Type" , "text/plain"),
('file', ('bar'))])
- r = requests.post(url, files = payload)
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
eq(r.status_code, 204)
response = client.get_object(Bucket=bucket_name, Key='foo.txt')
body = _get_body(response)