From: Yuval Lifshitz Date: Wed, 11 Sep 2024 15:12:22 +0000 (+0000) Subject: rgw/kafka: refactor topic creation to avoid rd_kafka_topic_name() X-Git-Tag: v18.2.5~332^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b2a99dfadc23aeaaaac650d32592d4f39d222a49;p=ceph.git rgw/kafka: refactor topic creation to avoid rd_kafka_topic_name() also, use unique_ptr for safer memory management. Fixes: https://tracker.ceph.com/issues/68033 Signed-off-by: Yuval Lifshitz (cherry picked from commit 80beab8eb2cf0c496019c9850c3b979127d24c63) Conflicts: src/rgw/rgw_kafka.cc --- diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 5b9d2f27cba4..7377a3edb015 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -18,16 +18,6 @@ #define dout_subsys ceph_subsys_rgw -// TODO investigation, not necessarily issues: -// (1) in case of single threaded writer context use spsc_queue -// (2) check performance of emptying queue to local list, and go over the list and publish -// (3) use std::shared_mutex (c++17) or equivalent for the connections lock - -// cmparisson operator between topic pointer and name -bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { - return name == std::string_view(rd_kafka_topic_name(rkt)); -} - namespace rgw::kafka { // status codes for publishing @@ -62,7 +52,15 @@ typedef std::vector CallbackList; struct connection_t { rd_kafka_t* producer = nullptr; rd_kafka_conf_t* temp_conf = nullptr; - std::vector topics; + + struct rd_kafka_topic_deleter { + void operator()(rd_kafka_topic_t* topic) { + rd_kafka_topic_destroy(topic); + } + }; + using topic_ptr = std::unique_ptr; + std::map topics; + uint64_t delivery_tag = 1; int status = STATUS_OK; CephContext* const cct; @@ -93,7 +91,7 @@ struct connection_t { // wait for all remaining acks/nacks rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */); // destroy all topics - std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);}); + topics.clear(); // destroy producer rd_kafka_destroy(producer); producer = nullptr; @@ -365,10 +363,9 @@ private: } // create a new topic unless it was already created - auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic); - rd_kafka_topic_t* topic = nullptr; + auto topic_it = conn->topics.find(message->topic); if (topic_it == conn->topics.end()) { - topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); + connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr)); if (!topic) { const auto err = rd_kafka_last_error(); ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl; @@ -378,17 +375,15 @@ private: conn->destroy(err); return; } - // TODO use the topics list as an LRU cache - conn->topics.push_back(topic); + topic_it = conn->topics.emplace(message->topic, std::move(topic)).first; ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl; } else { - topic = *topic_it; ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl; } const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++)); const auto rc = rd_kafka_produce( - topic, + topic_it->second.get(), // TODO: non builtin partitioning RD_KAFKA_PARTITION_UA, // make a copy of the payload