# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
- task, receiver = create_kafka_receiver_thread(topic_name+'_1')
- task.start()
+
+ try:
+ task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+ task.start()
- # create s3 topic
- endpoint_address = 'kafka://' + kafka_server
- # without acks from broker
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
- topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
- topic_arn1 = topic_conf1.set_config()
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
- topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
- topic_arn2 = topic_conf2.set_config()
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+ # create s3 topic
+ endpoint_address = 'kafka://' + kafka_server
+ # without acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
+ topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
'Events': []
},
{'Id': notification_name + '_2', 'TopicArn': topic_arn2,
'Events': []
}]
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
- # create objects in the bucket (async)
- number_of_objects = 10
- client_threads = []
- start_time = time.time()
- for i in range(number_of_objects):
- key = bucket.new_key(str(i))
- content = str(os.urandom(1024*1024))
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
- time_diff = time.time() - start_time
- print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ time_diff = time.time() - start_time
+ print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- print('wait for 5sec for the messages...')
- time.sleep(5)
- keys = list(bucket.list())
- receiver.verify_s3_events(keys, exact_match=True)
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True)
- # delete objects from the bucket
- client_threads = []
- start_time = time.time()
- for key in bucket.list():
- thr = threading.Thread(target = key.delete, args=())
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
- time_diff = time.time() - start_time
- print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ time_diff = time.time() - start_time
+ print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- print('wait for 5sec for the messages...')
- time.sleep(5)
- receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True)
- # cleanup
- s3_notification_conf.del_config()
- topic_conf1.del_config()
- topic_conf2.del_config()
- # delete the bucket
- conn.delete_bucket(bucket_name)
- stop_kafka_receiver(receiver, task)
+ finally:
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ stop_kafka_receiver(receiver, task)
def test_ps_s3_notification_multi_delete_on_master():