port = 9093
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
+ self.consumer = KafkaConsumer(topic,
+ bootstrap_servers = kafka_server+':'+str(port),
+ security_protocol=security_type,
+ consumer_timeout_ms=16000)
print('Kafka consumer created on topic: '+topic)
break
except Exception as error:
while not receiver.stop:
for msg in receiver.consumer:
receiver.events.append(json.loads(msg.value))
- timer.sleep(0.1)
+ time.sleep(0.1)
log.info('Kafka receiver ended')
print('Kafka receiver ended')
except Exception as error:
receiver.stop = True
task.join(1)
try:
+ receiver.consumer.unsubscribe()
receiver.consumer.close()
except Exception as error:
log.info('failed to gracefuly stop Kafka receiver: %s', str(error))