("Content-Type" , "text/plain"),('file', ('bar'))])
r = requests.post(url, files = payload)
- eq(r.status_code, 200)
+ eq(r.status_code, 204)
key = bucket.get_key("foo.txt")
got = key.get_contents_as_string()
eq(got, 'bar')
policy = base64.b64encode(json_policy_document)
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
- foo_string = 'foo' * 1024*1024
-
payload = OrderedDict([ ("key" , "${filename}"),("AWSAccessKeyId" , conn.aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('foo.txt', 'bar'))])
eq(r.status_code, 204)
key = bucket.get_key("foo.txt")
got = key.get_contents_as_string()
- eq(got, foo_string)
+ eq(got, 'bar')
@attr(resource='object')
@attr(assertion='succeeds and returns redirect url')
def test_post_object_success_redirect_action():
bucket = get_new_bucket()
+
conn = s3.main
host_name = conn.host
url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
host=host_name,bucket=bucket.name)
+ redirect_url = '{protocol}://{host}/{bucket}'.format(protocol= 'https' if conn.is_secure else 'http',\
+ host=host_name,bucket=bucket.name)
+ bucket.set_acl('public-read')
+
utc = pytz.utc
expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
["starts-with", "$key", "foo"],\
{"acl": "private"},\
["starts-with", "$Content-Type", "text/plain"],\
- ["$eq", "$success_action_redirect", "http://localhost"],\
+ ["eq", "$success_action_redirect", redirect_url],\
["content-length-range", 0, 1024]\
]\
}
payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
- ("Content-Type" , "text/plain"),("success_action_redirect" , "http://localhost"),\
+ ("Content-Type" , "text/plain"),("success_action_redirect" , redirect_url),\
('file', ('bar'))])
r = requests.post(url, files = payload)
url = r.url
key = bucket.get_key("foo.txt")
eq(url,
- 'http://localhost/?bucket={bucket}&key={key}&etag=%22{etag}%22'.format(bucket = bucket.name,
- key = key.name, etag = key.etag.strip('"')))
+ '{rurl}?bucket={bucket}&key={key}&etag=%22{etag}%22'.format(rurl = redirect_url, bucket = bucket.name,
+ key = key.name, etag = key.etag.strip('"')))
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 403)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'The request signature we calculated does not match the\
- signature you provided. Check your key and signing method.')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 403)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'The AWS Access Key Id you provided does not exist in our records.')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text,\
- "Invalid Policy: Invalid 'expiration' value: '{date}'".format(date=str(expires)))
@attr(resource='object')
policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
"conditions": [\
{"bucket": bucket.name},\
- ["starts-with", "$key", "\$foo"],\
{"acl": "private"},\
["starts-with", "$Content-Type", "text/plain"],\
["content-length-range", 0, 1024]\
policy = base64.b64encode(json_policy_document)
signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
- payload = OrderedDict([ ("key" , "\$foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id),\
+ payload = OrderedDict([ ("AWSAccessKeyId" , conn.aws_access_key_id),\
("acl" , "private"),("signature" , signature),("policy" , policy),\
("Content-Type" , "text/plain"),('file', ('bar'))])
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, "Bucket POST must contain a field named 'key'.\
- If it is specified, please check the order of the fields.")
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq("Bucket POST must contain a field named 'Signature'.\
- If it is specified, please check the order of the fields.")
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 403)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid according to Policy: Extra input fields: bucket')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 403)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid according to Policy: Policy Condition failed: ["starts-with", "$x-amz-meta-foo", "bar"]')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, "Invalid Policy: Invalid 'conditions' value: must be a List.")
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, "Invalid Policy: Invalid 'expiration' value: must be a String.")
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 403)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid according to Policy: Policy expired.')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 403)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid according to Policy: Policy Condition failed:\
- ["eq", "$x-amz-meta-foo", ""]')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid Policy: Policy missing expiration.')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid Policy: Policy missing conditions.')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Your proposed upload exceeds the maximum allowed size')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid Policy: Invalid content-length-range: wrong number of arguments.')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Invalid Policy: Invalid JSON.')
@attr(resource='object')
r = requests.post(url, files = payload)
eq(r.status_code, 400)
- message = ET.fromstring(r.content).find('Message')
- message_text = message.text
- eq(message_text, 'Your proposed upload is smaller than the minimum allowed size')
def _setup_request(bucket_acl=None, object_acl=None):