import socket
import ssl
import os
+import requests
+import base64
+import hmac
+import sha
+import pytz
+import json
+
+import xml.etree.ElementTree as ET
from httplib import HTTPConnection, HTTPSConnection
from urlparse import urlparse
import AnonymousAuth
from email.header import decode_header
+from ordereddict import OrderedDict
+
from . import (
nuke_prefixed_buckets,
@attr(resource='object')
@attr(method='put')
@attr(operation='data write from file (w/100-Continue)')
-@attr(assertion='returns written data')
+@attr(assertion='succeeds and returns written data')
def test_object_write_file():
# boto Key.set_contents_from_file / .send_file uses Expect:
# 100-Continue, so this test exercises that (though a bit too
eq(got, 'bar')
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='anonymous browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+def test_post_object_anonymous_request():
+ bucket = get_new_bucket()
+ bucket.set_acl('public-read-write')
+ conn = s3.main
+ host_name = conn.host
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 200)
+ key = bucket.get_key("foo.txt")
+ got = key.get_contents_as_string()
+ eq(got, 'bar')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+def test_post_object_authenticated_request():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+ key = bucket.get_key("foo.txt")
+ got = key.get_contents_as_string()
+ eq(got, 'bar')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='anonymous browser based upload via POST request')
+@attr(assertion='succeeds with status 201')
+def test_post_object_set_success_code():
+ bucket = get_new_bucket()
+ bucket.set_acl('public-read-write')
+ conn = s3.main
+ host_name = conn.host
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
+ ("success_action_status" , "201"),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 201)
+ message = ET.fromstring(r.content).find('Key')
+ eq(message.text,'foo.txt')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='anonymous browser based upload via POST request')
+@attr(assertion='succeeds with status 204')
+def test_post_object_set_invalid_success_code():
+ bucket = get_new_bucket()
+ bucket.set_acl('public-read-write')
+ conn = s3.main
+ host_name = conn.host
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
+ ("success_action_status" , "404"),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+ eq(r.content,'')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+def test_post_object_upload_larger_than_chunk():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 5*1024*1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ foo_string = 'foo' * 1024*1024
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', foo_string)])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+ key = bucket.get_key("foo.txt")
+ got = key.get_contents_as_string()
+ eq(got, foo_string)
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+def test_post_object_set_key_from_filename():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 5*1024*1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ foo_string = 'foo' * 1024*1024
+
+ payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('foo.txt', 'bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+ key = bucket.get_key("foo.txt")
+ got = key.get_contents_as_string()
+ eq(got, foo_string)
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds with status 204')
+def test_post_object_ignored_header():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),("x-ignore-foo" , "bar"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds with status 204')
+def test_post_object_case_insensitive_condition_fields():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',host=host_name,\
+ bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bUcKeT": bucket.name},\
+ ["StArTs-WiTh", "$KeY", "foo"],\
+ {"AcL": "private"},\
+ ["StArTs-WiTh", "$CoNtEnT-TyPe", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("kEy" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("aCl" , "private"),("signature" , signature),("pOLICy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds with escaped leading $ and returns written data')
+def test_post_object_escaped_field_values():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "\$foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+ key = bucket.get_key("\$foo.txt")
+ got = key.get_contents_as_string()
+ eq(got, 'bar')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds and returns redirect url')
+def test_post_object_success_redirect_action():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["$eq", "$success_action_redirect", "http://localhost"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),("success_action_redirect" , "http://localhost"),\
+ ('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 200)
+ url = r.url
+ key = bucket.get_key("foo.txt")
+ eq(url,
+ 'http://localhost/?bucket={bucket}&key={key}&etag=%22{etag}%22'.format(bucket = bucket.name,
+ key = key.name, etag = key.etag.strip('"')))
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with invalid signature error')
+def test_post_object_invalid_signature():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "\$foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())[::-1]
+
+ payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 403)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'The request signature we calculated does not match the\
+ signature you provided. Check your key and signing method.')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with access key does not exist error')
+def test_post_object_invalid_access_key():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "\$foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id[::-1]),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 403)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'The AWS Access Key Id you provided does not exist in our records.')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with invalid expiration error')
+def test_post_object_invalid_date_format():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": str(expires),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "\$foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text,\
+ "Invalid Policy: Invalid 'expiration' value: '{date}'".format(date=str(expires)))
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with missing key error')
+def test_post_object_no_key_specified():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "\$foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, "Bucket POST must contain a field named 'key'.\
+ If it is specified, please check the order of the fields.")
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with missing signature error')
+def test_post_object_missing_signature():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "\$foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq("Bucket POST must contain a field named 'Signature'.\
+ If it is specified, please check the order of the fields.")
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with extra input fields policy error')
+def test_post_object_missing_policy_condition():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 403)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid according to Policy: Extra input fields: bucket')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds using starts-with restriction on metadata header')
+def test_post_object_user_specified_header():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024],\
+ ["starts-with", "$x-amz-meta-foo", "bar"]
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('x-amz-meta-foo' , 'barclamp'),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 204)
+ key = bucket.get_key("foo.txt")
+ eq(key.get_metadata('foo'), 'barclamp')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with policy condition failed error due to missing field in POST request')
+def test_post_object_request_missing_policy_specified_field():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024],\
+ ["starts-with", "$x-amz-meta-foo", "bar"]
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 403)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid according to Policy: Policy Condition failed: ["starts-with", "$x-amz-meta-foo", "bar"]')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with conditions must be list error')
+def test_post_object_condition_is_case_sensitive():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "CONDITIONS": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024],\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, "Invalid Policy: Invalid 'conditions' value: must be a List.")
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with expiration must be string error')
+def test_post_object_expires_is_case_sensitive():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"EXPIRATION": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024],\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, "Invalid Policy: Invalid 'expiration' value: must be a String.")
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with policy expired error')
+def test_post_object_expired_policy():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=-6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024],\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 403)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid according to Policy: Policy expired.')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails using equality restriction on metadata header')
+def test_post_object_invalid_request_field_value():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024],\
+ ["eq", "$x-amz-meta-foo", ""]
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('x-amz-meta-foo' , 'barclamp'),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 403)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid according to Policy: Policy Condition failed:\
+ ["eq", "$x-amz-meta-foo", ""]')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with policy missing expiration error')
+def test_post_object_missing_expires_condition():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024],\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid Policy: Policy missing expiration.')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with policy missing conditions error')
+def test_post_object_missing_conditions_list():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid Policy: Policy missing conditions.')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with allowable upload size exceeded error')
+def test_post_object_upload_size_limit_exceeded():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 0]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Your proposed upload exceeds the maximum allowed size')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with invalid content length error')
+def test_post_object_missing_content_length_argument():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid Policy: Invalid content-length-range: wrong number of arguments.')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with invalid JSON error')
+def test_post_object_invalid_content_length_argument():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", -1, 0]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Invalid Policy: Invalid JSON.')
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='fails with upload size less than minimum allowable error')
+def test_post_object_upload_size_below_minimum():
+ bucket = get_new_bucket()
+ conn = s3.main
+ host_name = conn.host
+
+ url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket.name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 512, 1000]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ policy = base64.b64encode(json_policy_document)
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+ payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ r = requests.post(url, files = payload)
+ eq(r.status_code, 400)
+ message = ET.fromstring(r.content).find('Message')
+ message_text = message.text
+ eq(message_text, 'Your proposed upload is smaller than the minimum allowed size')
+
+
def _setup_request(bucket_acl=None, object_acl=None):
"""
add a foo key, and specified key and bucket acls to