conn->temp_conf = rd_kafka_conf_new();
if (!conn->temp_conf) {
conn->status = STATUS_CONF_ALLOC_FAILED;
- return conn;
+ return false;
}
- // get list of brokers based on the bootsrap broker
+ // set message timeout
+ // according to documentation, value of zero will expire the message based on retries.
+ // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
+ constexpr std::uint64_t min_message_timeout = 1;
+ const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
+ if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms",
+ std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ // get list of brokers based on the bootstrap broker
if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
if (conn->use_ssl) {
auto& conn = conn_it->second;
// Checking the connection idlesness
- if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+ if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ std::lock_guard lock(connections_lock);
ERASE_AND_CONTINUE(conn_it, connections);
}