From 054ddb2435d73070f3de717a6ad5a312351f4a45 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 9 Apr 2023 17:58:18 +0000 Subject: [PATCH] rgw/kafka: failed to reconnect to broker after idle timeout This is a regression due to: 8713c3169c0f9df1d2fc23ff2b82ede1e25be282 To reproduce, define a topic and a notification pointing to a kafka broker. Stop the broker and upload objects to the cluster, wait for 30 seconds, and then start the broker. Fixes: https://tracker.ceph.com/issues/59383?next_issue_id=59381 Signed-off-by: Yuval Lifshitz --- src/rgw/rgw_kafka.cc | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index ef647a601a3e..163b242d5c02 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -36,6 +36,7 @@ static const int STATUS_CONNECTION_CLOSED = -0x1002; static const int STATUS_QUEUE_FULL = -0x1003; static const int STATUS_MAX_INFLIGHT = -0x1004; static const int STATUS_MANAGER_STOPPED = -0x1005; +static const int STATUS_CONNECTION_IDLE = -0x1006; // status code for connection opening static const int STATUS_CONF_ALLOC_FAILED = -0x2001; @@ -86,12 +87,17 @@ struct connection_t { rd_kafka_conf_destroy(temp_conf); return; } + if (!is_ok()) { + // no producer, nothing to destroy + return; + } // wait for all remaining acks/nacks rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */); // destroy all topics std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);}); // destroy producer rd_kafka_destroy(producer); + producer = nullptr; // fire all remaining callbacks (if not fired by rd_kafka_flush) std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) { cb_tag.cb(status); @@ -113,7 +119,7 @@ struct connection_t { // dtor also destroys the internals ~connection_t() { - destroy(STATUS_CONNECTION_CLOSED); + destroy(status); } friend void intrusive_ptr_add_ref(const connection_t* p); @@ -153,6 +159,8 @@ std::string status_to_string(int s) { return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; case STATUS_CONF_ALLOC_FAILED: return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; + case STATUS_CONNECTION_IDLE: + return "RGW_KAFKA_STATUS_CONNECTION_IDLE"; } return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); } @@ -200,6 +208,11 @@ void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char * ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; } +void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) { + const auto conn = reinterpret_cast(rd_kafka_opaque(rk)); + ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl; +} + // utility function to create a connection, when the connection object already exists connection_ptr_t& create_connection(connection_ptr_t& conn) { // pointer must be valid and not marked for deletion @@ -273,6 +286,8 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { // redirect kafka logs to RGW rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback); + // define poll callback to allow reconnect + rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback); // create the producer conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); if (!conn->producer) { @@ -333,16 +348,6 @@ struct message_wrapper_t { typedef std::unordered_map ConnectionList; typedef boost::lockfree::queue> MessageQueue; -// macros used inside a loop where an iterator is either incremented or erased -#define INCREMENT_AND_CONTINUE(IT) \ - ++IT; \ - continue; - -#define ERASE_AND_CONTINUE(IT,CONTAINER) \ - IT=CONTAINER.erase(IT); \ - --connection_count; \ - continue; - class Manager { public: const size_t max_connections; @@ -475,8 +480,11 @@ private: // Checking the connection idlesness if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { - ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; - ERASE_AND_CONTINUE(conn_it, connections); + ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; + conn->destroy(STATUS_CONNECTION_IDLE); + conn_it = connections.erase(conn_it); + --connection_count; \ + continue; } // try to reconnect the connection if it has an error @@ -491,7 +499,8 @@ private: } else { ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl; } - INCREMENT_AND_CONTINUE(conn_it); + ++conn_it; + continue; } reply_count += rd_kafka_poll(conn->producer, read_timeout_ms); -- 2.47.3