]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
Additional tests for server side encryption, S3 SSE-KMS mode.
authorAdam Kupczyk <akupczyk@mirantis.com>
Mon, 11 Jul 2016 13:43:52 +0000 (15:43 +0200)
committerCasey Bodley <cbodley@redhat.com>
Fri, 10 Mar 2017 20:15:26 +0000 (15:15 -0500)
All tests belong to group 'encryption'.

Signed-off-by: Adam Kupczyk <akupczyk@mirantis.com>
s3tests/functional/test_s3.py

index 5c0c272e44e624880914f5aba24cf9c4650dc888..3f3f5f595356420255961186477ba70252bf0f17 100644 (file)
@@ -7867,3 +7867,315 @@ def test_encryption_sse_c_multipart_bad_download():
     eq(k.content_type, content_type)
     e = assert_raises(boto.exception.S3ResponseError,
                       k.get_contents_as_string, headers=get_headers)
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+@attr('encryption')
+def test_encryption_sse_c_post_object_authenticated_request():
+    bucket = get_new_bucket()
+
+    url = _get_post_url(s3.main, bucket)
+
+    utc = pytz.utc
+    expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+    policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"), \
+                       "conditions": [ \
+                           {"bucket": bucket.name}, \
+                           ["starts-with", "$key", "foo"], \
+                           {"acl": "private"}, \
+                           ["starts-with", "$Content-Type", "text/plain"], \
+                           ["starts-with", "$x-amz-server-side-encryption-customer-algorithm", ""], \
+                           ["starts-with", "$x-amz-server-side-encryption-customer-key", ""], \
+                           ["starts-with", "$x-amz-server-side-encryption-customer-key-md5", ""], \
+                           ["content-length-range", 0, 1024] \
+                           ] \
+                       }
+
+    json_policy_document = json.JSONEncoder().encode(policy_document)
+    policy = base64.b64encode(json_policy_document)
+    conn = s3.main
+    signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+    payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id), \
+                            ("acl" , "private"),("signature" , signature),("policy" , policy), \
+                            ("Content-Type" , "text/plain"), \
+                            ('x-amz-server-side-encryption-customer-algorithm', 'AES256'), \
+                            ('x-amz-server-side-encryption-customer-key', 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs='), \
+                            ('x-amz-server-side-encryption-customer-key-md5', 'DWygnHRtgiJ77HCm+1rvHw=='), \
+                            ('file', ('bar'),), ])
+
+    r = requests.post(url, files = payload)
+    eq(r.status_code, 204)
+    get_headers = {
+        'x-amz-server-side-encryption-customer-algorithm': 'AES256',
+        'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+        'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
+    }
+
+    key = bucket.get_key("foo.txt")
+    got = key.get_contents_as_string(headers=get_headers)
+    eq(got, 'bar')
+
+
+def _test_sse_kms_customer_write(file_size):
+    """
+    Tests Create a file of A's, use it to set_contents_from_file.
+    Create a file of B's, use it to re-set_contents_from_file.
+    Re-read the contents, and confirm we get B's
+    """
+    bucket = get_new_bucket()
+    sse_kms_client_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
+    }
+    key = bucket.new_key('testobj')
+    data = 'A'*file_size
+    key.set_contents_from_string(data, headers=sse_kms_client_headers)
+    rdata = key.get_contents_as_string(headers=sse_kms_client_headers)
+    eq(data, rdata)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test SSE-KMS encrypted transfer 1 byte')
+@attr(assertion='success')
+@attr('encryption')
+def test_sse_kms_transfer_1b():
+    _test_sse_kms_customer_write(1)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test SSE-KMS encrypted transfer 1KB')
+@attr(assertion='success')
+@attr('encryption')
+def test_sse_kms_transfer_1kb():
+    _test_sse_kms_customer_write(1024)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test SSE-KMS encrypted transfer 1MB')
+@attr(assertion='success')
+@attr('encryption')
+def test_sse_kms_transfer_1MB():
+    _test_sse_kms_customer_write(1024*1024)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test SSE-KMS encrypted transfer 13 bytes')
+@attr(assertion='success')
+@attr('encryption')
+def test_sse_kms_transfer_13b():
+    _test_sse_kms_customer_write(13)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write encrypted with SSE-KMS and read without SSE-KMS')
+@attr(assertion='operation success')
+@attr('encryption')
+def test_sse_kms_present():
+    bucket = get_new_bucket()
+    sse_kms_client_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
+    }
+    key = bucket.new_key('testobj')
+    data = 'A'*100
+    key.set_contents_from_string(data, headers=sse_kms_client_headers)
+    result = key.get_contents_as_string()
+    eq(data, result)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='write encrypted with SSE-KMS but read with other key')
+@attr(assertion='operation fails')
+@attr('encryption')
+def test_sse_kms_other_key():
+    bucket = get_new_bucket()
+    sse_kms_client_headers_A = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1'
+    }
+    sse_kms_client_headers_B = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
+    }
+    key = bucket.new_key('testobj')
+    data = 'A'*100
+    key.set_contents_from_string(data, headers=sse_kms_client_headers_A)
+    result = key.get_contents_as_string(headers=sse_kms_client_headers_B)
+    eq(data, result)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='declare SSE-KMS but do not provide key_id')
+@attr(assertion='operation fails')
+@attr('encryption')
+def test_sse_kms_no_key():
+    bucket = get_new_bucket()
+    sse_kms_client_headers = {
+        'x-amz-server-side-encryption': 'aws:kms'
+    }
+    key = bucket.new_key('testobj')
+    data = 'A'*100
+    e = assert_raises(boto.exception.S3ResponseError,
+                      key.set_contents_from_string, data, headers=sse_kms_client_headers)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Do not declare SSE-KMS but provide key_id')
+@attr(assertion='operation successfull, no encryption')
+@attr('encryption')
+def test_sse_kms_not_declared():
+    bucket = get_new_bucket()
+    sse_kms_client_headers = {
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
+    }
+    key = bucket.new_key('testobj')
+    data = 'A'*100
+    key.set_contents_from_string(data, headers=sse_kms_client_headers)
+    rdata = key.get_contents_as_string()
+    eq(data, rdata)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='complete KMS multi-part upload')
+@attr(assertion='successful')
+@attr('encryption')
+def test_sse_kms_multipart_upload():
+    bucket = get_new_bucket()
+    key = "multipart_enc"
+    content_type = 'text/plain'
+    objlen = 30 * 1024 * 1024
+    enc_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2',
+        'Content-Type': content_type
+    }
+    (upload, data) = _multipart_upload_enc(bucket, key, objlen,
+                                           init_headers=enc_headers, part_headers=enc_headers,
+                                           metadata={'foo': 'bar'})
+    upload.complete_upload()
+    result = _head_bucket(bucket)
+
+    eq(result.get('x-rgw-object-count', 1), 1)
+    eq(result.get('x-rgw-bytes-used', 30 * 1024 * 1024), 30 * 1024 * 1024)
+
+    k = bucket.get_key(key)
+    eq(k.metadata['foo'], 'bar')
+    eq(k.content_type, content_type)
+    test_string = k.get_contents_as_string(headers=enc_headers)
+    eq(len(test_string), k.size)
+    eq(data, test_string)
+    eq(test_string, data)
+
+    _check_content_using_range_enc(k, data, 1000000, enc_headers=enc_headers)
+    _check_content_using_range_enc(k, data, 10000000, enc_headers=enc_headers)
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='multipart KMS upload with bad key_id for uploading chunks')
+@attr(assertion='successful')
+@attr('encryption')
+def test_sse_kms_multipart_invalid_chunks_1():
+    bucket = get_new_bucket()
+    key = "multipart_enc"
+    content_type = 'text/bla'
+    objlen = 30 * 1024 * 1024
+    init_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
+        'Content-Type': content_type
+    }
+    part_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-2'
+    }
+    _multipart_upload_enc(bucket, key, objlen,
+                            init_headers=init_headers, part_headers=part_headers,
+                            metadata={'foo': 'bar'})
+
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='multipart KMS upload with unexistent key_id for chunks')
+@attr(assertion='successful')
+@attr('encryption')
+def test_sse_kms_multipart_invalid_chunks_2():
+    bucket = get_new_bucket()
+    key = "multipart_enc"
+    content_type = 'text/plain'
+    objlen = 30 * 1024 * 1024
+    init_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
+        'Content-Type': content_type
+    }
+    part_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-not-present'
+    }
+    _multipart_upload_enc(bucket, key, objlen,
+                            init_headers=init_headers, part_headers=part_headers,
+                            metadata={'foo': 'bar'})
+
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated KMS browser based upload via POST request')
+@attr(assertion='succeeds and returns written data')
+@attr('encryption')
+def test_sse_kms_post_object_authenticated_request():
+    bucket = get_new_bucket()
+
+    url = _get_post_url(s3.main, bucket)
+
+    utc = pytz.utc
+    expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+    policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"), \
+                       "conditions": [ \
+                           {"bucket": bucket.name}, \
+                           ["starts-with", "$key", "foo"], \
+                           {"acl": "private"}, \
+                           ["starts-with", "$Content-Type", "text/plain"], \
+                           ["starts-with", "$x-amz-server-side-encryption", ""], \
+                           ["starts-with", "$x-amz-server-side-encryption-aws-kms-key-id", ""], \
+                           ["content-length-range", 0, 1024] \
+                           ] \
+                       }
+
+    json_policy_document = json.JSONEncoder().encode(policy_document)
+    policy = base64.b64encode(json_policy_document)
+    conn = s3.main
+    signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+    payload = OrderedDict([ ("key" , "foo.txt"),("AWSAccessKeyId" , conn.aws_access_key_id), \
+                            ("acl" , "private"),("signature" , signature),("policy" , policy), \
+                            ("Content-Type" , "text/plain"), \
+                            ('x-amz-server-side-encryption', 'aws:kms'), \
+                            ('x-amz-server-side-encryption-aws-kms-key-id', 'testkey-1'), \
+                            ('file', ('bar'),), ])
+
+    r = requests.post(url, files = payload)
+    eq(r.status_code, 204)
+    get_headers = {
+        'x-amz-server-side-encryption': 'aws:kms',
+        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
+    }
+
+    key = bucket.get_key("foo.txt")
+    got = key.get_contents_as_string(headers=get_headers)
+    eq(got, 'bar')