// fire all remaining callbacks (if not fired by rd_kafka_flush)
std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
cb_tag.cb(status);
- ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+ ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag <<
+ " for: " << broker << dendl;
});
callbacks.clear();
delivery_tag = 1;
+ ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl;
}
bool is_ok() const {
// Checking the connection idlesness
if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
- ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
std::lock_guard lock(connections_lock);
ERASE_AND_CONTINUE(conn_it, connections);
}