Kafka Security Tests
--------------------
-First, make sure that vstart was initiated with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``::
+1. First generate SSL certificates::
- MON=1 OSD=1 MDS=0 MGR=1 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true"
+ cd /path/to/kafka/
+ KAFKA_CERT_HOSTNAME=192.168.1.100 KAFKA_CERT_IP=192.168.1.100 bash /path/to/ceph/src/test/rgw/bucket_notification/kafka-security.sh
+
+ Replace ``192.168.1.100`` with your actual Kafka broker's IP address or hostname.
-Then you should run the ``kafka-security.sh`` script inside the Kafka directory::
+2. Configure Kafka::
- cd /path/to/kafka/
- /path/to/ceph/src/test/rgw/bucket_notification/kafka-security.sh
+ Then edit ``/path/to/kafka/config/server.properties`` with complete configuration::
-Then make sure the Kafka server properties file (``/path/to/kafka/config/server.properties``) has the following lines::
+ **Important:** If you face any initialization failures, replace ``localhost`` in both ``listeners`` and ``advertised.listeners`` with your Kafka broker's actual hostname or IP address.
+ For example, if your Kafka broker runs on host ``kafka-server.example.com`` or IP ``192.168.1.100``, use::
+ listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
+ advertised.listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093,SASL_SSL://192.168.1.100:9094,SASL_PLAINTEXT://192.168.1.100:9095
- # all listeners
+ If both ``listeners`` and ``advertised.listeners`` do not match, the broker cannot connect to itself, causing initialization failures.
+
+ # All listeners
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
+ advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
+ listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT
# SSL configuration matching the kafka-security.sh script
ssl.keystore.location=./server.keystore.jks
ssl.truststore.password=mypassword
# SASL mechanisms
- sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
-
- # SASL over SSL with SCRAM-SHA-256 mechanism
- listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
- username="alice" \
- password="alice-secret" \
- user_alice="alice-secret";
+ sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+ sasl.mechanism.inter.broker.protocol=PLAIN
+ inter.broker.listener.name=PLAINTEXT
# SASL over SSL with PLAIN mechanism
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
- username="alice" \
- password="alice-secret" \
- user_alice="alice-secret";
+ username="admin" \
+ password="admin-secret" \
+ user_alice="alice-secret";
- # PLAINTEXT SASL with SCRAM-SHA-256 mechanism
- listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
- username="alice" \
- password="alice-secret" \
- user_alice="alice-secret";
+ # SASL over SSL with SCRAM-SHA-256 mechanism
+ listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+ username="admin" \
+ password="admin-secret";
+
+ # SASL over SSL with SCRAM-SHA-512 mechanism
+ listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+ username="admin" \
+ password="admin-secret";
# PLAINTEXT SASL with PLAIN mechanism
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
- username="alice" \
- password="alice-secret" \
- user_alice="alice-secret";
+ username="admin" \
+ password="admin-secret" \
+ user_alice="alice-secret";
+
+ # PLAINTEXT SASL with SCRAM-SHA-256 mechanism
+ listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+ username="admin" \
+ password="admin-secret";
+ # PLAINTEXT SASL with SCRAM-SHA-512 mechanism
+ listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
+ username="admin" \
+ password="admin-secret";
-And restart the Kafka server. Once both Zookeeper and Kafka are up, run the following command (for the SASL SCRAM test) from the Kafka directory::
+3. Start Zookeeper and Kafka.
- bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
+4. Start RGW vstart cluster with cleartext parameter set to true::
+ cd /path/to/ceph/build
+ MON=1 OSD=1 MDS=0 MGR=0 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true"
-To run the Kafka security test, you also need to provide the test with the location of the Kafka directory::
+5. Run the tests::
- KAFKA_DIR=/path/to/kafka BNTESTS_CONF=bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test'
+ cd /path/to/ceph
+ KAFKA_DIR=/path/to/kafka BNTESTS_CONF=/path/to/bntests.conf python -m pytest -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -m 'kafka_security_test'
==============
RabbitMQ Tests
# Kafka endpoint functions
default_kafka_server = get_ip()
+KAFKA_TEST_USER = 'alice'
+KAFKA_TEST_PASSWORD = 'alice-secret'
+
+def setup_scram_users_via_kafka_configs(mechanism: str) -> None:
+ """to setup SCRAM users using kafka-configs.sh after Kafka is running."""
+ if not mechanism.startswith('SCRAM'):
+ return
+
+ log.info(f"Setting up {mechanism} users via kafka-configs.sh...")
+
+ kafka_dir = os.environ.get('KAFKA_DIR')
+ if not kafka_dir:
+ log.warning("KAFKA_DIR not set, skipping SCRAM setup")
+ return
+
+ kafka_configs = os.path.join(kafka_dir, 'bin/kafka-configs.sh')
+ if not os.path.exists(kafka_configs):
+ log.warning(f"kafka-configs.sh not found at {kafka_configs}")
+ return
+
+ scram_mechanism = 'SCRAM-SHA-512' if 'SHA-512' in mechanism else 'SCRAM-SHA-256'
+ zk_connect = 'localhost:2181'
+
+ try:
+ # delete existing SCRAM credentials first
+ subprocess.run(
+ [kafka_configs,
+ '--zookeeper', zk_connect,
+ '--alter',
+ '--entity-type', 'users',
+ '--entity-name', KAFKA_TEST_USER,
+ '--delete-config', 'scram-sha-256,scram-sha-512'],
+ capture_output=True,
+ timeout=15,
+ check=False
+ )
+ time.sleep(1)
+
+ # adding SCRAM credentials
+ add_config_value = f'{scram_mechanism}=[password={KAFKA_TEST_PASSWORD}]'
+ result = subprocess.run(
+ [kafka_configs,
+ '--zookeeper', zk_connect,
+ '--alter',
+ '--entity-type', 'users',
+ '--entity-name', KAFKA_TEST_USER,
+ '--add-config', add_config_value],
+ capture_output=True,
+ text=True,
+ timeout=15,
+ check=False
+ )
+
+ if result.returncode == 0:
+ log.info(f"SCRAM user configured: {KAFKA_TEST_USER} ({scram_mechanism})")
+ else:
+ raise RuntimeError(f"Failed to create SCRAM user {KAFKA_TEST_USER} with {scram_mechanism}")
+ except Exception as e:
+ log.error(f"Failed to setup SCRAM users via kafka-configs: {e}")
+ raise
+
+def _kafka_ca_cert_path():
+ kafka_dir = os.environ.get('KAFKA_DIR')
+ if kafka_dir:
+ ca_path = os.path.join(kafka_dir, 'y-ca.crt')
+ if os.path.exists(ca_path):
+ return ca_path
+ return None
class KafkaReceiver(object):
"""class for receiving and storing messages on a topic from the kafka broker"""
- def __init__(self, topic_name, security_type, kafka_server):
+ def __init__(self, topic_name, security_type, kafka_server, mechanism='PLAIN'):
from kafka import KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
self.status = 'init'
+ ca_cert = _kafka_ca_cert_path()
+ base_config = {}
port = 9092
- if security_type != 'PLAINTEXT':
- security_type = 'SSL'
+ effective_protocol = 'PLAINTEXT'
+ if security_type == 'SSL':
port = 9093
+ effective_protocol = 'SSL'
+ elif security_type == 'SASL_SSL':
+ port = 9094
+ effective_protocol = 'SASL_SSL'
+ elif security_type == 'SASL_PLAINTEXT':
+ port = 9095
+ effective_protocol = 'SASL_PLAINTEXT'
if kafka_server is None:
endpoint = default_kafka_server + ":" + str(port)
else:
endpoint = kafka_server
+ base_config['bootstrap_servers'] = endpoint
+ if effective_protocol == 'SSL':
+ base_config['security_protocol'] = 'SSL'
+ if ca_cert:
+ base_config['ssl_cafile'] = ca_cert
+ kafka_dir = os.environ.get('KAFKA_DIR', '/opt/kafka')
+ client_cert = os.path.join(kafka_dir, 'config/client.crt')
+ client_key = os.path.join(kafka_dir, 'config/client.key')
+ if os.path.exists(client_cert) and os.path.exists(client_key):
+ base_config['ssl_certfile'] = client_cert
+ base_config['ssl_keyfile'] = client_key
+ elif effective_protocol == 'SASL_SSL':
+ base_config['security_protocol'] = 'SASL_SSL'
+ if ca_cert:
+ base_config['ssl_cafile'] = ca_cert
+ base_config['sasl_mechanism'] = mechanism
+ base_config.update({
+ 'sasl_plain_username': KAFKA_TEST_USER,
+ 'sasl_plain_password': KAFKA_TEST_PASSWORD,
+ })
+ elif effective_protocol == 'SASL_PLAINTEXT':
+ base_config['security_protocol'] = 'SASL_PLAINTEXT'
+ base_config['sasl_mechanism'] = mechanism
+ base_config.update({
+ 'sasl_plain_username': KAFKA_TEST_USER,
+ 'sasl_plain_password': KAFKA_TEST_PASSWORD,
+ })
+
remaining_retries = 10
while remaining_retries > 0:
try:
admin_client = KafkaAdminClient(
- bootstrap_servers=endpoint,
- request_timeout_ms=16000)
+ request_timeout_ms=16000,
+ **base_config)
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
admin_client.create_topics([topic])
log.info('Kafka admin created topic: %s on broker/s: %s', topic_name, endpoint)
raise Exception('Kafka admin failed to create topic: %s. no retries left', topic_name)
remaining_retries = 10
+ consumer_config = dict(base_config)
+ consumer_config.update({
+ 'metadata_max_age_ms': 5000,
+ 'consumer_timeout_ms': 5000,
+ 'auto_offset_reset': 'earliest'
+ })
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic_name,
- bootstrap_servers=endpoint,
- security_protocol=security_type,
- metadata_max_age_ms=5000,
- consumer_timeout_ms=5000,
- auto_offset_reset='earliest')
+ self.consumer = KafkaConsumer(topic_name, **consumer_config)
log.info('Kafka consumer connected to broker/s: %s for topic: %s', endpoint , topic_name)
# This forces the consumer to fetch metadata immediately
partitions = self.consumer.partitions_for_topic(topic)
self.events = []
self.topic = topic_name
self.stop = False
+ self.producer_config = base_config
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
"""verify stored records agains a list of keys"""
receiver.status = 'ended'
-def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None, mechanism='PLAIN'):
"""create kafka receiver and thread"""
- receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers)
+ receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers, mechanism=mechanism)
task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
task.daemon = True
return task, receiver
def verify_kafka_receiver(receiver):
"""test the kafka receiver"""
from kafka import KafkaProducer
- producer = KafkaProducer(bootstrap_servers=receiver.consumer.config['bootstrap_servers'],
- security_protocol=receiver.consumer.config['security_protocol'])
+ producer = KafkaProducer(**receiver.producer_config)
producer.send(receiver.topic, value=json.dumps({'test': 'message'}).encode('utf-8'))
producer.flush()
events = []
def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
""" test pushing kafka notification securly to master """
+ # Setup SCRAM users if needed
+ if mechanism.startswith('SCRAM'):
+ setup_scram_users_via_kafka_configs(mechanism)
+ time.sleep(2) # Allow time for SCRAM config to propagate
+
conn = connection()
zonegroup = get_config_zonegroup()
# create bucket
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
# create consumer on the topic
- task, receiver = create_kafka_receiver_thread(topic_name)
+ task, receiver = create_kafka_receiver_thread(topic_name, security_type=security_type, mechanism=mechanism)
task.start()
verify_kafka_receiver(receiver)
kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_sasl_scram_512():
+ kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-512')
+
+
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_ssl_sasl_scram_512():
+ kafka_security('SASL_SSL', mechanism='SCRAM-SHA-512')
+
+
@pytest.mark.http_test
def test_persistent_reload():
""" do a realm reload while we send notifications """