assert 'ChecksumSHA256' not in response
response = client.head_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')
assert composite_sha256sum == response['ChecksumSHA256']
+
+def test_post_object_upload_checksum():
+ megabytes = 1024 * 1024
+ min_size = 0
+ max_size = 5 * megabytes
+ test_payload_size = 2 * megabytes
+
+ bucket_name = get_new_bucket()
+ client = get_client()
+
+ url = _get_post_url(bucket_name)
+ utc = pytz.utc
+ expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+ policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+ "conditions": [\
+ {"bucket": bucket_name},\
+ ["starts-with", "$key", "foo_cksum_test"],\
+ {"acl": "private"},\
+ ["starts-with", "$Content-Type", "text/plain"],\
+ ["content-length-range", min_size, max_size],\
+ ]\
+ }
+
+ test_payload = b'x' * test_payload_size
+
+ json_policy_document = json.JSONEncoder().encode(policy_document)
+ bytes_json_policy_document = bytes(json_policy_document, 'utf-8')
+ policy = base64.b64encode(bytes_json_policy_document)
+ aws_secret_access_key = get_main_aws_secret_key()
+ aws_access_key_id = get_main_aws_access_key()
+
+ signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())
+
+ # good checksum payload (checked via upload from awscli)
+ payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),\
+ ('x-amz-checksum-sha256', 'aTL9MeXa9HObn6eP93eygxsJlcwdCwCTysgGAZAgE7w='),\
+ ('file', (test_payload)),])
+
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
+ assert r.status_code == 204
+
+ # bad checksum payload
+ payload = OrderedDict([ ("key" , "foo_cksum_test.txt"),("AWSAccessKeyId" , aws_access_key_id),\
+ ("acl" , "private"),("signature" , signature),("policy" , policy),\
+ ("Content-Type" , "text/plain"),\
+ ('x-amz-checksum-sha256', 'sailorjerry'),\
+ ('file', (test_payload)),])
+
+ r = requests.post(url, files=payload, verify=get_config_ssl_verify())
+ assert r.status_code == 400