From b188585dfabb1de663857d7396fd55b4bbbb6006 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Mon, 29 May 2023 11:54:00 +0000 Subject: [PATCH] test/rgw/notifications: fix kafka consumer shutdown issue Fixes: https://tracker.ceph.com/issues/61477 Signed-off-by: Yuval Lifshitz (cherry picked from commit b00f92da9a6cf848705c0508c0166c12913731e1) --- src/test/rgw/bucket_notification/test_bn.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 89a94106b4322..ae253107cb09e 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -419,7 +419,10 @@ class KafkaReceiver(object): port = 9093 while remaining_retries > 0: try: - self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type) + self.consumer = KafkaConsumer(topic, + bootstrap_servers = kafka_server+':'+str(port), + security_protocol=security_type, + consumer_timeout_ms=16000) print('Kafka consumer created on topic: '+topic) break except Exception as error: @@ -448,7 +451,7 @@ def kafka_receiver_thread_runner(receiver): while not receiver.stop: for msg in receiver.consumer: receiver.events.append(json.loads(msg.value)) - timer.sleep(0.1) + time.sleep(0.1) log.info('Kafka receiver ended') print('Kafka receiver ended') except Exception as error: @@ -468,6 +471,7 @@ def stop_kafka_receiver(receiver, task): receiver.stop = True task.join(1) try: + receiver.consumer.unsubscribe() receiver.consumer.close() except Exception as error: log.info('failed to gracefuly stop Kafka receiver: %s', str(error)) -- 2.39.5