self.events.append(event)
self.lock.release()
- def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
"""verify stored s3 records agains a list of keys"""
self.acquire_lock()
log.info('verify_s3_events: http server has %d events', len(self.events))
try:
- verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags)
except AssertionError as err:
self.close()
raise err
self.lock.release()
return events
- def close(self):
+ def close(self, task=None):
log.info('http server on %s starting shutdown', str(self.addr))
t = threading.Thread(target=self.shutdown)
t.start()
self.events.append(json.loads(body))
# TODO create a base class for the AMQP and HTTP cases
- def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
"""verify stored s3 records agains a list of keys"""
- verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags)
self.events = []
def get_and_reset_events(self):
self.events = []
return tmp
+ def close(self, task):
+ stop_amqp_receiver(self, task)
+
def amqp_receiver_thread_runner(receiver):
"""main thread function for the amqp receiver"""
self.topic = topic
self.stop = False
- def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[]):
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
"""verify stored s3 records agains a list of keys"""
- verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags)
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags)
self.events = []
+ def close(self, task):
+ stop_kafka_receiver(self, task)
+
def kafka_receiver_thread_runner(receiver):
"""main thread function for the kafka receiver"""
try:
conn.delete_bucket(bucket_name)
-@attr('amqp_test')
-def test_ps_s3_notification_push_amqp_on_master():
- """ test pushing amqp s3 notification on master """
-
- hostname = get_ip()
- conn = connection()
+def notification_push(endpoint_type, conn, account=None):
+ """ test pushinging notification """
zonegroup = get_config_zonegroup()
-
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
- topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
- topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
-
- # start amqp receivers
- exchange = 'ex1'
- task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
- task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
- task1.start()
- task2.start()
-
- # create two s3 topic
- endpoint_address = 'amqp://' + hostname
- # with acks from broker
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
- topic_arn1 = topic_conf1.set_config()
- # without acks from broker
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
- topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args)
- topic_arn2 = topic_conf2.set_config()
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
- 'Events': []
- },
- {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
- 'Events': ['s3:ObjectCreated:*']
- }]
+ topic_name = bucket_name + TOPIC_SUFFIX
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
+ receiver = {}
+ task = None
+ host = get_ip()
+ s3_notification_conf = None
+ topic_conf = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ host = get_ip_http()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ # with acks from broker
+ exchange = 'ex1'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ # create two s3 topic
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ elif endpoint_type == 'kafka':
+ # start amqp receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + kafka_server
+ # without acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ # create s3 topic
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
- # create objects in the bucket (async)
+ # create objects in the bucket
number_of_objects = 100
client_threads = []
+ etags = []
+ objects_size = {}
start_time = time.time()
for i in range(number_of_objects):
- key = bucket.new_key(str(i))
content = str(os.urandom(1024*1024))
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
+ object_size = len(content)
+ key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
+ thr = threading.Thread(target=set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
- print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ print('average time for creation + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
print('wait for 5sec for the messages...')
time.sleep(5)
- # check amqp receiver
+ # check receiver
keys = list(bucket.list())
- print('total number of objects: ' + str(len(keys)))
- receiver1.verify_s3_events(keys, exact_match=True)
- receiver2.verify_s3_events(keys, exact_match=True)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size, etags=etags)
# delete objects from the bucket
client_threads = []
start_time = time.time()
for key in bucket.list():
- thr = threading.Thread(target = key.delete, args=())
+ thr = threading.Thread(target=key.delete, args=())
thr.start()
client_threads.append(thr)
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
- print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ print('average time for deletion + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
print('wait for 5sec for the messages...')
time.sleep(5)
- # check amqp receiver 1 for deletions
- receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
- # check amqp receiver 2 has no deletions
- try:
- receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
- except:
- pass
- else:
- err = 'amqp receiver 2 should have no deletions'
- assert False, err
+ # check receiver
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size, etags=etags)
# cleanup
- stop_amqp_receiver(receiver1, task1)
- stop_amqp_receiver(receiver2, task2)
s3_notification_conf.del_config()
- topic_conf1.del_config()
- topic_conf2.del_config()
+ topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
+ receiver.close(task)
+
+
+@attr('amqp_test')
+def test_notification_push_amqp():
+ """ test pushing amqp notification """
+ return SkipTest("Running into an issue with amqp when we make exact_match=true")
+ conn = connection()
+ notification_push('amqp', conn)
@attr('manual_test')
@attr('kafka_test')
-def test_ps_s3_notification_push_kafka_on_master():
+def test_notification_push_kafka():
""" test pushing kafka s3 notification on master """
conn = connection()
- zonegroup = get_config_zonegroup()
-
- # create bucket
- bucket_name = gen_bucket_name()
- bucket = conn.create_bucket(bucket_name)
- # name is constant for manual testing
- topic_name = bucket_name+'_topic'
- # create consumer on the topic
-
- try:
- s3_notification_conf = None
- topic_conf1 = None
- topic_conf2 = None
- receiver = None
- task, receiver = create_kafka_receiver_thread(topic_name+'_1')
- task.start()
-
- # create s3 topic
- endpoint_address = 'kafka://' + kafka_server
- # without acks from broker
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
- topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
- topic_arn1 = topic_conf1.set_config()
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
- topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
- topic_arn2 = topic_conf2.set_config()
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
- 'Events': []
- },
- {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
- 'Events': []
- }]
-
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
-
- # create objects in the bucket (async)
- number_of_objects = 10
- client_threads = []
- etags = []
- start_time = time.time()
- for i in range(number_of_objects):
- key = bucket.new_key(str(i))
- content = str(os.urandom(1024*1024))
- etag = hashlib.md5(content.encode()).hexdigest()
- etags.append(etag)
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
-
- time_diff = time.time() - start_time
- print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
- print('wait for 10sec for the messages...')
- time.sleep(10)
- keys = list(bucket.list())
- receiver.verify_s3_events(keys, exact_match=True, etags=etags)
-
- # delete objects from the bucket
- client_threads = []
- start_time = time.time()
- for key in bucket.list():
- thr = threading.Thread(target = key.delete, args=())
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
-
- time_diff = time.time() - start_time
- print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
- print('wait for 10sec for the messages...')
- time.sleep(10)
- receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
- except Exception as e:
- assert False, str(e)
- finally:
- # cleanup
- if s3_notification_conf is not None:
- s3_notification_conf.del_config()
- if topic_conf1 is not None:
- topic_conf1.del_config()
- if topic_conf2 is not None:
- topic_conf2.del_config()
- # delete the bucket
- for key in bucket.list():
- key.delete()
- conn.delete_bucket(bucket_name)
- if receiver is not None:
- stop_kafka_receiver(receiver, task)
+ notification_push('kafka', conn)
@attr('http_test')
@attr('http_test')
-def test_ps_s3_notification_push_http_on_master():
- """ test pushing http s3 notification on master """
- hostname = get_ip_http()
+def test_notification_push_http():
+ """ test pushing http s3 notification """
conn = connection()
- zonegroup = get_config_zonegroup()
-
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- number_of_objects = 10
- http_server = HTTPServerWithEvents((host, port))
-
- # create bucket
- bucket_name = gen_bucket_name()
- bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
-
- # create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
- topic_arn = topic_conf.set_config()
- # create s3 notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn,
- 'Events': []
- }]
- s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
- assert_equal(status/100, 2)
-
- # create objects in the bucket
- client_threads = []
- objects_size = {}
- start_time = time.time()
- for i in range(number_of_objects):
- content = str(os.urandom(randint(1, 1024)))
- object_size = len(content)
- key = bucket.new_key(str(i))
- objects_size[key.name] = object_size
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
-
- time_diff = time.time() - start_time
- print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
- print('wait for 5sec for the messages...')
- time.sleep(5)
-
- # check http receiver
- keys = list(bucket.list())
- http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
-
- # delete objects from the bucket
- client_threads = []
- start_time = time.time()
- for key in bucket.list():
- thr = threading.Thread(target = key.delete, args=())
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
-
- time_diff = time.time() - start_time
- print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
- print('wait for 5sec for the messages...')
- time.sleep(5)
-
- # check http receiver
- http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
-
- # cleanup
- topic_conf.del_config()
- s3_notification_conf.del_config(notification=notification_name)
- # delete the bucket
- conn.delete_bucket(bucket_name)
- http_server.close()
+ notification_push('http', conn)
@attr('http_test')
topic_name = bucket_name + TOPIC_SUFFIX
receiver = {}
+ task = None
host = get_ip()
if endpoint_type == 'http':
# create random port for the http server
receiver = HTTPServerWithEvents((host, port))
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
- # the http server does not guarantee order, so duplicates are expected
- exact_match = False
elif endpoint_type == 'amqp':
# start amqp receiver
exchange = 'ex1'
task.start()
endpoint_address = 'amqp://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
- # amqp broker guarantee ordering
- exact_match = True
elif endpoint_type == 'kafka':
- # start amqp receiver
+ # start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
- # amqp broker guarantee ordering
- exact_match = True
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
wait_for_queue_to_drain(topic_name, account=account)
- receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
+ receiver.verify_s3_events(keys, exact_match=False, deletions=False)
# delete objects from the bucket
client_threads = []
wait_for_queue_to_drain(topic_name, account=account)
- receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
+ receiver.verify_s3_events(keys, exact_match=False, deletions=True)
# cleanup
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
- if endpoint_type == 'http':
- receiver.close()
- else:
- stop_amqp_receiver(receiver, task)
+ receiver.close(task)
@attr('http_test')