]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw/notifications: fix kafka consumer shutdown issue 51810/head
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 29 May 2023 11:54:00 +0000 (11:54 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Mon, 29 May 2023 11:54:00 +0000 (11:54 +0000)
Fixes: https://tracker.ceph.com/issues/61477
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/test/rgw/bucket_notification/test_bn.py

index 7e94379c79bced4851e832adca7f7061c90c7a0b..a08b7433272020dce8e61150ed4e309bb5cfd548 100644 (file)
@@ -427,7 +427,10 @@ class KafkaReceiver(object):
             port = 9093
         while remaining_retries > 0:
             try:
-                self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
+                self.consumer = KafkaConsumer(topic, 
+                        bootstrap_servers = kafka_server+':'+str(port), 
+                        security_protocol=security_type,
+                        consumer_timeout_ms=16000)
                 print('Kafka consumer created on topic: '+topic)
                 break
             except Exception as error:
@@ -456,7 +459,7 @@ def kafka_receiver_thread_runner(receiver):
         while not receiver.stop:
             for msg in receiver.consumer:
                 receiver.events.append(json.loads(msg.value))
-            timer.sleep(0.1)
+            time.sleep(0.1)
         log.info('Kafka receiver ended')
         print('Kafka receiver ended')
     except Exception as error:
@@ -476,6 +479,7 @@ def stop_kafka_receiver(receiver, task):
     receiver.stop = True
     task.join(1)
     try:
+        receiver.consumer.unsubscribe()
         receiver.consumer.close()
     except Exception as error:
         log.info('failed to gracefuly stop Kafka receiver: %s', str(error))