]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: Fix Kafka persistent notification breakage that are not retried. 56145/head
authorkchheda3 <kchheda3@bloomberg.net>
Thu, 26 Oct 2023 19:57:54 +0000 (15:57 -0400)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 2 Sep 2024 11:32:49 +0000 (11:32 +0000)
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
(cherry picked from commit b2fadb65ae6a9eb3eaade56e04b08f2b694b220c)

src/rgw/rgw_kafka.cc

index 7a4163d3877237ddb5e11581354027398273c9b4..e91c42c21136ce2cda7c3fcd22d779f0db032e83 100644 (file)
@@ -35,6 +35,7 @@ static const int STATUS_CONNECTION_CLOSED =      -0x1002;
 static const int STATUS_QUEUE_FULL =             -0x1003;
 static const int STATUS_MAX_INFLIGHT =           -0x1004;
 static const int STATUS_MANAGER_STOPPED =        -0x1005;
+static const int STATUS_CONNECTION_IDLE =        -0x1006;
 // status code for connection opening
 static const int STATUS_CONF_ALLOC_FAILED      = -0x2001;
 static const int STATUS_CONF_REPLCACE          = -0x2002;
@@ -93,7 +94,9 @@ struct connection_t {
     // fire all remaining callbacks (if not fired by rd_kafka_flush)
     std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
         cb_tag.cb(status);
-        ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+        ldout(cct, 20) << "Kafka destroy: invoking callback with tag="
+                       << cb_tag.tag << " for: " << broker
+                       << " with status: " << status << dendl;
       });
     callbacks.clear();
     delivery_tag = 1;
@@ -385,7 +388,9 @@ private:
     if (tag) {
       auto const q_len = conn->callbacks.size();
       if (q_len < max_inflight) {
-        ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+        ldout(conn->cct, 20)
+            << "Kafka publish (with callback, tag=" << *tag
+            << "): OK. Queue has: " << q_len + 1 << " callbacks" << dendl;
         conn->callbacks.emplace_back(*tag, message->cb);
       } else {
         // immediately invoke callback with error - this is not a connection error
@@ -429,7 +434,10 @@ private:
         if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
           ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           std::lock_guard lock(connections_lock);
-          ERASE_AND_CONTINUE(conn_it, connections);
+          conn->status = STATUS_CONNECTION_IDLE;
+          conn_it = connections.erase(conn_it);
+          --connection_count;
+          continue;
         }
 
         // try to reconnect the connection if it has an error