]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: For kafka include user-id & password as part of the key along with...
authorkchheda3 <kchheda3@bloomberg.net>
Thu, 23 May 2024 20:20:05 +0000 (16:20 -0400)
committerkchheda3 <kchheda3@bloomberg.net>
Tue, 28 May 2024 21:33:28 +0000 (17:33 -0400)
For kafka, currently connection pooling is done based on endpoint, so all the events with same endpoint share the same connection. but there are issues when userid & password is created/changed for the endpoint, coz the old connection is cached in broker manager and when new event with updated/new userid-password is sent, broker still uses the old connection that was created with old/no userid/password as currently only the `endpoint` is the key to connection pool.
To fix this, use all the topic attributes that are part of connection as the key to connection pool and if any of the attribute changes create new kafka connection. Attibutes include userid, password, ssl, ca_laction.

Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/test/rgw/bucket_notification/test_bn.py

index b5b97c9ba6242f7f7020eada62b257b1fcdc47ae..f2a7affcb0374253b7e85a8d0f7a897e069adfb1 100644 (file)
@@ -289,8 +289,7 @@ private:
   };
   const std::string topic;
   const ack_level_t ack_level;
-  std::string conn_name;
-
+  kafka::connection_id_t conn_id;
 
   ack_level_t get_ack_level(const RGWHTTPArgs& args) {
     bool exists;
@@ -311,27 +310,24 @@ public:
       const RGWHTTPArgs& args) : 
         topic(_topic),
         ack_level(get_ack_level(args)) {
-    if (!kafka::connect(conn_name, _endpoint,
-                        get_bool(args, "use-ssl", false),
-                        get_bool(args, "verify-ssl", true),
-                        args.get_optional("ca-location"),
-                        args.get_optional("mechanism"),
-                        args.get_optional("user-name"),
-                        args.get_optional("password"))) {
-      throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
-    }
-  }
+   if (!kafka::connect(
+           conn_id, _endpoint, get_bool(args, "use-ssl", false),
+           get_bool(args, "verify-ssl", true), args.get_optional("ca-location"),
+           args.get_optional("mechanism"), args.get_optional("user-name"),
+           args.get_optional("password"))) {
+     throw configuration_error("Kafka: failed to create connection to: " +
+                               _endpoint);
+   }
+ }
 
   int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
-      return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
+      return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
     } else {
       auto w = std::make_unique<Waiter>();
-      const auto rc = kafka::publish_with_confirm(conn_name, 
-        topic,
-        json_format_pubsub_event(event),
-        [wp = w.get()](int r) { wp->finish(r); }
-      );
+      const auto rc = kafka::publish_with_confirm(
+          conn_id, topic, json_format_pubsub_event(event),
+          [wp = w.get()](int r) { wp->finish(r); });
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;
@@ -342,7 +338,7 @@ public:
 
   std::string to_str() const override {
     std::string str("Kafka Endpoint");
-    str += "\nBroker: " + conn_name;
+    str += "\nBroker: " + to_string(conn_id);
     str += "\nTopic: " + topic;
     return str;
   }
index 9a356d9c6f04e59009fda77b74a99a93a850698b..06d3f1f5379e4c2e45781fd9446476cfb2973556 100644 (file)
@@ -13,6 +13,7 @@
 #include <thread>
 #include <atomic>
 #include <mutex>
+#include <boost/functional/hash.hpp>
 #include <boost/lockfree/queue.hpp>
 #include "common/dout.h"
 
@@ -70,6 +71,47 @@ inline std::string status_to_string(int s) {
   }
 }
 
+connection_id_t::connection_id_t(
+    const std::string& _broker,
+    const std::string& _user,
+    const std::string& _password,
+    const boost::optional<const std::string&>& _ca_location,
+    const boost::optional<const std::string&>& _mechanism,
+    bool _ssl)
+    : broker(_broker), user(_user), password(_password), ssl(_ssl) {
+  if (_ca_location.has_value()) {
+    ca_location = _ca_location.get();
+  }
+  if (_mechanism.has_value()) {
+    mechanism = _mechanism.get();
+  }
+}
+
+// equality operator and hasher functor are needed
+// so that connection_id_t could be used as key in unordered_map
+bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
+  return lhs.broker == rhs.broker && lhs.user == rhs.user &&
+         lhs.password == rhs.password && lhs.ca_location == rhs.ca_location &&
+         lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl;
+}
+
+struct connection_id_hasher {
+  std::size_t operator()(const connection_id_t& k) const {
+    std::size_t h = 0;
+    boost::hash_combine(h, k.broker);
+    boost::hash_combine(h, k.user);
+    boost::hash_combine(h, k.password);
+    boost::hash_combine(h, k.ca_location);
+    boost::hash_combine(h, k.mechanism);
+    boost::hash_combine(h, k.ssl);
+    return h;
+  }
+};
+
+std::string to_string(const connection_id_t& id) {
+  return id.broker + ":" + id.user;
+}
+
 // convert int status to errno - both RGW and librdkafka values
 inline int status_to_errno(int s) {
   if (s == 0) return 0;
@@ -165,8 +207,9 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
       ldout(conn->cct, 20) << "Kafka run: ack received with result=" << 
         rd_kafka_err2str(result) << dendl;
   } else {
-      ldout(conn->cct, 1) << "Kafka run: nack received with result=" << 
-        rd_kafka_err2str(result) << dendl;
+    ldout(conn->cct, 1) << "Kafka run: nack received with result="
+                        << rd_kafka_err2str(result)
+                        << " for broker: " << conn->broker << dendl;
   }
 
   if (!rkmessage->_private) {
@@ -327,18 +370,21 @@ conf_error:
 
 // struct used for holding messages in the message queue
 struct message_wrapper_t {
-  std::string conn_name; 
+  connection_id_t conn_id;
   std::string topic;
   std::string message;
   const reply_callback_t cb;
-  
-  message_wrapper_t(const std::string& _conn_name,
-      const std::string& _topic,
-      const std::string& _message,
-      reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {}
+
+  message_wrapper_t(const connection_id_t& _conn_id,
+                    const std::string& _topic,
+                    const std::string& _message,
+                    reply_callback_t _cb)
+      : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
 };
 
-typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
+typedef std::
+    unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher>
+        ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
 class Manager {
@@ -361,7 +407,7 @@ private:
   // TODO use rd_kafka_produce_batch for better performance
   void publish_internal(message_wrapper_t* message) {
     const std::unique_ptr<message_wrapper_t> msg_deleter(message);
-    const auto conn_it = connections.find(message->conn_name);
+    const auto conn_it = connections.find(message->conn_id);
     if (conn_it == connections.end()) {
       ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue" << dendl;
       if (message->cb) {
@@ -425,7 +471,9 @@ private:
             tag);
     if (rc == -1) {
       const auto err = rd_kafka_last_error();
-      ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+      ldout(conn->cct, 1) << "Kafka publish: failed to produce for topic: "
+                          << message->topic
+                          << ". with error: " << rd_kafka_err2str(err) << dendl;
       // immediatly invoke callback on error if needed
       if (message->cb) {
         message->cb(-rd_kafka_err2errno(err));
@@ -541,14 +589,14 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  bool connect(std::string& broker,
-          const std::string& url, 
-          bool use_ssl,
-          bool verify_ssl,
-          boost::optional<const std::string&> ca_location,
-          boost::optional<const std::string&> mechanism,
-          boost::optional<const std::string&> topic_user_name,
-          boost::optional<const std::string&> topic_password) {
+  bool connect(connection_id_t& conn_id,
+               const std::string& url,
+               bool use_ssl,
+               bool verify_ssl,
+               boost::optional<const std::string&> ca_location,
+               boost::optional<const std::string&> mechanism,
+               boost::optional<const std::string&> topic_user_name,
+               boost::optional<const std::string&> topic_password) {
     if (stopped) {
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
       return false;
@@ -556,6 +604,7 @@ public:
 
     std::string user;
     std::string password;
+    std::string broker;
     if (!parse_url_authority(url, broker, user, password)) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
@@ -584,14 +633,17 @@ public:
       ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
       return false;
     }
-
+    connection_id_t tmp_id(broker, user, password, ca_location, mechanism,
+                           use_ssl);
     std::lock_guard lock(connections_lock);
-    const auto it = connections.find(broker);
+    const auto it = connections.find(tmp_id);
     // note that ssl vs. non-ssl connection to the same host are two separate connections
     if (it != connections.end()) {
       // connection found - return even if non-ok
-      ldout(cct, 20) << "Kafka connect: connection found" << dendl;
-      return it->second.get();
+      ldout(cct, 20) << "Kafka connect: connection found: " << to_string(tmp_id)
+                     << dendl;
+      conn_id = std::move(tmp_id);
+      return true;
     }
 
     // connection not found, creating a new one
@@ -607,20 +659,24 @@ public:
       return false;
     }
     ++connection_count;
-    connections.emplace(broker, std::move(conn));
+    connections.emplace(tmp_id, std::move(conn));
 
-    ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
+    ldout(cct, 10) << "Kafka connect: new connection is created: "
+                   << to_string(tmp_id)
+                   << " . Total connections: " << connection_count << dendl;
+    conn_id = std::move(tmp_id);
     return true;
   }
 
   // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
-  int publish(const std::string& conn_name, 
-    const std::string& topic,
-    const std::string& message) {
+  int publish(const connection_id_t& conn_id,
+              const std::string& topic,
+              const std::string& message) {
     if (stopped) {
       return -ESRCH;
     }
-    auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, nullptr);
+    auto message_wrapper =
+        std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
     if (messages.push(message_wrapper.get())) {
       std::ignore = message_wrapper.release();
       ++queued;
@@ -628,15 +684,16 @@ public:
     }
     return -EBUSY;
   }
-  
-  int publish_with_confirm(const std::string& conn_name, 
-    const std::string& topic,
-    const std::string& message,
-    reply_callback_t cb) {
+
+  int publish_with_confirm(const connection_id_t& conn_id,
+                           const std::string& topic,
+                           const std::string& message,
+                           reply_callback_t cb) {
     if (stopped) {
       return -ESRCH;
     }
-    auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, cb);
+    auto message_wrapper =
+        std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
     if (messages.push(message_wrapper.get())) {
       std::ignore = message_wrapper.release();
       ++queued;
@@ -707,31 +764,35 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
-        boost::optional<const std::string&> ca_location,
-        boost::optional<const std::string&> mechanism,
-        boost::optional<const std::string&> user_name,
-        boost::optional<const std::string&> password) {
+bool connect(connection_id_t& conn_id,
+             const std::string& url,
+             bool use_ssl,
+             bool verify_ssl,
+             boost::optional<const std::string&> ca_location,
+             boost::optional<const std::string&> mechanism,
+             boost::optional<const std::string&> user_name,
+             boost::optional<const std::string&> password) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return false;
-  return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
+  return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
+                            mechanism, user_name, password);
 }
 
-int publish(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message) {
+int publish(const connection_id_t& conn_id,
+            const std::string& topic,
+            const std::string& message) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return -ESRCH;
-  return s_manager->publish(conn_name, topic, message);
+  return s_manager->publish(conn_id, topic, message);
 }
 
-int publish_with_confirm(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message,
-    reply_callback_t cb) {
+int publish_with_confirm(const connection_id_t& conn_id,
+                         const std::string& topic,
+                         const std::string& message,
+                         reply_callback_t cb) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return -ESRCH;
-  return s_manager->publish_with_confirm(conn_name, topic, message, cb);
+  return s_manager->publish_with_confirm(conn_id, topic, message, cb);
 }
 
 size_t get_connection_count() {
index a6a38ed81ab8e1bf3821473afe28ac612c9fb764..b7aa0d15759fc0822af6f60b660eb3c17b39d281 100644 (file)
@@ -21,28 +21,47 @@ bool init(CephContext* cct);
 // shutdown the kafka manager
 void shutdown();
 
+// key class for the connection list
+struct connection_id_t {
+  std::string broker;
+  std::string user;
+  std::string password;
+  std::string ca_location;
+  std::string mechanism;
+  bool ssl = false;
+  connection_id_t() = default;
+  connection_id_t(const std::string& _broker,
+                  const std::string& _user,
+                  const std::string& _password,
+                  const boost::optional<const std::string&>& _ca_location,
+                  const boost::optional<const std::string&>& _mechanism,
+                  bool _ssl);
+};
+
+std::string to_string(const connection_id_t& id);
+
 // connect to a kafka endpoint
-bool connect(std::string& broker,
-  const std::string& url,
-  bool use_ssl,
-  bool verify_ssl,
-  boost::optional<const std::string&> ca_location,
-  boost::optional<const std::string&> mechanism,
-  boost::optional<const std::string&> user_name,
-  boost::optional<const std::string&> password);
+bool connect(connection_id_t& conn_id,
+             const std::string& url,
+             bool use_ssl,
+             bool verify_ssl,
+             boost::optional<const std::string&> ca_location,
+             boost::optional<const std::string&> mechanism,
+             boost::optional<const std::string&> user_name,
+             boost::optional<const std::string&> password);
 
 // publish a message over a connection that was already created
-int publish(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message);
+int publish(const connection_id_t& conn_id,
+            const std::string& topic,
+            const std::string& message);
 
 // publish a message over a connection that was already created
 // and pass a callback that will be invoked (async) when broker confirms
 // receiving the message
-int publish_with_confirm(const std::string& conn_name,
-    const std::string& topic,
-    const std::string& message,
-    reply_callback_t cb);
+int publish_with_confirm(const connection_id_t& conn_id,
+                         const std::string& topic,
+                         const std::string& message,
+                         reply_callback_t cb);
 
 // convert the integer status returned from the "publish" function to a string
 std::string status_to_string(int s);
index 3ab366fdcd0f3ff0bedb342e0184831478f995d5..810080064fc21e1cbc0ecec5fca9c966450b34dc 100644 (file)
@@ -5284,3 +5284,103 @@ def test_notification_caching():
     # delete the bucket
     conn.delete_bucket(bucket_name)
     receiver.close(task)
+
+
+@attr('kafka_test')
+def test_connection_caching():
+    """ test connection caching """
+    conn = connection()
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name_1 = bucket_name + TOPIC_SUFFIX + "-without-ssl"
+    topic_name_2 = bucket_name + TOPIC_SUFFIX + "-with-ssl"
+
+    # start kafka receiver
+    task_1, receiver_1 = create_kafka_receiver_thread(topic_name_1)
+    task_1.start()
+    task_2, receiver_2 = create_kafka_receiver_thread(topic_name_2)
+    task_2.start()
+    endpoint_address = 'kafka://' + kafka_server
+    endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true'
+
+    # initially create both s3 topics with `use-ssl=true`
+    zonegroup = get_config_zonegroup()
+    topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup,
+                             endpoint_args=endpoint_args)
+    topic_arn_1 = topic_conf_1.set_config()
+    topic_conf_2 = PSTopicS3(conn, topic_name_2, zonegroup,
+                             endpoint_args=endpoint_args)
+    topic_arn_2 = topic_conf_2.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn_1,
+                        'Events': []
+                        },
+                       {'Id': notification_name + '_2', 'TopicArn': topic_arn_2,
+                        'Events': []}]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status / 100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024))
+        thr = threading.Thread(target=set_contents_from_string,
+                               args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + async notification is: ' + str(
+        time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target=key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + async notification is: ' + str(
+        time_diff * 1000 / number_of_objects) + ' milliseconds')
+
+    time.sleep(30)
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name_1],
+                   get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+    # remove the ssl from topic1 and update the topic.
+    endpoint_address = 'kafka://' + kafka_server
+    topic_conf_1.set_attributes(attribute_name="use-ssl",
+                                attribute_val="false")
+    keys = list(bucket.list())
+    wait_for_queue_to_drain(topic_name_1)
+    receiver_1.verify_s3_events(keys, deletions=True)
+    # topic stats for 2nd topic will still reflect entries
+    result = admin(['topic', 'stats', '--topic', topic_name_2],
+                   get_config_cluster())
+    assert_equal(result[1], 0)
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf_1.del_config()
+    topic_conf_2.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    receiver_1.close(task_1)
+    receiver_2.close(task_2)