r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 403
+def test_post_object_wrong_bucket():
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket_name},\
+ ["starts-with", "$key", "foo"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", 0, 1024]\
+ ]\
+ }
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
+ policy = base64.b64encode(bytes_json_policy_document)
+ aws_secret_access_key = get_main_aws_secret_key()
+ aws_access_key_id = get_main_aws_access_key()
+
+ signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
+
+ payload = OrderedDict([ ("key" , "${filename}"),('bucket', bucket_name),\
+ ("AWSAccessKeyId" , aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),('file', ('foo.txt', 'bar'))])
+
+ bad_bucket_name = get_new_bucket()
+ url = _get_post_url(bad_bucket_name)
+
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
+ assert r.status_code == 403
+
def test_post_object_invalid_request_field_value():
bucket_name = get_new_bucket()
client = get_client()