]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: failed to reconnect to broker after idle timeout 50965/head
authorYuval Lifshitz <ylifshit@redhat.com>
Sun, 9 Apr 2023 17:58:18 +0000 (17:58 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 9 Apr 2023 17:58:18 +0000 (17:58 +0000)
This is a regression due to: 8713c3169c0f9df1d2fc23ff2b82ede1e25be282

To reproduce, define a topic and a notification pointing to a kafka broker.
Stop the broker and upload objects to the cluster, wait for 30 seconds, and then start the broker.

Fixes: https://tracker.ceph.com/issues/59383?next_issue_id=59381
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_kafka.cc

index ef647a601a3e30c56f84436b6ce005b0b1d4c351..163b242d5c022063167b228ee9f656fd9bdb197b 100644 (file)
@@ -36,6 +36,7 @@ static const int STATUS_CONNECTION_CLOSED =      -0x1002;
 static const int STATUS_QUEUE_FULL =             -0x1003;
 static const int STATUS_MAX_INFLIGHT =           -0x1004;
 static const int STATUS_MANAGER_STOPPED =        -0x1005;
+static const int STATUS_CONNECTION_IDLE =        -0x1006;
 // status code for connection opening
 static const int STATUS_CONF_ALLOC_FAILED      = -0x2001;
 
@@ -86,12 +87,17 @@ struct connection_t {
         rd_kafka_conf_destroy(temp_conf);
         return;
     }
+    if (!is_ok()) {
+      // no producer, nothing to destroy
+      return;
+    }
     // wait for all remaining acks/nacks
     rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
     // destroy all topics
     std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
     // destroy producer
     rd_kafka_destroy(producer);
+    producer = nullptr;
     // fire all remaining callbacks (if not fired by rd_kafka_flush)
     std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
         cb_tag.cb(status);
@@ -113,7 +119,7 @@ struct connection_t {
 
   // dtor also destroys the internals
   ~connection_t() {
-    destroy(STATUS_CONNECTION_CLOSED);
+    destroy(status);
   }
 
   friend void intrusive_ptr_add_ref(const connection_t* p);
@@ -153,6 +159,8 @@ std::string status_to_string(int s) {
       return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
     case STATUS_CONF_ALLOC_FAILED:
       return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+    case STATUS_CONNECTION_IDLE:
+      return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
   }
   return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
 }
@@ -200,6 +208,11 @@ void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char *
     ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
 }
 
+void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+  const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
+  ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl;
+}
+
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn) {
   // pointer must be valid and not marked for deletion
@@ -273,6 +286,8 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
 
   // redirect kafka logs to RGW
   rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
+  // define poll callback to allow reconnect
+  rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback);
   // create the producer
   conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
   if (!conn->producer) {
@@ -333,16 +348,6 @@ struct message_wrapper_t {
 typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
-// macros used inside a loop where an iterator is either incremented or erased
-#define INCREMENT_AND_CONTINUE(IT) \
-          ++IT; \
-          continue;
-
-#define ERASE_AND_CONTINUE(IT,CONTAINER) \
-          IT=CONTAINER.erase(IT); \
-          --connection_count; \
-          continue;
-
 class Manager {
 public:
   const size_t max_connections;
@@ -475,8 +480,11 @@ private:
 
         // Checking the connection idlesness
         if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
-          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
-          ERASE_AND_CONTINUE(conn_it, connections);
+          ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+          conn->destroy(STATUS_CONNECTION_IDLE);
+          conn_it = connections.erase(conn_it);
+          --connection_count; \
+          continue;
         }
 
         // try to reconnect the connection if it has an error
@@ -491,7 +499,8 @@ private:
           } else {
             ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
           }
-          INCREMENT_AND_CONTINUE(conn_it);
+          ++conn_it;
+          continue;
         }
 
         reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);