del os.environ['RABBITMQ_CONFIG_FILE']
+@attr('amqp_test')
+def test_http_post_object_upload():
+ """ test that uploads object using HTTP POST """
+
+ import boto3
+ from collections import OrderedDict
+ import requests
+
+ hostname = get_ip()
+ zonegroup = 'default'
+ conn = connection()
+
+ endpoint = "http://%s:%d" % (get_config_host(), get_config_port())
+
+ conn1 = boto3.client(service_name='s3',
+ aws_access_key_id=get_access_key(),
+ aws_secret_access_key=get_secret_key(),
+ endpoint_url=endpoint,
+ )
+
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ key_name = 'foo.txt'
+
+ resp = conn1.generate_presigned_post(Bucket=bucket_name, Key=key_name,)
+
+ url = resp['url']
+
+ bucket = conn1.create_bucket(ACL='public-read-write', Bucket=bucket_name)
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
+ task1.start()
+
+ # create s3 topics
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+
+ # create s3 notifications
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:Post']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ # POST upload
+ r = requests.post(url, files=payload, verify=True)
+ assert_equal(r.status_code, 204)
+
+ # check amqp receiver
+ events = receiver1.get_and_reset_events()
+ assert_equal(len(events), 1)
+
+ # cleanup
+ stop_amqp_receiver(receiver1, task1)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ conn1.delete_object(Bucket=bucket_name, Key=key_name)
+ # delete the bucket
+ conn1.delete_bucket(Bucket=bucket_name)
+
+
@attr('amqp_test')
def test_ps_s3_multipart_on_master():
""" test multipart object upload on master"""
time.sleep(5)
# check amqp receiver
events = receiver.get_and_reset_events()
- assert_equal(len(events), 3) # PUT, COPY, Multipart start, Multipart End
+ assert_equal(len(events), 3) # PUT, COPY, Multipart Complete
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)