return bucket
-def _make_request(method, bucket, key, body=None, authenticated=False, response_headers=None):
+def _make_request(method, bucket, key, body=None, authenticated=False, response_headers=None, expires_in=100000):
"""
issue a request for a specified method, on a specified <bucket,key>,
with a specified (optional) body (encrypted per the connection), and
return the response (status, reason)
"""
if authenticated:
- url = key.generate_url(100000, method=method, response_headers=response_headers)
+ url = key.generate_url(expires_in, method=method, response_headers=response_headers)
o = urlparse(url)
path = o.path + '?' + o.query
else:
print res.status, res.reason
return res
-def _make_bucket_request(method, bucket, body=None, authenticated=False):
+def _make_bucket_request(method, bucket, body=None, authenticated=False, expires_in=100000):
"""
issue a request for a specified method, on a specified <bucket,key>,
with a specified (optional) body (encrypted per the connection), and
return the response (status, reason)
"""
if authenticated:
- url = bucket.generate_url(100000, method=method)
+ url = bucket.generate_url(expires_in, method=method)
o = urlparse(url)
path = o.path + '?' + o.query
else:
eq(res.reason, 'OK')
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='authenticated, no object acls')
+@attr(assertion='succeeds')
+def test_object_raw_put_authenticated_expired():
+ bucket = get_new_bucket()
+ key = bucket.new_key('foo')
+
+ res = _make_request('PUT', bucket, key, body='foo', authenticated=True, expires_in=-1000)
+ eq(res.status, 403)
+ eq(res.reason, 'Forbidden')
+
+
def check_bad_bucket_name(name):
"""
Attempt to create a bucket with a specified name, and confirm