From: Shilpa Jagannath Date: Thu, 29 Aug 2024 15:34:13 +0000 (-0700) Subject: Merge pull request #56163 from yuvalif/wip-64886-quincy X-Git-Tag: v17.2.8~131 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=aa725bc86f10956a455f64237e3fc15be50fd8a8;p=ceph.git Merge pull request #56163 from yuvalif/wip-64886-quincy quincy: rgw/kafka: set message timeout to 5 seconds --- aa725bc86f10956a455f64237e3fc15be50fd8a8 diff --cc src/rgw/rgw_kafka.cc index 9359b7a3d08,d13f656aa1c..7a4163d3877 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@@ -195,10 -196,17 +195,17 @@@ bool new_producer(connection_t* conn) conn->temp_conf = rd_kafka_conf_new(); if (!conn->temp_conf) { conn->status = STATUS_CONF_ALLOC_FAILED; - return conn; + return false; } - // get list of brokers based on the bootsrap broker + // set message timeout + // according to documentation, value of zero will expire the message based on retries. + // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms + constexpr std::uint64_t min_message_timeout = 1; + const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout); + if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", + std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + // get list of brokers based on the bootstrap broker if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; if (conn->use_ssl) { @@@ -420,9 -427,8 +426,9 @@@ private auto& conn = conn_it->second; // Checking the connection idlesness - if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { + if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) { ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; + std::lock_guard lock(connections_lock); ERASE_AND_CONTINUE(conn_it, connections); }