From: Yuval Lifshitz Date: Tue, 5 Mar 2024 10:14:06 +0000 (+0000) Subject: rgw/kafka: set message timeout to 5 seconds X-Git-Tag: v17.2.8~131^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=17bf8bd6553cfcfedb02446a21ae1e0ca94c4879;p=ceph.git rgw/kafka: set message timeout to 5 seconds also increase the idle timeout to 30 seconds. test instructions: https://gist.github.com/yuvalif/33487bff19883e3409caa8a843a0b353 Fixes: https://tracker.ceph.com/issues/64710 Signed-off-by: Yuval Lifshitz (cherry picked from commit 1c13850f24dbb90c33a12c6da338956c2e83811b) Conflicts: src/common/options/rgw.yaml.in src/rgw/rgw_kafka.cc --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index e82a0147b1c..505bfcbc155 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3464,3 +3464,25 @@ options: - rgw flags: - startup +- name: rgw_kafka_connection_idle + type: uint + level: advanced + desc: Time in seconds to delete idle kafka connections + long_desc: A conection will be considered "idle" if no messages + are sent to it for more than the time defined. + Note that the connection will not be considered idle, even if it is down, + as long as there are attempts to send messages to it. + default: 300 + services: + - rgw + with_legacy: true +- name: rgw_kafka_message_timeout + type: uint + level: advanced + desc: This is the maximum time in milliseconds to deliver a message (including retries) + long_desc: Delivery error occurs when the message timeout is exceeded. + Value must be greater than zero, if set to zero, a value of 1 millisecond will be used. + default: 5000 + services: + - rgw + with_legacy: true diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 9b75c9d0885..d13f656aa1c 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -199,7 +199,14 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { return conn; } - // get list of brokers based on the bootsrap broker + // set message timeout + // according to documentation, value of zero will expire the message based on retries. + // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms + constexpr std::uint64_t min_message_timeout = 1; + const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout); + if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", + std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + // get list of brokers based on the bootstrap broker if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; if (conn->use_ssl) { @@ -295,7 +302,6 @@ public: const size_t max_connections; const size_t max_inflight; const size_t max_queue; - const size_t max_idle_time; private: std::atomic connection_count; bool stopped; @@ -421,7 +427,7 @@ private: auto& conn = conn_it->second; // Checking the connection idlesness - if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { + if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) { ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; ERASE_AND_CONTINUE(conn_it, connections); } @@ -468,7 +474,6 @@ public: max_connections(_max_connections), max_inflight(_max_inflight), max_queue(_max_queue), - max_idle_time(30), connection_count(0), stopped(false), read_timeout_ms(_read_timeout_ms),