]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/bucket_notification: enhance Kafka security testing
authorsujay-d07 <sujaydongre07@gmail.com>
Sun, 1 Feb 2026 22:57:26 +0000 (04:27 +0530)
committersujay-d07 <sujaydongre07@gmail.com>
Wed, 25 Feb 2026 19:54:33 +0000 (01:24 +0530)
Add support for SCRAM-SHA-512, dynamic hostname/IP configuration for SSL
certificates, and improved SASL mechanism handling in Kafka security tests.

Changes:
- kafka-security.sh: Support KAFKA_CERT_HOSTNAME and KAFKA_CERT_IP environment
  variables for dynamic certificate generation with proper SAN fields
- test_bn.py: Add SCRAM user setup, support SASL_SSL/SASL_PLAINTEXT protocols,
  add SCRAM-SHA-512 test cases
- README.rst: Update with step-by-step Kafka configuration instructions
- requirements.txt: Replace nose-py3 with pynose, add requests>=2.28.0

Signed-off-by: sujay-d07 <sujaydongre07@gmail.com>
src/test/rgw/bucket_notification/README.rst
src/test/rgw/bucket_notification/kafka-security.sh
src/test/rgw/bucket_notification/requirements.txt
src/test/rgw/bucket_notification/test_bn.py

index db58e8cfcc7b1ff8819729257cbaf4bc4dcbdf28..8d94c79df7d17f6e826ab727eaeffeb0af6fc1ed 100644 (file)
@@ -63,20 +63,29 @@ After running `vstart.sh`, Zookeeper, and Kafka services you're ready to run the
 Kafka Security Tests
 --------------------
 
-First, make sure that vstart was initiated with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``::
+1. First generate SSL certificates::
 
-        MON=1 OSD=1 MDS=0 MGR=1 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true"
+        cd /path/to/kafka/
+        KAFKA_CERT_HOSTNAME=192.168.1.100 KAFKA_CERT_IP=192.168.1.100 bash /path/to/ceph/src/test/rgw/bucket_notification/kafka-security.sh
+   
+   Replace ``192.168.1.100`` with your actual Kafka broker's IP address or hostname.
 
-Then you should run the ``kafka-security.sh`` script inside the Kafka directory::
+2. Configure Kafka::
 
-        cd /path/to/kafka/
-        /path/to/ceph/src/test/rgw/bucket_notification/kafka-security.sh
+   Then edit ``/path/to/kafka/config/server.properties`` with complete configuration::
 
-Then make sure the Kafka server properties file (``/path/to/kafka/config/server.properties``) has the following lines::
+   **Important:** If you face any initialization failures, replace ``localhost`` in both ``listeners`` and ``advertised.listeners`` with your Kafka broker's actual hostname or IP address.
+   For example, if your Kafka broker runs on host ``kafka-server.example.com`` or IP ``192.168.1.100``, use::
 
+   listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
+   advertised.listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
 
-        # all listeners
+   If both ``listeners`` and ``advertised.listeners`` do not match, the broker cannot connect to itself, causing initialization failures.
+
+        # All listeners
         listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
+        advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
+        listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT
 
         # SSL configuration matching the kafka-security.sh script
         ssl.keystore.location=./server.keystore.jks
@@ -86,41 +95,53 @@ Then make sure the Kafka server properties file (``/path/to/kafka/config/server.
         ssl.truststore.password=mypassword
 
         # SASL mechanisms
-        sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
-
-        # SASL over SSL with SCRAM-SHA-256 mechanism
-        listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
-          username="alice" \
-          password="alice-secret" \
-          user_alice="alice-secret";
+        sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+        sasl.mechanism.inter.broker.protocol=PLAIN
+        inter.broker.listener.name=PLAINTEXT
 
         # SASL over SSL with PLAIN mechanism
         listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
-          username="alice" \
-          password="alice-secret" \
-          user_alice="alice-secret";
+        username="admin" \
+        password="admin-secret" \
+        user_alice="alice-secret";
 
-        # PLAINTEXT SASL with SCRAM-SHA-256 mechanism
-        listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
-          username="alice" \
-          password="alice-secret" \
-          user_alice="alice-secret";
+        # SASL over SSL with SCRAM-SHA-256 mechanism
+        listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+        username="admin" \
+        password="admin-secret";
+
+        # SASL over SSL with SCRAM-SHA-512 mechanism
+        listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+        username="admin" \
+        password="admin-secret";
 
         # PLAINTEXT SASL with PLAIN mechanism
         listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
-          username="alice" \
-          password="alice-secret" \
-          user_alice="alice-secret";
+        username="admin" \
+        password="admin-secret" \
+        user_alice="alice-secret";
+
+        # PLAINTEXT SASL with SCRAM-SHA-256 mechanism
+        listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+        username="admin" \
+        password="admin-secret";
 
+        # PLAINTEXT SASL with SCRAM-SHA-512 mechanism
+        listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+        username="admin" \
+        password="admin-secret";
 
-And restart the Kafka server. Once both Zookeeper and Kafka are up, run the following command (for the SASL SCRAM test) from the Kafka directory::
+3. Start Zookeeper and Kafka.
 
-        bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
+4. Start RGW vstart cluster with cleartext parameter set to true::
 
+        cd /path/to/ceph/build
+        MON=1 OSD=1 MDS=0 MGR=0 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true"
 
-To run the Kafka security test, you also need to provide the test with the location of the Kafka directory::
+5. Run the tests::
 
-        KAFKA_DIR=/path/to/kafka BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test'
+        cd /path/to/ceph
+        KAFKA_DIR=/path/to/kafka BNTESTS_CONF=/path/to/bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test'
 
 ==============
 RabbitMQ Tests
index 6c6f3e2614e04798a5d8e435628defb5d32bf1c5..020e784585d55f304d9215abd1aa03c2a7c25279 100755 (executable)
@@ -1,4 +1,5 @@
-FQDN=localhost
+FQDN=${KAFKA_CERT_HOSTNAME:-localhost}
+IP_SAN=${KAFKA_CERT_IP:-}
 KEYFILE=server.keystore.jks
 TRUSTFILE=server.truststore.jks
 CAFILE=y-ca.crt
@@ -14,12 +15,18 @@ rm -f $CAFILE
 rm -f $REQFILE
 rm -f $CERTFILE
 
-echo "########## create the request in key store '$KEYFILE'"
+SAN_STRING="DNS:$FQDN"
+if [ -n "$IP_SAN" ]; then
+  SAN_STRING="$SAN_STRING,IP:$IP_SAN"
+fi
+
+echo "########## create the request in key store '$KEYFILE' with SAN=$SAN_STRING"
 keytool -keystore $KEYFILE -alias localhost \
   -dname "CN=$FQDN, OU=Michigan Engineering, O=Red Hat Inc, \
   L=Ann Arbor, ST=Michigan, C=US" \
   -storepass $MYPW -keypass $MYPW \
-  -validity $VALIDITY -genkey -keyalg RSA -ext SAN=DNS:"$FQDN"
+  -validity $VALIDITY -genkey -keyalg RSA \
+  -ext SAN=$SAN_STRING
 
 echo "########## create the CA '$CAFILE'"
 openssl req -new -nodes -x509 -keyout $CAKEYFILE -out $CAFILE \
@@ -34,10 +41,17 @@ echo "########## create a request '$REQFILE' for signing in key store '$KEYFILE'
 keytool -storepass $MYPW -keystore $KEYFILE \
   -alias localhost -certreq -file $REQFILE
 
-echo "########## sign and create certificate '$CERTFILE'"
+echo "########## sign and create certificate '$CERTFILE' with SAN=$SAN_STRING"
+EXTFILE=$(mktemp)
+cat > $EXTFILE << EOF
+subjectAltName=$SAN_STRING
+EOF
+
 openssl x509 -req -CA $CAFILE -CAkey $CAKEYFILE -CAcreateserial \
   -days $VALIDITY \
-  -in $REQFILE -out $CERTFILE
+  -in $REQFILE -out $CERTFILE -extfile $EXTFILE
+
+rm -f $EXTFILE
 
 echo "########## store CA '$CAFILE' in key store '$KEYFILE'"
 keytool -storepass $MYPW -keystore $KEYFILE -alias CARoot \
index 14885b11f16e8e3bba21535399b35647b9792520..8a170924a32f539b96c0a1addd252cfed723dba5 100644 (file)
@@ -5,3 +5,4 @@ kafka-python >=2.0.0
 pika
 cloudevents
 xmltodict
+requests>=2.28.0
index 3fd354e2898c33acf5f53f83f21bd8fddbd2a775..728a1072dd4146e280a73b9795267100608358e7 100644 (file)
@@ -422,18 +422,95 @@ META_PREFIX = 'x-amz-meta-'
 # Kafka endpoint functions
 
 default_kafka_server = get_ip()
+KAFKA_TEST_USER = 'alice'
+KAFKA_TEST_PASSWORD = 'alice-secret'
+
+def setup_scram_users_via_kafka_configs(mechanism: str) -> None:
+    """to setup SCRAM users using kafka-configs.sh after Kafka is running."""
+    if not mechanism.startswith('SCRAM'):
+        return
+
+    log.info(f"Setting up {mechanism} users via kafka-configs.sh...")
+
+    kafka_dir = os.environ.get('KAFKA_DIR')
+    if not kafka_dir:
+        log.warning("KAFKA_DIR not set, skipping SCRAM setup")
+        return
+    
+    kafka_configs = os.path.join(kafka_dir, 'bin/kafka-configs.sh')
+    if not os.path.exists(kafka_configs):
+        log.warning(f"kafka-configs.sh not found at {kafka_configs}")
+        return
+    
+    scram_mechanism = 'SCRAM-SHA-512' if 'SHA-512' in mechanism else 'SCRAM-SHA-256'
+    zk_connect = 'localhost:2181'
+    
+    try:
+        # delete existing SCRAM credentials first
+        subprocess.run(
+            [kafka_configs,
+             '--zookeeper', zk_connect,
+             '--alter',
+             '--entity-type', 'users',
+             '--entity-name', KAFKA_TEST_USER,
+             '--delete-config', 'scram-sha-256,scram-sha-512'],
+            capture_output=True,
+            timeout=15,
+            check=False
+        )
+        time.sleep(1)
+        
+        # adding SCRAM credentials
+        add_config_value = f'{scram_mechanism}=[password={KAFKA_TEST_PASSWORD}]'
+        result = subprocess.run(
+            [kafka_configs,
+             '--zookeeper', zk_connect,
+             '--alter',
+             '--entity-type', 'users',
+             '--entity-name', KAFKA_TEST_USER,
+             '--add-config', add_config_value],
+            capture_output=True,
+            text=True,
+            timeout=15,
+            check=False
+        )
+        
+        if result.returncode == 0:
+            log.info(f"SCRAM user configured: {KAFKA_TEST_USER} ({scram_mechanism})")
+        else:
+            raise RuntimeError(f"Failed to create SCRAM user {KAFKA_TEST_USER} with {scram_mechanism}")
+    except Exception as e:
+        log.error(f"Failed to setup SCRAM users via kafka-configs: {e}")
+        raise
+
+def _kafka_ca_cert_path():
+    kafka_dir = os.environ.get('KAFKA_DIR')
+    if kafka_dir:
+        ca_path = os.path.join(kafka_dir, 'y-ca.crt')
+        if os.path.exists(ca_path):
+            return ca_path
+    return None
 
 class KafkaReceiver(object):
     """class for receiving and storing messages on a topic from the kafka broker"""
-    def __init__(self, topic_name, security_type, kafka_server):
+    def __init__(self, topic_name, security_type, kafka_server, mechanism='PLAIN'):
         from kafka import KafkaConsumer
         from kafka.admin import KafkaAdminClient, NewTopic
         from kafka.errors import TopicAlreadyExistsError
         self.status = 'init'
+        ca_cert = _kafka_ca_cert_path()
+        base_config = {}
         port = 9092
-        if security_type != 'PLAINTEXT':
-            security_type = 'SSL'
+        effective_protocol = 'PLAINTEXT'
+        if security_type == 'SSL':
             port = 9093
+            effective_protocol = 'SSL'
+        elif security_type == 'SASL_SSL':
+            port = 9094
+            effective_protocol = 'SASL_SSL'
+        elif security_type == 'SASL_PLAINTEXT':
+            port = 9095
+            effective_protocol = 'SASL_PLAINTEXT'
 
         if kafka_server is None:
             endpoint = default_kafka_server + ":" + str(port)
@@ -442,12 +519,40 @@ class KafkaReceiver(object):
         else:
             endpoint = kafka_server
 
+        base_config['bootstrap_servers'] = endpoint
+        if effective_protocol == 'SSL':
+            base_config['security_protocol'] = 'SSL'
+            if ca_cert:
+                base_config['ssl_cafile'] = ca_cert
+            kafka_dir = os.environ.get('KAFKA_DIR', '/opt/kafka')
+            client_cert = os.path.join(kafka_dir, 'config/client.crt')
+            client_key = os.path.join(kafka_dir, 'config/client.key')
+            if os.path.exists(client_cert) and os.path.exists(client_key):
+                base_config['ssl_certfile'] = client_cert
+                base_config['ssl_keyfile'] = client_key
+        elif effective_protocol == 'SASL_SSL':
+            base_config['security_protocol'] = 'SASL_SSL'
+            if ca_cert:
+                base_config['ssl_cafile'] = ca_cert
+            base_config['sasl_mechanism'] = mechanism
+            base_config.update({
+                'sasl_plain_username': KAFKA_TEST_USER,
+                'sasl_plain_password': KAFKA_TEST_PASSWORD,
+            })
+        elif effective_protocol == 'SASL_PLAINTEXT':
+            base_config['security_protocol'] = 'SASL_PLAINTEXT'
+            base_config['sasl_mechanism'] = mechanism
+            base_config.update({
+                'sasl_plain_username': KAFKA_TEST_USER,
+                'sasl_plain_password': KAFKA_TEST_PASSWORD,
+            })
+
         remaining_retries = 10
         while remaining_retries > 0:
             try:
                 admin_client = KafkaAdminClient(
-                        bootstrap_servers=endpoint,
-                        request_timeout_ms=16000)
+                        request_timeout_ms=16000,
+                        **base_config)
                 topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
                 admin_client.create_topics([topic])
                 log.info('Kafka admin created topic: %s on broker/s: %s', topic_name, endpoint)
@@ -465,14 +570,15 @@ class KafkaReceiver(object):
             raise Exception('Kafka admin failed to create topic: %s. no retries left', topic_name)
 
         remaining_retries = 10
+        consumer_config = dict(base_config)
+        consumer_config.update({
+            'metadata_max_age_ms': 5000,
+            'consumer_timeout_ms': 5000,
+            'auto_offset_reset': 'earliest'
+        })
         while remaining_retries > 0:
             try:
-                self.consumer = KafkaConsumer(topic_name,
-                        bootstrap_servers=endpoint,
-                        security_protocol=security_type,
-                        metadata_max_age_ms=5000,
-                        consumer_timeout_ms=5000,
-                        auto_offset_reset='earliest')
+                self.consumer = KafkaConsumer(topic_name, **consumer_config)
                 log.info('Kafka consumer connected to broker/s: %s for topic: %s', endpoint , topic_name)
                 # This forces the consumer to fetch metadata immediately
                 partitions = self.consumer.partitions_for_topic(topic)
@@ -492,6 +598,7 @@ class KafkaReceiver(object):
         self.events = []
         self.topic = topic_name
         self.stop = False
+        self.producer_config = base_config
 
     def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
         """verify stored records agains a list of keys"""
@@ -521,9 +628,9 @@ def kafka_receiver_thread_runner(receiver):
     receiver.status = 'ended'
 
 
-def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None, mechanism='PLAIN'):
     """create kafka receiver and thread"""
-    receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers)
+    receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers, mechanism=mechanism)
     task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
     task.daemon = True
     return task, receiver
@@ -542,8 +649,7 @@ def stop_kafka_receiver(receiver, task):
 def verify_kafka_receiver(receiver):
     """test the kafka receiver"""
     from kafka import KafkaProducer
-    producer = KafkaProducer(bootstrap_servers=receiver.consumer.config['bootstrap_servers'],
-                             security_protocol=receiver.consumer.config['security_protocol'])
+    producer = KafkaProducer(**receiver.producer_config)
     producer.send(receiver.topic, value=json.dumps({'test': 'message'}).encode('utf-8'))
     producer.flush()
     events = []
@@ -4495,6 +4601,11 @@ def test_topic_no_permissions():
 
 def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
     """ test pushing kafka notification securly to master """
+    # Setup SCRAM users if needed
+    if mechanism.startswith('SCRAM'):
+        setup_scram_users_via_kafka_configs(mechanism)
+        time.sleep(2)  # Allow time for SCRAM config to propagate
+    
     conn = connection()
     zonegroup = get_config_zonegroup()
     # create bucket
@@ -4529,7 +4640,7 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
 
     # create consumer on the topic
-    task, receiver = create_kafka_receiver_thread(topic_name)
+    task, receiver = create_kafka_receiver_thread(topic_name, security_type=security_type, mechanism=mechanism)
     task.start()
     verify_kafka_receiver(receiver)
 
@@ -4615,6 +4726,16 @@ def test_notification_kafka_security_sasl_scram():
     kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
 
 
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_sasl_scram_512():
+    kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-512')
+
+
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_ssl_sasl_scram_512():
+    kafka_security('SASL_SSL', mechanism='SCRAM-SHA-512')
+
+
 @pytest.mark.http_test
 def test_persistent_reload():
     """ do a realm reload while we send notifications """