default: true
services:
- rgw
+- name: rgw_kafka_connection_idle
+ type: uint
+ level: advanced
+ desc: Time in seconds to delete idle kafka connections
+ long_desc: A conection will be considered "idle" if no messages
+ are sent to it for more than the time defined.
+ Note that the connection will not be considered idle, even if it is down,
+ as long as there are attempts to send messages to it.
+ default: 300
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_kafka_message_timeout
+ type: uint
+ level: advanced
+ desc: This is the maximum time in milliseconds to deliver a message (including retries)
+ long_desc: Delivery error occurs when the message timeout is exceeded.
+ Value must be greater than zero, if set to zero, a value of 1 millisecond will be used.
+ default: 5000
+ services:
+ - rgw
+ with_legacy: true
return false;
}
- // get list of brokers based on the bootsrap broker
+ // set message timeout
+ // according to documentation, value of zero will expire the message based on retries.
+ // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
+ constexpr std::uint64_t min_message_timeout = 1;
+ const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
+ if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms",
+ std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ // get list of brokers based on the bootstrap broker
if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
if (conn->use_ssl) {
const size_t max_connections;
const size_t max_inflight;
const size_t max_queue;
- const size_t max_idle_time;
private:
std::atomic<size_t> connection_count;
bool stopped;
auto& conn = conn_it->second;
// Checking the connection idlesness
- if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+ if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
std::lock_guard lock(connections_lock);
conn->destroy(STATUS_CONNECTION_IDLE);
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
- max_idle_time(30),
connection_count(0),
stopped(false),
read_timeout_ms(_read_timeout_ms),