]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/tasks: add Kafka SSL and SASL security testing support 67158/head
authorsujay-d07 <sujaydongre07@gmail.com>
Sat, 21 Feb 2026 11:16:07 +0000 (16:46 +0530)
committersujay-d07 <sujaydongre07@gmail.com>
Wed, 25 Feb 2026 19:54:33 +0000 (01:24 +0530)
Add comprehensive SSL and SASL authentication support for Kafka bucket
notification tests, including SCRAM-SHA-256/512 mechanisms and multiple
listener protocols.

Signed-off-by: sujay-d07 <sujaydongre07@gmail.com>
qa/suites/rgw/notifications/overrides.yaml
qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml
qa/tasks/kafka.py
qa/tasks/notification_tests.py

index ee16ab9457348e06dd243f67b7b2ccd3a1b7b5e6..b5d6e94ba6cd572e13c51466cffe929ecf1becd4 100644 (file)
@@ -10,6 +10,7 @@ overrides:
         rgw crypt s3 kms backend: testing
         rgw crypt s3 kms encryption keys: testkey-1=YmluCmJvb3N0CmJvb3N0LWJ1aWxkCmNlcGguY29uZgo= testkey-2=aWIKTWFrZWZpbGUKbWFuCm91dApzcmMKVGVzdGluZwo=
         rgw crypt require ssl: false
+        rgw allow notification secrets in cleartext: true
   rgw:
     realm: MyRealm
     zonegroup: MyZoneGroup
index 4407cd3eaccc3858de213c749cd3b7579c644935..d477f0954507690e06577e6bf0f7e0533176ea6a 100644 (file)
@@ -4,5 +4,5 @@ tasks:
       kafka_version: 3.8.1
 - notification-tests:
     client.0:
-      extra_attr: ["kafka_test"]
+      extra_attr: ["kafka_test", "kafka_security_test"]
       rgw_server: client.0
index 833f03babf6907f678a14124de4ff369bf41847a..c87cd06c1c87f4cb4d5143d578f54ac064642c54 100644 (file)
@@ -58,6 +58,38 @@ def install_kafka(ctx, config):
             args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file],
         )
 
+        kafka_dir = get_kafka_dir(ctx, config)
+
+        script_src = os.path.join(
+            os.path.dirname(__file__),
+            '..', '..', 'src', 'test', 'rgw', 'bucket_notification', 'kafka-security.sh',
+        )
+        with open(script_src, 'r') as f:
+            kafka_security_script = f.read()
+
+        script_path = '{tdir}/kafka-security.sh'.format(tdir=kafka_dir)
+        remote.write_file(
+            path=script_path,
+            data=kafka_security_script.encode(),
+        )
+
+        # running kafka-security.sh from the kafka dir so certs are generated there
+        ctx.cluster.only(client).run(
+            args=[
+                'cd', kafka_dir, run.Raw('&&'),
+                'env',
+                'KAFKA_CERT_HOSTNAME={ip}'.format(ip=remote.ip_address),
+                'KAFKA_CERT_IP={ip}'.format(ip=remote.ip_address),
+                'bash', script_path,
+            ],
+        )
+
+        ctx.cluster.only(client).run(
+            args=['sudo', 'chmod', 'o+rx', '/home/ubuntu'],
+        )
+
+        broker_conf(ctx, client, kafka_dir)
+
     try:
         yield
     finally:
@@ -78,6 +110,77 @@ def install_kafka(ctx, config):
             )
 
 
+def broker_conf(ctx, client, kafka_dir):
+    """writing a custom server.properties config file"""
+    (remote,) = ctx.cluster.only(client).remotes.keys()
+    ip = remote.ip_address
+    conf = (
+        "broker.id=0\n"
+        "listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_SSL://0.0.0.0:9094,SASL_PLAINTEXT://0.0.0.0:9095\n"
+        "advertised.listeners=PLAINTEXT://{ip}:9092,SSL://{ip}:9093,SASL_SSL://{ip}:9094,SASL_PLAINTEXT://{ip}:9095\n"
+        "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT\n"
+        "log.dirs={tdir}/data/kafka-logs\n"
+        "num.network.threads=3\n"
+        "num.io.threads=8\n"
+        "socket.send.buffer.bytes=102400\n"
+        "socket.receive.buffer.bytes=102400\n"
+        "socket.request.max.bytes=104857600\n"
+        "num.partitions=1\n"
+        "num.recovery.threads.per.data.dir=1\n"
+        "offsets.topic.replication.factor=1\n"
+        "transaction.state.log.replication.factor=1\n"
+        "transaction.state.log.min.isr=1\n"
+        "log.retention.hours=168\n"
+        "log.segment.bytes=1073741824\n"
+        "log.retention.check.interval.ms=300000\n"
+        "zookeeper.connect=localhost:2181\n"
+        "zookeeper.connection.timeout.ms=18000\n"
+        "group.initial.rebalance.delay.ms=0\n"
+        "ssl.keystore.location={tdir}/server.keystore.jks\n"
+        "ssl.keystore.password=mypassword\n"
+        "ssl.key.password=mypassword\n"
+        "ssl.truststore.location={tdir}/server.truststore.jks\n"
+        "ssl.truststore.password=mypassword\n"
+        "sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512\n"
+        "sasl.mechanism.inter.broker.protocol=PLAIN\n"
+        "inter.broker.listener.name=PLAINTEXT\n"
+        'listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\\n'
+        '  username="admin" \\\n'
+        '  password="admin-secret" \\\n'
+        '  user_alice="alice-secret";\n'
+        'listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n'
+        '  username="admin" \\\n'
+        '  password="admin-secret";\n'
+        'listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n'
+        '  username="admin" \\\n'
+        '  password="admin-secret";\n'
+        'listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\\n'
+        '  username="admin" \\\n'
+        '  password="admin-secret" \\\n'
+        '  user_alice="alice-secret";\n'
+        'listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n'
+        '  username="admin" \\\n'
+        '  password="admin-secret";\n'
+        'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\\n'
+        '  username="admin" \\\n'
+        '  password="admin-secret";\n'
+    ).format(tdir=kafka_dir, ip=ip)
+    file_name = 'server.properties'
+    log.info("kafka conf file: %s", file_name)
+    log.info(conf)
+    ctx.cluster.only(client).run(
+        args=[
+            'cd', kafka_dir, run.Raw('&&'),
+            'mkdir', '-p', 'config', run.Raw('&&'),
+            'mkdir', '-p', 'data',
+        ],
+    )
+    remote.write_file(
+        path='{tdir}/config/{file_name}'.format(tdir=kafka_dir, file_name=file_name),
+        data=conf.encode(),
+    )
+
+
 @contextlib.contextmanager
 def run_kafka(ctx,config):
     """
@@ -117,7 +220,7 @@ def run_kafka(ctx,config):
             ctx.cluster.only(client).run(
                 args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                  './kafka-server-stop.sh',  
-                 '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+                 '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
                 ],
             )
 
@@ -149,7 +252,7 @@ def run_admin_cmds(ctx,config):
             args=[
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
                 './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
-                '--bootstrap-server', 'localhost:9092'
+                '--bootstrap-server', '{ip}:9092'.format(ip=remote.ip_address)
             ],
         )
 
@@ -158,7 +261,7 @@ def run_admin_cmds(ctx,config):
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                 'echo', "First", run.Raw('|'),
                 './kafka-console-producer.sh', '--topic', 'quickstart-events',
-                '--bootstrap-server', 'localhost:9092'
+                '--bootstrap-server', '{ip}:9092'.format(ip=remote.ip_address)
             ],
         )
 
@@ -167,7 +270,7 @@ def run_admin_cmds(ctx,config):
                 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
                 './kafka-console-consumer.sh', '--topic', 'quickstart-events',
                 '--from-beginning',
-                '--bootstrap-server', 'localhost:9092',
+                '--bootstrap-server', '{ip}:9092'.format(ip=remote.ip_address),
                 run.Raw('&'), 'exit'
             ],
         )
@@ -198,6 +301,8 @@ def task(ctx,config):
     if isinstance(config, list):
         config = dict.fromkeys(config)
 
+    ctx.kafka_dir = get_kafka_dir(ctx, config)
+
     log.debug('Kafka config is %s', config)
 
     with contextutil.nested(
index a4b0fab97ad5a3386796d7c34fc403c5ef8282fd..cd1b4752c3bc956c27f2ee113081e825f4f48611 100644 (file)
@@ -227,13 +227,20 @@ def run_tests(ctx, config):
 
         args = [
             'BNTESTS_CONF={tdir}/ceph/src/test/rgw/bucket_notification/bn-tests.{client}.conf'.format(tdir=testdir, client=client),
+            ]
+
+        kafka_dir = getattr(ctx, 'kafka_dir', None)
+        if kafka_dir:
+            args.append('KAFKA_DIR={kafka_dir}'.format(kafka_dir=kafka_dir))
+
+        args.extend([
             '{tdir}/ceph/src/test/rgw/bucket_notification/virtualenv/bin/python'.format(tdir=testdir),
             '-m', 'pytest',
             '-s',
             '{tdir}/ceph/src/test/rgw/bucket_notification/test_bn.py'.format(tdir=testdir),
             '-v',
             '-m', ' or '.join(markers),
-            ]
+            ])
 
         remote.run(
             args=args,