From: kchheda3 Date: Thu, 26 Oct 2023 19:57:54 +0000 (-0400) Subject: rgw/notification: Fix Kafka persistent notification breakage that are not retried. X-Git-Tag: v17.2.8~102^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1c626928a3b38d86c39e3c74b3f6e552cd5a14fe;p=ceph.git rgw/notification: Fix Kafka persistent notification breakage that are not retried. Signed-off-by: kchheda3 (cherry picked from commit b2fadb65ae6a9eb3eaade56e04b08f2b694b220c) --- diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 7a4163d387723..e91c42c21136c 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -35,6 +35,7 @@ static const int STATUS_CONNECTION_CLOSED = -0x1002; static const int STATUS_QUEUE_FULL = -0x1003; static const int STATUS_MAX_INFLIGHT = -0x1004; static const int STATUS_MANAGER_STOPPED = -0x1005; +static const int STATUS_CONNECTION_IDLE = -0x1006; // status code for connection opening static const int STATUS_CONF_ALLOC_FAILED = -0x2001; static const int STATUS_CONF_REPLCACE = -0x2002; @@ -93,7 +94,9 @@ struct connection_t { // fire all remaining callbacks (if not fired by rd_kafka_flush) std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) { cb_tag.cb(status); - ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl; + ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" + << cb_tag.tag << " for: " << broker + << " with status: " << status << dendl; }); callbacks.clear(); delivery_tag = 1; @@ -385,7 +388,9 @@ private: if (tag) { auto const q_len = conn->callbacks.size(); if (q_len < max_inflight) { - ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; + ldout(conn->cct, 20) + << "Kafka publish (with callback, tag=" << *tag + << "): OK. Queue has: " << q_len + 1 << " callbacks" << dendl; conn->callbacks.emplace_back(*tag, message->cb); } else { // immediately invoke callback with error - this is not a connection error @@ -429,7 +434,10 @@ private: if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) { ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; std::lock_guard lock(connections_lock); - ERASE_AND_CONTINUE(conn_it, connections); + conn->status = STATUS_CONNECTION_IDLE; + conn_it = connections.erase(conn_it); + --connection_count; + continue; } // try to reconnect the connection if it has an error