From bb3abaf6b029454781bac50928359bbfdf56984b Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Mon, 22 May 2023 09:38:14 +0000 Subject: [PATCH] rgw/notification/kafka: simplify kafka connection memory management use a bare pointer and a unique_ptr instead of a shared pointer (boost::intrusive_ptr). and use the broker name as the handle exposed outside of the kafka module. Fixes: https://tracker.ceph.com/issues/61328 Signed-off-by: Yuval Lifshitz (cherry picked from commit fbd287602742ecbe6a0d383ce9db1b6a65c348cb) Conflicts: src/rgw/rgw_kafka.cc --- src/rgw/driver/rados/rgw_pubsub_push.cc | 13 +- src/rgw/rgw_kafka.cc | 156 ++++++++++-------------- src/rgw/rgw_kafka.h | 21 +--- 3 files changed, 76 insertions(+), 114 deletions(-) diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 9f1d7c57553b4..f15aa3bcc693d 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -285,8 +285,8 @@ private: }; CephContext* const cct; const std::string topic; - kafka::connection_ptr_t conn; const ack_level_t ack_level; + std::string conn_name; ack_level_t get_ack_level(const RGWHTTPArgs& args) { @@ -309,9 +309,9 @@ public: CephContext* _cct) : cct(_cct), topic(_topic), - conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"), args.get_optional("mechanism"))) , ack_level(get_ack_level(args)) { - if (!conn) { + if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), + args.get_optional("ca-location"), args.get_optional("mechanism"))) { throw configuration_error("Kafka: failed to create connection to: " + _endpoint); } } @@ -371,13 +371,12 @@ public: }; int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { - ceph_assert(conn); if (ack_level == ack_level_t::None) { - return kafka::publish(conn, topic, json_format_pubsub_event(event)); + return kafka::publish(conn_name, topic, json_format_pubsub_event(event)); } else { // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine auto w = std::unique_ptr(new Waiter); - const auto rc = kafka::publish_with_confirm(conn, + const auto rc = kafka::publish_with_confirm(conn_name, topic, json_format_pubsub_event(event), std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); @@ -391,7 +390,7 @@ public: std::string to_str() const override { std::string str("Kafka Endpoint"); - str += kafka::to_string(conn); + str += "\nBroker: " + conn_name; str += "\nTopic: " + topic; return str; } diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index cde0ea6200ec8..651d7099ebc58 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -31,13 +31,13 @@ bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { namespace rgw::kafka { // status codes for publishing -// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client static const int STATUS_CONNECTION_CLOSED = -0x1002; static const int STATUS_QUEUE_FULL = -0x1003; static const int STATUS_MAX_INFLIGHT = -0x1004; static const int STATUS_MANAGER_STOPPED = -0x1005; // status code for connection opening static const int STATUS_CONF_ALLOC_FAILED = -0x2001; +static const int STATUS_CONF_REPLCACE = -0x2002; static const int STATUS_OK = 0x0; @@ -64,7 +64,6 @@ struct connection_t { std::vector topics; uint64_t delivery_tag = 1; int status = STATUS_OK; - mutable std::atomic ref_count = 0; CephContext* const cct; CallbackList callbacks; const std::string broker; @@ -115,29 +114,8 @@ struct connection_t { ~connection_t() { destroy(STATUS_CONNECTION_CLOSED); } - - friend void intrusive_ptr_add_ref(const connection_t* p); - friend void intrusive_ptr_release(const connection_t* p); }; -std::string to_string(const connection_ptr_t& conn) { - std::string str; - str += "\nBroker: " + conn->broker; - str += conn->use_ssl ? "\nUse SSL" : ""; - str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : ""; - str += conn->mechanism ? "\nSASL Mechanism: " + *(conn->mechanism) : ""; - return str; -} -// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr -void intrusive_ptr_add_ref(const connection_t* p) { - ++p->ref_count; -} -void intrusive_ptr_release(const connection_t* p) { - if (--p->ref_count == 0) { - delete p; - } -} - // convert int status to string - including RGW specific values std::string status_to_string(int s) { switch (s) { @@ -153,6 +131,8 @@ std::string status_to_string(int s) { return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; case STATUS_CONF_ALLOC_FAILED: return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; + case STATUS_CONF_REPLCACE: + return "RGW_KAFKA_STATUS_CONF_REPLCACE"; } return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); } @@ -200,11 +180,15 @@ void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char * ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; } -// utility function to create a connection, when the connection object already exists -connection_ptr_t& create_connection(connection_ptr_t& conn) { - // pointer must be valid and not marked for deletion - ceph_assert(conn); - +void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) { + const auto conn = reinterpret_cast(rd_kafka_opaque(rk)); + ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl; +} + +using connection_t_ptr = std::unique_ptr; + +// utility function to create a producer, when the connection object already exists +bool new_producer(connection_t* conn) { // reset all status codes conn->status = STATUS_OK; char errstr[512] = {0}; @@ -212,7 +196,7 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { conn->temp_conf = rd_kafka_conf_new(); if (!conn->temp_conf) { conn->status = STATUS_CONF_ALLOC_FAILED; - return conn; + return false; } // get list of brokers based on the bootsrap broker @@ -269,16 +253,20 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback); // set the global opaque pointer to be the connection itself - rd_kafka_conf_set_opaque(conn->temp_conf, conn.get()); + rd_kafka_conf_set_opaque(conn->temp_conf, conn); // redirect kafka logs to RGW rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback); // create the producer + if (conn->producer) { + ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl; + conn->destroy(STATUS_CONF_REPLCACE); + } conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); if (!conn->producer) { conn->status = rd_kafka_last_error(); ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl; - return conn; + return false; } ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl; { @@ -296,41 +284,28 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { // conf ownership passed to producer conn->temp_conf = nullptr; - return conn; + return true; conf_error: conn->status = rd_kafka_last_error(); ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl; - return conn; -} - -// utility function to create a new connection -connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct, - bool use_ssl, - bool verify_ssl, - boost::optional ca_location, - const std::string& user, - const std::string& password, - boost::optional mechanism) { - // create connection state - connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)); - return create_connection(conn); + return false; } -/// struct used for holding messages in the message queue +// struct used for holding messages in the message queue struct message_wrapper_t { - connection_ptr_t conn; + std::string conn_name; std::string topic; std::string message; - reply_callback_t cb; + const reply_callback_t cb; - message_wrapper_t(connection_ptr_t& _conn, + message_wrapper_t(const std::string& _conn_name, const std::string& _topic, const std::string& _message, - reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {} + reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {} }; -typedef std::unordered_map ConnectionList; +typedef std::unordered_map ConnectionList; typedef boost::lockfree::queue> MessageQueue; // macros used inside a loop where an iterator is either incremented or erased @@ -363,15 +338,23 @@ private: // TODO use rd_kafka_produce_batch for better performance void publish_internal(message_wrapper_t* message) { - const std::unique_ptr msg_owner(message); - auto& conn = message->conn; + const std::unique_ptr msg_deleter(message); + const auto conn_it = connections.find(message->conn_name); + if (conn_it == connections.end()) { + ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl; + if (message->cb) { + message->cb(STATUS_CONNECTION_CLOSED); + } + return; + } + auto& conn = conn_it->second; conn->timestamp = ceph_clock_now(); if (!conn->is_ok()) { // connection had an issue while message was in the queue // TODO add error stats - ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl; + ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl; if (message->cb) { message->cb(conn->status); } @@ -485,7 +468,7 @@ private: ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl; const auto& broker = conn_it->first; ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl; - if (create_connection(conn)->is_ok() == false) { + if (new_producer(conn.get()) == false) { ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl; // TODO: add error counter for failed retries // TODO: add exponential backoff for retries @@ -552,33 +535,32 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url, + bool connect(std::string& broker, + const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism) { if (stopped) { - // TODO: increment counter ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; - return nullptr; + return false; } - std::string broker; - std::string user; - std::string password; + std::string user; + std::string password; if (!parse_url_authority(url, broker, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; - return nullptr; + return false; } // this should be validated by the regex in parse_url() ceph_assert(user.empty() == password.empty()); - if (!user.empty() && !use_ssl && !g_conf().get_val("rgw_allow_notification_secrets_in_cleartext")) { + if (!user.empty() && !use_ssl && !g_conf().get_val("rgw_allow_notification_secrets_in_cleartext")) { ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; - return nullptr; - } + return false; + } std::lock_guard lock(connections_lock); const auto it = connections.find(broker); @@ -586,53 +568,49 @@ public: if (it != connections.end()) { // connection found - return even if non-ok ldout(cct, 20) << "Kafka connect: connection found" << dendl; - return it->second; + return it->second.get(); } // connection not found, creating a new one if (connection_count >= max_connections) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; - return nullptr; + return false; } - const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password, mechanism); - // create_new_connection must always return a connection object + // create_connection must always return a connection object // even if error occurred during creation. // in such a case the creation will be retried in the main thread - ceph_assert(conn); ++connection_count; ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; - return connections.emplace(broker, conn).first->second; + auto conn = connections.emplace(broker, std::make_unique(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get(); + if (!new_producer(conn)) { + ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl; + } + return true; } // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack) - int publish(connection_ptr_t& conn, + int publish(const std::string& conn_name, const std::string& topic, const std::string& message) { if (stopped) { return STATUS_MANAGER_STOPPED; } - if (!conn || !conn->is_ok()) { - return STATUS_CONNECTION_CLOSED; - } - if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) { + if (messages.push(new message_wrapper_t(conn_name, topic, message, nullptr))) { ++queued; return STATUS_OK; } return STATUS_QUEUE_FULL; } - int publish_with_confirm(connection_ptr_t& conn, + int publish_with_confirm(const std::string& conn_name, const std::string& topic, const std::string& message, reply_callback_t cb) { if (stopped) { return STATUS_MANAGER_STOPPED; } - if (!conn || !conn->is_ok()) { - return STATUS_CONNECTION_CLOSED; - } - if (messages.push(new message_wrapper_t(conn, topic, message, cb))) { + if (messages.push(new message_wrapper_t(conn_name, topic, message, cb))) { ++queued; return STATUS_OK; } @@ -697,26 +675,26 @@ void shutdown() { s_manager = nullptr; } -connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, +bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism) { - if (!s_manager) return nullptr; - return s_manager->connect(url, use_ssl, verify_ssl, ca_location, mechanism); + if (!s_manager) return false; + return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism); } -int publish(connection_ptr_t& conn, +int publish(const std::string& conn_name, const std::string& topic, const std::string& message) { if (!s_manager) return STATUS_MANAGER_STOPPED; - return s_manager->publish(conn, topic, message); + return s_manager->publish(conn_name, topic, message); } -int publish_with_confirm(connection_ptr_t& conn, +int publish_with_confirm(const std::string& conn_name, const std::string& topic, const std::string& message, reply_callback_t cb) { if (!s_manager) return STATUS_MANAGER_STOPPED; - return s_manager->publish_with_confirm(conn, topic, message, cb); + return s_manager->publish_with_confirm(conn_name, topic, message, cb); } size_t get_connection_count() { diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index 6d3dcad4a004c..813fda32969b9 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -5,20 +5,11 @@ #include #include -#include #include #include "include/common_fwd.h" namespace rgw::kafka { -// forward declaration of connection object -struct connection_t; - -typedef boost::intrusive_ptr connection_ptr_t; - -// required interfaces needed so that connection_t could be used inside boost::intrusive_ptr -void intrusive_ptr_add_ref(const connection_t* p); -void intrusive_ptr_release(const connection_t* p); // the reply callback is expected to get an integer parameter // indicating the result, and not to return anything @@ -31,17 +22,17 @@ bool init(CephContext* cct); void shutdown(); // connect to a kafka endpoint -connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism); +bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism); // publish a message over a connection that was already created -int publish(connection_ptr_t& conn, +int publish(const std::string& conn_name, const std::string& topic, const std::string& message); // publish a message over a connection that was already created // and pass a callback that will be invoked (async) when broker confirms // receiving the message -int publish_with_confirm(connection_ptr_t& conn, +int publish_with_confirm(const std::string& conn_name, const std::string& topic, const std::string& message, reply_callback_t cb); @@ -71,11 +62,5 @@ size_t get_max_inflight(); // maximum number of messages in the queue size_t get_max_queue(); -// disconnect from a kafka broker -bool disconnect(connection_ptr_t& conn); - -// display connection as string -std::string to_string(const connection_ptr_t& conn); - } -- 2.39.5