]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: set message timeout to 5 seconds 56158/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 5 Mar 2024 10:14:06 +0000 (10:14 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Wed, 13 Mar 2024 11:30:36 +0000 (11:30 +0000)
also increase the idle timeout to 30 seconds.
test instructions:
https://gist.github.com/yuvalif/33487bff19883e3409caa8a843a0b353

Fixes: https://tracker.ceph.com/issues/64710
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit 1c13850f24dbb90c33a12c6da338956c2e83811b)

Conflicts:
src/common/options/rgw.yaml.in
src/rgw/rgw_kafka.cc

src/common/options/rgw.yaml.in
src/rgw/rgw_kafka.cc

index 9c7f91f9e0d981b11ab49389e924fd6e93564221..6e5cdb55ec4e76c8fb782efeeb519093593a3ccd 100644 (file)
@@ -3772,3 +3772,25 @@ options:
   default: true
   services:
   - rgw
+- name: rgw_kafka_connection_idle
+  type: uint 
+  level: advanced
+  desc: Time in seconds to delete idle kafka connections
+  long_desc: A conection will be considered "idle" if no messages
+    are sent to it for more than the time defined.
+    Note that the connection will not be considered idle, even if it is down,
+    as long as there are attempts to send messages to it.
+  default: 300
+  services:
+  - rgw
+  with_legacy: true
+- name: rgw_kafka_message_timeout
+  type: uint 
+  level: advanced
+  desc: This is the maximum time in milliseconds to deliver a message (including retries)
+  long_desc: Delivery error occurs when the message timeout is exceeded.
+    Value must be greater than zero, if set to zero, a value of 1 millisecond will be used.
+  default: 5000
+  services:
+  - rgw
+  with_legacy: true
index 642787a38cf17b38600dec777ed0fc98ca4214f2..03b11980ae7c24c51172a8ea2b7ea884a2ff7624 100644 (file)
@@ -209,7 +209,14 @@ bool new_producer(connection_t* conn) {
     return false;
   }
 
-  // get list of brokers based on the bootsrap broker
+  // set message timeout
+  // according to documentation, value of zero will expire the message based on retries.
+  // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
+  constexpr std::uint64_t min_message_timeout = 1;
+  const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
+  if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", 
+        std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+  // get list of brokers based on the bootstrap broker
   if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
 
   if (conn->use_ssl) {
@@ -325,7 +332,6 @@ public:
   const size_t max_connections;
   const size_t max_inflight;
   const size_t max_queue;
-  const size_t max_idle_time;
 private:
   std::atomic<size_t> connection_count;
   bool stopped;
@@ -459,7 +465,7 @@ private:
         auto& conn = conn_it->second;
 
         // Checking the connection idlesness
-        if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+        if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
           ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           std::lock_guard lock(connections_lock);
           conn->destroy(STATUS_CONNECTION_IDLE);
@@ -511,7 +517,6 @@ public:
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
-    max_idle_time(30),
     connection_count(0),
     stopped(false),
     read_timeout_ms(_read_timeout_ms),