]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: refactor topic creation to avoid rd_kafka_topic_name() 59741/head
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 11 Sep 2024 15:12:22 +0000 (15:12 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Wed, 11 Sep 2024 15:12:22 +0000 (15:12 +0000)
also, use unique_ptr for safer memory management.

Fixes: https://tracker.ceph.com/issues/68033
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/rgw_kafka.cc

index d3586e4900bde635bff28750c363bab670e9c16e..c0ec3dc2c553e3e57f4947e33c611a3f86fe5eab 100644 (file)
 
 #define dout_subsys ceph_subsys_rgw_notification
 
-// comparison operator between topic pointer and name
-bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
-    return name == std::string_view(rd_kafka_topic_name(rkt)); 
-}
-
 // this is the inverse of rd_kafka_errno2err
 // see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c
 inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) {
@@ -141,9 +136,19 @@ struct reply_callback_with_tag_t {
 
 typedef std::vector<reply_callback_with_tag_t> CallbackList;
 
+
+
 struct connection_t {
   rd_kafka_t* producer = nullptr;
-  std::vector<rd_kafka_topic_t*> topics;
+
+  struct rd_kafka_topic_deleter {
+    void operator()(rd_kafka_topic_t* topic) {
+      rd_kafka_topic_destroy(topic);
+    }
+  };
+  using topic_ptr = std::unique_ptr<rd_kafka_topic_t, rd_kafka_topic_deleter>;
+  std::map<std::string, topic_ptr> topics;
+
   uint64_t delivery_tag = 1;
   int status = 0;
   CephContext* const cct;
@@ -168,7 +173,6 @@ struct connection_t {
     // wait for 500ms to try and handle pending callbacks
     rd_kafka_flush(producer, 500);
     // destroy all topics
-    std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
     topics.clear();
     // destroy producer
     rd_kafka_destroy(producer);
@@ -431,10 +435,9 @@ private:
     }
 
     // create a new topic unless it was already created
-    auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
-    rd_kafka_topic_t* topic = nullptr;
+    auto topic_it = conn->topics.find(message->topic);
     if (topic_it == conn->topics.end()) {
-      topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
+      connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr));
       if (!topic) {
         const auto err = rd_kafka_last_error();
         ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " 
@@ -444,17 +447,15 @@ private:
         }
         return;
       }
-      // TODO use the topics list as an LRU cache
-      conn->topics.push_back(topic);
+      topic_it = conn->topics.emplace(message->topic, std::move(topic)).first;
       ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
     } else {
-        topic = *topic_it;
         ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
     }
 
     const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
     const auto rc = rd_kafka_produce(
-            topic,
+            topic_it->second.get(),
             // TODO: non builtin partitioning
             RD_KAFKA_PARTITION_UA,
             // make a copy of the payload