]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
feat(rgw/kafka): add mTLS client certificate authentication for Kafka notifications 68771/head
authorJan Radon <jan.fabian.radon@sap.com>
Fri, 15 May 2026 13:42:08 +0000 (15:42 +0200)
committerJan Radon <jan.fabian.radon@sap.com>
Sat, 23 May 2026 07:05:37 +0000 (09:05 +0200)
Add support for mutual TLS (mTLS) client certificate authentication
when publishing bucket notifications to Kafka brokers. RGW can now
present a client certificate and private key to authenticate with
brokers that require ssl.client.auth=required.
Changes:
- Add ssl-certificate-location, ssl-key-location, and ssl-key-password
  topic attributes for configuring client certificates
- Validate that ssl_certificate and ssl_key are provided together
- Include ssl_key_password in connection identity (hash/equality)
- Add kafka-security.sh script for generating broker and client TLS certs
- Add mTLS test (test_notification_kafka_security_ssl_mtls) using
  use_mtls=True flag on the existing SSL security path
- Update RGW notifications documentation with mTLS parameters

Fixes: http://tracker.ceph.com/issues/67427
Signed-off-by: Jan Radon <jan.fabian.radon@sap.com>
doc/radosgw/notifications.rst
qa/tasks/kafka.py
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/rgw/rgw_rest_pubsub.cc
src/test/rgw/bucket_notification/README.rst
src/test/rgw/bucket_notification/kafka-security.sh
src/test/rgw/bucket_notification/test_bn.py

index 56b46b0087d73a7b5214826ed15b4a1f8b99d723..de07940ef619fd1c0f93380b6fdcc111db58b66e 100644 (file)
@@ -236,6 +236,9 @@ and must be between 1 and 256 characters long. To relax these requirements, use:
    [&Attributes.entry.16.key=user-name&Attributes.entry.16.value=<user-name-string>]
    [&Attributes.entry.17.key=password&Attributes.entry.17.value=<password-string>]
    [&Attributes.entry.18.key=kafka-brokers&Attributes.entry.18.value=<kafka-broker-list>]
+   [&Attributes.entry.19.key=ssl-certificate-location&Attributes.entry.19.value=<file path>]
+   [&Attributes.entry.20.key=ssl-key-location&Attributes.entry.20.value=<file path>]
+   [&Attributes.entry.21.key=ssl-key-password&Attributes.entry.21.value=<password-string>]
 
 Request parameters:
 
@@ -362,6 +365,22 @@ Request parameters:
  - ``kafka-brokers``: A comma-separated list of ``host:port`` of Kafka brokers:
    these brokers (may contain a broker which is defined in Kafka URI) will be
    added to Kafka URI to support sending notifications to a Kafka cluster.
+ - ``ssl-certificate-location``: The path to a PEM-encoded client certificate
+   file to present to the Kafka broker for mutual TLS (mTLS) authentication.
+   This enables certificate-based client identity and must be used together
+   with ``ssl-key-location`` and ``use-ssl=true``. Specifying only one of
+   ``ssl-certificate-location`` or ``ssl-key-location`` will cause the
+   connection to fail.
+ - ``ssl-key-location``: The path to a PEM-encoded private key file
+   corresponding to the client certificate specified in
+   ``ssl-certificate-location``.
+ - ``ssl-key-password``: The password for the client private key, if the key
+   file is encrypted. This is optional and only required when the private key
+   is password-protected.
+
+   The same security considerations in place for this parameter as
+   for ``user``/``password``: it should be provided over HTTPS or
+   ``rgw_allow_notification_secrets_in_cleartext`` must be set to "true".
 
 .. note::
 
@@ -639,6 +658,12 @@ Valid ``AttributeName`` that can be passed:
   broker before being delivered to their final destinations.
 - ``kafka-brokers``: Set endpoint with broker(s) as a comma-separated list of
   ``host`` or ``host:port`` (default port 9092).
+- ``ssl-certificate-location``: Path to a PEM-encoded client certificate for mTLS
+  authentication to the Kafka broker. Must be provided together with
+  ``ssl-key-location``; specifying only one will cause the connection to fail.
+- ``ssl-key-location``: Path to a PEM-encoded private key corresponding to the
+  client certificate. Must be provided together with ``ssl-certificate-location``.
+- ``ssl-key-password``: Password for an encrypted private key (optional).
 
 Notifications
 ~~~~~~~~~~~~~
index c87cd06c1c87f4cb4d5143d578f54ac064642c54..0337bbf740d3972799b42322da8bb44e1c9efc55 100644 (file)
@@ -12,6 +12,14 @@ from teuthology.orchestra import run
 
 log = logging.getLogger(__name__)
 
+KAFKA_PORTS = {
+    'PLAINTEXT': 9092,
+    'SSL': 9093,
+    'SASL_SSL': 9094,
+    'SASL_PLAINTEXT': 9095,
+    'MTLS': 9096,
+}
+
 def get_kafka_version(config):
     for client, client_config in config.items():
         if 'kafka_version' in client_config:
@@ -116,9 +124,15 @@ def broker_conf(ctx, client, kafka_dir):
     ip = remote.ip_address
     conf = (
         "broker.id=0\n"
-        "listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_SSL://0.0.0.0:9094,SASL_PLAINTEXT://0.0.0.0:9095\n"
-        "advertised.listeners=PLAINTEXT://{ip}:9092,SSL://{ip}:9093,SASL_SSL://{ip}:9094,SASL_PLAINTEXT://{ip}:9095\n"
-        "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT\n"
+        "listeners=PLAINTEXT://0.0.0.0:{plaintext},SSL://0.0.0.0:{ssl},"
+        "SASL_SSL://0.0.0.0:{sasl_ssl},SASL_PLAINTEXT://0.0.0.0:{sasl_plaintext},"
+        "MTLS://0.0.0.0:{mtls}\n"
+        "advertised.listeners=PLAINTEXT://{ip}:{plaintext},SSL://{ip}:{ssl},"
+        "SASL_SSL://{ip}:{sasl_ssl},SASL_PLAINTEXT://{ip}:{sasl_plaintext},"
+        "MTLS://{ip}:{mtls}\n"
+        "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,"
+        "SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,MTLS:SSL\n"
+        "inter.broker.listener.name=PLAINTEXT\n"
         "log.dirs={tdir}/data/kafka-logs\n"
         "num.network.threads=3\n"
         "num.io.threads=8\n"
@@ -136,14 +150,23 @@ def broker_conf(ctx, client, kafka_dir):
         "zookeeper.connect=localhost:2181\n"
         "zookeeper.connection.timeout.ms=18000\n"
         "group.initial.rebalance.delay.ms=0\n"
+        # SSL configuration
         "ssl.keystore.location={tdir}/server.keystore.jks\n"
         "ssl.keystore.password=mypassword\n"
         "ssl.key.password=mypassword\n"
         "ssl.truststore.location={tdir}/server.truststore.jks\n"
         "ssl.truststore.password=mypassword\n"
+        "ssl.client.auth=requested\n"
+        # mTLS listener (port 9096) requires client certificate
+        "listener.name.mtls.ssl.client.auth=required\n"
+        "listener.name.mtls.ssl.keystore.location={tdir}/server.keystore.jks\n"
+        "listener.name.mtls.ssl.keystore.password=mypassword\n"
+        "listener.name.mtls.ssl.key.password=mypassword\n"
+        "listener.name.mtls.ssl.truststore.location={tdir}/server.truststore.jks\n"
+        "listener.name.mtls.ssl.truststore.password=mypassword\n"
+        # SASL mechanisms
         "sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512\n"
         "sasl.mechanism.inter.broker.protocol=PLAIN\n"
-        "inter.broker.listener.name=PLAINTEXT\n"
         'listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\\n'
         '  username="admin" \\\n'
         '  password="admin-secret" \\\n'
@@ -164,7 +187,15 @@ def broker_conf(ctx, client, kafka_dir):
         'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n'
         '  username="admin" \\\n'
         '  password="admin-secret";\n'
-    ).format(tdir=kafka_dir, ip=ip)
+    ).format(
+        tdir=kafka_dir,
+        ip=ip,
+        plaintext=KAFKA_PORTS['PLAINTEXT'],
+        ssl=KAFKA_PORTS['SSL'],
+        sasl_ssl=KAFKA_PORTS['SASL_SSL'],
+        sasl_plaintext=KAFKA_PORTS['SASL_PLAINTEXT'],
+        mtls=KAFKA_PORTS['MTLS'],
+    )
     file_name = 'server.properties'
     log.info("kafka conf file: %s", file_name)
     log.info(conf)
@@ -281,6 +312,7 @@ def run_admin_cmds(ctx,config):
         pass
 
 
+
 @contextlib.contextmanager
 def task(ctx,config):
     """
index 5e237507d6a7ddfed42c50a86d1d63c3168334b5..3fdf2e9c5023cd1921215e70fdd9432c7e14d2d9 100644 (file)
@@ -308,7 +308,10 @@ public:
            conn_id, _endpoint, get_bool(args, "use-ssl", false),
            get_bool(args, "verify-ssl", true), args.get_optional("ca-location"),
            args.get_optional("mechanism"), args.get_optional("user-name"),
-           args.get_optional("password"), args.get_optional("kafka-brokers"))) {
+           args.get_optional("password"), args.get_optional("kafka-brokers"),
+           args.get_optional("ssl-certificate-location"),
+           args.get_optional("ssl-key-location"),
+           args.get_optional("ssl-key-password"))) {
      throw configuration_error("Kafka: failed to create connection to: " +
                                _endpoint);
    }
index 3fa6f49b93fa8bc85e6b756ca35ccd0b36635fda..c86b8445723a9e0e54e746107102b26a8a413ef9 100644 (file)
@@ -77,15 +77,29 @@ connection_id_t::connection_id_t(
     const boost::optional<const std::string&>& _ca_location,
     const boost::optional<const std::string&>& _mechanism,
     bool _ssl,
-    bool _verify_ssl)
+    bool _verify_ssl,
+    const boost::optional<const std::string&>& _ssl_certificate,
+    const boost::optional<const std::string&>& _ssl_key,
+    const boost::optional<const std::string&>& _ssl_key_password
+  )
     : broker(_broker), user(_user), password(_password), ssl(_ssl),
       verify_ssl(_verify_ssl) {
+
   if (_ca_location.has_value()) {
     ca_location = _ca_location.get();
   }
   if (_mechanism.has_value()) {
     mechanism = _mechanism.get();
   }
+  if (_ssl_certificate.has_value()) {
+    ssl_certificate = _ssl_certificate.get();
+  }
+  if (_ssl_key.has_value()) {
+    ssl_key = _ssl_key.get();
+  }
+  if (_ssl_key_password.has_value()) {
+    ssl_key_password = _ssl_key_password.get();
+  }
 }
 
 // equality operator and hasher functor are needed
@@ -94,7 +108,10 @@ bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
   return lhs.broker == rhs.broker && lhs.user == rhs.user &&
          lhs.password == rhs.password && lhs.ca_location == rhs.ca_location &&
          lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl &&
-         lhs.verify_ssl == rhs.verify_ssl;
+         lhs.verify_ssl == rhs.verify_ssl &&
+         lhs.ssl_certificate == rhs.ssl_certificate &&
+         lhs.ssl_key == rhs.ssl_key &&
+         lhs.ssl_key_password == rhs.ssl_key_password;
 }
 
 struct connection_id_hasher {
@@ -107,6 +124,9 @@ struct connection_id_hasher {
     boost::hash_combine(h, k.mechanism);
     boost::hash_combine(h, k.ssl);
     boost::hash_combine(h, k.verify_ssl);
+    boost::hash_combine(h, k.ssl_certificate);
+    boost::hash_combine(h, k.ssl_key);
+    boost::hash_combine(h, k.ssl_key_password);
     return h;
   }
 };
@@ -168,6 +188,9 @@ struct connection_t {
   const std::string user;
   const std::string password;
   const boost::optional<std::string> mechanism;
+  const boost::optional<std::string> ssl_certificate;
+  const boost::optional<std::string> ssl_key;
+  const boost::optional<std::string> ssl_key_password;
   utime_t timestamp = ceph_clock_now();
 
   // cleanup of all internal connection resource
@@ -200,8 +223,12 @@ struct connection_t {
   // ctor for setting immutable values
   connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, 
           const boost::optional<const std::string&>& _ca_location,
-          const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism) :
-      cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {}                                                                                                                                                        
+          const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism,
+          const boost::optional<const std::string&>& _ssl_certificate,
+          const boost::optional<const std::string&>& _ssl_key,
+          const boost::optional<const std::string&>& _ssl_key_password) :
+      cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism),
+      ssl_certificate(_ssl_certificate), ssl_key(_ssl_key), ssl_key_password(_ssl_key_password) {}                                                                                                                                                        
 
   // dtor also destroys the internals
   ~connection_t() {
@@ -341,6 +368,20 @@ bool new_producer(connection_t* conn) {
                           errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
       goto conf_error;
     }
+    if (conn->ssl_certificate) {
+      if (rd_kafka_conf_set(conf.get(), "ssl.certificate.location", conn->ssl_certificate->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      ldout(conn->cct, 20) << "Kafka connect: successfully configured client certificate location" << dendl;
+    }
+    if (conn->ssl_key) {
+      if (rd_kafka_conf_set(conf.get(), "ssl.key.location", conn->ssl_key->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      ldout(conn->cct, 20) << "Kafka connect: successfully configured client key location" << dendl;
+    }
+    if (conn->ssl_key_password) {
+      if (rd_kafka_conf_set(conf.get(), "ssl.key.password", conn->ssl_key_password->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      ldout(conn->cct, 20) << "Kafka connect: successfully configured client key password" << dendl;
+    }
+    // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
+    // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
 
     ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
   } else if (!conn->user.empty()) {
@@ -620,7 +661,10 @@ public:
                boost::optional<const std::string&> mechanism,
                boost::optional<const std::string&> topic_user_name,
                boost::optional<const std::string&> topic_password,
-               boost::optional<const std::string&> brokers) {
+               boost::optional<const std::string&> brokers,
+               boost::optional<const std::string&> ssl_certificate,
+               boost::optional<const std::string&> ssl_key,
+               boost::optional<const std::string&> ssl_key_password) {
     if (stopped) {
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
       return false;
@@ -658,13 +702,20 @@ public:
       return false;
     }
 
+    // ssl_certificate and ssl_key must both be provided for mTLS
+    if (ssl_certificate.has_value() != ssl_key.has_value()) {
+      ldout(cct, 1) << "Kafka connect: both ssl_certificate and ssl_key must be provided for mTLS (got only "
+                    << (ssl_certificate.has_value() ? "ssl_certificate" : "ssl_key") << ")" << dendl;
+      return false;
+    }
+
     if (brokers.has_value()) {
       broker_list.append(",");
       broker_list.append(brokers.get());
     }
 
     connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
-                           use_ssl, verify_ssl);
+                           use_ssl, verify_ssl, ssl_certificate, ssl_key, ssl_key_password);
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(tmp_id);
     // note that ssl vs. non-ssl connection to the same host are two separate connections
@@ -683,7 +734,7 @@ public:
       return false;
     }
 
-    auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+    auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism, ssl_certificate, ssl_key, ssl_key_password);
     if (!new_producer(conn.get())) {
       ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
       return false;
@@ -802,11 +853,15 @@ bool connect(connection_id_t& conn_id,
              boost::optional<const std::string&> mechanism,
              boost::optional<const std::string&> user_name,
              boost::optional<const std::string&> password,
-             boost::optional<const std::string&> brokers) {
+             boost::optional<const std::string&> brokers,
+             boost::optional<const std::string&> ssl_certificate,
+             boost::optional<const std::string&> ssl_key,
+             boost::optional<const std::string&> ssl_key_password) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return false;
   return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
-                            mechanism, user_name, password, brokers);
+                            mechanism, user_name, password, brokers,
+                            ssl_certificate, ssl_key, ssl_key_password);
 }
 
 int publish(const connection_id_t& conn_id,
index acd0846bc1b11511a7358506f5ed250f9ec167b7..34d06b0d223102be841fa855550042c5f9c14c4e 100644 (file)
@@ -28,6 +28,9 @@ struct connection_id_t {
   std::string password;
   std::string ca_location;
   std::string mechanism;
+  std::string ssl_certificate;
+  std::string ssl_key;
+  std::string ssl_key_password;
   bool ssl = false;
   bool verify_ssl = true;
   connection_id_t() = default;
@@ -37,7 +40,10 @@ struct connection_id_t {
                   const boost::optional<const std::string&>& _ca_location,
                   const boost::optional<const std::string&>& _mechanism,
                   bool _ssl,
-                  bool _verify_ssl);
+                  bool _verify_ssl,
+                  const boost::optional<const std::string&>& _ssl_certificate,
+                  const boost::optional<const std::string&>& _ssl_key,
+                  const boost::optional<const std::string&>& _ssl_key_password);
 };
 
 std::string to_string(const connection_id_t& id);
@@ -51,7 +57,10 @@ bool connect(connection_id_t& conn_id,
              boost::optional<const std::string&> mechanism,
              boost::optional<const std::string&> user_name,
              boost::optional<const std::string&> password,
-             boost::optional<const std::string&> brokers);
+             boost::optional<const std::string&> brokers,
+             boost::optional<const std::string&> ssl_certificate,
+             boost::optional<const std::string&> ssl_key,
+             boost::optional<const std::string&> ssl_key_password);
 
 // publish a message over a connection that was already created
 int publish(const connection_id_t& conn_id,
index 4f32804a552960150698cbc9362c4929ace7209a..23d25e197536f7c2189de84e129bff0d3296209b 100644 (file)
@@ -76,6 +76,17 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct
       return false;
     }
   }
+
+  // check for mTLS key password - also a secret that requires secure transport
+  auto ssl_key_password = args.get_optional("ssl-key-password");
+  if (ssl_key_password.has_value() && !ssl_key_password->empty()) {
+    dest.stored_secret = true;
+    if (!verify_transport_security(cct, *ri.env)) {
+      message = "Topic contains secrets that must be transmitted over a secure transport";
+      return false;
+    }
+  }
+
   return true;
 }
 
@@ -813,7 +824,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
       static constexpr std::initializer_list<const char*> args = {
           "verify-ssl",    "use-ssl",         "ca-location", "amqp-ack-level",
           "amqp-exchange", "kafka-ack-level", "mechanism",   "cloudevents",
-          "user-name",     "password"};
+          "user-name",     "password",
+          "ssl-certificate-location", "ssl-key-location", "ssl-key-password"};
       if (std::find(args.begin(), args.end(), attribute_name) != args.end()) {
         replace_str(attribute_name, s->info.args.get("AttributeValue"));
         return 0;
index 8d94c79df7d17f6e826ab727eaeffeb0af6fc1ed..6460e692507e1fb5de9ffc16e39aaac117130d2b 100644 (file)
@@ -77,15 +77,15 @@ Kafka Security Tests
    **Important:** If you face any initialization failures, replace ``localhost`` in both ``listeners`` and ``advertised.listeners`` with your Kafka broker's actual hostname or IP address.
    For example, if your Kafka broker runs on host ``kafka-server.example.com`` or IP ``192.168.1.100``, use::
 
-   listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
-   advertised.listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
+   listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095,MTLS://192.168.1.100:9096
+   advertised.listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095,MTLS://192.168.1.100:9096
 
    If both ``listeners`` and ``advertised.listeners`` do not match, the broker cannot connect to itself, causing initialization failures.
 
-        # All listeners
-        listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
-        advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
-        listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT
+        # All listeners (including MTLS on port 9096)
+        listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095,MTLS://localhost:9096
+        advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095,MTLS://localhost:9096
+        listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,MTLS:SSL
 
         # SSL configuration matching the kafka-security.sh script
         ssl.keystore.location=./server.keystore.jks
@@ -94,6 +94,19 @@ Kafka Security Tests
         ssl.truststore.location=./server.truststore.jks
         ssl.truststore.password=mypassword
 
+        # Allow optional client certificates on the main SSL listener (port 9093).
+        # Use "requested" so that plain SSL tests (without client certs) still work,
+        # while mTLS tests use a separate listener with ssl.client.auth=required.
+        ssl.client.auth=requested
+
+        # mTLS listener: requires client certificate authentication
+        listener.name.mtls.ssl.client.auth=required
+        listener.name.mtls.ssl.keystore.location=./server.keystore.jks
+        listener.name.mtls.ssl.keystore.password=mypassword
+        listener.name.mtls.ssl.key.password=mypassword
+        listener.name.mtls.ssl.truststore.location=./server.truststore.jks
+        listener.name.mtls.ssl.truststore.password=mypassword
+
         # SASL mechanisms
         sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
         sasl.mechanism.inter.broker.protocol=PLAIN
index 020e784585d55f304d9215abd1aa03c2a7c25279..b3d216b0cd2618d6a1a491a6d35644270e7e307b 100755 (executable)
@@ -14,6 +14,9 @@ rm -f $TRUSTFILE
 rm -f $CAFILE
 rm -f $REQFILE
 rm -f $CERTFILE
+rm -f $CLIENT_KEYFILE
+rm -f $CLIENT_CERTFILE
+rm -f $CLIENT_REQFILE
 
 SAN_STRING="DNS:$FQDN"
 if [ -n "$IP_SAN" ]; then
@@ -32,6 +35,7 @@ echo "########## create the CA '$CAFILE'"
 openssl req -new -nodes -x509 -keyout $CAKEYFILE -out $CAFILE \
   -days $VALIDITY -subj \
   '/C=US/ST=Michigan/L=Ann Arbor/O=Red Hat Inc/OU=Michigan Engineering/CN=yuval-1'
+chmod 644 $CAKEYFILE
 
 echo "########## store the CA in trust store '$TRUSTFILE'"
 keytool -keystore $TRUSTFILE -storepass $MYPW -alias CARoot \
@@ -61,3 +65,26 @@ echo "########## store certificate '$CERTFILE' in key store '$KEYFILE'"
 keytool -storepass $MYPW -keystore $KEYFILE -alias localhost \
   -import -file $CERTFILE
 
+echo "########## generate client certificate for mTLS testing"
+CLIENT_KEYFILE=client.key
+CLIENT_CERTFILE=client.crt
+CLIENT_REQFILE=client.req
+
+# generate client private key (PKCS#8 for compatibility)
+openssl genpkey -algorithm RSA -out $CLIENT_KEYFILE -pkeyopt rsa_keygen_bits:2048
+chmod 644 $CLIENT_KEYFILE
+
+# generate client CSR
+openssl req -new -key $CLIENT_KEYFILE -out $CLIENT_REQFILE \
+  -subj '/CN=rgw-client/OU=Testing/O=Ceph/C=US'
+
+# sign client cert with our CA
+openssl x509 -req -CA $CAFILE -CAkey $CAKEYFILE -CAcreateserial \
+  -days $VALIDITY -in $CLIENT_REQFILE -out $CLIENT_CERTFILE -sha256
+
+rm -f $CLIENT_REQFILE
+
+echo "########## import client CA into truststore (so broker trusts client certs)"
+keytool -keystore $TRUSTFILE -storepass $MYPW -alias ClientCA \
+  -noprompt -importcert -file $CAFILE 2>/dev/null || true
+
index 459211af2c38e71ed20d577627f3f234a9a150cd..c42c9ff07da2a762c60b5fc582c1a06ca88c98a6 100644 (file)
@@ -439,59 +439,75 @@ def setup_scram_users_via_kafka_configs(mechanism: str) -> None:
         return
     
     kafka_configs = os.path.join(kafka_dir, 'bin/kafka-configs.sh')
+    kafka_configs_no_ext = os.path.join(kafka_dir, 'bin/kafka-configs')
     if not os.path.exists(kafka_configs):
-        log.warning(f"kafka-configs.sh not found at {kafka_configs}")
-        return
-    
-    scram_mechanism = 'SCRAM-SHA-512' if 'SHA-512' in mechanism else 'SCRAM-SHA-256'
-    zk_connect = 'localhost:2181'
-    
-    try:
-        # delete existing SCRAM credentials first
-        subprocess.run(
-            [kafka_configs,
-             '--zookeeper', zk_connect,
-             '--alter',
-             '--entity-type', 'users',
-             '--entity-name', KAFKA_TEST_USER,
-             '--delete-config', 'scram-sha-256,scram-sha-512'],
-            capture_output=True,
-            timeout=15,
-            check=False
-        )
-        time.sleep(1)
-        
-        # adding SCRAM credentials
-        add_config_value = f'{scram_mechanism}=[password={KAFKA_TEST_PASSWORD}]'
-        result = subprocess.run(
-            [kafka_configs,
-             '--zookeeper', zk_connect,
-             '--alter',
-             '--entity-type', 'users',
-             '--entity-name', KAFKA_TEST_USER,
-             '--add-config', add_config_value],
+        if os.path.exists(kafka_configs_no_ext):
+            kafka_configs = kafka_configs_no_ext
+        else:
+            raise RuntimeError(
+                f"kafka-configs not found under KAFKA_DIR={kafka_dir}. "
+                "Expected bin/kafka-configs.sh or bin/kafka-configs"
+            )
+
+    base_cmd = [kafka_configs]
+
+    def run_kafka_configs(args):
+        return subprocess.run(
+            base_cmd + args,
             capture_output=True,
             text=True,
-            timeout=15,
-            check=False
+            timeout=30,
+            check=False,
         )
-        
-        if result.returncode == 0:
-            log.info(f"SCRAM user configured: {KAFKA_TEST_USER} ({scram_mechanism})")
-        else:
-            raise RuntimeError(f"Failed to create SCRAM user {KAFKA_TEST_USER} with {scram_mechanism}")
+    
+    try:
+        # Idempotently ensure both SCRAM mechanisms exist for the test user.
+        # This avoids cross-test credential drift between SHA-256 and SHA-512 tests.
+        bootstrap_server = f'{default_kafka_server}:9092'
+        for scram_mechanism in ('SCRAM-SHA-256', 'SCRAM-SHA-512'):
+            add_config_value = f'{scram_mechanism}=[password={KAFKA_TEST_PASSWORD}]'
+            result = run_kafka_configs([
+                '--bootstrap-server', bootstrap_server,
+                '--alter',
+                '--entity-type', 'users',
+                '--entity-name', KAFKA_TEST_USER,
+                '--add-config', add_config_value,
+            ])
+            if result.returncode != 0:
+                raise RuntimeError(
+                    f"Failed to create/update SCRAM credentials for {KAFKA_TEST_USER} ({scram_mechanism}): {result.stderr.strip()}"
+                )
+
+        describe = run_kafka_configs([
+            '--bootstrap-server', bootstrap_server,
+            '--describe',
+            '--entity-type', 'users',
+            '--entity-name', KAFKA_TEST_USER,
+        ])
+        if describe.returncode != 0:
+            raise RuntimeError(f"Failed to verify SCRAM user {KAFKA_TEST_USER}: {describe.stderr.strip()}")
+        if ('SCRAM-SHA-256' not in describe.stdout) or ('SCRAM-SHA-512' not in describe.stdout):
+            raise RuntimeError(
+                f"SCRAM verification missing mechanisms for {KAFKA_TEST_USER}. output: {describe.stdout.strip()}"
+            )
+
+        log.info(f"SCRAM user configured idempotently: {KAFKA_TEST_USER} (SCRAM-SHA-256,SCRAM-SHA-512)")
     except Exception as e:
         log.error(f"Failed to setup SCRAM users via kafka-configs: {e}")
         raise
 
 def _kafka_ca_cert_path():
-    kafka_dir = os.environ.get('KAFKA_DIR')
+    kafka_dir = os.environ.get('KAFKA_CERT_DIR') or os.environ.get('KAFKA_DIR')
     if kafka_dir:
         ca_path = os.path.join(kafka_dir, 'y-ca.crt')
         if os.path.exists(ca_path):
             return ca_path
     return None
 
+
+def _kafka_cert_dir():
+    return os.environ.get('KAFKA_CERT_DIR') or os.environ.get('KAFKA_DIR', '/opt/kafka')
+
 class KafkaReceiver(object):
     """class for receiving and storing messages on a topic from the kafka broker"""
     def __init__(self, topic_name, security_type, kafka_server, mechanism='PLAIN'):
@@ -525,9 +541,9 @@ class KafkaReceiver(object):
             base_config['security_protocol'] = 'SSL'
             if ca_cert:
                 base_config['ssl_cafile'] = ca_cert
-            kafka_dir = os.environ.get('KAFKA_DIR', '/opt/kafka')
-            client_cert = os.path.join(kafka_dir, 'config/client.crt')
-            client_key = os.path.join(kafka_dir, 'config/client.key')
+            kafka_dir = _kafka_cert_dir()
+            client_cert = os.path.join(kafka_dir, 'client.crt')
+            client_key = os.path.join(kafka_dir, 'client.key')
             if os.path.exists(client_cert) and os.path.exists(client_key):
                 base_config['ssl_certfile'] = client_cert
                 base_config['ssl_keyfile'] = client_key
@@ -2641,13 +2657,6 @@ def metadata_filter(endpoint_type, conn):
         task.start()
         endpoint_address = 'amqp://' + host
         endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable&persistent=true'
-    elif endpoint_type == 'kafka':
-        # start kafka receiver
-        task, receiver = create_kafka_receiver_thread(topic_name)
-        task.start()
-        verify_kafka_receiver(receiver)
-        endpoint_address = 'kafka://' + host
-        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
     else:
         pytest.skip('Unknown endpoint type: ' + endpoint_type)
 
@@ -2736,12 +2745,6 @@ def metadata_filter(endpoint_type, conn):
     conn.delete_bucket(bucket_name)
 
 
-@pytest.mark.kafka_test
-def test_metadata_filter_kafka():
-    """ test notification of filtering metadata, kafka endpoint """
-    conn = connection()
-    metadata_filter('kafka', conn)
-
 
 @pytest.mark.http_test
 def test_metadata_filter_http():
@@ -4650,7 +4653,7 @@ def test_topic_no_permissions():
 
 
 def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False,
-                   verify_ssl=True, include_ca_location=True):
+                   verify_ssl=True, include_ca_location=True, use_mtls=False):
     """ test pushing kafka notification securly to master """
     # Setup SCRAM users if needed
     if mechanism.startswith('SCRAM'):
@@ -4671,7 +4674,10 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
         else:
             endpoint_address = 'kafka://' + default_kafka_server + ':9094'
     elif security_type == 'SSL':
-        endpoint_address = 'kafka://' + default_kafka_server + ':9093'
+        if use_mtls:
+            endpoint_address = 'kafka://' + default_kafka_server + ':9096'
+        else:
+            endpoint_address = 'kafka://' + default_kafka_server + ':9093'
     elif security_type == 'SASL_PLAINTEXT':
         endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9095'
     else:
@@ -4680,24 +4686,32 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
     if security_type == 'SASL_PLAINTEXT':
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=false&mechanism='+mechanism
     elif security_type == 'SASL_SSL':
-        KAFKA_DIR = os.environ['KAFKA_DIR']
-        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&mechanism='+mechanism
-        if include_ca_location:
-            endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
+        kafka_cert_dir = _kafka_cert_dir()
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+kafka_cert_dir+'/y-ca.crt&mechanism='+mechanism
         if use_topic_attrs_for_creds:
             endpoint_args += '&user-name=alice&password=alice-secret'
     else:
-        KAFKA_DIR = os.environ['KAFKA_DIR']
+        kafka_cert_dir = _kafka_cert_dir()
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true'
         if include_ca_location:
-            endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
-
-    if security_type in ('SSL', 'SASL_SSL') and not verify_ssl:
-        endpoint_args += '&verify-ssl=false'
+            endpoint_args += '&ca-location='+kafka_cert_dir+'/y-ca.crt'
+        if not verify_ssl:
+            endpoint_args += '&verify-ssl=false'
+        if use_mtls:
+            ssl_cert_path = os.path.join(kafka_cert_dir, 'client.crt')
+            ssl_key_path = os.path.join(kafka_cert_dir, 'client.key')
+            assert os.path.isfile(ssl_cert_path), \
+                f'mTLS client certificate not found: {ssl_cert_path}'
+            assert os.path.isfile(ssl_key_path), \
+                f'mTLS client key not found: {ssl_key_path}'
+            endpoint_args += '&ssl-certificate-location=' + ssl_cert_path
+            endpoint_args += '&ssl-key-location=' + ssl_key_path
 
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
 
     # create consumer on the topic
+    # When use_mtls=True, RGW produces to port 9096 (ssl.client.auth=required) to enforce mTLS.
+    # The test consumer connects to port 9093 (SSL) which is sufficient for reading.
     task, receiver = create_kafka_receiver_thread(topic_name, security_type=security_type, mechanism=mechanism)
     task.start()
     verify_kafka_receiver(receiver)
@@ -4799,6 +4813,12 @@ def test_notification_kafka_security_ssl_sasl_scram_512():
     kafka_security('SASL_SSL', mechanism='SCRAM-SHA-512')
 
 
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_ssl_mtls():
+    """test mTLS client certificate authentication to Kafka"""
+    kafka_security('SSL', use_mtls=True)
+
+
 @pytest.mark.http_test
 def test_persistent_reload():
     """ do a realm reload while we send notifications """
@@ -6129,4 +6149,3 @@ def test_kafka_batch_size():
 def test_kafka_batch_size_mismatch():
     """ test that without rgw_kafka_max_batch_size, batched messages exceed the broker limit """
     kafka_batch_size(match_batch_size=False)
-