From: sujay-d07 Date: Sun, 1 Feb 2026 22:57:26 +0000 (+0530) Subject: rgw/bucket_notification: enhance Kafka security testing X-Git-Tag: v21.0.0~181^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d98c6cc3f9f2686c4ab4afb4dc72dbae2e7edb0c;p=ceph.git rgw/bucket_notification: enhance Kafka security testing Add support for SCRAM-SHA-512, dynamic hostname/IP configuration for SSL certificates, and improved SASL mechanism handling in Kafka security tests. Changes: - kafka-security.sh: Support KAFKA_CERT_HOSTNAME and KAFKA_CERT_IP environment variables for dynamic certificate generation with proper SAN fields - test_bn.py: Add SCRAM user setup, support SASL_SSL/SASL_PLAINTEXT protocols, add SCRAM-SHA-512 test cases - README.rst: Update with step-by-step Kafka configuration instructions - requirements.txt: Replace nose-py3 with pynose, add requests>=2.28.0 Signed-off-by: sujay-d07 --- diff --git a/src/test/rgw/bucket_notification/README.rst b/src/test/rgw/bucket_notification/README.rst index db58e8cfcc7b..8d94c79df7d1 100644 --- a/src/test/rgw/bucket_notification/README.rst +++ b/src/test/rgw/bucket_notification/README.rst @@ -63,20 +63,29 @@ After running `vstart.sh`, Zookeeper, and Kafka services you're ready to run the Kafka Security Tests -------------------- -First, make sure that vstart was initiated with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``:: +1. First generate SSL certificates:: - MON=1 OSD=1 MDS=0 MGR=1 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true" + cd /path/to/kafka/ + KAFKA_CERT_HOSTNAME=192.168.1.100 KAFKA_CERT_IP=192.168.1.100 bash /path/to/ceph/src/test/rgw/bucket_notification/kafka-security.sh + + Replace ``192.168.1.100`` with your actual Kafka broker's IP address or hostname. -Then you should run the ``kafka-security.sh`` script inside the Kafka directory:: +2. Configure Kafka:: - cd /path/to/kafka/ - /path/to/ceph/src/test/rgw/bucket_notification/kafka-security.sh + Then edit ``/path/to/kafka/config/server.properties`` with complete configuration:: -Then make sure the Kafka server properties file (``/path/to/kafka/config/server.properties``) has the following lines:: + **Important:** If you face any initialization failures, replace ``localhost`` in both ``listeners`` and ``advertised.listeners`` with your Kafka broker's actual hostname or IP address. + For example, if your Kafka broker runs on host ``kafka-server.example.com`` or IP ``192.168.1.100``, use:: + listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095 + advertised.listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095 - # all listeners + If both ``listeners`` and ``advertised.listeners`` do not match, the broker cannot connect to itself, causing initialization failures. + + # All listeners listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 + advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 + listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT # SSL configuration matching the kafka-security.sh script ssl.keystore.location=./server.keystore.jks @@ -86,41 +95,53 @@ Then make sure the Kafka server properties file (``/path/to/kafka/config/server. ssl.truststore.password=mypassword # SASL mechanisms - sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256 - - # SASL over SSL with SCRAM-SHA-256 mechanism - listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ - username="alice" \ - password="alice-secret" \ - user_alice="alice-secret"; + sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 + sasl.mechanism.inter.broker.protocol=PLAIN + inter.broker.listener.name=PLAINTEXT # SASL over SSL with PLAIN mechanism listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ - username="alice" \ - password="alice-secret" \ - user_alice="alice-secret"; + username="admin" \ + password="admin-secret" \ + user_alice="alice-secret"; - # PLAINTEXT SASL with SCRAM-SHA-256 mechanism - listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ - username="alice" \ - password="alice-secret" \ - user_alice="alice-secret"; + # SASL over SSL with SCRAM-SHA-256 mechanism + listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ + username="admin" \ + password="admin-secret"; + + # SASL over SSL with SCRAM-SHA-512 mechanism + listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ + username="admin" \ + password="admin-secret"; # PLAINTEXT SASL with PLAIN mechanism listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ - username="alice" \ - password="alice-secret" \ - user_alice="alice-secret"; + username="admin" \ + password="admin-secret" \ + user_alice="alice-secret"; + + # PLAINTEXT SASL with SCRAM-SHA-256 mechanism + listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ + username="admin" \ + password="admin-secret"; + # PLAINTEXT SASL with SCRAM-SHA-512 mechanism + listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ + username="admin" \ + password="admin-secret"; -And restart the Kafka server. Once both Zookeeper and Kafka are up, run the following command (for the SASL SCRAM test) from the Kafka directory:: +3. Start Zookeeper and Kafka. - bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice +4. Start RGW vstart cluster with cleartext parameter set to true:: + cd /path/to/ceph/build + MON=1 OSD=1 MDS=0 MGR=0 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true" -To run the Kafka security test, you also need to provide the test with the location of the Kafka directory:: +5. Run the tests:: - KAFKA_DIR=/path/to/kafka BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test' + cd /path/to/ceph + KAFKA_DIR=/path/to/kafka BNTESTS_CONF=/path/to/bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test' ============== RabbitMQ Tests diff --git a/src/test/rgw/bucket_notification/kafka-security.sh b/src/test/rgw/bucket_notification/kafka-security.sh index 6c6f3e2614e0..020e784585d5 100755 --- a/src/test/rgw/bucket_notification/kafka-security.sh +++ b/src/test/rgw/bucket_notification/kafka-security.sh @@ -1,4 +1,5 @@ -FQDN=localhost +FQDN=${KAFKA_CERT_HOSTNAME:-localhost} +IP_SAN=${KAFKA_CERT_IP:-} KEYFILE=server.keystore.jks TRUSTFILE=server.truststore.jks CAFILE=y-ca.crt @@ -14,12 +15,18 @@ rm -f $CAFILE rm -f $REQFILE rm -f $CERTFILE -echo "########## create the request in key store '$KEYFILE'" +SAN_STRING="DNS:$FQDN" +if [ -n "$IP_SAN" ]; then + SAN_STRING="$SAN_STRING,IP:$IP_SAN" +fi + +echo "########## create the request in key store '$KEYFILE' with SAN=$SAN_STRING" keytool -keystore $KEYFILE -alias localhost \ -dname "CN=$FQDN, OU=Michigan Engineering, O=Red Hat Inc, \ L=Ann Arbor, ST=Michigan, C=US" \ -storepass $MYPW -keypass $MYPW \ - -validity $VALIDITY -genkey -keyalg RSA -ext SAN=DNS:"$FQDN" + -validity $VALIDITY -genkey -keyalg RSA \ + -ext SAN=$SAN_STRING echo "########## create the CA '$CAFILE'" openssl req -new -nodes -x509 -keyout $CAKEYFILE -out $CAFILE \ @@ -34,10 +41,17 @@ echo "########## create a request '$REQFILE' for signing in key store '$KEYFILE' keytool -storepass $MYPW -keystore $KEYFILE \ -alias localhost -certreq -file $REQFILE -echo "########## sign and create certificate '$CERTFILE'" +echo "########## sign and create certificate '$CERTFILE' with SAN=$SAN_STRING" +EXTFILE=$(mktemp) +cat > $EXTFILE << EOF +subjectAltName=$SAN_STRING +EOF + openssl x509 -req -CA $CAFILE -CAkey $CAKEYFILE -CAcreateserial \ -days $VALIDITY \ - -in $REQFILE -out $CERTFILE + -in $REQFILE -out $CERTFILE -extfile $EXTFILE + +rm -f $EXTFILE echo "########## store CA '$CAFILE' in key store '$KEYFILE'" keytool -storepass $MYPW -keystore $KEYFILE -alias CARoot \ diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt index 14885b11f16e..8a170924a32f 100644 --- a/src/test/rgw/bucket_notification/requirements.txt +++ b/src/test/rgw/bucket_notification/requirements.txt @@ -5,3 +5,4 @@ kafka-python >=2.0.0 pika cloudevents xmltodict +requests>=2.28.0 diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 3fd354e2898c..728a1072dd41 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -422,18 +422,95 @@ META_PREFIX = 'x-amz-meta-' # Kafka endpoint functions default_kafka_server = get_ip() +KAFKA_TEST_USER = 'alice' +KAFKA_TEST_PASSWORD = 'alice-secret' + +def setup_scram_users_via_kafka_configs(mechanism: str) -> None: + """to setup SCRAM users using kafka-configs.sh after Kafka is running.""" + if not mechanism.startswith('SCRAM'): + return + + log.info(f"Setting up {mechanism} users via kafka-configs.sh...") + + kafka_dir = os.environ.get('KAFKA_DIR') + if not kafka_dir: + log.warning("KAFKA_DIR not set, skipping SCRAM setup") + return + + kafka_configs = os.path.join(kafka_dir, 'bin/kafka-configs.sh') + if not os.path.exists(kafka_configs): + log.warning(f"kafka-configs.sh not found at {kafka_configs}") + return + + scram_mechanism = 'SCRAM-SHA-512' if 'SHA-512' in mechanism else 'SCRAM-SHA-256' + zk_connect = 'localhost:2181' + + try: + # delete existing SCRAM credentials first + subprocess.run( + [kafka_configs, + '--zookeeper', zk_connect, + '--alter', + '--entity-type', 'users', + '--entity-name', KAFKA_TEST_USER, + '--delete-config', 'scram-sha-256,scram-sha-512'], + capture_output=True, + timeout=15, + check=False + ) + time.sleep(1) + + # adding SCRAM credentials + add_config_value = f'{scram_mechanism}=[password={KAFKA_TEST_PASSWORD}]' + result = subprocess.run( + [kafka_configs, + '--zookeeper', zk_connect, + '--alter', + '--entity-type', 'users', + '--entity-name', KAFKA_TEST_USER, + '--add-config', add_config_value], + capture_output=True, + text=True, + timeout=15, + check=False + ) + + if result.returncode == 0: + log.info(f"SCRAM user configured: {KAFKA_TEST_USER} ({scram_mechanism})") + else: + raise RuntimeError(f"Failed to create SCRAM user {KAFKA_TEST_USER} with {scram_mechanism}") + except Exception as e: + log.error(f"Failed to setup SCRAM users via kafka-configs: {e}") + raise + +def _kafka_ca_cert_path(): + kafka_dir = os.environ.get('KAFKA_DIR') + if kafka_dir: + ca_path = os.path.join(kafka_dir, 'y-ca.crt') + if os.path.exists(ca_path): + return ca_path + return None class KafkaReceiver(object): """class for receiving and storing messages on a topic from the kafka broker""" - def __init__(self, topic_name, security_type, kafka_server): + def __init__(self, topic_name, security_type, kafka_server, mechanism='PLAIN'): from kafka import KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic from kafka.errors import TopicAlreadyExistsError self.status = 'init' + ca_cert = _kafka_ca_cert_path() + base_config = {} port = 9092 - if security_type != 'PLAINTEXT': - security_type = 'SSL' + effective_protocol = 'PLAINTEXT' + if security_type == 'SSL': port = 9093 + effective_protocol = 'SSL' + elif security_type == 'SASL_SSL': + port = 9094 + effective_protocol = 'SASL_SSL' + elif security_type == 'SASL_PLAINTEXT': + port = 9095 + effective_protocol = 'SASL_PLAINTEXT' if kafka_server is None: endpoint = default_kafka_server + ":" + str(port) @@ -442,12 +519,40 @@ class KafkaReceiver(object): else: endpoint = kafka_server + base_config['bootstrap_servers'] = endpoint + if effective_protocol == 'SSL': + base_config['security_protocol'] = 'SSL' + if ca_cert: + base_config['ssl_cafile'] = ca_cert + kafka_dir = os.environ.get('KAFKA_DIR', '/opt/kafka') + client_cert = os.path.join(kafka_dir, 'config/client.crt') + client_key = os.path.join(kafka_dir, 'config/client.key') + if os.path.exists(client_cert) and os.path.exists(client_key): + base_config['ssl_certfile'] = client_cert + base_config['ssl_keyfile'] = client_key + elif effective_protocol == 'SASL_SSL': + base_config['security_protocol'] = 'SASL_SSL' + if ca_cert: + base_config['ssl_cafile'] = ca_cert + base_config['sasl_mechanism'] = mechanism + base_config.update({ + 'sasl_plain_username': KAFKA_TEST_USER, + 'sasl_plain_password': KAFKA_TEST_PASSWORD, + }) + elif effective_protocol == 'SASL_PLAINTEXT': + base_config['security_protocol'] = 'SASL_PLAINTEXT' + base_config['sasl_mechanism'] = mechanism + base_config.update({ + 'sasl_plain_username': KAFKA_TEST_USER, + 'sasl_plain_password': KAFKA_TEST_PASSWORD, + }) + remaining_retries = 10 while remaining_retries > 0: try: admin_client = KafkaAdminClient( - bootstrap_servers=endpoint, - request_timeout_ms=16000) + request_timeout_ms=16000, + **base_config) topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1) admin_client.create_topics([topic]) log.info('Kafka admin created topic: %s on broker/s: %s', topic_name, endpoint) @@ -465,14 +570,15 @@ class KafkaReceiver(object): raise Exception('Kafka admin failed to create topic: %s. no retries left', topic_name) remaining_retries = 10 + consumer_config = dict(base_config) + consumer_config.update({ + 'metadata_max_age_ms': 5000, + 'consumer_timeout_ms': 5000, + 'auto_offset_reset': 'earliest' + }) while remaining_retries > 0: try: - self.consumer = KafkaConsumer(topic_name, - bootstrap_servers=endpoint, - security_protocol=security_type, - metadata_max_age_ms=5000, - consumer_timeout_ms=5000, - auto_offset_reset='earliest') + self.consumer = KafkaConsumer(topic_name, **consumer_config) log.info('Kafka consumer connected to broker/s: %s for topic: %s', endpoint , topic_name) # This forces the consumer to fetch metadata immediately partitions = self.consumer.partitions_for_topic(topic) @@ -492,6 +598,7 @@ class KafkaReceiver(object): self.events = [] self.topic = topic_name self.stop = False + self.producer_config = base_config def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): """verify stored records agains a list of keys""" @@ -521,9 +628,9 @@ def kafka_receiver_thread_runner(receiver): receiver.status = 'ended' -def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None): +def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None, mechanism='PLAIN'): """create kafka receiver and thread""" - receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers) + receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers, mechanism=mechanism) task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver @@ -542,8 +649,7 @@ def stop_kafka_receiver(receiver, task): def verify_kafka_receiver(receiver): """test the kafka receiver""" from kafka import KafkaProducer - producer = KafkaProducer(bootstrap_servers=receiver.consumer.config['bootstrap_servers'], - security_protocol=receiver.consumer.config['security_protocol']) + producer = KafkaProducer(**receiver.producer_config) producer.send(receiver.topic, value=json.dumps({'test': 'message'}).encode('utf-8')) producer.flush() events = [] @@ -4495,6 +4601,11 @@ def test_topic_no_permissions(): def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False): """ test pushing kafka notification securly to master """ + # Setup SCRAM users if needed + if mechanism.startswith('SCRAM'): + setup_scram_users_via_kafka_configs(mechanism) + time.sleep(2) # Allow time for SCRAM config to propagate + conn = connection() zonegroup = get_config_zonegroup() # create bucket @@ -4529,7 +4640,7 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) # create consumer on the topic - task, receiver = create_kafka_receiver_thread(topic_name) + task, receiver = create_kafka_receiver_thread(topic_name, security_type=security_type, mechanism=mechanism) task.start() verify_kafka_receiver(receiver) @@ -4615,6 +4726,16 @@ def test_notification_kafka_security_sasl_scram(): kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256') +@pytest.mark.kafka_security_test +def test_notification_kafka_security_sasl_scram_512(): + kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-512') + + +@pytest.mark.kafka_security_test +def test_notification_kafka_security_ssl_sasl_scram_512(): + kafka_security('SASL_SSL', mechanism='SCRAM-SHA-512') + + @pytest.mark.http_test def test_persistent_reload(): """ do a realm reload while we send notifications """