if (x == size):
return
-def _multipart_upload(bucket_name, key, size, part_size=5*1024*1024, client=None, content_type=None, metadata=None, resend_parts=[]):
+def _multipart_upload(bucket_name, key, size, part_size=5*1024*1024, client=None, content_type=None, metadata=None, resend_parts=[], tagging=None):
"""
generate a multi-part upload for a random file of specifed size,
if requested, generate a list of the parts
if content_type == None and metadata == None:
+ if tagging is not None:
+ response = client.create_multipart_upload(Bucket=bucket_name, Key=key, Tagging=tagging)
+ else:
response = client.create_multipart_upload(Bucket=bucket_name, Key=key)
else:
response = client.create_multipart_upload(Bucket=bucket_name, Key=key, Metadata=metadata, ContentType=content_type)
def _make_random_string(size):
return ''.join(random.choice(string.ascii_letters) for _ in range(size))
+@pytest.mark.tagging
+def test_set_multipart_tagging():
+ bucket_name = get_new_bucket()
+ client = get_client()
+ tags='Hello=World&foo=bar'
+ key = "mymultipart"
+ objlen = 1
+
+ (upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen, tagging=tags)
+ response = client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ response = client.get_object_tagging(Bucket=bucket_name, Key=key)
+ assert len(response['TagSet']) == 2
+ assert response['TagSet'][0]['Key'] == 'Hello'
+ assert response['TagSet'][0]['Value'] == 'World'
+ assert response['TagSet'][1]['Key'] == 'foo'
+ assert response['TagSet'][1]['Value'] == 'bar'
+
+ response = client.delete_object_tagging(Bucket=bucket_name, Key=key)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 204
+
+ response = client.get_object_tagging(Bucket=bucket_name, Key=key)
+ assert len(response['TagSet']) == 0
@pytest.mark.tagging
@pytest.mark.fails_on_dbstore