From: 邓伟键 Date: Fri, 27 Mar 2026 02:31:04 +0000 (+0800) Subject: rgw/kafka: honor verify-ssl and separate cached connections X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=595a7537717744a8fd35ced5c4614d34a29495f5;p=ceph.git rgw/kafka: honor verify-ssl and separate cached connections RGW's Kafka notification path accepts and propagates the verify-ssl topic attribute, but rgw_kafka.cc never applied it to librdkafka. This patch configures enable.ssl.certificate.verification from verify-ssl and separates cached connections by verify-ssl setting. Fixes: https://tracker.ceph.com/issues/55790 Signed-off-by: 邓伟键 --- diff --git a/ceph.spec.in b/ceph.spec.in index d4cabb21cfb9..db2cd71780b8 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -311,7 +311,7 @@ BuildRequires: yaml-cpp-devel >= 0.6 BuildRequires: librabbitmq-devel %endif %if 0%{with kafka_endpoint} -BuildRequires: librdkafka-devel +BuildRequires: librdkafka-devel >= 1.1.0 %endif %if 0%{with lua_packages} Requires: lua-devel diff --git a/debian/control b/debian/control index 3d4c84984dd0..0f335519d024 100644 --- a/debian/control +++ b/debian/control @@ -73,7 +73,7 @@ Build-Depends: automake, librabbitmq-dev, libre2-dev, libutf8proc-dev (>= 2.2.0), - librdkafka-dev, + librdkafka-dev (>= 1.1.0), libthrift-dev (>= 0.13.0), libyaml-cpp-dev (>= 0.6), libzstd-dev , diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index d42e2b099be5..340672d85f37 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -460,7 +460,7 @@ if(WITH_RADOSGW_AMQP_ENDPOINT) find_package(RabbitMQ REQUIRED) endif() if(WITH_RADOSGW_KAFKA_ENDPOINT) - find_package(RDKafka 0.9.2 REQUIRED) + find_package(RDKafka 1.1.0 REQUIRED) endif() target_link_libraries(rgw_a diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 658f2ed2c809..0d12083dc313 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -76,8 +76,10 @@ connection_id_t::connection_id_t( const std::string& _password, const boost::optional& _ca_location, const boost::optional& _mechanism, - bool _ssl) - : broker(_broker), user(_user), password(_password), ssl(_ssl) { + bool _ssl, + bool _verify_ssl) + : broker(_broker), user(_user), password(_password), ssl(_ssl), + verify_ssl(_verify_ssl) { if (_ca_location.has_value()) { ca_location = _ca_location.get(); } @@ -91,7 +93,8 @@ connection_id_t::connection_id_t( bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) { return lhs.broker == rhs.broker && lhs.user == rhs.user && lhs.password == rhs.password && lhs.ca_location == rhs.ca_location && - lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl; + lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl && + lhs.verify_ssl == rhs.verify_ssl; } struct connection_id_hasher { @@ -103,6 +106,7 @@ struct connection_id_hasher { boost::hash_combine(h, k.ca_location); boost::hash_combine(h, k.mechanism); boost::hash_combine(h, k.ssl); + boost::hash_combine(h, k.verify_ssl); return h; } }; @@ -159,7 +163,7 @@ struct connection_t { CallbackList callbacks; const std::string broker; const bool use_ssl; - const bool verify_ssl; // TODO currently ignored, not supported in librdkafka v0.11.6 + const bool verify_ssl; const boost::optional ca_location; const std::string user; const std::string password; @@ -319,8 +323,11 @@ bool new_producer(connection_t* conn) { } else { ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl; } - // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call - // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "enable.ssl.certificate.verification", + conn->verify_ssl ? "true" : "false", + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl; } else if (!conn->user.empty()) { @@ -644,7 +651,7 @@ public: } connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism, - use_ssl); + use_ssl, verify_ssl); std::lock_guard lock(connections_lock); const auto it = connections.find(tmp_id); // note that ssl vs. non-ssl connection to the same host are two separate connections diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index a46594bfe180..acd0846bc1b1 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -29,13 +29,15 @@ struct connection_id_t { std::string ca_location; std::string mechanism; bool ssl = false; + bool verify_ssl = true; connection_id_t() = default; connection_id_t(const std::string& _broker, const std::string& _user, const std::string& _password, const boost::optional& _ca_location, const boost::optional& _mechanism, - bool _ssl); + bool _ssl, + bool _verify_ssl); }; std::string to_string(const connection_id_t& id); diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 728a1072dd41..903948df78b3 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -4599,7 +4599,8 @@ def test_topic_no_permissions(): conn2.delete_bucket(bucket_name) -def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False): +def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False, + verify_ssl=True, include_ca_location=True): """ test pushing kafka notification securly to master """ # Setup SCRAM users if needed if mechanism.startswith('SCRAM'): @@ -4630,12 +4631,19 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=false&mechanism='+mechanism elif security_type == 'SASL_SSL': KAFKA_DIR = os.environ['KAFKA_DIR'] - endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&mechanism='+mechanism + if include_ca_location: + endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt' if use_topic_attrs_for_creds: endpoint_args += '&user-name=alice&password=alice-secret' else: KAFKA_DIR = os.environ['KAFKA_DIR'] - endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt' + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true' + if include_ca_location: + endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt' + + if security_type in ('SSL', 'SASL_SSL') and not verify_ssl: + endpoint_args += '&verify-ssl=false' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) @@ -4701,6 +4709,11 @@ def test_notification_kafka_security_ssl(): kafka_security('SSL') +@pytest.mark.kafka_security_test +def test_notification_kafka_security_ssl_skip_verification_without_ca(): + kafka_security('SSL', verify_ssl=False, include_ca_location=False) + + @pytest.mark.kafka_security_test def test_notification_kafka_security_ssl_sasl(): kafka_security('SASL_SSL')