]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge pull request #56163 from yuvalif/wip-64886-quincy
authorShilpa Jagannath <smanjara@users.noreply.github.com>
Thu, 29 Aug 2024 15:34:13 +0000 (08:34 -0700)
committerGitHub <noreply@github.com>
Thu, 29 Aug 2024 15:34:13 +0000 (08:34 -0700)
quincy: rgw/kafka: set message timeout to 5 seconds

1  2 
src/common/options/rgw.yaml.in
src/rgw/rgw_kafka.cc

Simple merge
index 9359b7a3d089b296d8a5a89a433222dd79fd043a,d13f656aa1cb90fdc0a297aa173cfc10020ebb58..7a4163d3877237ddb5e11581354027398273c9b4
@@@ -195,10 -196,17 +195,17 @@@ bool new_producer(connection_t* conn) 
    conn->temp_conf = rd_kafka_conf_new();
    if (!conn->temp_conf) {
      conn->status = STATUS_CONF_ALLOC_FAILED;
 -    return conn;
 +    return false;
    }
  
-   // get list of brokers based on the bootsrap broker
+   // set message timeout
+   // according to documentation, value of zero will expire the message based on retries.
+   // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
+   constexpr std::uint64_t min_message_timeout = 1;
+   const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
+   if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", 
+         std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+   // get list of brokers based on the bootstrap broker
    if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
  
    if (conn->use_ssl) {
@@@ -420,9 -427,8 +426,9 @@@ private
          auto& conn = conn_it->second;
  
          // Checking the connection idlesness
-         if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+         if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
            ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
 +          std::lock_guard lock(connections_lock);
            ERASE_AND_CONTINUE(conn_it, connections);
          }