From: sujay-d07 Date: Sat, 21 Feb 2026 11:16:07 +0000 (+0530) Subject: qa/tasks: add Kafka SSL and SASL security testing support X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=06fda9d94549fe8bee1b500d3054754f6d307fdc;p=ceph.git qa/tasks: add Kafka SSL and SASL security testing support Add comprehensive SSL and SASL authentication support for Kafka bucket notification tests, including SCRAM-SHA-256/512 mechanisms and multiple listener protocols. Signed-off-by: sujay-d07 --- diff --git a/qa/suites/rgw/notifications/overrides.yaml b/qa/suites/rgw/notifications/overrides.yaml index ee16ab94573..b5d6e94ba6c 100644 --- a/qa/suites/rgw/notifications/overrides.yaml +++ b/qa/suites/rgw/notifications/overrides.yaml @@ -10,6 +10,7 @@ overrides: rgw crypt s3 kms backend: testing rgw crypt s3 kms encryption keys: testkey-1=YmluCmJvb3N0CmJvb3N0LWJ1aWxkCmNlcGguY29uZgo= testkey-2=aWIKTWFrZWZpbGUKbWFuCm91dApzcmMKVGVzdGluZwo= rgw crypt require ssl: false + rgw allow notification secrets in cleartext: true rgw: realm: MyRealm zonegroup: MyZoneGroup diff --git a/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml index 4407cd3eacc..d477f095450 100644 --- a/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml +++ b/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml @@ -4,5 +4,5 @@ tasks: kafka_version: 3.8.1 - notification-tests: client.0: - extra_attr: ["kafka_test"] + extra_attr: ["kafka_test", "kafka_security_test"] rgw_server: client.0 diff --git a/qa/tasks/kafka.py b/qa/tasks/kafka.py index 833f03babf6..c87cd06c1c8 100644 --- a/qa/tasks/kafka.py +++ b/qa/tasks/kafka.py @@ -58,6 +58,38 @@ def install_kafka(ctx, config): args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file], ) + kafka_dir = get_kafka_dir(ctx, config) + + script_src = os.path.join( + os.path.dirname(__file__), + '..', '..', 'src', 'test', 'rgw', 'bucket_notification', 'kafka-security.sh', + ) + with open(script_src, 'r') as f: + kafka_security_script = f.read() + + script_path = '{tdir}/kafka-security.sh'.format(tdir=kafka_dir) + remote.write_file( + path=script_path, + data=kafka_security_script.encode(), + ) + + # running kafka-security.sh from the kafka dir so certs are generated there + ctx.cluster.only(client).run( + args=[ + 'cd', kafka_dir, run.Raw('&&'), + 'env', + 'KAFKA_CERT_HOSTNAME={ip}'.format(ip=remote.ip_address), + 'KAFKA_CERT_IP={ip}'.format(ip=remote.ip_address), + 'bash', script_path, + ], + ) + + ctx.cluster.only(client).run( + args=['sudo', 'chmod', 'o+rx', '/home/ubuntu'], + ) + + broker_conf(ctx, client, kafka_dir) + try: yield finally: @@ -78,6 +110,77 @@ def install_kafka(ctx, config): ) +def broker_conf(ctx, client, kafka_dir): + """writing a custom server.properties config file""" + (remote,) = ctx.cluster.only(client).remotes.keys() + ip = remote.ip_address + conf = ( + "broker.id=0\n" + "listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_SSL://0.0.0.0:9094,SASL_PLAINTEXT://0.0.0.0:9095\n" + "advertised.listeners=PLAINTEXT://{ip}:9092,SSL://{ip}:9093,SASL_SSL://{ip}:9094,SASL_PLAINTEXT://{ip}:9095\n" + "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT\n" + "log.dirs={tdir}/data/kafka-logs\n" + "num.network.threads=3\n" + "num.io.threads=8\n" + "socket.send.buffer.bytes=102400\n" + "socket.receive.buffer.bytes=102400\n" + "socket.request.max.bytes=104857600\n" + "num.partitions=1\n" + "num.recovery.threads.per.data.dir=1\n" + "offsets.topic.replication.factor=1\n" + "transaction.state.log.replication.factor=1\n" + "transaction.state.log.min.isr=1\n" + "log.retention.hours=168\n" + "log.segment.bytes=1073741824\n" + "log.retention.check.interval.ms=300000\n" + "zookeeper.connect=localhost:2181\n" + "zookeeper.connection.timeout.ms=18000\n" + "group.initial.rebalance.delay.ms=0\n" + "ssl.keystore.location={tdir}/server.keystore.jks\n" + "ssl.keystore.password=mypassword\n" + "ssl.key.password=mypassword\n" + "ssl.truststore.location={tdir}/server.truststore.jks\n" + "ssl.truststore.password=mypassword\n" + "sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512\n" + "sasl.mechanism.inter.broker.protocol=PLAIN\n" + "inter.broker.listener.name=PLAINTEXT\n" + 'listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\\n' + ' username="admin" \\\n' + ' password="admin-secret" \\\n' + ' user_alice="alice-secret";\n' + 'listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n' + ' username="admin" \\\n' + ' password="admin-secret";\n' + 'listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n' + ' username="admin" \\\n' + ' password="admin-secret";\n' + 'listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\\n' + ' username="admin" \\\n' + ' password="admin-secret" \\\n' + ' user_alice="alice-secret";\n' + 'listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n' + ' username="admin" \\\n' + ' password="admin-secret";\n' + 'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n' + ' username="admin" \\\n' + ' password="admin-secret";\n' + ).format(tdir=kafka_dir, ip=ip) + file_name = 'server.properties' + log.info("kafka conf file: %s", file_name) + log.info(conf) + ctx.cluster.only(client).run( + args=[ + 'cd', kafka_dir, run.Raw('&&'), + 'mkdir', '-p', 'config', run.Raw('&&'), + 'mkdir', '-p', 'data', + ], + ) + remote.write_file( + path='{tdir}/config/{file_name}'.format(tdir=kafka_dir, file_name=file_name), + data=conf.encode(), + ) + + @contextlib.contextmanager def run_kafka(ctx,config): """ @@ -117,7 +220,7 @@ def run_kafka(ctx,config): ctx.cluster.only(client).run( args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), './kafka-server-stop.sh', - '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)), + '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)), ], ) @@ -149,7 +252,7 @@ def run_admin_cmds(ctx,config): args=[ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), './kafka-topics.sh', '--create', '--topic', 'quickstart-events', - '--bootstrap-server', 'localhost:9092' + '--bootstrap-server', '{ip}:9092'.format(ip=remote.ip_address) ], ) @@ -158,7 +261,7 @@ def run_admin_cmds(ctx,config): 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 'echo', "First", run.Raw('|'), './kafka-console-producer.sh', '--topic', 'quickstart-events', - '--bootstrap-server', 'localhost:9092' + '--bootstrap-server', '{ip}:9092'.format(ip=remote.ip_address) ], ) @@ -167,7 +270,7 @@ def run_admin_cmds(ctx,config): 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), './kafka-console-consumer.sh', '--topic', 'quickstart-events', '--from-beginning', - '--bootstrap-server', 'localhost:9092', + '--bootstrap-server', '{ip}:9092'.format(ip=remote.ip_address), run.Raw('&'), 'exit' ], ) @@ -198,6 +301,8 @@ def task(ctx,config): if isinstance(config, list): config = dict.fromkeys(config) + ctx.kafka_dir = get_kafka_dir(ctx, config) + log.debug('Kafka config is %s', config) with contextutil.nested( diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index a4b0fab97ad..cd1b4752c3b 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -227,13 +227,20 @@ def run_tests(ctx, config): args = [ 'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client), + ] + + kafka_dir = getattr(ctx, 'kafka_dir', None) + if kafka_dir: + args.append('KAFKA_DIR={kafka_dir}'.format(kafka_dir=kafka_dir)) + + args.extend([ '{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir), '-m', 'pytest', '-s', '{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir), '-v', '-m', ' or '.join(markers), - ] + ]) remote.run( args=args,