]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: refactor topic creation to avoid rd_kafka_topic_name() 59764/head
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 11 Sep 2024 15:12:22 +0000 (15:12 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 12 Sep 2024 13:13:01 +0000 (13:13 +0000)
also, use unique_ptr for safer memory management.

Fixes: https://tracker.ceph.com/issues/68033
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
(cherry picked from commit 80beab8eb2cf0c496019c9850c3b979127d24c63)

Conflicts:
src/rgw/rgw_kafka.cc

src/rgw/rgw_kafka.cc

index 5b9d2f27cba4ae16517de57b9bfa91746ea951ed..7377a3edb01586cef8ec8122560f0f171e1d2385 100644 (file)
 
 #define dout_subsys ceph_subsys_rgw
 
-// TODO investigation, not necessarily issues:
-// (1) in case of single threaded writer context use spsc_queue
-// (2) check performance of emptying queue to local list, and go over the list and publish
-// (3) use std::shared_mutex (c++17) or equivalent for the connections lock
-
-// cmparisson operator between topic pointer and name
-bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
-    return name == std::string_view(rd_kafka_topic_name(rkt)); 
-}
-
 namespace rgw::kafka {
 
 // status codes for publishing
@@ -62,7 +52,15 @@ typedef std::vector<reply_callback_with_tag_t> CallbackList;
 struct connection_t {
   rd_kafka_t* producer = nullptr;
   rd_kafka_conf_t* temp_conf = nullptr;
-  std::vector<rd_kafka_topic_t*> topics;
+
+  struct rd_kafka_topic_deleter {
+    void operator()(rd_kafka_topic_t* topic) {
+      rd_kafka_topic_destroy(topic);
+    }
+  };
+  using topic_ptr = std::unique_ptr<rd_kafka_topic_t, rd_kafka_topic_deleter>;
+  std::map<std::string, topic_ptr> topics;
+
   uint64_t delivery_tag = 1;
   int status = STATUS_OK;
   CephContext* const cct;
@@ -93,7 +91,7 @@ struct connection_t {
     // wait for all remaining acks/nacks
     rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
     // destroy all topics
-    std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
+    topics.clear();
     // destroy producer
     rd_kafka_destroy(producer);
     producer = nullptr;
@@ -365,10 +363,9 @@ private:
     }
 
     // create a new topic unless it was already created
-    auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
-    rd_kafka_topic_t* topic = nullptr;
+    auto topic_it = conn->topics.find(message->topic);
     if (topic_it == conn->topics.end()) {
-      topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
+      connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr));
       if (!topic) {
         const auto err = rd_kafka_last_error();
         ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
@@ -378,17 +375,15 @@ private:
         conn->destroy(err);
         return;
       }
-      // TODO use the topics list as an LRU cache
-      conn->topics.push_back(topic);
+      topic_it = conn->topics.emplace(message->topic, std::move(topic)).first;
       ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
     } else {
-        topic = *topic_it;
         ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
     }
 
     const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
     const auto rc = rd_kafka_produce(
-            topic,
+            topic_it->second.get(),
             // TODO: non builtin partitioning
             RD_KAFKA_PARTITION_UA,
             // make a copy of the payload