see_also:
- rgw_bucket_counters_cache
with_legacy: true
+- name: rgw_kafka_connection_idle
+ type: uint
+ level: advanced
+ desc: Time in seconds to delete idle kafka connections
+ long_desc: A conection will be considered "idle" if no messages
+ are sent to it for more than the time defined.
+ Note that the connection will not be considered idle, even if it is down,
+ as long as there are attempts to send messages to it.
+ default: 30
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_kafka_sleep_timeout
+ type: uint
+ level: advanced
+ desc: Time in milliseconds to sleep while polling for kafka replies
+ long_desc: This will be used to prevent busy waiting for the kafka replies
+ As well as for the cases where the broker is down and we try to reconnect.
+ The same values times 3 will be used to sleep if there were no messages
+ sent or received across all kafka connections
+ default: 10
+ services:
+ - rgw
+ with_legacy: true
const size_t max_connections;
const size_t max_inflight;
const size_t max_queue;
- const size_t max_idle_time;
private:
std::atomic<size_t> connection_count;
bool stopped;
conn_it = connections.begin();
end_it = connections.end();
}
+
+ const auto read_timeout = cct->_conf->rgw_kafka_sleep_timeout;
// loop over all connections to read acks
for (;conn_it != end_it;) {
auto& conn = conn_it->second;
// Checking the connection idleness
- if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+ if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
std::lock_guard lock(connections_lock);
conn->status = STATUS_CONNECTION_IDLE;
continue;
}
- reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
+ reply_count += rd_kafka_poll(conn->producer, read_timeout);
// just increment the iterator
++conn_it;
}
- // if no messages were received or published
- // across all connection, sleep for 100ms
+ // sleep if no messages were received or published across all connection
if (send_count == 0 && reply_count == 0) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(std::chrono::milliseconds(read_timeout*3));
}
}
}
Manager(size_t _max_connections,
size_t _max_inflight,
size_t _max_queue,
- int _read_timeout_ms,
CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
- max_idle_time(30),
connection_count(0),
stopped(false),
- read_timeout_ms(_read_timeout_ms),
connections(_max_connections),
messages(max_queue),
queued(0),
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
-static const int READ_TIMEOUT_MS_DEFAULT = 500;
bool init(CephContext* cct) {
if (s_manager) {
return false;
}
// TODO: take conf from CephContext
- s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, cct);
return true;
}