self.consumer = KafkaConsumer(topic,
bootstrap_servers = kafka_server+':'+str(port),
security_protocol=security_type,
- consumer_timeout_ms=16000)
+ consumer_timeout_ms=16000,
+ auto_offset_reset='earliest')
print('Kafka consumer created on topic: '+topic)
break
except Exception as error:
time_diff = time.time() - start_time
print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- print('wait for 5sec for the messages...')
- time.sleep(5)
+ print('wait for 10sec for the messages...')
+ time.sleep(10)
keys = list(bucket.list())
receiver.verify_s3_events(keys, exact_match=True, etags=etags)
time_diff = time.time() - start_time
print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- print('wait for 5sec for the messages...')
- time.sleep(5)
+ print('wait for 10sec for the messages...')
+ time.sleep(10)
receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
except Exception as e:
assert False, str(e)