From: Yuval Lifshitz Date: Tue, 15 Nov 2022 18:18:12 +0000 (+0200) Subject: rgw/kafka: route librdkafka log messages to rgw log X-Git-Tag: v18.1.0~839^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=158a1b009c55b91b2a9800420ad9d80118a5e59e;p=ceph.git rgw/kafka: route librdkafka log messages to rgw log Fixes: https://tracker.ceph.com/issues/50076 Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 9b75c9d088574..954f8b19e0ebf 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -184,6 +184,20 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* // rkmessage is destroyed automatically by librdkafka } +void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char *buf) { + ceph_assert(rd_kafka_opaque(rk)); + + const auto conn = reinterpret_cast(rd_kafka_opaque(rk)); + if (level <= 3) + ldout(conn->cct, 1) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; + else if (level <= 5) + ldout(conn->cct, 2) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; + else if (level <= 6) + ldout(conn->cct, 10) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; + else + ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; +} + // utility function to create a connection, when the connection object already exists connection_ptr_t& create_connection(connection_ptr_t& conn) { // pointer must be valid and not marked for deletion @@ -233,6 +247,8 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { // set the global opaque pointer to be the connection itself rd_kafka_conf_set_opaque(conn->temp_conf, conn.get()); + // redirect kafka logs to RGW + rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback); // create the producer conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); if (!conn->producer) { @@ -241,6 +257,18 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { return conn; } ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl; + { + // set log level of producer + const auto log_level = conn->cct->_conf->subsys.get_log_level(ceph_subsys_rgw); + if (log_level <= 1) + rd_kafka_set_log_level(conn->producer, 3); + else if (log_level <= 2) + rd_kafka_set_log_level(conn->producer, 5); + else if (log_level <= 10) + rd_kafka_set_log_level(conn->producer, 6); + else + rd_kafka_set_log_level(conn->producer, 7); + } // conf ownership passed to producer conn->temp_conf = nullptr;