]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: honor verify-ssl and separate cached connections 67978/head
author邓伟键 <emmmvkdeng@gmail.com>
Fri, 27 Mar 2026 02:31:04 +0000 (10:31 +0800)
committer邓伟键 <emmmvkdeng@gmail.com>
Fri, 27 Mar 2026 02:31:04 +0000 (10:31 +0800)
RGW's Kafka notification path accepts and propagates the verify-ssl
topic attribute, but rgw_kafka.cc never applied it to librdkafka.

This patch configures enable.ssl.certificate.verification from
verify-ssl and separates cached connections by verify-ssl setting.

Fixes: https://tracker.ceph.com/issues/55790
Signed-off-by: 邓伟键 <emmmvkdeng@gmail.com>
ceph.spec.in
debian/control
src/rgw/CMakeLists.txt
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/test/rgw/bucket_notification/test_bn.py

index d4cabb21cfb9100685c25f04d2d101b6b79f755c..db2cd71780b8101585c8f4064d9ca46892ecfc94 100644 (file)
@@ -311,7 +311,7 @@ BuildRequires:  yaml-cpp-devel >= 0.6
 BuildRequires:  librabbitmq-devel
 %endif
 %if 0%{with kafka_endpoint}
-BuildRequires:  librdkafka-devel
+BuildRequires:  librdkafka-devel >= 1.1.0
 %endif
 %if 0%{with lua_packages}
 Requires:  lua-devel
index 3d4c84984dd0d66c9cca1014fea162021f4ccb98..0f335519d02491db5113ff5902e750d47ad6aa62 100644 (file)
@@ -73,7 +73,7 @@ Build-Depends: automake,
                librabbitmq-dev,
                libre2-dev,
                libutf8proc-dev (>= 2.2.0),
-               librdkafka-dev,
+               librdkafka-dev (>= 1.1.0),
                libthrift-dev (>= 0.13.0),
                libyaml-cpp-dev (>= 0.6),
                libzstd-dev <pkg.ceph.check>,
index d42e2b099be5a1723d2888a9d606cfa9ce17bc38..340672d85f37904418bb36da8d3eae32ebb97ae9 100644 (file)
@@ -460,7 +460,7 @@ if(WITH_RADOSGW_AMQP_ENDPOINT)
   find_package(RabbitMQ REQUIRED)
 endif()
 if(WITH_RADOSGW_KAFKA_ENDPOINT)
-  find_package(RDKafka 0.9.2 REQUIRED)
+  find_package(RDKafka 1.1.0 REQUIRED)
 endif()
 
 target_link_libraries(rgw_a
index 658f2ed2c809941ccc521415795b3ae407050ccf..0d12083dc31306595e03a6f70aaa273f9d8755b4 100644 (file)
@@ -76,8 +76,10 @@ connection_id_t::connection_id_t(
     const std::string& _password,
     const boost::optional<const std::string&>& _ca_location,
     const boost::optional<const std::string&>& _mechanism,
-    bool _ssl)
-    : broker(_broker), user(_user), password(_password), ssl(_ssl) {
+    bool _ssl,
+    bool _verify_ssl)
+    : broker(_broker), user(_user), password(_password), ssl(_ssl),
+      verify_ssl(_verify_ssl) {
   if (_ca_location.has_value()) {
     ca_location = _ca_location.get();
   }
@@ -91,7 +93,8 @@ connection_id_t::connection_id_t(
 bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
   return lhs.broker == rhs.broker && lhs.user == rhs.user &&
          lhs.password == rhs.password && lhs.ca_location == rhs.ca_location &&
-         lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl;
+         lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl &&
+         lhs.verify_ssl == rhs.verify_ssl;
 }
 
 struct connection_id_hasher {
@@ -103,6 +106,7 @@ struct connection_id_hasher {
     boost::hash_combine(h, k.ca_location);
     boost::hash_combine(h, k.mechanism);
     boost::hash_combine(h, k.ssl);
+    boost::hash_combine(h, k.verify_ssl);
     return h;
   }
 };
@@ -159,7 +163,7 @@ struct connection_t {
   CallbackList callbacks;
   const std::string broker;
   const bool use_ssl;
-  const bool verify_ssl; // TODO currently ignored, not supported in librdkafka v0.11.6
+  const bool verify_ssl;
   const boost::optional<std::string> ca_location;
   const std::string user;
   const std::string password;
@@ -319,8 +323,11 @@ bool new_producer(connection_t* conn) {
     } else {
       ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
     }
-    // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
-    // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+    if (rd_kafka_conf_set(conf.get(), "enable.ssl.certificate.verification",
+                          conn->verify_ssl ? "true" : "false",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+      goto conf_error;
+    }
 
     ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
   } else if (!conn->user.empty()) {
@@ -644,7 +651,7 @@ public:
     }
 
     connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
-                           use_ssl);
+                           use_ssl, verify_ssl);
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(tmp_id);
     // note that ssl vs. non-ssl connection to the same host are two separate connections
index a46594bfe1801558af53a06927f03c333f2fd4d9..acd0846bc1b11511a7358506f5ed250f9ec167b7 100644 (file)
@@ -29,13 +29,15 @@ struct connection_id_t {
   std::string ca_location;
   std::string mechanism;
   bool ssl = false;
+  bool verify_ssl = true;
   connection_id_t() = default;
   connection_id_t(const std::string& _broker,
                   const std::string& _user,
                   const std::string& _password,
                   const boost::optional<const std::string&>& _ca_location,
                   const boost::optional<const std::string&>& _mechanism,
-                  bool _ssl);
+                  bool _ssl,
+                  bool _verify_ssl);
 };
 
 std::string to_string(const connection_id_t& id);
index 728a1072dd4146e280a73b9795267100608358e7..903948df78b3081545e47d15ac0e3d5400db0273 100644 (file)
@@ -4599,7 +4599,8 @@ def test_topic_no_permissions():
     conn2.delete_bucket(bucket_name)
 
 
-def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
+def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False,
+                   verify_ssl=True, include_ca_location=True):
     """ test pushing kafka notification securly to master """
     # Setup SCRAM users if needed
     if mechanism.startswith('SCRAM'):
@@ -4630,12 +4631,19 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=false&mechanism='+mechanism
     elif security_type == 'SASL_SSL':
         KAFKA_DIR = os.environ['KAFKA_DIR']
-        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&mechanism='+mechanism
+        if include_ca_location:
+            endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
         if use_topic_attrs_for_creds:
             endpoint_args += '&user-name=alice&password=alice-secret'
     else:
         KAFKA_DIR = os.environ['KAFKA_DIR']
-        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true'
+        if include_ca_location:
+            endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
+
+    if security_type in ('SSL', 'SASL_SSL') and not verify_ssl:
+        endpoint_args += '&verify-ssl=false'
 
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
 
@@ -4701,6 +4709,11 @@ def test_notification_kafka_security_ssl():
     kafka_security('SSL')
 
 
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_ssl_skip_verification_without_ca():
+    kafka_security('SSL', verify_ssl=False, include_ca_location=False)
+
+
 @pytest.mark.kafka_security_test
 def test_notification_kafka_security_ssl_sasl():
     kafka_security('SASL_SSL')