From 3d8cd6de70f7d0b7e94a9b8fa6e0ad8f65bf7a10 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Tue, 5 Mar 2024 10:14:06 +0000 Subject: [PATCH] rgw/kafka: set message timeout to 5 seconds also increase the idle timeout to 30 seconds. test instructions: https://gist.github.com/yuvalif/33487bff19883e3409caa8a843a0b353 Fixes: https://tracker.ceph.com/issues/64710 Signed-off-by: Yuval Lifshitz (cherry picked from commit 1c13850f24dbb90c33a12c6da338956c2e83811b) Conflicts: src/common/options/rgw.yaml.in src/rgw/rgw_kafka.cc --- src/common/options/rgw.yaml.in | 22 ++++++++++++++++++++++ src/rgw/rgw_kafka.cc | 13 +++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 9c7f91f9e0d98..6e5cdb55ec4e7 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3772,3 +3772,25 @@ options: default: true services: - rgw +- name: rgw_kafka_connection_idle + type: uint + level: advanced + desc: Time in seconds to delete idle kafka connections + long_desc: A conection will be considered "idle" if no messages + are sent to it for more than the time defined. + Note that the connection will not be considered idle, even if it is down, + as long as there are attempts to send messages to it. + default: 300 + services: + - rgw + with_legacy: true +- name: rgw_kafka_message_timeout + type: uint + level: advanced + desc: This is the maximum time in milliseconds to deliver a message (including retries) + long_desc: Delivery error occurs when the message timeout is exceeded. + Value must be greater than zero, if set to zero, a value of 1 millisecond will be used. + default: 5000 + services: + - rgw + with_legacy: true diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 642787a38cf17..03b11980ae7c2 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -209,7 +209,14 @@ bool new_producer(connection_t* conn) { return false; } - // get list of brokers based on the bootsrap broker + // set message timeout + // according to documentation, value of zero will expire the message based on retries. + // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms + constexpr std::uint64_t min_message_timeout = 1; + const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout); + if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", + std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + // get list of brokers based on the bootstrap broker if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; if (conn->use_ssl) { @@ -325,7 +332,6 @@ public: const size_t max_connections; const size_t max_inflight; const size_t max_queue; - const size_t max_idle_time; private: std::atomic connection_count; bool stopped; @@ -459,7 +465,7 @@ private: auto& conn = conn_it->second; // Checking the connection idlesness - if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { + if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) { ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; std::lock_guard lock(connections_lock); conn->destroy(STATUS_CONNECTION_IDLE); @@ -511,7 +517,6 @@ public: max_connections(_max_connections), max_inflight(_max_inflight), max_queue(_max_queue), - max_idle_time(30), connection_count(0), stopped(false), read_timeout_ms(_read_timeout_ms), -- 2.39.5