static const int STATUS_QUEUE_FULL = -0x1003;
static const int STATUS_MAX_INFLIGHT = -0x1004;
static const int STATUS_MANAGER_STOPPED = -0x1005;
+static const int STATUS_CONNECTION_IDLE = -0x1006;
// status code for connection opening
static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
static const int STATUS_CONF_REPLCACE = -0x2002;
// fire all remaining callbacks (if not fired by rd_kafka_flush)
std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
cb_tag.cb(status);
- ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+ ldout(cct, 20) << "Kafka destroy: invoking callback with tag="
+ << cb_tag.tag << " for: " << broker
+ << " with status: " << status << dendl;
});
callbacks.clear();
delivery_tag = 1;
if (tag) {
auto const q_len = conn->callbacks.size();
if (q_len < max_inflight) {
- ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+ ldout(conn->cct, 20)
+ << "Kafka publish (with callback, tag=" << *tag
+ << "): OK. Queue has: " << q_len + 1 << " callbacks" << dendl;
conn->callbacks.emplace_back(*tag, message->cb);
} else {
// immediately invoke callback with error - this is not a connection error
if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
std::lock_guard lock(connections_lock);
- ERASE_AND_CONTINUE(conn_it, connections);
+ conn->status = STATUS_CONNECTION_IDLE;
+ conn_it = connections.erase(conn_it);
+ --connection_count;
+ continue;
}
// try to reconnect the connection if it has an error