]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
rgw: fix post tests to include tcp port
authorYehuda Sadeh <yehuda@inktank.com>
Wed, 23 Jan 2013 18:41:38 +0000 (10:41 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Wed, 23 Jan 2013 18:41:38 +0000 (10:41 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
s3tests/functional/test_s3.py

index 0e4d07529ec7f3cce690bf3e6ec5b18205b09bd1..258e3d8bdabba5dd1a6a6197b5ed34f23dd2e2cb 100644 (file)
@@ -1034,17 +1034,20 @@ def test_object_write_file():
     eq(got, 'bar')
 
 
+def _get_post_url(conn, bucket):
+
+       url = '{protocol}://{host}:{port}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+                    host=conn.host, port=conn.port, bucket=bucket.name)
+       return url
+
 @attr(resource='object')
 @attr(method='post')
 @attr(operation='anonymous browser based upload via POST request')
 @attr(assertion='succeeds and returns written data')
 def test_post_object_anonymous_request():
        bucket = get_new_bucket()
+       url = _get_post_url(s3.main, bucket)
        bucket.set_acl('public-read-write')
-       conn = s3.main
-       host_name = conn.host
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
 
        payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
        ("Content-Type" , "text/plain"),('file', ('bar'))])
@@ -1062,11 +1065,8 @@ def test_post_object_anonymous_request():
 @attr(assertion='succeeds and returns written data')
 def test_post_object_authenticated_request():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1083,6 +1083,7 @@ def test_post_object_authenticated_request():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1103,10 +1104,7 @@ def test_post_object_authenticated_request():
 def test_post_object_set_success_code():
        bucket = get_new_bucket()
        bucket.set_acl('public-read-write')
-       conn = s3.main
-       host_name = conn.host
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
        ("success_action_status" , "201"),\
@@ -1125,10 +1123,7 @@ def test_post_object_set_success_code():
 def test_post_object_set_invalid_success_code():
        bucket = get_new_bucket()
        bucket.set_acl('public-read-write')
-       conn = s3.main
-       host_name = conn.host
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
        ("success_action_status" , "404"),\
@@ -1145,11 +1140,8 @@ def test_post_object_set_invalid_success_code():
 @attr(assertion='succeeds and returns written data')
 def test_post_object_upload_larger_than_chunk():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1166,6 +1158,7 @@ def test_post_object_upload_larger_than_chunk():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        foo_string = 'foo' * 1024*1024
@@ -1187,11 +1180,8 @@ def test_post_object_upload_larger_than_chunk():
 @attr(assertion='succeeds and returns written data')
 def test_post_object_set_key_from_filename():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1208,6 +1198,7 @@ def test_post_object_set_key_from_filename():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1227,11 +1218,8 @@ def test_post_object_set_key_from_filename():
 @attr(assertion='succeeds with status 204')
 def test_post_object_ignored_header():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1248,6 +1236,7 @@ def test_post_object_ignored_header():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1264,11 +1253,8 @@ def test_post_object_ignored_header():
 @attr(assertion='succeeds with status 204')
 def test_post_object_case_insensitive_condition_fields():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',host=host_name,\
-       bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1285,6 +1271,7 @@ def test_post_object_case_insensitive_condition_fields():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("kEy" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1301,11 +1288,8 @@ def test_post_object_case_insensitive_condition_fields():
 @attr(assertion='succeeds with escaped leading $ and returns written data')
 def test_post_object_escaped_field_values():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1322,6 +1306,7 @@ def test_post_object_escaped_field_values():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1342,15 +1327,9 @@ def test_post_object_escaped_field_values():
 def test_post_object_success_redirect_action():
        bucket = get_new_bucket()
 
-       conn = s3.main
-       host_name = conn.host
-
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
-
-       redirect_url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
-        bucket.set_acl('public-read')
+       url = _get_post_url(s3.main, bucket)
+       redirect_url = _get_post_url(s3.main, bucket)
+       bucket.set_acl('public-read')
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1368,6 +1347,7 @@ def test_post_object_success_redirect_action():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1390,11 +1370,8 @@ def test_post_object_success_redirect_action():
 @attr(assertion='fails with invalid signature error')
 def test_post_object_invalid_signature():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1411,6 +1388,7 @@ def test_post_object_invalid_signature():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())[::-1]
 
        payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1427,11 +1405,8 @@ def test_post_object_invalid_signature():
 @attr(assertion='fails with access key does not exist error')
 def test_post_object_invalid_access_key():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1448,6 +1423,7 @@ def test_post_object_invalid_access_key():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id[::-1]),\
@@ -1464,11 +1440,8 @@ def test_post_object_invalid_access_key():
 @attr(assertion='fails with invalid expiration error')
 def test_post_object_invalid_date_format():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1485,6 +1458,7 @@ def test_post_object_invalid_date_format():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1501,11 +1475,8 @@ def test_post_object_invalid_date_format():
 @attr(assertion='fails with missing key error')
 def test_post_object_no_key_specified():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1521,6 +1492,7 @@ def test_post_object_no_key_specified():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1537,11 +1509,8 @@ def test_post_object_no_key_specified():
 @attr(assertion='fails with missing signature error')
 def test_post_object_missing_signature():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1558,6 +1527,7 @@ def test_post_object_missing_signature():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1574,11 +1544,8 @@ def test_post_object_missing_signature():
 @attr(assertion='fails with extra input fields policy error')
 def test_post_object_missing_policy_condition():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1594,6 +1561,7 @@ def test_post_object_missing_policy_condition():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1610,11 +1578,8 @@ def test_post_object_missing_policy_condition():
 @attr(assertion='succeeds using starts-with restriction on metadata header')
 def test_post_object_user_specified_header():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1632,6 +1597,7 @@ def test_post_object_user_specified_header():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1650,11 +1616,8 @@ def test_post_object_user_specified_header():
 @attr(assertion='fails with policy condition failed error due to missing field in POST request')
 def test_post_object_request_missing_policy_specified_field():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1672,6 +1635,7 @@ def test_post_object_request_missing_policy_specified_field():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1688,11 +1652,8 @@ def test_post_object_request_missing_policy_specified_field():
 @attr(assertion='fails with conditions must be list error')
 def test_post_object_condition_is_case_sensitive():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1709,6 +1670,7 @@ def test_post_object_condition_is_case_sensitive():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1725,11 +1687,8 @@ def test_post_object_condition_is_case_sensitive():
 @attr(assertion='fails with expiration must be string error')
 def test_post_object_expires_is_case_sensitive():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1746,6 +1705,7 @@ def test_post_object_expires_is_case_sensitive():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1762,11 +1722,8 @@ def test_post_object_expires_is_case_sensitive():
 @attr(assertion='fails with policy expired error')
 def test_post_object_expired_policy():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=-6000)
@@ -1783,6 +1740,7 @@ def test_post_object_expired_policy():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1799,11 +1757,8 @@ def test_post_object_expired_policy():
 @attr(assertion='fails using equality restriction on metadata header')
 def test_post_object_invalid_request_field_value():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1821,6 +1776,7 @@ def test_post_object_invalid_request_field_value():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1837,11 +1793,8 @@ def test_post_object_invalid_request_field_value():
 @attr(assertion='fails with policy missing expiration error')
 def test_post_object_missing_expires_condition():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1858,6 +1811,7 @@ def test_post_object_missing_expires_condition():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1874,11 +1828,8 @@ def test_post_object_missing_expires_condition():
 @attr(assertion='fails with policy missing conditions error')
 def test_post_object_missing_conditions_list():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1888,6 +1839,7 @@ def test_post_object_missing_conditions_list():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1904,11 +1856,8 @@ def test_post_object_missing_conditions_list():
 @attr(assertion='fails with allowable upload size exceeded error')
 def test_post_object_upload_size_limit_exceeded():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1925,6 +1874,7 @@ def test_post_object_upload_size_limit_exceeded():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1941,11 +1891,8 @@ def test_post_object_upload_size_limit_exceeded():
 @attr(assertion='fails with invalid content length error')
 def test_post_object_missing_content_length_argument():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1962,6 +1909,7 @@ def test_post_object_missing_content_length_argument():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -1978,11 +1926,8 @@ def test_post_object_missing_content_length_argument():
 @attr(assertion='fails with invalid JSON error')
 def test_post_object_invalid_content_length_argument():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -1999,6 +1944,7 @@ def test_post_object_invalid_content_length_argument():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@@ -2015,11 +1961,8 @@ def test_post_object_invalid_content_length_argument():
 @attr(assertion='fails with upload size less than minimum allowable error')
 def test_post_object_upload_size_below_minimum():
        bucket = get_new_bucket()
-       conn = s3.main
-       host_name = conn.host
 
-       url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
-       host=host_name,bucket=bucket.name)
+       url = _get_post_url(s3.main, bucket)
 
        utc = pytz.utc
        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
@@ -2036,6 +1979,7 @@ def test_post_object_upload_size_below_minimum():
 
        json_policy_document = json.JSONEncoder().encode(policy_document)
        policy = base64.b64encode(json_policy_document)
+       conn = s3.main
        signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
 
        payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\