[&Attributes.entry.16.key=user-name&Attributes.entry.16.value=<user-name-string>]
[&Attributes.entry.17.key=password&Attributes.entry.17.value=<password-string>]
[&Attributes.entry.18.key=kafka-brokers&Attributes.entry.18.value=<kafka-broker-list>]
+ [&Attributes.entry.19.key=ssl-certificate-location&Attributes.entry.19.value=<file path>]
+ [&Attributes.entry.20.key=ssl-key-location&Attributes.entry.20.value=<file path>]
+ [&Attributes.entry.21.key=ssl-key-password&Attributes.entry.21.value=<password-string>]
Request parameters:
- ``kafka-brokers``: A comma-separated list of ``host:port`` of Kafka brokers:
these brokers (may contain a broker which is defined in Kafka URI) will be
added to Kafka URI to support sending notifications to a Kafka cluster.
+ - ``ssl-certificate-location``: The path to a PEM-encoded client certificate
+ file to present to the Kafka broker for mutual TLS (mTLS) authentication.
+ This enables certificate-based client identity and must be used together
+ with ``ssl-key-location`` and ``use-ssl=true``. Specifying only one of
+ ``ssl-certificate-location`` or ``ssl-key-location`` will cause the
+ connection to fail.
+ - ``ssl-key-location``: The path to a PEM-encoded private key file
+ corresponding to the client certificate specified in
+ ``ssl-certificate-location``.
+ - ``ssl-key-password``: The password for the client private key, if the key
+ file is encrypted. This is optional and only required when the private key
+ is password-protected.
+
+ The same security considerations in place for this parameter as
+ for ``user``/``password``: it should be provided over HTTPS or
+ ``rgw_allow_notification_secrets_in_cleartext`` must be set to "true".
.. note::
broker before being delivered to their final destinations.
- ``kafka-brokers``: Set endpoint with broker(s) as a comma-separated list of
``host`` or ``host:port`` (default port 9092).
+- ``ssl-certificate-location``: Path to a PEM-encoded client certificate for mTLS
+ authentication to the Kafka broker. Must be provided together with
+ ``ssl-key-location``; specifying only one will cause the connection to fail.
+- ``ssl-key-location``: Path to a PEM-encoded private key corresponding to the
+ client certificate. Must be provided together with ``ssl-certificate-location``.
+- ``ssl-key-password``: Password for an encrypted private key (optional).
Notifications
~~~~~~~~~~~~~
log = logging.getLogger(__name__)
+KAFKA_PORTS = {
+ 'PLAINTEXT': 9092,
+ 'SSL': 9093,
+ 'SASL_SSL': 9094,
+ 'SASL_PLAINTEXT': 9095,
+ 'MTLS': 9096,
+}
+
def get_kafka_version(config):
for client, client_config in config.items():
if 'kafka_version' in client_config:
ip = remote.ip_address
conf = (
"broker.id=0\n"
- "listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_SSL://0.0.0.0:9094,SASL_PLAINTEXT://0.0.0.0:9095\n"
- "advertised.listeners=PLAINTEXT://{ip}:9092,SSL://{ip}:9093,SASL_SSL://{ip}:9094,SASL_PLAINTEXT://{ip}:9095\n"
- "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT\n"
+ "listeners=PLAINTEXT://0.0.0.0:{plaintext},SSL://0.0.0.0:{ssl},"
+ "SASL_SSL://0.0.0.0:{sasl_ssl},SASL_PLAINTEXT://0.0.0.0:{sasl_plaintext},"
+ "MTLS://0.0.0.0:{mtls}\n"
+ "advertised.listeners=PLAINTEXT://{ip}:{plaintext},SSL://{ip}:{ssl},"
+ "SASL_SSL://{ip}:{sasl_ssl},SASL_PLAINTEXT://{ip}:{sasl_plaintext},"
+ "MTLS://{ip}:{mtls}\n"
+ "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,"
+ "SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,MTLS:SSL\n"
+ "inter.broker.listener.name=PLAINTEXT\n"
"log.dirs={tdir}/data/kafka-logs\n"
"num.network.threads=3\n"
"num.io.threads=8\n"
"zookeeper.connect=localhost:2181\n"
"zookeeper.connection.timeout.ms=18000\n"
"group.initial.rebalance.delay.ms=0\n"
+ # SSL configuration
"ssl.keystore.location={tdir}/server.keystore.jks\n"
"ssl.keystore.password=mypassword\n"
"ssl.key.password=mypassword\n"
"ssl.truststore.location={tdir}/server.truststore.jks\n"
"ssl.truststore.password=mypassword\n"
+ "ssl.client.auth=requested\n"
+ # mTLS listener (port 9096) requires client certificate
+ "listener.name.mtls.ssl.client.auth=required\n"
+ "listener.name.mtls.ssl.keystore.location={tdir}/server.keystore.jks\n"
+ "listener.name.mtls.ssl.keystore.password=mypassword\n"
+ "listener.name.mtls.ssl.key.password=mypassword\n"
+ "listener.name.mtls.ssl.truststore.location={tdir}/server.truststore.jks\n"
+ "listener.name.mtls.ssl.truststore.password=mypassword\n"
+ # SASL mechanisms
"sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512\n"
"sasl.mechanism.inter.broker.protocol=PLAIN\n"
- "inter.broker.listener.name=PLAINTEXT\n"
'listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\\n'
' username="admin" \\\n'
' password="admin-secret" \\\n'
'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n'
' username="admin" \\\n'
' password="admin-secret";\n'
- ).format(tdir=kafka_dir, ip=ip)
+ ).format(
+ tdir=kafka_dir,
+ ip=ip,
+ plaintext=KAFKA_PORTS['PLAINTEXT'],
+ ssl=KAFKA_PORTS['SSL'],
+ sasl_ssl=KAFKA_PORTS['SASL_SSL'],
+ sasl_plaintext=KAFKA_PORTS['SASL_PLAINTEXT'],
+ mtls=KAFKA_PORTS['MTLS'],
+ )
file_name = 'server.properties'
log.info("kafka conf file: %s", file_name)
log.info(conf)
pass
+
@contextlib.contextmanager
def task(ctx,config):
"""
conn_id, _endpoint, get_bool(args, "use-ssl", false),
get_bool(args, "verify-ssl", true), args.get_optional("ca-location"),
args.get_optional("mechanism"), args.get_optional("user-name"),
- args.get_optional("password"), args.get_optional("kafka-brokers"))) {
+ args.get_optional("password"), args.get_optional("kafka-brokers"),
+ args.get_optional("ssl-certificate-location"),
+ args.get_optional("ssl-key-location"),
+ args.get_optional("ssl-key-password"))) {
throw configuration_error("Kafka: failed to create connection to: " +
_endpoint);
}
const boost::optional<const std::string&>& _ca_location,
const boost::optional<const std::string&>& _mechanism,
bool _ssl,
- bool _verify_ssl)
+ bool _verify_ssl,
+ const boost::optional<const std::string&>& _ssl_certificate,
+ const boost::optional<const std::string&>& _ssl_key,
+ const boost::optional<const std::string&>& _ssl_key_password
+ )
: broker(_broker), user(_user), password(_password), ssl(_ssl),
verify_ssl(_verify_ssl) {
+
if (_ca_location.has_value()) {
ca_location = _ca_location.get();
}
if (_mechanism.has_value()) {
mechanism = _mechanism.get();
}
+ if (_ssl_certificate.has_value()) {
+ ssl_certificate = _ssl_certificate.get();
+ }
+ if (_ssl_key.has_value()) {
+ ssl_key = _ssl_key.get();
+ }
+ if (_ssl_key_password.has_value()) {
+ ssl_key_password = _ssl_key_password.get();
+ }
}
// equality operator and hasher functor are needed
return lhs.broker == rhs.broker && lhs.user == rhs.user &&
lhs.password == rhs.password && lhs.ca_location == rhs.ca_location &&
lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl &&
- lhs.verify_ssl == rhs.verify_ssl;
+ lhs.verify_ssl == rhs.verify_ssl &&
+ lhs.ssl_certificate == rhs.ssl_certificate &&
+ lhs.ssl_key == rhs.ssl_key &&
+ lhs.ssl_key_password == rhs.ssl_key_password;
}
struct connection_id_hasher {
boost::hash_combine(h, k.mechanism);
boost::hash_combine(h, k.ssl);
boost::hash_combine(h, k.verify_ssl);
+ boost::hash_combine(h, k.ssl_certificate);
+ boost::hash_combine(h, k.ssl_key);
+ boost::hash_combine(h, k.ssl_key_password);
return h;
}
};
const std::string user;
const std::string password;
const boost::optional<std::string> mechanism;
+ const boost::optional<std::string> ssl_certificate;
+ const boost::optional<std::string> ssl_key;
+ const boost::optional<std::string> ssl_key_password;
utime_t timestamp = ceph_clock_now();
// cleanup of all internal connection resource
// ctor for setting immutable values
connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
const boost::optional<const std::string&>& _ca_location,
- const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism) :
- cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {}
+ const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism,
+ const boost::optional<const std::string&>& _ssl_certificate,
+ const boost::optional<const std::string&>& _ssl_key,
+ const boost::optional<const std::string&>& _ssl_key_password) :
+ cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism),
+ ssl_certificate(_ssl_certificate), ssl_key(_ssl_key), ssl_key_password(_ssl_key_password) {}
// dtor also destroys the internals
~connection_t() {
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
goto conf_error;
}
+ if (conn->ssl_certificate) {
+ if (rd_kafka_conf_set(conf.get(), "ssl.certificate.location", conn->ssl_certificate->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured client certificate location" << dendl;
+ }
+ if (conn->ssl_key) {
+ if (rd_kafka_conf_set(conf.get(), "ssl.key.location", conn->ssl_key->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured client key location" << dendl;
+ }
+ if (conn->ssl_key_password) {
+ if (rd_kafka_conf_set(conf.get(), "ssl.key.password", conn->ssl_key_password->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured client key password" << dendl;
+ }
+ // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
+ // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
} else if (!conn->user.empty()) {
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> topic_user_name,
boost::optional<const std::string&> topic_password,
- boost::optional<const std::string&> brokers) {
+ boost::optional<const std::string&> brokers,
+ boost::optional<const std::string&> ssl_certificate,
+ boost::optional<const std::string&> ssl_key,
+ boost::optional<const std::string&> ssl_key_password) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
return false;
}
+ // ssl_certificate and ssl_key must both be provided for mTLS
+ if (ssl_certificate.has_value() != ssl_key.has_value()) {
+ ldout(cct, 1) << "Kafka connect: both ssl_certificate and ssl_key must be provided for mTLS (got only "
+ << (ssl_certificate.has_value() ? "ssl_certificate" : "ssl_key") << ")" << dendl;
+ return false;
+ }
+
if (brokers.has_value()) {
broker_list.append(",");
broker_list.append(brokers.get());
}
connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
- use_ssl, verify_ssl);
+ use_ssl, verify_ssl, ssl_certificate, ssl_key, ssl_key_password);
std::lock_guard lock(connections_lock);
const auto it = connections.find(tmp_id);
// note that ssl vs. non-ssl connection to the same host are two separate connections
return false;
}
- auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+ auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism, ssl_certificate, ssl_key, ssl_key_password);
if (!new_producer(conn.get())) {
ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
return false;
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
boost::optional<const std::string&> password,
- boost::optional<const std::string&> brokers) {
+ boost::optional<const std::string&> brokers,
+ boost::optional<const std::string&> ssl_certificate,
+ boost::optional<const std::string&> ssl_key,
+ boost::optional<const std::string&> ssl_key_password) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
- mechanism, user_name, password, brokers);
+ mechanism, user_name, password, brokers,
+ ssl_certificate, ssl_key, ssl_key_password);
}
int publish(const connection_id_t& conn_id,
std::string password;
std::string ca_location;
std::string mechanism;
+ std::string ssl_certificate;
+ std::string ssl_key;
+ std::string ssl_key_password;
bool ssl = false;
bool verify_ssl = true;
connection_id_t() = default;
const boost::optional<const std::string&>& _ca_location,
const boost::optional<const std::string&>& _mechanism,
bool _ssl,
- bool _verify_ssl);
+ bool _verify_ssl,
+ const boost::optional<const std::string&>& _ssl_certificate,
+ const boost::optional<const std::string&>& _ssl_key,
+ const boost::optional<const std::string&>& _ssl_key_password);
};
std::string to_string(const connection_id_t& id);
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
boost::optional<const std::string&> password,
- boost::optional<const std::string&> brokers);
+ boost::optional<const std::string&> brokers,
+ boost::optional<const std::string&> ssl_certificate,
+ boost::optional<const std::string&> ssl_key,
+ boost::optional<const std::string&> ssl_key_password);
// publish a message over a connection that was already created
int publish(const connection_id_t& conn_id,
return false;
}
}
+
+ // check for mTLS key password - also a secret that requires secure transport
+ auto ssl_key_password = args.get_optional("ssl-key-password");
+ if (ssl_key_password.has_value() && !ssl_key_password->empty()) {
+ dest.stored_secret = true;
+ if (!verify_transport_security(cct, *ri.env)) {
+ message = "Topic contains secrets that must be transmitted over a secure transport";
+ return false;
+ }
+ }
+
return true;
}
static constexpr std::initializer_list<const char*> args = {
"verify-ssl", "use-ssl", "ca-location", "amqp-ack-level",
"amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents",
- "user-name", "password"};
+ "user-name", "password",
+ "ssl-certificate-location", "ssl-key-location", "ssl-key-password"};
if (std::find(args.begin(), args.end(), attribute_name) != args.end()) {
replace_str(attribute_name, s->info.args.get("AttributeValue"));
return 0;
**Important:** If you face any initialization failures, replace ``localhost`` in both ``listeners`` and ``advertised.listeners`` with your Kafka broker's actual hostname or IP address.
For example, if your Kafka broker runs on host ``kafka-server.example.com`` or IP ``192.168.1.100``, use::
- listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
- advertised.listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
+ listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095,MTLS://192.168.1.100:9096
+ advertised.listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095,MTLS://192.168.1.100:9096
If both ``listeners`` and ``advertised.listeners`` do not match, the broker cannot connect to itself, causing initialization failures.
- # All listeners
- listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
- advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
- listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT
+ # All listeners (including MTLS on port 9096)
+ listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095,MTLS://localhost:9096
+ advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095,MTLS://localhost:9096
+ listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,MTLS:SSL
# SSL configuration matching the kafka-security.sh script
ssl.keystore.location=./server.keystore.jks
ssl.truststore.location=./server.truststore.jks
ssl.truststore.password=mypassword
+ # Allow optional client certificates on the main SSL listener (port 9093).
+ # Use "requested" so that plain SSL tests (without client certs) still work,
+ # while mTLS tests use a separate listener with ssl.client.auth=required.
+ ssl.client.auth=requested
+
+ # mTLS listener: requires client certificate authentication
+ listener.name.mtls.ssl.client.auth=required
+ listener.name.mtls.ssl.keystore.location=./server.keystore.jks
+ listener.name.mtls.ssl.keystore.password=mypassword
+ listener.name.mtls.ssl.key.password=mypassword
+ listener.name.mtls.ssl.truststore.location=./server.truststore.jks
+ listener.name.mtls.ssl.truststore.password=mypassword
+
# SASL mechanisms
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=PLAIN
rm -f $CAFILE
rm -f $REQFILE
rm -f $CERTFILE
+rm -f $CLIENT_KEYFILE
+rm -f $CLIENT_CERTFILE
+rm -f $CLIENT_REQFILE
SAN_STRING="DNS:$FQDN"
if [ -n "$IP_SAN" ]; then
openssl req -new -nodes -x509 -keyout $CAKEYFILE -out $CAFILE \
-days $VALIDITY -subj \
'/C=US/ST=Michigan/L=Ann Arbor/O=Red Hat Inc/OU=Michigan Engineering/CN=yuval-1'
+chmod 644 $CAKEYFILE
echo "########## store the CA in trust store '$TRUSTFILE'"
keytool -keystore $TRUSTFILE -storepass $MYPW -alias CARoot \
keytool -storepass $MYPW -keystore $KEYFILE -alias localhost \
-import -file $CERTFILE
+echo "########## generate client certificate for mTLS testing"
+CLIENT_KEYFILE=client.key
+CLIENT_CERTFILE=client.crt
+CLIENT_REQFILE=client.req
+
+# generate client private key (PKCS#8 for compatibility)
+openssl genpkey -algorithm RSA -out $CLIENT_KEYFILE -pkeyopt rsa_keygen_bits:2048
+chmod 644 $CLIENT_KEYFILE
+
+# generate client CSR
+openssl req -new -key $CLIENT_KEYFILE -out $CLIENT_REQFILE \
+ -subj '/CN=rgw-client/OU=Testing/O=Ceph/C=US'
+
+# sign client cert with our CA
+openssl x509 -req -CA $CAFILE -CAkey $CAKEYFILE -CAcreateserial \
+ -days $VALIDITY -in $CLIENT_REQFILE -out $CLIENT_CERTFILE -sha256
+
+rm -f $CLIENT_REQFILE
+
+echo "########## import client CA into truststore (so broker trusts client certs)"
+keytool -keystore $TRUSTFILE -storepass $MYPW -alias ClientCA \
+ -noprompt -importcert -file $CAFILE 2>/dev/null || true
+
return
kafka_configs = os.path.join(kafka_dir, 'bin/kafka-configs.sh')
+ kafka_configs_no_ext = os.path.join(kafka_dir, 'bin/kafka-configs')
if not os.path.exists(kafka_configs):
- log.warning(f"kafka-configs.sh not found at {kafka_configs}")
- return
-
- scram_mechanism = 'SCRAM-SHA-512' if 'SHA-512' in mechanism else 'SCRAM-SHA-256'
- zk_connect = 'localhost:2181'
-
- try:
- # delete existing SCRAM credentials first
- subprocess.run(
- [kafka_configs,
- '--zookeeper', zk_connect,
- '--alter',
- '--entity-type', 'users',
- '--entity-name', KAFKA_TEST_USER,
- '--delete-config', 'scram-sha-256,scram-sha-512'],
- capture_output=True,
- timeout=15,
- check=False
- )
- time.sleep(1)
-
- # adding SCRAM credentials
- add_config_value = f'{scram_mechanism}=[password={KAFKA_TEST_PASSWORD}]'
- result = subprocess.run(
- [kafka_configs,
- '--zookeeper', zk_connect,
- '--alter',
- '--entity-type', 'users',
- '--entity-name', KAFKA_TEST_USER,
- '--add-config', add_config_value],
+ if os.path.exists(kafka_configs_no_ext):
+ kafka_configs = kafka_configs_no_ext
+ else:
+ raise RuntimeError(
+ f"kafka-configs not found under KAFKA_DIR={kafka_dir}. "
+ "Expected bin/kafka-configs.sh or bin/kafka-configs"
+ )
+
+ base_cmd = [kafka_configs]
+
+ def run_kafka_configs(args):
+ return subprocess.run(
+ base_cmd + args,
capture_output=True,
text=True,
- timeout=15,
- check=False
+ timeout=30,
+ check=False,
)
-
- if result.returncode == 0:
- log.info(f"SCRAM user configured: {KAFKA_TEST_USER} ({scram_mechanism})")
- else:
- raise RuntimeError(f"Failed to create SCRAM user {KAFKA_TEST_USER} with {scram_mechanism}")
+
+ try:
+ # Idempotently ensure both SCRAM mechanisms exist for the test user.
+ # This avoids cross-test credential drift between SHA-256 and SHA-512 tests.
+ bootstrap_server = f'{default_kafka_server}:9092'
+ for scram_mechanism in ('SCRAM-SHA-256', 'SCRAM-SHA-512'):
+ add_config_value = f'{scram_mechanism}=[password={KAFKA_TEST_PASSWORD}]'
+ result = run_kafka_configs([
+ '--bootstrap-server', bootstrap_server,
+ '--alter',
+ '--entity-type', 'users',
+ '--entity-name', KAFKA_TEST_USER,
+ '--add-config', add_config_value,
+ ])
+ if result.returncode != 0:
+ raise RuntimeError(
+ f"Failed to create/update SCRAM credentials for {KAFKA_TEST_USER} ({scram_mechanism}): {result.stderr.strip()}"
+ )
+
+ describe = run_kafka_configs([
+ '--bootstrap-server', bootstrap_server,
+ '--describe',
+ '--entity-type', 'users',
+ '--entity-name', KAFKA_TEST_USER,
+ ])
+ if describe.returncode != 0:
+ raise RuntimeError(f"Failed to verify SCRAM user {KAFKA_TEST_USER}: {describe.stderr.strip()}")
+ if ('SCRAM-SHA-256' not in describe.stdout) or ('SCRAM-SHA-512' not in describe.stdout):
+ raise RuntimeError(
+ f"SCRAM verification missing mechanisms for {KAFKA_TEST_USER}. output: {describe.stdout.strip()}"
+ )
+
+ log.info(f"SCRAM user configured idempotently: {KAFKA_TEST_USER} (SCRAM-SHA-256,SCRAM-SHA-512)")
except Exception as e:
log.error(f"Failed to setup SCRAM users via kafka-configs: {e}")
raise
def _kafka_ca_cert_path():
- kafka_dir = os.environ.get('KAFKA_DIR')
+ kafka_dir = os.environ.get('KAFKA_CERT_DIR') or os.environ.get('KAFKA_DIR')
if kafka_dir:
ca_path = os.path.join(kafka_dir, 'y-ca.crt')
if os.path.exists(ca_path):
return ca_path
return None
+
+def _kafka_cert_dir():
+ return os.environ.get('KAFKA_CERT_DIR') or os.environ.get('KAFKA_DIR', '/opt/kafka')
+
class KafkaReceiver(object):
"""class for receiving and storing messages on a topic from the kafka broker"""
def __init__(self, topic_name, security_type, kafka_server, mechanism='PLAIN'):
base_config['security_protocol'] = 'SSL'
if ca_cert:
base_config['ssl_cafile'] = ca_cert
- kafka_dir = os.environ.get('KAFKA_DIR', '/opt/kafka')
- client_cert = os.path.join(kafka_dir, 'config/client.crt')
- client_key = os.path.join(kafka_dir, 'config/client.key')
+ kafka_dir = _kafka_cert_dir()
+ client_cert = os.path.join(kafka_dir, 'client.crt')
+ client_key = os.path.join(kafka_dir, 'client.key')
if os.path.exists(client_cert) and os.path.exists(client_key):
base_config['ssl_certfile'] = client_cert
base_config['ssl_keyfile'] = client_key
task.start()
endpoint_address = 'amqp://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable&persistent=true'
- elif endpoint_type == 'kafka':
- # start kafka receiver
- task, receiver = create_kafka_receiver_thread(topic_name)
- task.start()
- verify_kafka_receiver(receiver)
- endpoint_address = 'kafka://' + host
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
else:
pytest.skip('Unknown endpoint type: ' + endpoint_type)
conn.delete_bucket(bucket_name)
-@pytest.mark.kafka_test
-def test_metadata_filter_kafka():
- """ test notification of filtering metadata, kafka endpoint """
- conn = connection()
- metadata_filter('kafka', conn)
-
@pytest.mark.http_test
def test_metadata_filter_http():
def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False,
- verify_ssl=True, include_ca_location=True):
+ verify_ssl=True, include_ca_location=True, use_mtls=False):
""" test pushing kafka notification securly to master """
# Setup SCRAM users if needed
if mechanism.startswith('SCRAM'):
else:
endpoint_address = 'kafka://' + default_kafka_server + ':9094'
elif security_type == 'SSL':
- endpoint_address = 'kafka://' + default_kafka_server + ':9093'
+ if use_mtls:
+ endpoint_address = 'kafka://' + default_kafka_server + ':9096'
+ else:
+ endpoint_address = 'kafka://' + default_kafka_server + ':9093'
elif security_type == 'SASL_PLAINTEXT':
endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9095'
else:
if security_type == 'SASL_PLAINTEXT':
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=false&mechanism='+mechanism
elif security_type == 'SASL_SSL':
- KAFKA_DIR = os.environ['KAFKA_DIR']
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&mechanism='+mechanism
- if include_ca_location:
- endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
+ kafka_cert_dir = _kafka_cert_dir()
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+kafka_cert_dir+'/y-ca.crt&mechanism='+mechanism
if use_topic_attrs_for_creds:
endpoint_args += '&user-name=alice&password=alice-secret'
else:
- KAFKA_DIR = os.environ['KAFKA_DIR']
+ kafka_cert_dir = _kafka_cert_dir()
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true'
if include_ca_location:
- endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
-
- if security_type in ('SSL', 'SASL_SSL') and not verify_ssl:
- endpoint_args += '&verify-ssl=false'
+ endpoint_args += '&ca-location='+kafka_cert_dir+'/y-ca.crt'
+ if not verify_ssl:
+ endpoint_args += '&verify-ssl=false'
+ if use_mtls:
+ ssl_cert_path = os.path.join(kafka_cert_dir, 'client.crt')
+ ssl_key_path = os.path.join(kafka_cert_dir, 'client.key')
+ assert os.path.isfile(ssl_cert_path), \
+ f'mTLS client certificate not found: {ssl_cert_path}'
+ assert os.path.isfile(ssl_key_path), \
+ f'mTLS client key not found: {ssl_key_path}'
+ endpoint_args += '&ssl-certificate-location=' + ssl_cert_path
+ endpoint_args += '&ssl-key-location=' + ssl_key_path
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
# create consumer on the topic
+ # When use_mtls=True, RGW produces to port 9096 (ssl.client.auth=required) to enforce mTLS.
+ # The test consumer connects to port 9093 (SSL) which is sufficient for reading.
task, receiver = create_kafka_receiver_thread(topic_name, security_type=security_type, mechanism=mechanism)
task.start()
verify_kafka_receiver(receiver)
kafka_security('SASL_SSL', mechanism='SCRAM-SHA-512')
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_ssl_mtls():
+ """test mTLS client certificate authentication to Kafka"""
+ kafka_security('SSL', use_mtls=True)
+
+
@pytest.mark.http_test
def test_persistent_reload():
""" do a realm reload while we send notifications """
def test_kafka_batch_size_mismatch():
""" test that without rgw_kafka_max_batch_size, batched messages exceed the broker limit """
kafka_batch_size(match_batch_size=False)
-