eq(got, 'bar')
+def _get_post_url(conn, bucket):
+
+ url = '{protocol}://{host}:{port}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=conn.host, port=conn.port, bucket=bucket.name)
+ return url
+
@attr(resource='object')
@attr(method='post')
@attr(operation='anonymous browser based upload via POST request')
@attr(assertion='succeeds and returns written data')
def test_post_object_anonymous_request():
bucket = get_new_bucket()
+ url = _get_post_url(s3.main, bucket)
bucket.set_acl('public-read-write')
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
("Content-Type" , "text/plain"),('file', ('bar'))])
@attr(assertion='succeeds and returns written data')
def test_post_object_authenticated_request():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
def test_post_object_set_success_code():
bucket = get_new_bucket()
bucket.set_acl('public-read-write')
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
("success_action_status" , "201"),\
def test_post_object_set_invalid_success_code():
bucket = get_new_bucket()
bucket.set_acl('public-read-write')
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
("success_action_status" , "404"),\
@attr(assertion='succeeds and returns written data')
def test_post_object_upload_larger_than_chunk():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
foo_string = 'foo' * 1024*1024
@attr(assertion='succeeds and returns written data')
def test_post_object_set_key_from_filename():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='succeeds with status 204')
def test_post_object_ignored_header():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='succeeds with status 204')
def test_post_object_case_insensitive_condition_fields():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',host=host_name,\
- bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("kEy" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='succeeds with escaped leading $ and returns written data')
def test_post_object_escaped_field_values():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
def test_post_object_success_redirect_action():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
-
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
-
- redirect_url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
- bucket.set_acl('public-read')
+ url = _get_post_url(s3.main, bucket)
+ redirect_url = _get_post_url(s3.main, bucket)
+ bucket.set_acl('public-read')
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with invalid signature error')
def test_post_object_invalid_signature():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())[::-1]
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with access key does not exist error')
def test_post_object_invalid_access_key():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id[::-1]),\
@attr(assertion='fails with invalid expiration error')
def test_post_object_invalid_date_format():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with missing key error')
def test_post_object_no_key_specified():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with missing signature error')
def test_post_object_missing_signature():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with extra input fields policy error')
def test_post_object_missing_policy_condition():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='succeeds using starts-with restriction on metadata header')
def test_post_object_user_specified_header():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with policy condition failed error due to missing field in POST request')
def test_post_object_request_missing_policy_specified_field():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with conditions must be list error')
def test_post_object_condition_is_case_sensitive():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with expiration must be string error')
def test_post_object_expires_is_case_sensitive():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with policy expired error')
def test_post_object_expired_policy():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=-6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails using equality restriction on metadata header')
def test_post_object_invalid_request_field_value():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with policy missing expiration error')
def test_post_object_missing_expires_condition():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with policy missing conditions error')
def test_post_object_missing_conditions_list():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with allowable upload size exceeded error')
def test_post_object_upload_size_limit_exceeded():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with invalid content length error')
def test_post_object_missing_content_length_argument():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with invalid JSON error')
def test_post_object_invalid_content_length_argument():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
@attr(assertion='fails with upload size less than minimum allowable error')
def test_post_object_upload_size_below_minimum():
bucket = get_new_bucket()
- conn = s3.main
- host_name = conn.host
- url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
- host=host_name,bucket=bucket.name)
+ url = _get_post_url(s3.main, bucket)
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
json_policy_document = json.JSONEncoder().encode(policy_document)
policy = base64.b64encode(json_policy_document)
+ conn = s3.main
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\