From 903b941742d2184e1b10708c1c11e730f4a423ac Mon Sep 17 00:00:00 2001 From: root Date: Mon, 5 Apr 2021 16:22:25 +0530 Subject: [PATCH] rgw/test: stop kafka consumer if tests fails This PR fixes https://tracker.ceph.com/issues/50138 by stopping the kafka consumer if the test fails. Signed-off-by: Kalpesh Pandya --- src/test/rgw/bucket_notification/test_bn.py | 111 ++++++++++---------- 1 file changed, 57 insertions(+), 54 deletions(-) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 9b253b51de7..9ee363f6587 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -1006,74 +1006,77 @@ def test_ps_s3_notification_push_kafka_on_master(): # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic - task, receiver = create_kafka_receiver_thread(topic_name+'_1') - task.start() + + try: + task, receiver = create_kafka_receiver_thread(topic_name+'_1') + task.start() - # create s3 topic - endpoint_address = 'kafka://' + kafka_server - # without acks from broker - endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' - topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) - topic_arn1 = topic_conf1.set_config() - endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none' - topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) - topic_arn2 = topic_conf2.set_config() - # create s3 notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, + # create s3 topic + endpoint_address = 'kafka://' + kafka_server + # without acks from broker + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' + topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) + topic_arn1 = topic_conf1.set_config() + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none' + topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) + topic_arn2 = topic_conf2.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, 'Events': [] }, {'Id': notification_name + '_2', 'TopicArn': topic_arn2, 'Events': [] }] - s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() - assert_equal(status/100, 2) + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) - # create objects in the bucket (async) - number_of_objects = 10 - client_threads = [] - start_time = time.time() - for i in range(number_of_objects): - key = bucket.new_key(str(i)) - content = str(os.urandom(1024*1024)) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] - time_diff = time.time() - start_time - print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + time_diff = time.time() - start_time + print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - print('wait for 5sec for the messages...') - time.sleep(5) - keys = list(bucket.list()) - receiver.verify_s3_events(keys, exact_match=True) + print('wait for 5sec for the messages...') + time.sleep(5) + keys = list(bucket.list()) + receiver.verify_s3_events(keys, exact_match=True) - # delete objects from the bucket - client_threads = [] - start_time = time.time() - for key in bucket.list(): - thr = threading.Thread(target = key.delete, args=()) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] - time_diff = time.time() - start_time - print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + time_diff = time.time() - start_time + print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - print('wait for 5sec for the messages...') - time.sleep(5) - receiver.verify_s3_events(keys, exact_match=True, deletions=True) + print('wait for 5sec for the messages...') + time.sleep(5) + receiver.verify_s3_events(keys, exact_match=True, deletions=True) - # cleanup - s3_notification_conf.del_config() - topic_conf1.del_config() - topic_conf2.del_config() - # delete the bucket - conn.delete_bucket(bucket_name) - stop_kafka_receiver(receiver, task) + finally: + # cleanup + s3_notification_conf.del_config() + topic_conf1.del_config() + topic_conf2.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + stop_kafka_receiver(receiver, task) def test_ps_s3_notification_multi_delete_on_master(): -- 2.39.5