]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw/notifications: fix kafka consumer shutdown issue 51862/head
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 29 May 2023 11:54:00 +0000 (11:54 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Wed, 31 May 2023 17:33:25 +0000 (17:33 +0000)
Fixes: https://tracker.ceph.com/issues/61477
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit b00f92da9a6cf848705c0508c0166c12913731e1)

src/test/rgw/bucket_notification/test_bn.py

index 89a94106b4322c0084478c07e3b8f91fa878ac73..ae253107cb09e4c2035cb3e511564a3ec4bf83ea 100644 (file)
@@ -419,7 +419,10 @@ class KafkaReceiver(object):
             port = 9093
         while remaining_retries > 0:
             try:
-                self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
+                self.consumer = KafkaConsumer(topic, 
+                        bootstrap_servers = kafka_server+':'+str(port), 
+                        security_protocol=security_type,
+                        consumer_timeout_ms=16000)
                 print('Kafka consumer created on topic: '+topic)
                 break
             except Exception as error:
@@ -448,7 +451,7 @@ def kafka_receiver_thread_runner(receiver):
         while not receiver.stop:
             for msg in receiver.consumer:
                 receiver.events.append(json.loads(msg.value))
-            timer.sleep(0.1)
+            time.sleep(0.1)
         log.info('Kafka receiver ended')
         print('Kafka receiver ended')
     except Exception as error:
@@ -468,6 +471,7 @@ def stop_kafka_receiver(receiver, task):
     receiver.stop = True
     task.join(1)
     try:
+        receiver.consumer.unsubscribe()
         receiver.consumer.close()
     except Exception as error:
         log.info('failed to gracefuly stop Kafka receiver: %s', str(error))