]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/rgw/notifications: fix kafka consumer shutdown issue 59340/head
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 29 May 2023 11:54:00 +0000 (11:54 +0000)
committerKonstantin Shalygin <k0ste@k0ste.ru>
Tue, 20 Aug 2024 12:42:15 +0000 (19:42 +0700)
Fixes: https://tracker.ceph.com/issues/61477
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit b00f92da9a6cf848705c0508c0166c12913731e1)

src/test/rgw/bucket_notification/test_bn.py

index 80bcc21a697a17cee0caaac98963eba0f6108af8..812a07d1b081a6687324ae25675cec3366f417b6 100644 (file)
@@ -424,7 +424,10 @@ class KafkaReceiver(object):
             port = 9093
         while remaining_retries > 0:
             try:
-                self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
+                self.consumer = KafkaConsumer(topic, 
+                        bootstrap_servers = kafka_server+':'+str(port), 
+                        security_protocol=security_type,
+                        consumer_timeout_ms=16000)
                 print('Kafka consumer created on topic: '+topic)
                 break
             except Exception as error:
@@ -453,7 +456,7 @@ def kafka_receiver_thread_runner(receiver):
         while not receiver.stop:
             for msg in receiver.consumer:
                 receiver.events.append(json.loads(msg.value))
-            timer.sleep(0.1)
+            time.sleep(0.1)
         log.info('Kafka receiver ended')
         print('Kafka receiver ended')
     except Exception as error:
@@ -473,6 +476,7 @@ def stop_kafka_receiver(receiver, task):
     receiver.stop = True
     task.join(1)
     try:
+        receiver.consumer.unsubscribe()
         receiver.consumer.close()
     except Exception as error:
         log.info('failed to gracefuly stop Kafka receiver: %s', str(error))