""" test notification creation with multiple topics"""
zones, ps_zones = init_env()
bucket_name = gen_bucket_name()
+ # TODO: test via push endpoint (amqp+http)
topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
# create topics
- dest_endpoint1 = 'amqp://localhost'
- dest_args1 = 'amqp-exchange=ex1&amqp-ack-level=none'
- dest_endpoint2 = 'http://localhost:9001'
- topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
- endpoint=dest_endpoint1,
- endpoint_args=dest_args1)
+ topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1)
result, status = topic_conf1.set_config()
parsed_result = json.loads(result)
topic_arn1 = parsed_result['arn']
assert_equal(status/100, 2)
- topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
- endpoint=dest_endpoint2)
+ topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2)
result, status = topic_conf2.set_config()
parsed_result = json.loads(result)
topic_arn2 = parsed_result['arn']
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
# create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
+ notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
+ notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
topic_conf_list = [
{
- 'Id': notification_name + '_1',
+ 'Id': notification_name1,
'TopicArn': topic_arn1,
'Events': ['s3:ObjectCreated:*']
},
{
- 'Id': notification_name + '_2',
+ 'Id': notification_name2,
'TopicArn': topic_arn2,
'Events': ['s3:ObjectCreated:*']
}]
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
result, _ = s3_notification_conf.get_config()
- print('first try')
- print(result)
assert_equal(len(result['TopicConfigurations']), 2)
+ assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
+ assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
- _, status = s3_notification_conf.set_config()
+ # get auto-generated subscriptions
+ sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1,
+ topic_name1)
+ _, status = sub_conf1.get_config()
assert_equal(status/100, 2)
- result, _ = s3_notification_conf.get_config()
- print('second try')
- print(result)
- assert_equal(len(result['TopicConfigurations']), 2)
+ sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2,
+ topic_name2)
+ _, status = sub_conf2.get_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ number_of_objects = 10
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ # get the events from both of the subscription
+ result, _ = sub_conf1.get_events()
+ parsed_result = json.loads(result)
+ for record in parsed_result['Records']:
+ log.debug(record)
+ keys = list(bucket.list())
+ # TODO: set exact_match to true
+ verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+
+ result, _ = sub_conf2.get_events()
+ parsed_result = json.loads(result)
+ for record in parsed_result['Records']:
+ log.debug(record)
+ keys = list(bucket.list())
+ # TODO: set exact_match to true
+ verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+
# cleanup
s3_notification_conf.del_config()
topic_conf1.del_config()
topic_conf2.del_config()
+ # delete objects from the bucket
+ for key in bucket.list():
+ key.delete()
zones[0].delete_bucket(bucket_name)