]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: For kafka include user-id & password as part of the key along with... 62495/head
authorkchheda3 <kchheda3@bloomberg.net>
Thu, 23 May 2024 20:20:05 +0000 (16:20 -0400)
committerkchheda3 <kchheda3@bloomberg.net>
Tue, 25 Mar 2025 13:52:28 +0000 (13:52 +0000)
For kafka, currently connection pooling is done based on endpoint, so all the events with same endpoint share the same connection. but there are issues when userid & password is created/changed for the endpoint, coz the old connection is cached in broker manager and when new event with updated/new userid-password is sent, broker still uses the old connection that was created with old/no userid/password as currently only the `endpoint` is the key to connection pool.
To fix this, use all the topic attributes that are part of connection as the key to connection pool and if any of the attribute changes create new kafka connection. Attibutes include userid, password, ssl, ca_laction.

Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/test/rgw/bucket_notification/test_bn.py

index f1a1e8c9546bf4a373da3ccb1adb3dbe515545d1..e0d742d6f10b3c382db69eb2f1a0dacf976f46e5 100644 (file)
@@ -289,8 +289,7 @@ private:
   };
   const std::string topic;
   const ack_level_t ack_level;
-  std::string conn_name;
-
+  kafka::connection_id_t conn_id;
 
   ack_level_t get_ack_level(const RGWHTTPArgs& args) {
     bool exists;
@@ -311,27 +310,24 @@ public:
       const RGWHTTPArgs& args) : 
         topic(_topic),
         ack_level(get_ack_level(args)) {
-    if (!kafka::connect(conn_name, _endpoint,
-                        get_bool(args, "use-ssl", false),
-                        get_bool(args, "verify-ssl", true),
-                        args.get_optional("ca-location"),
-                        args.get_optional("mechanism"),
-                        args.get_optional("user-name"),
-                        args.get_optional("password"))) {
-      throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
-    }
-  }
+   if (!kafka::connect(
+           conn_id, _endpoint, get_bool(args, "use-ssl", false),
+           get_bool(args, "verify-ssl", true), args.get_optional("ca-location"),
+           args.get_optional("mechanism"), args.get_optional("user-name"),
+           args.get_optional("password"))) {
+     throw configuration_error("Kafka: failed to create connection to: " +
+                               _endpoint);
+   }
+ }
 
   int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
+      return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
     } else {
       auto w = std::make_unique<Waiter>();
-      const auto rc = kafka::publish_with_confirm(conn_name, 
-        topic,
-        json_format_pubsub_event(event),
-        [wp = w.get()](int r) { wp->finish(r); }
-      );
+      const auto rc = kafka::publish_with_confirm(
+          conn_id, topic, json_format_pubsub_event(event),
+          [wp = w.get()](int r) { wp->finish(r); });
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;
@@ -342,7 +338,7 @@ public:
 
   std::string to_str() const override {
     std::string str("Kafka Endpoint");
-    str += "\nBroker: " + conn_name;
+    str += "\nBroker: " + to_string(conn_id);
     str += "\nTopic: " + topic;
     return str;
   }
index dbc893d70c9ddf7d15e657d37c86406df043c2d1..e0c8a37acc87624bfbbcab670b0ccd96c6a121db 100644 (file)
@@ -13,6 +13,7 @@
 #include <thread>
 #include <atomic>
 #include <mutex>
+#include <boost/functional/hash.hpp>
 #include <boost/lockfree/queue.hpp>
 #include "common/dout.h"
 
@@ -65,6 +66,47 @@ inline std::string status_to_string(int s) {
   }
 }
 
+connection_id_t::connection_id_t(
+    const std::string& _broker,
+    const std::string& _user,
+    const std::string& _password,
+    const boost::optional<const std::string&>& _ca_location,
+    const boost::optional<const std::string&>& _mechanism,
+    bool _ssl)
+    : broker(_broker), user(_user), password(_password), ssl(_ssl) {
+  if (_ca_location.has_value()) {
+    ca_location = _ca_location.get();
+  }
+  if (_mechanism.has_value()) {
+    mechanism = _mechanism.get();
+  }
+}
+
+// equality operator and hasher functor are needed
+// so that connection_id_t could be used as key in unordered_map
+bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
+  return lhs.broker == rhs.broker && lhs.user == rhs.user &&
+         lhs.password == rhs.password && lhs.ca_location == rhs.ca_location &&
+         lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl;
+}
+
+struct connection_id_hasher {
+  std::size_t operator()(const connection_id_t& k) const {
+    std::size_t h = 0;
+    boost::hash_combine(h, k.broker);
+    boost::hash_combine(h, k.user);
+    boost::hash_combine(h, k.password);
+    boost::hash_combine(h, k.ca_location);
+    boost::hash_combine(h, k.mechanism);
+    boost::hash_combine(h, k.ssl);
+    return h;
+  }
+};
+
+std::string to_string(const connection_id_t& id) {
+  return id.broker + ":" + id.user;
+}
+
 // convert int status to errno - both RGW and librdkafka values
 inline int status_to_errno(int s) {
   if (s == 0) return 0;
@@ -169,8 +211,9 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
       ldout(conn->cct, 20) << "Kafka run: ack received with result=" <<
         rd_kafka_err2str(result) << dendl;
   } else {
-      ldout(conn->cct, 1) << "Kafka run: nack received with result=" <<
-        rd_kafka_err2str(result) << dendl;
+    ldout(conn->cct, 1) << "Kafka run: nack received with result="
+                        << rd_kafka_err2str(result)
+                        << " for broker: " << conn->broker << dendl;
   }
 
   if (!rkmessage->_private) {
@@ -331,18 +374,21 @@ conf_error:
 
 // struct used for holding messages in the message queue
 struct message_wrapper_t {
-  std::string conn_name; 
+  connection_id_t conn_id;
   std::string topic;
   std::string message;
   const reply_callback_t cb;
-  
-  message_wrapper_t(const std::string& _conn_name,
-      const std::string& _topic,
-      const std::string& _message,
-      reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {}
+
+  message_wrapper_t(const connection_id_t& _conn_id,
+                    const std::string& _topic,
+                    const std::string& _message,
+                    reply_callback_t _cb)
+      : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
 };
 
-typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
+typedef std::
+    unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher>
+        ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
 class Manager {
@@ -365,7 +411,7 @@ private:
   // TODO use rd_kafka_produce_batch for better performance
   void publish_internal(message_wrapper_t* message) {
     const std::unique_ptr<message_wrapper_t> msg_deleter(message);
-    const auto conn_it = connections.find(message->conn_name);
+    const auto conn_it = connections.find(message->conn_id);
     if (conn_it == connections.end()) {
       ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue" << dendl;
       if (message->cb) {
@@ -426,7 +472,9 @@ private:
             tag);
     if (rc == -1) {
       const auto err = rd_kafka_last_error();
-      ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+      ldout(conn->cct, 1) << "Kafka publish: failed to produce for topic: "
+                          << message->topic
+                          << ". with error: " << rd_kafka_err2str(err) << dendl;
       // immediatly invoke callback on error if needed
       if (message->cb) {
         message->cb(-rd_kafka_err2errno(err));
@@ -545,14 +593,14 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  bool connect(std::string& broker,
-          const std::string& url, 
-          bool use_ssl,
-          bool verify_ssl,
-          boost::optional<const std::string&> ca_location,
-          boost::optional<const std::string&> mechanism,
-          boost::optional<const std::string&> topic_user_name,
-          boost::optional<const std::string&> topic_password) {
+  bool connect(connection_id_t& conn_id,
+               const std::string& url,
+               bool use_ssl,
+               bool verify_ssl,
+               boost::optional<const std::string&> ca_location,
+               boost::optional<const std::string&> mechanism,
+               boost::optional<const std::string&> topic_user_name,
+               boost::optional<const std::string&> topic_password) {
     if (stopped) {
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
       return false;
@@ -560,6 +608,7 @@ public:
 
     std::string user;
     std::string password;
+    std::string broker;
     if (!parse_url_authority(url, broker, user, password)) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
@@ -588,14 +637,17 @@ public:
       ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
       return false;
     }
-
+    connection_id_t tmp_id(broker, user, password, ca_location, mechanism,
+                           use_ssl);
     std::lock_guard lock(connections_lock);
-    const auto it = connections.find(broker);
+    const auto it = connections.find(tmp_id);
     // note that ssl vs. non-ssl connection to the same host are two separate connections
     if (it != connections.end()) {
       // connection found - return even if non-ok
-      ldout(cct, 20) << "Kafka connect: connection found" << dendl;
-      return it->second.get();
+      ldout(cct, 20) << "Kafka connect: connection found: " << to_string(tmp_id)
+                     << dendl;
+      conn_id = std::move(tmp_id);
+      return true;
     }
 
     // connection not found, creating a new one
@@ -611,20 +663,24 @@ public:
       return false;
     }
     ++connection_count;
-    connections.emplace(broker, std::move(conn));
+    connections.emplace(tmp_id, std::move(conn));
 
-    ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
+    ldout(cct, 10) << "Kafka connect: new connection is created: "
+                   << to_string(tmp_id)
+                   << " . Total connections: " << connection_count << dendl;
+    conn_id = std::move(tmp_id);
     return true;
   }
 
   // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
-  int publish(const std::string& conn_name, 
-    const std::string& topic,
-    const std::string& message) {
+  int publish(const connection_id_t& conn_id,
+              const std::string& topic,
+              const std::string& message) {
     if (stopped) {
       return -ESRCH;
     }
-    auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, nullptr);
+    auto message_wrapper =
+        std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
     if (messages.push(message_wrapper.get())) {
       std::ignore = message_wrapper.release();
       ++queued;
@@ -632,15 +688,16 @@ public:
     }
     return -EBUSY;
   }
-  
-  int publish_with_confirm(const std::string& conn_name, 
-    const std::string& topic,
-    const std::string& message,
-    reply_callback_t cb) {
+
+  int publish_with_confirm(const connection_id_t& conn_id,
+                           const std::string& topic,
+                           const std::string& message,
+                           reply_callback_t cb) {
     if (stopped) {
       return -ESRCH;
     }
-    auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, cb);
+    auto message_wrapper =
+        std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
     if (messages.push(message_wrapper.get())) {
       std::ignore = message_wrapper.release();
       ++queued;
@@ -711,31 +768,35 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
-        boost::optional<const std::string&> ca_location,
-        boost::optional<const std::string&> mechanism,
-        boost::optional<const std::string&> user_name,
-        boost::optional<const std::string&> password) {
+bool connect(connection_id_t& conn_id,
+             const std::string& url,
+             bool use_ssl,
+             bool verify_ssl,
+             boost::optional<const std::string&> ca_location,
+             boost::optional<const std::string&> mechanism,
+             boost::optional<const std::string&> user_name,
+             boost::optional<const std::string&> password) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return false;
-  return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
+  return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
+                            mechanism, user_name, password);
 }
 
-int publish(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message) {
+int publish(const connection_id_t& conn_id,
+            const std::string& topic,
+            const std::string& message) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return -ESRCH;
-  return s_manager->publish(conn_name, topic, message);
+  return s_manager->publish(conn_id, topic, message);
 }
 
-int publish_with_confirm(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message,
-    reply_callback_t cb) {
+int publish_with_confirm(const connection_id_t& conn_id,
+                         const std::string& topic,
+                         const std::string& message,
+                         reply_callback_t cb) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return -ESRCH;
-  return s_manager->publish_with_confirm(conn_name, topic, message, cb);
+  return s_manager->publish_with_confirm(conn_id, topic, message, cb);
 }
 
 size_t get_connection_count() {
index a6a38ed81ab8e1bf3821473afe28ac612c9fb764..b7aa0d15759fc0822af6f60b660eb3c17b39d281 100644 (file)
@@ -21,28 +21,47 @@ bool init(CephContext* cct);
 // shutdown the kafka manager
 void shutdown();
 
+// key class for the connection list
+struct connection_id_t {
+  std::string broker;
+  std::string user;
+  std::string password;
+  std::string ca_location;
+  std::string mechanism;
+  bool ssl = false;
+  connection_id_t() = default;
+  connection_id_t(const std::string& _broker,
+                  const std::string& _user,
+                  const std::string& _password,
+                  const boost::optional<const std::string&>& _ca_location,
+                  const boost::optional<const std::string&>& _mechanism,
+                  bool _ssl);
+};
+
+std::string to_string(const connection_id_t& id);
+
 // connect to a kafka endpoint
-bool connect(std::string& broker,
-  const std::string& url,
-  bool use_ssl,
-  bool verify_ssl,
-  boost::optional<const std::string&> ca_location,
-  boost::optional<const std::string&> mechanism,
-  boost::optional<const std::string&> user_name,
-  boost::optional<const std::string&> password);
+bool connect(connection_id_t& conn_id,
+             const std::string& url,
+             bool use_ssl,
+             bool verify_ssl,
+             boost::optional<const std::string&> ca_location,
+             boost::optional<const std::string&> mechanism,
+             boost::optional<const std::string&> user_name,
+             boost::optional<const std::string&> password);
 
 // publish a message over a connection that was already created
-int publish(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message);
+int publish(const connection_id_t& conn_id,
+            const std::string& topic,
+            const std::string& message);
 
 // publish a message over a connection that was already created
 // and pass a callback that will be invoked (async) when broker confirms
 // receiving the message
-int publish_with_confirm(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message,
-    reply_callback_t cb);
+int publish_with_confirm(const connection_id_t& conn_id,
+                         const std::string& topic,
+                         const std::string& message,
+                         reply_callback_t cb);
 
 // convert the integer status returned from the "publish" function to a string
 std::string status_to_string(int s);
index 4b5a23e58222505200349c8acbb46d30661c9fbb..23a2f840261b4a52e61b05395fe40e243c6466b8 100644 (file)
@@ -5546,3 +5546,105 @@ def test_notification_caching():
     conn.delete_bucket(bucket_name)
     if receiver is not None:
         stop_kafka_receiver(receiver, task)
+
+
+@attr('kafka_test')
+def test_connection_caching():
+    """ test connection caching """
+    conn = connection()
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name_1 = bucket_name + TOPIC_SUFFIX + "-without-ssl"
+    topic_name_2 = bucket_name + TOPIC_SUFFIX + "-with-ssl"
+
+    # start kafka receiver
+    task_1, receiver_1 = create_kafka_receiver_thread(topic_name_1)
+    task_1.start()
+    task_2, receiver_2 = create_kafka_receiver_thread(topic_name_2)
+    task_2.start()
+    endpoint_address = 'kafka://' + kafka_server
+    endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true'
+
+    # initially create both s3 topics with `use-ssl=true`
+    zonegroup = get_config_zonegroup()
+    topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup,
+                             endpoint_args=endpoint_args)
+    topic_arn_1 = topic_conf_1.set_config()
+    topic_conf_2 = PSTopicS3(conn, topic_name_2, zonegroup,
+                             endpoint_args=endpoint_args)
+    topic_arn_2 = topic_conf_2.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn_1,
+                        'Events': []
+                        },
+                       {'Id': notification_name + '_2', 'TopicArn': topic_arn_2,
+                        'Events': []}]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status / 100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024))
+        thr = threading.Thread(target=set_contents_from_string,
+                               args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + async notification is: ' + str(
+        time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target=key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + async notification is: ' + str(
+        time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+    time.sleep(30)
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name_1],
+                   get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+    # remove the ssl from topic1 and update the topic.
+    endpoint_address = 'kafka://' + kafka_server
+    topic_conf_1.set_attributes(attribute_name="use-ssl",
+                                attribute_val="false")
+    keys = list(bucket.list())
+    wait_for_queue_to_drain(topic_name_1)
+    receiver_1.verify_s3_events(keys, deletions=True)
+    # topic stats for 2nd topic will still reflect entries
+    result = admin(['topic', 'stats', '--topic', topic_name_2],
+                   get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf_1.del_config()
+    topic_conf_2.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    if receiver_1 is not None:
+        stop_kafka_receiver(receiver_1, task_1)
+    if receiver_2 is not None:
+        stop_kafka_receiver(receiver_2, task_2)