From faf538a5f42c6a890800736fe77f111d7e3e35e1 Mon Sep 17 00:00:00 2001 From: Ali Masarwa Date: Mon, 22 Apr 2024 12:34:40 +0300 Subject: [PATCH] RGW\test: making notification push test end point agnostic Signed-off-by: Ali Masarwa --- src/test/rgw/bucket_notification/test_bn.py | 374 ++++++-------------- 1 file changed, 118 insertions(+), 256 deletions(-) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 81ec82deaecc0..a1b5f64764bfe 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -199,12 +199,12 @@ class HTTPServerWithEvents(ThreadingHTTPServer): self.events.append(event) self.lock.release() - def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): + def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): """verify stored s3 records agains a list of keys""" self.acquire_lock() log.info('verify_s3_events: http server has %d events', len(self.events)) try: - verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes) + verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags) except AssertionError as err: self.close() raise err @@ -220,7 +220,7 @@ class HTTPServerWithEvents(ThreadingHTTPServer): self.lock.release() return events - def close(self): + def close(self, task=None): log.info('http server on %s starting shutdown', str(self.addr)) t = threading.Thread(target=self.shutdown) t.start() @@ -294,9 +294,9 @@ class AMQPReceiver(object): self.events.append(json.loads(body)) # TODO create a base class for the AMQP and HTTP cases - def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): + def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): """verify stored s3 records agains a list of keys""" - verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes) + verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags) self.events = [] def get_and_reset_events(self): @@ -304,6 +304,9 @@ class AMQPReceiver(object): self.events = [] return tmp + def close(self, task): + stop_amqp_receiver(self, task) + def amqp_receiver_thread_runner(receiver): """main thread function for the amqp receiver""" @@ -396,11 +399,14 @@ class KafkaReceiver(object): self.topic = topic self.stop = False - def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[]): + def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): """verify stored s3 records agains a list of keys""" - verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags) + verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes, etags=etags) self.events = [] + def close(self, task): + stop_kafka_receiver(self, task) + def kafka_receiver_thread_runner(receiver): """main thread function for the kafka receiver""" try: @@ -1168,108 +1174,141 @@ def test_ps_s3_notification_errors_on_master(): conn.delete_bucket(bucket_name) -@attr('amqp_test') -def test_ps_s3_notification_push_amqp_on_master(): - """ test pushing amqp s3 notification on master """ - - hostname = get_ip() - conn = connection() +def notification_push(endpoint_type, conn, account=None): + """ test pushinging notification """ zonegroup = get_config_zonegroup() - # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) - topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' - topic_name2 = bucket_name + TOPIC_SUFFIX + '_2' - - # start amqp receivers - exchange = 'ex1' - task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1) - task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2) - task1.start() - task2.start() - - # create two s3 topic - endpoint_address = 'amqp://' + hostname - # with acks from broker - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args) - topic_arn1 = topic_conf1.set_config() - # without acks from broker - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' - topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args) - topic_arn2 = topic_conf2.set_config() - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, - 'Events': [] - }, - {'Id': notification_name+'_2', 'TopicArn': topic_arn2, - 'Events': ['s3:ObjectCreated:*'] - }] + topic_name = bucket_name + TOPIC_SUFFIX - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + receiver = {} + task = None + host = get_ip() + s3_notification_conf = None + topic_conf = None + if endpoint_type == 'http': + # create random port for the http server + host = get_ip_http() + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((host, port)) + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + endpoint_address = 'amqp://' + host + # with acks from broker + exchange = 'ex1' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + # create two s3 topic + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + elif endpoint_type == 'kafka': + # start amqp receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + endpoint_address = 'kafka://' + kafka_server + # without acks from broker + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' + # create s3 topic + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) - # create objects in the bucket (async) + # create objects in the bucket number_of_objects = 100 client_threads = [] + etags = [] + objects_size = {} start_time = time.time() for i in range(number_of_objects): - key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + etag = hashlib.md5(content.encode()).hexdigest() + etags.append(etag) + object_size = len(content) + key = bucket.new_key(str(i)) + objects_size[key.name] = object_size + thr = threading.Thread(target=set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time - print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + print('average time for creation + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) - # check amqp receiver + # check receiver keys = list(bucket.list()) - print('total number of objects: ' + str(len(keys))) - receiver1.verify_s3_events(keys, exact_match=True) - receiver2.verify_s3_events(keys, exact_match=True) + receiver.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size, etags=etags) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): - thr = threading.Thread(target = key.delete, args=()) + thr = threading.Thread(target=key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time - print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + print('average time for deletion + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) - # check amqp receiver 1 for deletions - receiver1.verify_s3_events(keys, exact_match=True, deletions=True) - # check amqp receiver 2 has no deletions - try: - receiver1.verify_s3_events(keys, exact_match=False, deletions=True) - except: - pass - else: - err = 'amqp receiver 2 should have no deletions' - assert False, err + # check receiver + receiver.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size, etags=etags) # cleanup - stop_amqp_receiver(receiver1, task1) - stop_amqp_receiver(receiver2, task2) s3_notification_conf.del_config() - topic_conf1.del_config() - topic_conf2.del_config() + topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) + receiver.close(task) + + +@attr('amqp_test') +def test_notification_push_amqp(): + """ test pushing amqp notification """ + return SkipTest("Running into an issue with amqp when we make exact_match=true") + conn = connection() + notification_push('amqp', conn) @attr('manual_test') @@ -1404,102 +1443,10 @@ def test_ps_s3_notification_push_amqp_idleness_check(): @attr('kafka_test') -def test_ps_s3_notification_push_kafka_on_master(): +def test_notification_push_kafka(): """ test pushing kafka s3 notification on master """ conn = connection() - zonegroup = get_config_zonegroup() - - # create bucket - bucket_name = gen_bucket_name() - bucket = conn.create_bucket(bucket_name) - # name is constant for manual testing - topic_name = bucket_name+'_topic' - # create consumer on the topic - - try: - s3_notification_conf = None - topic_conf1 = None - topic_conf2 = None - receiver = None - task, receiver = create_kafka_receiver_thread(topic_name+'_1') - task.start() - - # create s3 topic - endpoint_address = 'kafka://' + kafka_server - # without acks from broker - endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' - topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) - topic_arn1 = topic_conf1.set_config() - endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none' - topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) - topic_arn2 = topic_conf2.set_config() - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, - 'Events': [] - }, - {'Id': notification_name + '_2', 'TopicArn': topic_arn2, - 'Events': [] - }] - - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - - # create objects in the bucket (async) - number_of_objects = 10 - client_threads = [] - etags = [] - start_time = time.time() - for i in range(number_of_objects): - key = bucket.new_key(str(i)) - content = str(os.urandom(1024*1024)) - etag = hashlib.md5(content.encode()).hexdigest() - etags.append(etag) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - - time_diff = time.time() - start_time - print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - - print('wait for 10sec for the messages...') - time.sleep(10) - keys = list(bucket.list()) - receiver.verify_s3_events(keys, exact_match=True, etags=etags) - - # delete objects from the bucket - client_threads = [] - start_time = time.time() - for key in bucket.list(): - thr = threading.Thread(target = key.delete, args=()) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - - time_diff = time.time() - start_time - print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - - print('wait for 10sec for the messages...') - time.sleep(10) - receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) - except Exception as e: - assert False, str(e) - finally: - # cleanup - if s3_notification_conf is not None: - s3_notification_conf.del_config() - if topic_conf1 is not None: - topic_conf1.del_config() - if topic_conf2 is not None: - topic_conf2.del_config() - # delete the bucket - for key in bucket.list(): - key.delete() - conn.delete_bucket(bucket_name) - if receiver is not None: - stop_kafka_receiver(receiver, task) + notification_push('kafka', conn) @attr('http_test') @@ -1571,87 +1518,10 @@ def test_ps_s3_notification_multi_delete_on_master(): @attr('http_test') -def test_ps_s3_notification_push_http_on_master(): - """ test pushing http s3 notification on master """ - hostname = get_ip_http() +def test_notification_push_http(): + """ test pushing http s3 notification """ conn = connection() - zonegroup = get_config_zonegroup() - - # create random port for the http server - host = get_ip() - port = random.randint(10000, 20000) - # start an http server in a separate thread - number_of_objects = 10 - http_server = HTTPServerWithEvents((host, port)) - - # create bucket - bucket_name = gen_bucket_name() - bucket = conn.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - - # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) - endpoint_args = 'push-endpoint='+endpoint_address - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) - topic_arn = topic_conf.set_config() - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn, - 'Events': [] - }] - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) - - # create objects in the bucket - client_threads = [] - objects_size = {} - start_time = time.time() - for i in range(number_of_objects): - content = str(os.urandom(randint(1, 1024))) - object_size = len(content) - key = bucket.new_key(str(i)) - objects_size[key.name] = object_size - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - - time_diff = time.time() - start_time - print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - - print('wait for 5sec for the messages...') - time.sleep(5) - - # check http receiver - keys = list(bucket.list()) - http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size) - - # delete objects from the bucket - client_threads = [] - start_time = time.time() - for key in bucket.list(): - thr = threading.Thread(target = key.delete, args=()) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - - time_diff = time.time() - start_time - print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - - print('wait for 5sec for the messages...') - time.sleep(5) - - # check http receiver - http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) - - # cleanup - topic_conf.del_config() - s3_notification_conf.del_config(notification=notification_name) - # delete the bucket - conn.delete_bucket(bucket_name) - http_server.close() + notification_push('http', conn) @attr('http_test') @@ -3820,6 +3690,7 @@ def persistent_notification(endpoint_type, conn, account=None): topic_name = bucket_name + TOPIC_SUFFIX receiver = {} + task = None host = get_ip() if endpoint_type == 'http': # create random port for the http server @@ -3829,8 +3700,6 @@ def persistent_notification(endpoint_type, conn, account=None): receiver = HTTPServerWithEvents((host, port)) endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' - # the http server does not guarantee order, so duplicates are expected - exact_match = False elif endpoint_type == 'amqp': # start amqp receiver exchange = 'ex1' @@ -3838,16 +3707,12 @@ def persistent_notification(endpoint_type, conn, account=None): task.start() endpoint_address = 'amqp://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true' - # amqp broker guarantee ordering - exact_match = True elif endpoint_type == 'kafka': - # start amqp receiver + # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true' - # amqp broker guarantee ordering - exact_match = True else: return SkipTest('Unknown endpoint type: ' + endpoint_type) @@ -3884,7 +3749,7 @@ def persistent_notification(endpoint_type, conn, account=None): wait_for_queue_to_drain(topic_name, account=account) - receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False) + receiver.verify_s3_events(keys, exact_match=False, deletions=False) # delete objects from the bucket client_threads = [] @@ -3900,17 +3765,14 @@ def persistent_notification(endpoint_type, conn, account=None): wait_for_queue_to_drain(topic_name, account=account) - receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True) + receiver.verify_s3_events(keys, exact_match=False, deletions=True) # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) - if endpoint_type == 'http': - receiver.close() - else: - stop_amqp_receiver(receiver, task) + receiver.close(task) @attr('http_test') -- 2.39.5