From: Yuval Lifshitz Date: Thu, 28 Dec 2023 18:53:14 +0000 (+0000) Subject: rgw/kafka: make the connection idle and sleep timeouts and configurable X-Git-Tag: v19.1.0~571^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1f2a5e38cf85446f508a4c2a5d15a8366b7f51e1;p=ceph.git rgw/kafka: make the connection idle and sleep timeouts and configurable Fixes: https://tracker.ceph.com/issues/63901 Signed-off-by: Yuval Lifshitz --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 3f0f691e84e75..a37079775225b 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3965,3 +3965,27 @@ options: see_also: - rgw_bucket_counters_cache with_legacy: true +- name: rgw_kafka_connection_idle + type: uint + level: advanced + desc: Time in seconds to delete idle kafka connections + long_desc: A conection will be considered "idle" if no messages + are sent to it for more than the time defined. + Note that the connection will not be considered idle, even if it is down, + as long as there are attempts to send messages to it. + default: 30 + services: + - rgw + with_legacy: true +- name: rgw_kafka_sleep_timeout + type: uint + level: advanced + desc: Time in milliseconds to sleep while polling for kafka replies + long_desc: This will be used to prevent busy waiting for the kafka replies + As well as for the cases where the broker is down and we try to reconnect. + The same values times 3 will be used to sleep if there were no messages + sent or received across all kafka connections + default: 10 + services: + - rgw + with_legacy: true diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 4593a4de67b09..0d6e773750144 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -326,7 +326,6 @@ public: const size_t max_connections; const size_t max_inflight; const size_t max_queue; - const size_t max_idle_time; private: std::atomic connection_count; bool stopped; @@ -457,13 +456,15 @@ private: conn_it = connections.begin(); end_it = connections.end(); } + + const auto read_timeout = cct->_conf->rgw_kafka_sleep_timeout; // loop over all connections to read acks for (;conn_it != end_it;) { auto& conn = conn_it->second; // Checking the connection idleness - if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { + if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) { ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; std::lock_guard lock(connections_lock); conn->status = STATUS_CONNECTION_IDLE; @@ -488,15 +489,14 @@ private: continue; } - reply_count += rd_kafka_poll(conn->producer, read_timeout_ms); + reply_count += rd_kafka_poll(conn->producer, read_timeout); // just increment the iterator ++conn_it; } - // if no messages were received or published - // across all connection, sleep for 100ms + // sleep if no messages were received or published across all connection if (send_count == 0 && reply_count == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(read_timeout*3)); } } } @@ -510,15 +510,12 @@ public: Manager(size_t _max_connections, size_t _max_inflight, size_t _max_queue, - int _read_timeout_ms, CephContext* _cct) : max_connections(_max_connections), max_inflight(_max_inflight), max_queue(_max_queue), - max_idle_time(30), connection_count(0), stopped(false), - read_timeout_ms(_read_timeout_ms), connections(_max_connections), messages(max_queue), queued(0), @@ -673,14 +670,13 @@ static Manager* s_manager = nullptr; static const size_t MAX_CONNECTIONS_DEFAULT = 256; static const size_t MAX_INFLIGHT_DEFAULT = 8192; static const size_t MAX_QUEUE_DEFAULT = 8192; -static const int READ_TIMEOUT_MS_DEFAULT = 500; bool init(CephContext* cct) { if (s_manager) { return false; } // TODO: take conf from CephContext - s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct); + s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, cct); return true; }