]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: make the connection idle and sleep timeouts and configurable 55022/head
authorYuval Lifshitz <ylifshit@ibm.com>
Thu, 28 Dec 2023 18:53:14 +0000 (18:53 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 1 Jan 2024 11:59:37 +0000 (11:59 +0000)
Fixes: https://tracker.ceph.com/issues/63901
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/common/options/rgw.yaml.in
src/rgw/rgw_kafka.cc

index 3f0f691e84e75b8554a46c9d3b8f2ab96eb58f37..a37079775225bd0b6fcb6b36ca1a65a1a4b581cf 100644 (file)
@@ -3965,3 +3965,27 @@ options:
   see_also:
   - rgw_bucket_counters_cache
   with_legacy: true
+- name: rgw_kafka_connection_idle
+  type: uint 
+  level: advanced
+  desc: Time in seconds to delete idle kafka connections
+  long_desc: A conection will be considered "idle" if no messages
+    are sent to it for more than the time defined.
+    Note that the connection will not be considered idle, even if it is down,
+    as long as there are attempts to send messages to it.
+  default: 30
+  services:
+  - rgw
+  with_legacy: true
+- name: rgw_kafka_sleep_timeout
+  type: uint 
+  level: advanced
+  desc: Time in milliseconds to sleep while polling for kafka replies
+  long_desc: This will be used to prevent busy waiting for the kafka replies
+    As well as for the cases where the broker is down and we try to reconnect.
+    The same values times 3 will be used to sleep if there were no messages
+    sent or received across all kafka connections
+  default: 10
+  services:
+  - rgw
+  with_legacy: true
index 4593a4de67b0932a43a6c04a098cf2dbf5d70aab..0d6e7737501445602cfb98aed3fda951134b90a2 100644 (file)
@@ -326,7 +326,6 @@ public:
   const size_t max_connections;
   const size_t max_inflight;
   const size_t max_queue;
-  const size_t max_idle_time;
 private:
   std::atomic<size_t> connection_count;
   bool stopped;
@@ -457,13 +456,15 @@ private:
         conn_it = connections.begin();
         end_it = connections.end();
       }
+
+      const auto read_timeout = cct->_conf->rgw_kafka_sleep_timeout;
       // loop over all connections to read acks
       for (;conn_it != end_it;) {
         
         auto& conn = conn_it->second;
 
         // Checking the connection idleness
-        if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+        if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
           ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           std::lock_guard lock(connections_lock);
           conn->status = STATUS_CONNECTION_IDLE;
@@ -488,15 +489,14 @@ private:
           continue;
         }
 
-        reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
+        reply_count += rd_kafka_poll(conn->producer, read_timeout);
 
         // just increment the iterator
         ++conn_it;
       }
-      // if no messages were received or published
-      // across all connection, sleep for 100ms
+      // sleep if no messages were received or published across all connection
       if (send_count == 0 && reply_count == 0) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::this_thread::sleep_for(std::chrono::milliseconds(read_timeout*3));
       }
     }
   }
@@ -510,15 +510,12 @@ public:
   Manager(size_t _max_connections,
       size_t _max_inflight,
       size_t _max_queue, 
-      int _read_timeout_ms,
       CephContext* _cct) : 
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
-    max_idle_time(30),
     connection_count(0),
     stopped(false),
-    read_timeout_ms(_read_timeout_ms),
     connections(_max_connections),
     messages(max_queue),
     queued(0),
@@ -673,14 +670,13 @@ static Manager* s_manager = nullptr;
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
 static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
 static const size_t MAX_QUEUE_DEFAULT = 8192;
-static const int READ_TIMEOUT_MS_DEFAULT = 500;
 
 bool init(CephContext* cct) {
   if (s_manager) {
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, cct);
   return true;
 }