]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: route librdkafka log messages to rgw log 48896/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 15 Nov 2022 18:18:12 +0000 (20:18 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 15 Nov 2022 19:00:43 +0000 (21:00 +0200)
Fixes: https://tracker.ceph.com/issues/50076
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_kafka.cc

index 9b75c9d0885749b55d3278cb9e6a27bd7f7b22dd..954f8b19e0ebffab81d7b3b31a48bb113c82b9b5 100644 (file)
@@ -184,6 +184,20 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
   // rkmessage is destroyed automatically by librdkafka
 }
 
+void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char *buf) {
+  ceph_assert(rd_kafka_opaque(rk));
+  
+  const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
+  if (level <= 3)
+    ldout(conn->cct, 1) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+  else if (level <= 5)
+    ldout(conn->cct, 2) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+  else if (level <= 6)
+    ldout(conn->cct, 10) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+  else
+    ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
+}
+
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn) {
   // pointer must be valid and not marked for deletion
@@ -233,6 +247,8 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
   // set the global opaque pointer to be the connection itself
   rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
 
+  // redirect kafka logs to RGW
+  rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
   // create the producer
   conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
   if (!conn->producer) {
@@ -241,6 +257,18 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
     return conn;
   }
   ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
+  {
+    // set log level of producer
+    const auto log_level = conn->cct->_conf->subsys.get_log_level(ceph_subsys_rgw);
+    if (log_level <= 1)
+      rd_kafka_set_log_level(conn->producer, 3);
+    else if (log_level <= 2)
+      rd_kafka_set_log_level(conn->producer, 5);
+    else if (log_level <= 10)
+      rd_kafka_set_log_level(conn->producer, 6);
+    else
+      rd_kafka_set_log_level(conn->producer, 7);
+  }
 
   // conf ownership passed to producer
   conn->temp_conf = nullptr;