]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/test: stop kafka consumer if tests fails
authorroot <root@localhost.localdomain>
Mon, 5 Apr 2021 10:52:25 +0000 (16:22 +0530)
committerroot <root@localhost.localdomain>
Mon, 5 Apr 2021 15:44:17 +0000 (21:14 +0530)
This PR fixes https://tracker.ceph.com/issues/50138 by stopping the kafka consumer if the test fails.

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
src/test/rgw/bucket_notification/test_bn.py

index 9b253b51de7a726c5bf4c211771e3c7c2c1c819d..9ee363f65877f0dfde2b99185c73d346eef65104 100644 (file)
@@ -1006,74 +1006,77 @@ def test_ps_s3_notification_push_kafka_on_master():
     # name is constant for manual testing
     topic_name = bucket_name+'_topic'
     # create consumer on the topic
-    task, receiver = create_kafka_receiver_thread(topic_name+'_1')
-    task.start()
+    
+    try:
+        task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+        task.start()
 
-    # create s3 topic
-    endpoint_address = 'kafka://' + kafka_server
-    # without acks from broker
-    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
-    topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
-    topic_arn1 = topic_conf1.set_config()
-    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
-    topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
-    topic_arn2 = topic_conf2.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+        # create s3 topic
+        endpoint_address = 'kafka://' + kafka_server
+        # without acks from broker
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+        topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+        topic_arn1 = topic_conf1.set_config()
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
+        topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+        topic_arn2 = topic_conf2.set_config()
+        # create s3 notification
+        notification_name = bucket_name + NOTIFICATION_SUFFIX
+        topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
                          'Events': []
                        },
                        {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
                          'Events': []
                        }]
 
-    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
+        s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+        response, status = s3_notification_conf.set_config()
+        assert_equal(status/100, 2)
 
-    # create objects in the bucket (async)
-    number_of_objects = 10
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads]
+        # create objects in the bucket (async)
+        number_of_objects = 10
+        client_threads = []
+        start_time = time.time()
+        for i in range(number_of_objects):
+            key = bucket.new_key(str(i))
+            content = str(os.urandom(1024*1024))
+            thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+            thr.start()
+            client_threads.append(thr)
+        [thr.join() for thr in client_threads]
 
-    time_diff = time.time() - start_time
-    print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+        time_diff = time.time() - start_time
+        print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    keys = list(bucket.list())
-    receiver.verify_s3_events(keys, exact_match=True)
+        print('wait for 5sec for the messages...')
+        time.sleep(5)
+        keys = list(bucket.list())
+        receiver.verify_s3_events(keys, exact_match=True)
 
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads]
+        # delete objects from the bucket
+        client_threads = []
+        start_time = time.time()
+        for key in bucket.list():
+            thr = threading.Thread(target = key.delete, args=())
+            thr.start()
+            client_threads.append(thr)
+        [thr.join() for thr in client_threads]
 
-    time_diff = time.time() - start_time
-    print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+        time_diff = time.time() - start_time
+        print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+        print('wait for 5sec for the messages...')
+        time.sleep(5)
+        receiver.verify_s3_events(keys, exact_match=True, deletions=True)
 
-    # cleanup
-    s3_notification_conf.del_config()
-    topic_conf1.del_config()
-    topic_conf2.del_config()
-    # delete the bucket
-    conn.delete_bucket(bucket_name)
-    stop_kafka_receiver(receiver, task)
+    finally:
+        # cleanup
+        s3_notification_conf.del_config()
+        topic_conf1.del_config()
+        topic_conf2.del_config()
+        # delete the bucket
+        conn.delete_bucket(bucket_name)
+        stop_kafka_receiver(receiver, task)
 
 
 def test_ps_s3_notification_multi_delete_on_master():