From: Yuval Lifshitz Date: Mon, 29 May 2023 11:54:00 +0000 (+0000) Subject: test/rgw/notifications: fix kafka consumer shutdown issue X-Git-Tag: v19.0.0~1102^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F51810%2Fhead;p=ceph.git test/rgw/notifications: fix kafka consumer shutdown issue Fixes: https://tracker.ceph.com/issues/61477 Signed-off-by: Yuval Lifshitz --- diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 7e94379c79bc..a08b74332720 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -427,7 +427,10 @@ class KafkaReceiver(object): port = 9093 while remaining_retries > 0: try: - self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type) + self.consumer = KafkaConsumer(topic, + bootstrap_servers = kafka_server+':'+str(port), + security_protocol=security_type, + consumer_timeout_ms=16000) print('Kafka consumer created on topic: '+topic) break except Exception as error: @@ -456,7 +459,7 @@ def kafka_receiver_thread_runner(receiver): while not receiver.stop: for msg in receiver.consumer: receiver.events.append(json.loads(msg.value)) - timer.sleep(0.1) + time.sleep(0.1) log.info('Kafka receiver ended') print('Kafka receiver ended') except Exception as error: @@ -476,6 +479,7 @@ def stop_kafka_receiver(receiver, task): receiver.stop = True task.join(1) try: + receiver.consumer.unsubscribe() receiver.consumer.close() except Exception as error: log.info('failed to gracefuly stop Kafka receiver: %s', str(error))