if security_type != 'PLAINTEXT':
security_type = 'SSL'
port = 9093
+
+ if kafka_server is None:
+ endpoint = "localhost" + ":" + str(port)
+ elif ":" not in kafka_server:
+ endpoint = kafka_server + ":" + str(port)
+ else:
+ endpoint = kafka_server
+
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic,
- bootstrap_servers = kafka_server+':'+str(port),
+ self.consumer = KafkaConsumer(topic,
+ bootstrap_servers=endpoint,
security_protocol=security_type,
- consumer_timeout_ms=16000)
+ consumer_timeout_ms=16000,
+ auto_offset_reset='earliest')
print('Kafka consumer created on topic: '+topic)
break
except Exception as error: