]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
add tests for get/put/delete/post object tagging 159/head
authorAbhishek Lekshmanan <abhishek@suse.com>
Tue, 25 Jul 2017 08:32:05 +0000 (10:32 +0200)
committerAbhishek Lekshmanan <abhishek@suse.com>
Tue, 25 Jul 2017 08:51:29 +0000 (10:51 +0200)
- adds test for put obj with tagging query
- adds test for HEAD obj with x-amz-tagging-count verification
- adds test for put object tagging
- adds test for get object tagging
- adds test for post object tagging
- adds test for delete object tagging
- also verify& exceed the max tags
- add tagging condition to bucket acl

Signed-off-by: Abhishek Lekshmanan <abhishek@suse.com>
s3tests/functional/test_s3.py

index 7abfe7ab668031d5a61bfdc38fc6489c8e2fc389..b8dc47fb320a9de61800e4ee3c4355e7b3ee64ec 100644 (file)
@@ -35,6 +35,7 @@ from urlparse import urlparse
 from nose.tools import eq_ as eq
 from nose.plugins.attrib import attr
 from nose.plugins.skip import SkipTest
+from boto.s3.tagging import TagSet
 
 from .utils import assert_raises
 from .utils import generate_random
@@ -45,6 +46,7 @@ from email.header import decode_header
 from ordereddict import OrderedDict
 
 from boto.s3.cors import CORSConfiguration
+from urllib import quote_plus
 
 from . import (
     nuke_prefixed_buckets,
@@ -101,6 +103,14 @@ def tag(*tags):
         return func
     return wrap
 
+def parse_s3_errorcode(error_xml):
+    """
+    Given an S3 error response return the Error Code message.
+    Useful for apis not fully implemented in boto
+    """
+    return ET.fromstring(error_xml).find('./Code').text
+
+
 @attr(resource='bucket')
 @attr(method='get')
 @attr(operation='list')
@@ -148,6 +158,15 @@ def _get_keys_prefixes(li):
     prefixes = [x for x in li if not isinstance(x, boto.s3.key.Key)]
     return (keys, prefixes)
 
+def _get_alt_connection():
+    return boto.s3.connection.S3Connection(
+        aws_access_key_id=s3['alt'].aws_access_key_id,
+        aws_secret_access_key=s3['alt'].aws_secret_access_key,
+        is_secure=s3['alt'].is_secure,
+        port=s3['alt'].port,
+        host=s3['alt'].host,
+        calling_format=s3['alt'].calling_format,
+    )
 
 @attr(resource='bucket')
 @attr(method='get')
@@ -8768,3 +8787,421 @@ def test_bucket_policy_set_condition_operator_end_with_IfExists():
     res = _make_request('GET', bucket.name, bucket.get_key("foo"),
                         request_headers={'referer': 'http://example.com'})
     eq(res.status, 403)
+
+
+def _tags_from_dict(d):
+    tag_list = []
+    for k,v in d.items():
+        tag_list.append({
+            'Key'  : k,
+            'Value': v if v is not None else ''
+        })
+
+    return tag_list
+
+class S3TestTagSet(TagSet):
+    '''
+    version of TagSet that supports comparision, so that we can compare tagsets
+    '''
+    def to_dict(self):
+        d = dict()
+        for tag in self:
+            d[tag.key] = tag.value
+        return d
+
+    def __eq__(self, other):
+        return self.to_dict() == other.to_dict()
+
+    def __str__(self):
+        s = ''
+        for tag in self:
+            if s:
+                s += '&'
+            s += quote_plus(tag.key)
+            v = tag.value
+            if v is not None and v != '':
+                s += '=' + quote_plus(v)
+        return s
+
+    def to_xml(self):
+        xml = '<Tagging>'
+        xml += super(S3TestTagSet,self).to_xml()
+        xml += '</Tagging>'
+        return xml
+
+def _parse_tagging_xml(tags_xml):
+    # Apparently ETree doesn't understand namespaces well, so let's define it
+    ns = {"aws" : "http://s3.amazonaws.com/doc/2006-03-01/"}
+    tags_list = ET.fromstring(tags_xml).findall('./aws:TagSet/aws:Tag', ns)
+    tagset = S3TestTagSet()
+
+    for it in tags_list:
+        # unfortunately etree returns None when string is empty
+        tagset.add_tag(it.find('aws:Key', ns).text,it.find('aws:Value', ns).text or '')
+    return tagset
+
+def _make_random_string(size):
+    return ''.join(random.choice(string.ascii_letters) for _ in range(size))
+
+def _get_obj_tags_conn(conn, bucket_name, key_name):
+    res = conn.make_request('GET',bucket_name, key_name, query_args='tagging')
+    eq(res.status, 200)
+    return _parse_tagging_xml(res.read())
+
+def _get_obj_tags(bucket, key_name):
+    # our _make_request doesn't sign query args, let's piggy back on boto
+    return _get_obj_tags_conn(bucket.connection, bucket.name, key_name)
+
+def _put_obj_tags_conn(conn, bucket_name, key_name, tag_str):
+    return conn.make_request('PUT',bucket_name, key_name, query_args='tagging', data=tag_str)
+
+def _put_obj_tags(bucket, key_name, tag_str):
+    return _put_obj_tags_conn(bucket.connection, bucket.name, key_name, tag_str)
+
+def _delete_obj_tags(bucket, key_name):
+    return bucket.connection.make_request('DELETE', bucket.name, key_name, query_args='tagging')
+
+def _create_simple_tagset(count):
+    tagset = S3TestTagSet()
+    for i in range(count):
+        tagset.add_tag('key'+str(i),'val'+str(i))
+
+    return tagset
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test Get/PutObjTagging output')
+@attr(assertion='success')
+@attr('tagging')
+def test_get_obj_tagging():
+    bucket, key = _create_key_with_random_content('testputtags')
+    input_tagset = _create_simple_tagset(2)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(input_tagset, res_tagset)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test HEAD obj tagging output')
+@attr(assertion='success')
+@attr('tagging')
+def test_get_obj_head_tagging():
+    bucket, key = _create_key_with_random_content('testputtags')
+    count = 2
+    input_tagset = _create_simple_tagset(count)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+
+    res = _make_request('HEAD',bucket, key, authenticated=True)
+    eq(res.status, 200)
+    eq(int(res.getheader('x-amz-tagging-count')), count)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test Put max allowed tags')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_max_tags():
+    bucket, key = _create_key_with_random_content('testputmaxtags')
+    input_tagset = _create_simple_tagset(10)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(input_tagset, res_tagset)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test Put max allowed tags')
+@attr(assertion='fails')
+@attr('tagging')
+def test_put_excess_tags():
+    bucket, key = _create_key_with_random_content('testputexcesstags')
+    input_tagset = _create_simple_tagset(11)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 400)
+    eq(parse_s3_errorcode(res.read()), 'InvalidTag')
+
+    # Now assert that no tags have been put
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(len(res_tagset), 0)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test Put max allowed k-v size')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_max_kvsize_tags():
+    bucket, key = _create_key_with_random_content('testputmaxkeysize')
+    input_tagset = S3TestTagSet()
+    for i in range(10):
+        k = _make_random_string(128)
+        v = _make_random_string(256)
+        input_tagset.add_tag(k, v)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(input_tagset, res_tagset)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test exceed key size')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_excess_key_tags():
+    bucket, key = _create_key_with_random_content('testputexcesskeytags')
+    input_tagset = S3TestTagSet()
+    for i in range(10):
+        k = _make_random_string(129)
+        v = _make_random_string(256)
+        input_tagset.add_tag(k, v)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 400)
+    eq(parse_s3_errorcode(res.read()), 'InvalidTag')
+
+    # Now assert that no tags have been put
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(len(res_tagset), 0)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test exceed val size')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_excess_val_tags():
+    bucket, key = _create_key_with_random_content('testputexcessvaltags')
+    input_tagset = S3TestTagSet()
+    for i in range(10):
+        k = _make_random_string(128)
+        v = _make_random_string(257)
+        input_tagset.add_tag(k, v)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 400)
+    eq(parse_s3_errorcode(res.read()), 'InvalidTag')
+
+    # Now assert that no tags have been put
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(len(res_tagset), 0)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test PUT modifies existing tags')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_modify_tags():
+    bucket, key = _create_key_with_random_content('testputmodifytags')
+    input_tagset = S3TestTagSet()
+    input_tagset.add_tag('key','val')
+    input_tagset.add_tag('key2','val2')
+
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(input_tagset, res_tagset)
+
+    input2_tagset = S3TestTagSet()
+    input2_tagset.add_tag('key3','val3')
+
+    res = _put_obj_tags(bucket, key.name, input2_tagset.to_xml())
+    eq(res.status, 200)
+    res2_tagset = _get_obj_tags(bucket, key.name)
+    eq(input2_tagset, res2_tagset)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test Delete tags')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_delete_tags():
+    bucket, key = _create_key_with_random_content('testputmodifytags')
+    input_tagset = _create_simple_tagset(2)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(input_tagset, res_tagset)
+
+    input2_tagset = S3TestTagSet()
+    input2_tagset.add_tag('key3','val3')
+
+    res = _delete_obj_tags(bucket, key.name)
+    eq(res.status, 204)
+
+    # TODO do a test to verify that we've *only* removed the xattr relating to
+    # tagging
+    res2_tagset = _get_obj_tags(bucket, key.name)
+    eq(len(res2_tagset), 0)
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='anonymous browser based upload via POST request')
+@attr('tagging')
+@attr(assertion='succeeds and returns written data')
+def test_post_object_tags_anonymous_request():
+    bucket = get_new_bucket()
+    url = _get_post_url(s3.main, bucket)
+    bucket.set_acl('public-read-write')
+    input_tagset = _create_simple_tagset(2)
+    key_name = "foo.txt"
+    payload = OrderedDict([
+        ("key" , key_name),
+        ("acl" , "public-read"),
+        ("Content-Type" , "text/plain"),
+        ("tagging", input_tagset.to_xml()),
+        ('file', ('bar')),
+    ])
+
+    r = requests.post(url, files = payload)
+    eq(r.status_code, 204)
+    key = bucket.get_key("foo.txt")
+    got = key.get_contents_as_string()
+    eq(got, 'bar')
+
+    res_tagset = _get_obj_tags(bucket, key_name)
+    eq(input_tagset, res_tagset)
+
+@attr(resource='object')
+@attr(method='post')
+@attr(operation='authenticated browser based upload via POST request')
+@attr('tagging')
+@attr(assertion='succeeds and returns written data')
+def test_post_object_tags_authenticated_request():
+    bucket = get_new_bucket()
+
+    url = _get_post_url(s3.main, bucket)
+
+    utc = pytz.utc
+    expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)
+
+    policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),\
+    "conditions": [
+        {"bucket": bucket.name},
+        ["starts-with", "$key", "foo"],
+        {"acl": "private"},
+        ["starts-with", "$Content-Type", "text/plain"],
+        ["content-length-range", 0, 1024],
+        ["starts-with", "$tagging", ""]
+    ]}
+    input_tagset = _create_simple_tagset(2)
+
+    json_policy_document = json.JSONEncoder().encode(policy_document)
+    policy = base64.b64encode(json_policy_document)
+    conn = s3.main
+    signature = base64.b64encode(hmac.new(conn.aws_secret_access_key, policy, sha).digest())
+
+    payload = OrderedDict([
+        ("key" , "foo.txt"),
+        ("AWSAccessKeyId" , conn.aws_access_key_id),
+        ("acl" , "private"),("signature" , signature),("policy" , policy),
+        ('tagging',input_tagset.to_xml()),
+        ("Content-Type" , "text/plain"),
+        ('file', ('bar'))])
+
+    r = requests.post(url, files = payload)
+    eq(r.status_code, 204)
+    key = bucket.get_key("foo.txt")
+    got = key.get_contents_as_string()
+    eq(got, 'bar')
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='Test PutObj with tagging headers')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_obj_with_tags():
+    input_tagset = S3TestTagSet()
+    input_tagset.add_tag('foo','bar')
+    input_tagset.add_tag('bar', '')
+    put_obj_tag_headers = {
+        'x-amz-tagging' : str(input_tagset)
+    }
+    bucket = get_new_bucket()
+    key = bucket.new_key('testtagobj1')
+    data = 'A'*100
+    key.set_contents_from_string(data, headers=put_obj_tag_headers)
+    result = key.get_contents_as_string()
+    eq(data,result)
+
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(input_tagset.to_dict(), res_tagset.to_dict())
+
+def _make_arn_resource(path="*"):
+    return "arn:aws:s3:::{}".format(path)
+
+def make_json_policy(action, resource, principal={"AWS": "*"}):
+    return json.dumps(
+    {
+        "Version": "2012-10-17",
+        "Statement": [{
+        "Effect": "Allow",
+        "Principal": principal,
+        "Action": action,
+        "Resource": [
+            resource
+          ]
+        }]
+    })
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test GetObjTagging public read')
+@attr(assertion='success')
+@attr('tagging')
+def test_get_tags_acl_public():
+    bucket, key = _create_key_with_random_content('testputtagsacl')
+
+    resource = _make_arn_resource("{}/{}".format(bucket.name, key.name))
+    policy_document = make_json_policy("s3:GetObjectTagging",
+                                       resource)
+
+    bucket.set_policy(policy_document)
+    input_tagset = _create_simple_tagset(10)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+    new_conn = _get_alt_connection()
+    res_tagset = _get_obj_tags_conn(new_conn, bucket.name, key.name)
+    eq(input_tagset, res_tagset)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test PutObjTagging public wrote')
+@attr(assertion='success')
+@attr('tagging')
+def test_put_tags_acl_public():
+    bucket, key = _create_key_with_random_content('testputtagsacl')
+
+    resource = _make_arn_resource("{}/{}".format(bucket.name, key.name))
+    #principal = {"AWS": "s3test2"} This needs a tenanted user?
+    policy_document = make_json_policy("s3:PutObjectTagging",
+                                       resource)
+    bucket.set_policy(policy_document)
+    new_conn = _get_alt_connection()
+    input_tagset = _create_simple_tagset(10)
+    res = _put_obj_tags_conn(new_conn, bucket.name, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+    res_tagset = _get_obj_tags(bucket, key.name)
+    eq(input_tagset, res_tagset)
+
+@attr(resource='object')
+@attr(method='get')
+@attr(operation='Test DeleteObjTagging public')
+@attr(assertion='success')
+@attr('tagging')
+def test_delete_tags_obj_public():
+    bucket, key = _create_key_with_random_content('testputtagsacl')
+
+    resource = _make_arn_resource("{}/{}".format(bucket.name, key.name))
+    policy_document = make_json_policy("s3:DeleteObjectTagging",
+                                       resource)
+
+    bucket.set_policy(policy_document)
+    input_tagset = _create_simple_tagset(10)
+    res = _put_obj_tags(bucket, key.name, input_tagset.to_xml())
+    eq(res.status, 200)
+    new_conn = _get_alt_connection()
+    res = new_conn.make_request("DELETE",bucket.name, key.name, query_args='tagging')
+    eq(res.status, 204)
+    tags = _get_obj_tags(bucket, key.name)
+    eq(len(tags),0)
+    #eq(input_tagset, res_tagset)