]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification/kafka: simplify kafka connection memory management 51796/head
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 22 May 2023 09:38:14 +0000 (09:38 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 28 May 2023 16:33:15 +0000 (16:33 +0000)
use a bare pointer and a unique_ptr instead of a shared pointer (boost::intrusive_ptr).
and use the broker name as the handle exposed outside of the kafka module.

Fixes: https://tracker.ceph.com/issues/61328
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit fbd287602742ecbe6a0d383ce9db1b6a65c348cb)

 Conflicts:
src/rgw/rgw_kafka.cc

src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h

index 9f1d7c57553b479c628988e2999c4bee38a2af1a..f15aa3bcc693d9b8c7d0b1dccf1df35d938d8ebc 100644 (file)
@@ -285,8 +285,8 @@ private:
   };
   CephContext* const cct;
   const std::string topic;
-  kafka::connection_ptr_t conn;
   const ack_level_t ack_level;
+  std::string conn_name;
 
 
   ack_level_t get_ack_level(const RGWHTTPArgs& args) {
@@ -309,9 +309,9 @@ public:
       CephContext* _cct) : 
         cct(_cct),
         topic(_topic),
-        conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"), args.get_optional("mechanism"))) ,
         ack_level(get_ack_level(args)) {
-    if (!conn) { 
+    if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), 
+          args.get_optional("ca-location"), args.get_optional("mechanism"))) {
       throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
     }
   }
@@ -371,13 +371,12 @@ public:
   };
 
   int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
-    ceph_assert(conn);
     if (ack_level == ack_level_t::None) {
-      return kafka::publish(conn, topic, json_format_pubsub_event(event));
+      return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
     } else {
       // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
       auto w = std::unique_ptr<Waiter>(new Waiter);
-      const auto rc = kafka::publish_with_confirm(conn, 
+      const auto rc = kafka::publish_with_confirm(conn_name
         topic,
         json_format_pubsub_event(event),
         std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
@@ -391,7 +390,7 @@ public:
 
   std::string to_str() const override {
     std::string str("Kafka Endpoint");
-    str += kafka::to_string(conn);
+    str += "\nBroker: " + conn_name;
     str += "\nTopic: " + topic;
     return str;
   }
index cde0ea6200ec8f86ab9cc668043e87d5db3c664c..651d7099ebc58bedc7f9132e952e1dbbe4fcc0ec 100644 (file)
@@ -31,13 +31,13 @@ bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
 namespace rgw::kafka {
 
 // status codes for publishing
-// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
 static const int STATUS_CONNECTION_CLOSED =      -0x1002;
 static const int STATUS_QUEUE_FULL =             -0x1003;
 static const int STATUS_MAX_INFLIGHT =           -0x1004;
 static const int STATUS_MANAGER_STOPPED =        -0x1005;
 // status code for connection opening
 static const int STATUS_CONF_ALLOC_FAILED      = -0x2001;
+static const int STATUS_CONF_REPLCACE          = -0x2002;
 
 static const int STATUS_OK =                     0x0;
 
@@ -64,7 +64,6 @@ struct connection_t {
   std::vector<rd_kafka_topic_t*> topics;
   uint64_t delivery_tag = 1;
   int status = STATUS_OK;
-  mutable std::atomic<int> ref_count = 0;
   CephContext* const cct;
   CallbackList callbacks;
   const std::string broker;
@@ -115,29 +114,8 @@ struct connection_t {
   ~connection_t() {
     destroy(STATUS_CONNECTION_CLOSED);
   }
-
-  friend void intrusive_ptr_add_ref(const connection_t* p);
-  friend void intrusive_ptr_release(const connection_t* p);
 };
 
-std::string to_string(const connection_ptr_t& conn) {
-    std::string str;
-    str += "\nBroker: " + conn->broker; 
-    str += conn->use_ssl ? "\nUse SSL" : ""; 
-    str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
-    str += conn->mechanism ? "\nSASL Mechanism: " + *(conn->mechanism) : "";
-    return str;
-}
-// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
-void intrusive_ptr_add_ref(const connection_t* p) {
-  ++p->ref_count;
-}
-void intrusive_ptr_release(const connection_t* p) {
-  if (--p->ref_count == 0) {
-    delete p;
-  }
-}
-
 // convert int status to string - including RGW specific values
 std::string status_to_string(int s) {
   switch (s) {
@@ -153,6 +131,8 @@ std::string status_to_string(int s) {
       return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
     case STATUS_CONF_ALLOC_FAILED:
       return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+    case STATUS_CONF_REPLCACE:
+      return "RGW_KAFKA_STATUS_CONF_REPLCACE";
   }
   return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
 }
@@ -200,11 +180,15 @@ void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char *
     ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
 }
 
-// utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn) {
-  // pointer must be valid and not marked for deletion
-  ceph_assert(conn);
-  
+void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+  const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
+  ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl;
+}
+
+using connection_t_ptr = std::unique_ptr<connection_t>;
+
+// utility function to create a producer, when the connection object already exists
+bool new_producer(connection_t* conn) {
   // reset all status codes
   conn->status = STATUS_OK; 
   char errstr[512] = {0};
@@ -212,7 +196,7 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
   conn->temp_conf = rd_kafka_conf_new();
   if (!conn->temp_conf) {
     conn->status = STATUS_CONF_ALLOC_FAILED;
-    return conn;
+    return false;
   }
 
   // get list of brokers based on the bootsrap broker
@@ -269,16 +253,20 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
   rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
 
   // set the global opaque pointer to be the connection itself
-  rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
+  rd_kafka_conf_set_opaque(conn->temp_conf, conn);
 
   // redirect kafka logs to RGW
   rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
   // create the producer
+  if (conn->producer) {
+    ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl;
+    conn->destroy(STATUS_CONF_REPLCACE);
+  }
   conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
   if (!conn->producer) {
     conn->status = rd_kafka_last_error();
     ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
-    return conn;
+    return false;
   }
   ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
   {
@@ -296,41 +284,28 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
 
   // conf ownership passed to producer
   conn->temp_conf = nullptr;
-  return conn;
+  return true;
 
 conf_error:
   conn->status = rd_kafka_last_error();
   ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
-  return conn;
-}
-
-// utility function to create a new connection
-connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct,
-        bool use_ssl,
-        bool verify_ssl,
-        boost::optional<const std::string&> ca_location, 
-        const std::string& user, 
-        const std::string& password,
-        boost::optional<const std::string&> mechanism) { 
-  // create connection state
-  connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism));
-  return create_connection(conn);
+  return false;
 }
 
-/// struct used for holding messages in the message queue
+// struct used for holding messages in the message queue
 struct message_wrapper_t {
-  connection_ptr_t conn
+  std::string conn_name
   std::string topic;
   std::string message;
-  reply_callback_t cb;
+  const reply_callback_t cb;
   
-  message_wrapper_t(connection_ptr_t& _conn,
+  message_wrapper_t(const std::string& _conn_name,
       const std::string& _topic,
       const std::string& _message,
-      reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
+      reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {}
 };
 
-typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
+typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
 // macros used inside a loop where an iterator is either incremented or erased
@@ -363,15 +338,23 @@ private:
 
   // TODO use rd_kafka_produce_batch for better performance
   void publish_internal(message_wrapper_t* message) {
-    const std::unique_ptr<message_wrapper_t> msg_owner(message);
-    auto& conn = message->conn;
+    const std::unique_ptr<message_wrapper_t> msg_deleter(message);
+    const auto conn_it = connections.find(message->conn_name);
+    if (conn_it == connections.end()) {
+      ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl;
+      if (message->cb) {
+        message->cb(STATUS_CONNECTION_CLOSED);
+      }
+      return;
+    }
+    auto& conn = conn_it->second;
 
     conn->timestamp = ceph_clock_now(); 
 
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
-      ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl;
+      ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl;
       if (message->cb) {
         message->cb(conn->status);
       }
@@ -485,7 +468,7 @@ private:
           ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
           const auto& broker = conn_it->first;
           ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
-          if (create_connection(conn)->is_ok() == false) {
+          if (new_producer(conn.get()) == false) {
             ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
             // TODO: add error counter for failed retries
             // TODO: add exponential backoff for retries
@@ -552,33 +535,32 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, 
+  bool connect(std::string& broker,
+          const std::string& url, 
           bool use_ssl,
           bool verify_ssl,
           boost::optional<const std::string&> ca_location,
           boost::optional<const std::string&> mechanism) {
     if (stopped) {
-      // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
-      return nullptr;
+      return false;
     }
 
-    std::string broker;
-       std::string user;
-       std::string password;
+    std::string user;
+    std::string password;
     if (!parse_url_authority(url, broker, user, password)) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
-      return nullptr;
+      return false;
     }
 
     // this should be validated by the regex in parse_url()
     ceph_assert(user.empty() == password.empty());
 
-       if (!user.empty() && !use_ssl && !g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
+    if (!user.empty() && !use_ssl && !g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
       ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
-      return nullptr;
-       }
+      return false;
+    }
 
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(broker);
@@ -586,53 +568,49 @@ public:
     if (it != connections.end()) {
       // connection found - return even if non-ok
       ldout(cct, 20) << "Kafka connect: connection found" << dendl;
-      return it->second;
+      return it->second.get();
     }
 
     // connection not found, creating a new one
     if (connection_count >= max_connections) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
-      return nullptr;
+      return false;
     }
-    const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password, mechanism);
-    // create_new_connection must always return a connection object
+    // create_connection must always return a connection object
     // even if error occurred during creation. 
     // in such a case the creation will be retried in the main thread
-    ceph_assert(conn);
     ++connection_count;
     ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
-    return connections.emplace(broker, conn).first->second;
+    auto conn = connections.emplace(broker, std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get();
+    if (!new_producer(conn)) {
+      ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl;
+    }
+    return true;
   }
 
   // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
-  int publish(connection_ptr_t& conn
+  int publish(const std::string& conn_name
     const std::string& topic,
     const std::string& message) {
     if (stopped) {
       return STATUS_MANAGER_STOPPED;
     }
-    if (!conn || !conn->is_ok()) {
-      return STATUS_CONNECTION_CLOSED;
-    }
-    if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
+    if (messages.push(new message_wrapper_t(conn_name, topic, message, nullptr))) {
       ++queued;
       return STATUS_OK;
     }
     return STATUS_QUEUE_FULL;
   }
   
-  int publish_with_confirm(connection_ptr_t& conn
+  int publish_with_confirm(const std::string& conn_name
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
     if (stopped) {
       return STATUS_MANAGER_STOPPED;
     }
-    if (!conn || !conn->is_ok()) {
-      return STATUS_CONNECTION_CLOSED;
-    }
-    if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
+    if (messages.push(new message_wrapper_t(conn_name, topic, message, cb))) {
       ++queued;
       return STATUS_OK;
     }
@@ -697,26 +675,26 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
+bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
         boost::optional<const std::string&> ca_location,
         boost::optional<const std::string&> mechanism) {
-  if (!s_manager) return nullptr;
-  return s_manager->connect(url, use_ssl, verify_ssl, ca_location, mechanism);
+  if (!s_manager) return false;
+  return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
 }
 
-int publish(connection_ptr_t& conn, 
+int publish(const std::string& conn_name,
     const std::string& topic,
     const std::string& message) {
   if (!s_manager) return STATUS_MANAGER_STOPPED;
-  return s_manager->publish(conn, topic, message);
+  return s_manager->publish(conn_name, topic, message);
 }
 
-int publish_with_confirm(connection_ptr_t& conn, 
+int publish_with_confirm(const std::string& conn_name,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
   if (!s_manager) return STATUS_MANAGER_STOPPED;
-  return s_manager->publish_with_confirm(conn, topic, message, cb);
+  return s_manager->publish_with_confirm(conn_name, topic, message, cb);
 }
 
 size_t get_connection_count() {
index 6d3dcad4a004ce388f26756d84b9efd8d34b7a9d..813fda32969b9fac57a55dee26a691d1b6efdbff 100644 (file)
@@ -5,20 +5,11 @@
 
 #include <string>
 #include <functional>
-#include <boost/smart_ptr/intrusive_ptr.hpp>
 #include <boost/optional.hpp>
 
 #include "include/common_fwd.h"
 
 namespace rgw::kafka {
-// forward declaration of connection object
-struct connection_t;
-
-typedef boost::intrusive_ptr<connection_t> connection_ptr_t;
-
-// required interfaces needed so that connection_t could be used inside boost::intrusive_ptr
-void intrusive_ptr_add_ref(const connection_t* p);
-void intrusive_ptr_release(const connection_t* p);
 
 // the reply callback is expected to get an integer parameter
 // indicating the result, and not to return anything
@@ -31,17 +22,17 @@ bool init(CephContext* cct);
 void shutdown();
 
 // connect to a kafka endpoint
-connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
+bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
 
 // publish a message over a connection that was already created
-int publish(connection_ptr_t& conn,
+int publish(const std::string& conn_name,
     const std::string& topic,
     const std::string& message);
 
 // publish a message over a connection that was already created
 // and pass a callback that will be invoked (async) when broker confirms
 // receiving the message
-int publish_with_confirm(connection_ptr_t& conn,
+int publish_with_confirm(const std::string& conn_name,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb);
@@ -71,11 +62,5 @@ size_t get_max_inflight();
 // maximum number of messages in the queue
 size_t get_max_queue();
 
-// disconnect from a kafka broker
-bool disconnect(connection_ptr_t& conn);
-
-// display connection as string
-std::string to_string(const connection_ptr_t& conn);
-
 }