// rkmessage is destroyed automatically by librdkafka
}
+void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char *buf) {
+ ceph_assert(rd_kafka_opaque(rk));
+
+ const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
+ if (level <= 3)
+ ldout(conn->cct, 1) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+ else if (level <= 5)
+ ldout(conn->cct, 2) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+ else if (level <= 6)
+ ldout(conn->cct, 10) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+ else
+ ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+}
+
// utility function to create a connection, when the connection object already exists
connection_ptr_t& create_connection(connection_ptr_t& conn) {
// pointer must be valid and not marked for deletion
// set the global opaque pointer to be the connection itself
rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
+ // redirect kafka logs to RGW
+ rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
// create the producer
conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
if (!conn->producer) {
return conn;
}
ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
+ {
+ // set log level of producer
+ const auto log_level = conn->cct->_conf->subsys.get_log_level(ceph_subsys_rgw);
+ if (log_level <= 1)
+ rd_kafka_set_log_level(conn->producer, 3);
+ else if (log_level <= 2)
+ rd_kafka_set_log_level(conn->producer, 5);
+ else if (log_level <= 10)
+ rd_kafka_set_log_level(conn->producer, 6);
+ else
+ rd_kafka_set_log_level(conn->producer, 7);
+ }
// conf ownership passed to producer
conn->temp_conf = nullptr;