From: Yuval Lifshitz Date: Tue, 5 Mar 2024 10:14:06 +0000 (+0000) Subject: rgw/kafka: set message timeout to 5 seconds X-Git-Tag: v20.0.0~2419^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1c13850f24dbb90c33a12c6da338956c2e83811b;p=ceph.git rgw/kafka: set message timeout to 5 seconds also increase the idle timeout to 30 seconds. test instructions: https://gist.github.com/yuvalif/33487bff19883e3409caa8a843a0b353 Fixes: https://tracker.ceph.com/issues/64710 Signed-off-by: Yuval Lifshitz --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 6fab43e5589ff..a7af43ae05f53 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3983,7 +3983,7 @@ options: are sent to it for more than the time defined. Note that the connection will not be considered idle, even if it is down, as long as there are attempts to send messages to it. - default: 30 + default: 300 services: - rgw with_legacy: true @@ -3999,3 +3999,13 @@ options: services: - rgw with_legacy: true +- name: rgw_kafka_message_timeout + type: uint + level: advanced + desc: This is the maximum time in milliseconds to deliver a message (including retries) + long_desc: Delivery error occurs when the message timeout is exceeded. + Value must be greater than zero, if set to zero, a value of 1 millisecond will be used. + default: 5000 + services: + - rgw + with_legacy: true diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 0d6e773750144..3b5b1c7011b0b 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -210,9 +210,16 @@ bool new_producer(connection_t* conn) { return false; } + // set message timeout + // according to documentation, value of zero will expire the message based on retries. + // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms + constexpr std::uint64_t min_message_timeout = 1; + const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout); + if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", + std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; // get list of brokers based on the bootstrap broker if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; - + if (conn->use_ssl) { if (!conn->user.empty()) { // use SSL+SASL