}
// create a new topic unless it was already created
- auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
- rd_kafka_topic_t* topic = nullptr;
+ auto topic_it = conn->topics.find(message->topic);
if (topic_it == conn->topics.end()) {
- topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
+ connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr));
if (!topic) {
const auto err = rd_kafka_last_error();
- ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
+ ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
<< rd_kafka_err2str(err) << "(" << err << ")" << dendl;
if (message->cb) {
message->cb(-rd_kafka_err2errno(err));