are sent to it for more than the time defined.
Note that the connection will not be considered idle, even if it is down,
as long as there are attempts to send messages to it.
- default: 30
+ default: 300
services:
- rgw
with_legacy: true
services:
- rgw
with_legacy: true
+- name: rgw_kafka_message_timeout
+ type: uint
+ level: advanced
+ desc: This is the maximum time in milliseconds to deliver a message (including retries)
+ long_desc: Delivery error occurs when the message timeout is exceeded.
+ Value must be greater than zero, if set to zero, a value of 1 millisecond will be used.
+ default: 5000
+ services:
+ - rgw
+ with_legacy: true
return false;
}
+ // set message timeout
+ // according to documentation, value of zero will expire the message based on retries.
+ // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
+ constexpr std::uint64_t min_message_timeout = 1;
+ const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
+ if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms",
+ std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
// get list of brokers based on the bootstrap broker
if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
-
+
if (conn->use_ssl) {
if (!conn->user.empty()) {
// use SSL+SASL