]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/test/kafka: let consumer read events from the beginning 54637/head
authorYuval Lifshitz <ylifshit@redhat.com>
Thu, 23 Nov 2023 14:29:04 +0000 (14:29 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Thu, 23 Nov 2023 14:31:15 +0000 (14:31 +0000)
in some cases the RGW may publish the notifications before the kafka
consumer started reading the events from the topic

Fixes: https://tracker.ceph.com/issues/62136
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/test/rgw/bucket_notification/test_bn.py

index 32fe262bdc310610837cc2108161a9fa521d6f25..8860df1555ef9fb90cb2545d3f6bbbd1009a9feb 100644 (file)
@@ -434,7 +434,8 @@ class KafkaReceiver(object):
                 self.consumer = KafkaConsumer(topic, 
                         bootstrap_servers = kafka_server+':'+str(port), 
                         security_protocol=security_type,
-                        consumer_timeout_ms=16000)
+                        consumer_timeout_ms=16000,
+                        auto_offset_reset='earliest')
                 print('Kafka consumer created on topic: '+topic)
                 break
             except Exception as error:
@@ -1534,8 +1535,8 @@ def test_ps_s3_notification_push_kafka_on_master():
         time_diff = time.time() - start_time
         print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
-        print('wait for 5sec for the messages...')
-        time.sleep(5)
+        print('wait for 10sec for the messages...')
+        time.sleep(10)
         keys = list(bucket.list())
         receiver.verify_s3_events(keys, exact_match=True, etags=etags)
 
@@ -1551,8 +1552,8 @@ def test_ps_s3_notification_push_kafka_on_master():
         time_diff = time.time() - start_time
         print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
-        print('wait for 5sec for the messages...')
-        time.sleep(5)
+        print('wait for 10sec for the messages...')
+        time.sleep(10)
         receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
     except Exception as e:
         assert False, str(e)