]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: set message timeout to 5 seconds 55952/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 5 Mar 2024 10:14:06 +0000 (10:14 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 7 Mar 2024 10:46:34 +0000 (10:46 +0000)
also increase the idle timeout to 30 seconds.
test instructions:
https://gist.github.com/yuvalif/33487bff19883e3409caa8a843a0b353

Fixes: https://tracker.ceph.com/issues/64710
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/common/options/rgw.yaml.in
src/rgw/rgw_kafka.cc

index 6fab43e5589ff99335f65b3e51c008735bcb70d1..a7af43ae05f53d8a8449619c2a6124db46cc7c28 100644 (file)
@@ -3983,7 +3983,7 @@ options:
     are sent to it for more than the time defined.
     Note that the connection will not be considered idle, even if it is down,
     as long as there are attempts to send messages to it.
-  default: 30
+  default: 300
   services:
   - rgw
   with_legacy: true
@@ -3999,3 +3999,13 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_kafka_message_timeout
+  type: uint 
+  level: advanced
+  desc: This is the maximum time in milliseconds to deliver a message (including retries)
+  long_desc: Delivery error occurs when the message timeout is exceeded.
+    Value must be greater than zero, if set to zero, a value of 1 millisecond will be used.
+  default: 5000
+  services:
+  - rgw
+  with_legacy: true
index 0d6e7737501445602cfb98aed3fda951134b90a2..3b5b1c7011b0b861a7e7d2c7486fe2fcbe54ce1a 100644 (file)
@@ -210,9 +210,16 @@ bool new_producer(connection_t* conn) {
     return false;
   }
 
+  // set message timeout
+  // according to documentation, value of zero will expire the message based on retries.
+  // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
+  constexpr std::uint64_t min_message_timeout = 1;
+  const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
+  if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", 
+        std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
   // get list of brokers based on the bootstrap broker
   if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
-
+  
   if (conn->use_ssl) {
     if (!conn->user.empty()) {
       // use SSL+SASL