From 82b385aef86128a0187c3b49e213a20893e8ff85 Mon Sep 17 00:00:00 2001 From: kchheda3 Date: Thu, 23 May 2024 16:20:05 -0400 Subject: [PATCH] rgw/notification: For kafka include user-id & password as part of the key along with endpoint for connection pooling. For kafka, currently connection pooling is done based on endpoint, so all the events with same endpoint share the same connection. but there are issues when userid & password is created/changed for the endpoint, coz the old connection is cached in broker manager and when new event with updated/new userid-password is sent, broker still uses the old connection that was created with old/no userid/password as currently only the `endpoint` is the key to connection pool. To fix this, use all the topic attributes that are part of connection as the key to connection pool and if any of the attribute changes create new kafka connection. Attibutes include userid, password, ssl, ca_laction. Signed-off-by: kchheda3 --- src/rgw/driver/rados/rgw_pubsub_push.cc | 34 ++--- src/rgw/rgw_kafka.cc | 161 ++++++++++++++------ src/rgw/rgw_kafka.h | 49 ++++-- src/test/rgw/bucket_notification/test_bn.py | 102 +++++++++++++ 4 files changed, 262 insertions(+), 84 deletions(-) diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index f1a1e8c9546..e0d742d6f10 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -289,8 +289,7 @@ private: }; const std::string topic; const ack_level_t ack_level; - std::string conn_name; - + kafka::connection_id_t conn_id; ack_level_t get_ack_level(const RGWHTTPArgs& args) { bool exists; @@ -311,27 +310,24 @@ public: const RGWHTTPArgs& args) : topic(_topic), ack_level(get_ack_level(args)) { - if (!kafka::connect(conn_name, _endpoint, - get_bool(args, "use-ssl", false), - get_bool(args, "verify-ssl", true), - args.get_optional("ca-location"), - args.get_optional("mechanism"), - args.get_optional("user-name"), - args.get_optional("password"))) { - throw configuration_error("Kafka: failed to create connection to: " + _endpoint); - } - } + if (!kafka::connect( + conn_id, _endpoint, get_bool(args, "use-ssl", false), + get_bool(args, "verify-ssl", true), args.get_optional("ca-location"), + args.get_optional("mechanism"), args.get_optional("user-name"), + args.get_optional("password"))) { + throw configuration_error("Kafka: failed to create connection to: " + + _endpoint); + } + } int send(const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { - return kafka::publish(conn_name, topic, json_format_pubsub_event(event)); + return kafka::publish(conn_id, topic, json_format_pubsub_event(event)); } else { auto w = std::make_unique(); - const auto rc = kafka::publish_with_confirm(conn_name, - topic, - json_format_pubsub_event(event), - [wp = w.get()](int r) { wp->finish(r); } - ); + const auto rc = kafka::publish_with_confirm( + conn_id, topic, json_format_pubsub_event(event), + [wp = w.get()](int r) { wp->finish(r); }); if (rc < 0) { // failed to publish, does not wait for reply return rc; @@ -342,7 +338,7 @@ public: std::string to_str() const override { std::string str("Kafka Endpoint"); - str += "\nBroker: " + conn_name; + str += "\nBroker: " + to_string(conn_id); str += "\nTopic: " + topic; return str; } diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index dbc893d70c9..e0c8a37acc8 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "common/dout.h" @@ -65,6 +66,47 @@ inline std::string status_to_string(int s) { } } +connection_id_t::connection_id_t( + const std::string& _broker, + const std::string& _user, + const std::string& _password, + const boost::optional& _ca_location, + const boost::optional& _mechanism, + bool _ssl) + : broker(_broker), user(_user), password(_password), ssl(_ssl) { + if (_ca_location.has_value()) { + ca_location = _ca_location.get(); + } + if (_mechanism.has_value()) { + mechanism = _mechanism.get(); + } +} + +// equality operator and hasher functor are needed +// so that connection_id_t could be used as key in unordered_map +bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) { + return lhs.broker == rhs.broker && lhs.user == rhs.user && + lhs.password == rhs.password && lhs.ca_location == rhs.ca_location && + lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl; +} + +struct connection_id_hasher { + std::size_t operator()(const connection_id_t& k) const { + std::size_t h = 0; + boost::hash_combine(h, k.broker); + boost::hash_combine(h, k.user); + boost::hash_combine(h, k.password); + boost::hash_combine(h, k.ca_location); + boost::hash_combine(h, k.mechanism); + boost::hash_combine(h, k.ssl); + return h; + } +}; + +std::string to_string(const connection_id_t& id) { + return id.broker + ":" + id.user; +} + // convert int status to errno - both RGW and librdkafka values inline int status_to_errno(int s) { if (s == 0) return 0; @@ -169,8 +211,9 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* ldout(conn->cct, 20) << "Kafka run: ack received with result=" << rd_kafka_err2str(result) << dendl; } else { - ldout(conn->cct, 1) << "Kafka run: nack received with result=" << - rd_kafka_err2str(result) << dendl; + ldout(conn->cct, 1) << "Kafka run: nack received with result=" + << rd_kafka_err2str(result) + << " for broker: " << conn->broker << dendl; } if (!rkmessage->_private) { @@ -331,18 +374,21 @@ conf_error: // struct used for holding messages in the message queue struct message_wrapper_t { - std::string conn_name; + connection_id_t conn_id; std::string topic; std::string message; const reply_callback_t cb; - - message_wrapper_t(const std::string& _conn_name, - const std::string& _topic, - const std::string& _message, - reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {} + + message_wrapper_t(const connection_id_t& _conn_id, + const std::string& _topic, + const std::string& _message, + reply_callback_t _cb) + : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {} }; -typedef std::unordered_map ConnectionList; +typedef std:: + unordered_map + ConnectionList; typedef boost::lockfree::queue> MessageQueue; class Manager { @@ -365,7 +411,7 @@ private: // TODO use rd_kafka_produce_batch for better performance void publish_internal(message_wrapper_t* message) { const std::unique_ptr msg_deleter(message); - const auto conn_it = connections.find(message->conn_name); + const auto conn_it = connections.find(message->conn_id); if (conn_it == connections.end()) { ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue" << dendl; if (message->cb) { @@ -426,7 +472,9 @@ private: tag); if (rc == -1) { const auto err = rd_kafka_last_error(); - ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; + ldout(conn->cct, 1) << "Kafka publish: failed to produce for topic: " + << message->topic + << ". with error: " << rd_kafka_err2str(err) << dendl; // immediatly invoke callback on error if needed if (message->cb) { message->cb(-rd_kafka_err2errno(err)); @@ -545,14 +593,14 @@ public: } // connect to a broker, or reuse an existing connection if already connected - bool connect(std::string& broker, - const std::string& url, - bool use_ssl, - bool verify_ssl, - boost::optional ca_location, - boost::optional mechanism, - boost::optional topic_user_name, - boost::optional topic_password) { + bool connect(connection_id_t& conn_id, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional topic_user_name, + boost::optional topic_password) { if (stopped) { ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; return false; @@ -560,6 +608,7 @@ public: std::string user; std::string password; + std::string broker; if (!parse_url_authority(url, broker, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; @@ -588,14 +637,17 @@ public: ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; return false; } - + connection_id_t tmp_id(broker, user, password, ca_location, mechanism, + use_ssl); std::lock_guard lock(connections_lock); - const auto it = connections.find(broker); + const auto it = connections.find(tmp_id); // note that ssl vs. non-ssl connection to the same host are two separate connections if (it != connections.end()) { // connection found - return even if non-ok - ldout(cct, 20) << "Kafka connect: connection found" << dendl; - return it->second.get(); + ldout(cct, 20) << "Kafka connect: connection found: " << to_string(tmp_id) + << dendl; + conn_id = std::move(tmp_id); + return true; } // connection not found, creating a new one @@ -611,20 +663,24 @@ public: return false; } ++connection_count; - connections.emplace(broker, std::move(conn)); + connections.emplace(tmp_id, std::move(conn)); - ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; + ldout(cct, 10) << "Kafka connect: new connection is created: " + << to_string(tmp_id) + << " . Total connections: " << connection_count << dendl; + conn_id = std::move(tmp_id); return true; } // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack) - int publish(const std::string& conn_name, - const std::string& topic, - const std::string& message) { + int publish(const connection_id_t& conn_id, + const std::string& topic, + const std::string& message) { if (stopped) { return -ESRCH; } - auto message_wrapper = std::make_unique(conn_name, topic, message, nullptr); + auto message_wrapper = + std::make_unique(conn_id, topic, message, nullptr); if (messages.push(message_wrapper.get())) { std::ignore = message_wrapper.release(); ++queued; @@ -632,15 +688,16 @@ public: } return -EBUSY; } - - int publish_with_confirm(const std::string& conn_name, - const std::string& topic, - const std::string& message, - reply_callback_t cb) { + + int publish_with_confirm(const connection_id_t& conn_id, + const std::string& topic, + const std::string& message, + reply_callback_t cb) { if (stopped) { return -ESRCH; } - auto message_wrapper = std::make_unique(conn_name, topic, message, cb); + auto message_wrapper = + std::make_unique(conn_id, topic, message, cb); if (messages.push(message_wrapper.get())) { std::ignore = message_wrapper.release(); ++queued; @@ -711,31 +768,35 @@ void shutdown() { s_manager = nullptr; } -bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, - boost::optional ca_location, - boost::optional mechanism, - boost::optional user_name, - boost::optional password) { +bool connect(connection_id_t& conn_id, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional user_name, + boost::optional password) { std::shared_lock lock(s_manager_mutex); if (!s_manager) return false; - return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password); + return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location, + mechanism, user_name, password); } -int publish(const std::string& conn_name, - const std::string& topic, - const std::string& message) { +int publish(const connection_id_t& conn_id, + const std::string& topic, + const std::string& message) { std::shared_lock lock(s_manager_mutex); if (!s_manager) return -ESRCH; - return s_manager->publish(conn_name, topic, message); + return s_manager->publish(conn_id, topic, message); } -int publish_with_confirm(const std::string& conn_name, - const std::string& topic, - const std::string& message, - reply_callback_t cb) { +int publish_with_confirm(const connection_id_t& conn_id, + const std::string& topic, + const std::string& message, + reply_callback_t cb) { std::shared_lock lock(s_manager_mutex); if (!s_manager) return -ESRCH; - return s_manager->publish_with_confirm(conn_name, topic, message, cb); + return s_manager->publish_with_confirm(conn_id, topic, message, cb); } size_t get_connection_count() { diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index a6a38ed81ab..b7aa0d15759 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -21,28 +21,47 @@ bool init(CephContext* cct); // shutdown the kafka manager void shutdown(); +// key class for the connection list +struct connection_id_t { + std::string broker; + std::string user; + std::string password; + std::string ca_location; + std::string mechanism; + bool ssl = false; + connection_id_t() = default; + connection_id_t(const std::string& _broker, + const std::string& _user, + const std::string& _password, + const boost::optional& _ca_location, + const boost::optional& _mechanism, + bool _ssl); +}; + +std::string to_string(const connection_id_t& id); + // connect to a kafka endpoint -bool connect(std::string& broker, - const std::string& url, - bool use_ssl, - bool verify_ssl, - boost::optional ca_location, - boost::optional mechanism, - boost::optional user_name, - boost::optional password); +bool connect(connection_id_t& conn_id, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional user_name, + boost::optional password); // publish a message over a connection that was already created -int publish(const std::string& conn_name, - const std::string& topic, - const std::string& message); +int publish(const connection_id_t& conn_id, + const std::string& topic, + const std::string& message); // publish a message over a connection that was already created // and pass a callback that will be invoked (async) when broker confirms // receiving the message -int publish_with_confirm(const std::string& conn_name, - const std::string& topic, - const std::string& message, - reply_callback_t cb); +int publish_with_confirm(const connection_id_t& conn_id, + const std::string& topic, + const std::string& message, + reply_callback_t cb); // convert the integer status returned from the "publish" function to a string std::string status_to_string(int s); diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 4b5a23e5822..23a2f840261 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -5546,3 +5546,105 @@ def test_notification_caching(): conn.delete_bucket(bucket_name) if receiver is not None: stop_kafka_receiver(receiver, task) + + +@attr('kafka_test') +def test_connection_caching(): + """ test connection caching """ + conn = connection() + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name_1 = bucket_name + TOPIC_SUFFIX + "-without-ssl" + topic_name_2 = bucket_name + TOPIC_SUFFIX + "-with-ssl" + + # start kafka receiver + task_1, receiver_1 = create_kafka_receiver_thread(topic_name_1) + task_1.start() + task_2, receiver_2 = create_kafka_receiver_thread(topic_name_2) + task_2.start() + endpoint_address = 'kafka://' + kafka_server + endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true' + + # initially create both s3 topics with `use-ssl=true` + zonegroup = get_config_zonegroup() + topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, + endpoint_args=endpoint_args) + topic_arn_1 = topic_conf_1.set_config() + topic_conf_2 = PSTopicS3(conn, topic_name_2, zonegroup, + endpoint_args=endpoint_args) + topic_arn_2 = topic_conf_2.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn_1, + 'Events': [] + }, + {'Id': notification_name + '_2', 'TopicArn': topic_arn_2, + 'Events': []}] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status / 100, 2) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024)) + thr = threading.Thread(target=set_contents_from_string, + args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + async notification is: ' + str( + time_diff * 1000 / number_of_objects) + ' milliseconds') + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target=key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + async notification is: ' + str( + time_diff * 1000 / number_of_objects) + ' milliseconds') + + time.sleep(30) + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name_1], + get_config_cluster()) + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) + + # remove the ssl from topic1 and update the topic. + endpoint_address = 'kafka://' + kafka_server + topic_conf_1.set_attributes(attribute_name="use-ssl", + attribute_val="false") + keys = list(bucket.list()) + wait_for_queue_to_drain(topic_name_1) + receiver_1.verify_s3_events(keys, deletions=True) + # topic stats for 2nd topic will still reflect entries + result = admin(['topic', 'stats', '--topic', topic_name_2], + get_config_cluster()) + assert_equal(result[1], 0) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) + + # cleanup + s3_notification_conf.del_config() + topic_conf_1.del_config() + topic_conf_2.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + if receiver_1 is not None: + stop_kafka_receiver(receiver_1, task_1) + if receiver_2 is not None: + stop_kafka_receiver(receiver_2, task_2) -- 2.39.5