From 629b4045e0432054eb4f45618e3db9729ecd0f84 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 6 Nov 2022 20:23:48 +0200 Subject: [PATCH] rgw/notification/test: add support for testing kafka security also reorganize the bucket notification tests file to make manual testing of kafka and amqp easier. Signed-off-by: Yuval Lifshitz --- src/test/rgw/bucket_notification/README.rst | 75 +++--- .../rgw/bucket_notification/kafka-security.sh | 49 ++++ src/test/rgw/bucket_notification/test_bn.py | 250 ++++++++++-------- 3 files changed, 228 insertions(+), 146 deletions(-) create mode 100755 src/test/rgw/bucket_notification/kafka-security.sh diff --git a/src/test/rgw/bucket_notification/README.rst b/src/test/rgw/bucket_notification/README.rst index f24e87ceee5be..9686bef71b3b5 100644 --- a/src/test/rgw/bucket_notification/README.rst +++ b/src/test/rgw/bucket_notification/README.rst @@ -1,44 +1,50 @@ -============================ - Bucket Notification tests -============================ +========================== + Bucket Notification Tests +========================== You will need to use the sample configuration file named ``bntests.conf.SAMPLE`` that has been provided at ``/path/to/ceph/src/test/rgw/bucket_notification/``. You can also copy this file to the directory where you are running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started with vstart. +For the tests covering Kafka and RabbitMQ security, the RGW will need to accept use/password without TLS connection between the client and the RGW. +So, the cluster will have to be started with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``. +For example:: -============ -Kafka tests -============ + MON=1 OSD=1 MDS=0 MGR=1 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true" -You also need to install Kafka which can be done by downloading and unzipping from the following:: +=========== +Kafka Tests +=========== - https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz +You also need to install Kafka which can be downloaded from: https://kafka.apache.org/downloads -Then inside the kafka config directory (``/path/to/kafka-2.6.0-src/config/``) you need to create a file named ``kafka_server_jaas.conf`` -with the following content:: +To test Kafka security, you should first run the ``kafka-security.sh`` script inside the Kafka directory. - KafkaClient { - org.apache.kafka.common.security.plain.PlainLoginModule required - username="alice" - password="alice-secret"; - }; +Then edit the Kafka server properties file (``/path/to/kafka/config/server.properties``) +to have the following lines:: -After creating this above file run the following command in kafka directory (``/path/to/kafka-2.6.0-src/``):: + listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094 + ssl.keystore.location=/home/ylifshit/kafka-3.3.1-src/server.keystore.jks + ssl.keystore.password=mypassword + ssl.key.password=mypassword + ssl.truststore.location=/home/ylifshit/kafka-3.3.1-src/server.truststore.jks + ssl.truststore.password=mypassword + sasl.enabled.mechanisms=PLAIN + listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ + username="alice" \ + password="alice-secret" \ + user_alice="alice-secret"; - ./gradlew jar -PscalaVersion=2.13.2 - -After following the above steps next is you need to start the Zookeeper and Kafka services. -Here's the commands which can be used to start these services. For starting -Zookeeper service run:: +After following the above steps, start the Zookeeper and Kafka services. +For starting Zookeeper service run:: bin/zookeeper-server-start.sh config/zookeeper.properties -and then run to start the Kafka service:: +and then start the Kafka service:: bin/kafka-server-start.sh config/server.properties -If you want to run Zookeeper and Kafka services in background add ``-daemon`` at the end of the command like:: +If you want to run Zookeeper and Kafka services in the background add ``-daemon`` at the end of the command like:: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties @@ -46,15 +52,17 @@ and:: bin/kafka-server-start.sh -daemon config/server.properties -After starting vstart, zookeeper and kafka services you're ready to run the Kafka tests:: +After running vstart, Zookeeper, and Kafka services you're ready to run the Kafka tests:: BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_test' -After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``), zookeeper and kafka services which could be stopped by ``Ctrl+C``. +To run the Kafka security test, you also need to provide the test with the location of the Kafka directory:: + + KAFKA_DIR=/path/to/kafkaBNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_ssl_test' -=============== -RabbitMQ tests -=============== +============== +RabbitMQ Tests +============== You need to install RabbitMQ in the following way:: @@ -64,7 +72,7 @@ Then you need to run the following command:: sudo chkconfig rabbitmq-server on -Finally to start the RabbitMQ server you need to run the following command:: +Finally, to start the RabbitMQ server you need to run the following command:: sudo /sbin/service rabbitmq-server start @@ -72,10 +80,17 @@ To confirm that the RabbitMQ server is running you can run the following command sudo /sbin/service rabbitmq-server status -After starting vstart and RabbitMQ server you're ready to run the AMQP tests:: +After running vstart and RabbitMQ server you're ready to run the AMQP tests:: BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_test' After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command:: sudo /sbin/service rabbitmq-server stop + +To run the RabbitMQ SSL security tests use the following:: + + BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_ssl_test' + +During these tests, the test script will restart the RabbitMQ server with the correct security configuration (``sudo`` privileges will be needed). + diff --git a/src/test/rgw/bucket_notification/kafka-security.sh b/src/test/rgw/bucket_notification/kafka-security.sh new file mode 100755 index 0000000000000..6c6f3e2614e04 --- /dev/null +++ b/src/test/rgw/bucket_notification/kafka-security.sh @@ -0,0 +1,49 @@ +FQDN=localhost +KEYFILE=server.keystore.jks +TRUSTFILE=server.truststore.jks +CAFILE=y-ca.crt +CAKEYFILE=y-ca.key +REQFILE=$FQDN.req +CERTFILE=$FQDN.crt +MYPW=mypassword +VALIDITY=36500 + +rm -f $KEYFILE +rm -f $TRUSTFILE +rm -f $CAFILE +rm -f $REQFILE +rm -f $CERTFILE + +echo "########## create the request in key store '$KEYFILE'" +keytool -keystore $KEYFILE -alias localhost \ + -dname "CN=$FQDN, OU=Michigan Engineering, O=Red Hat Inc, \ + L=Ann Arbor, ST=Michigan, C=US" \ + -storepass $MYPW -keypass $MYPW \ + -validity $VALIDITY -genkey -keyalg RSA -ext SAN=DNS:"$FQDN" + +echo "########## create the CA '$CAFILE'" +openssl req -new -nodes -x509 -keyout $CAKEYFILE -out $CAFILE \ + -days $VALIDITY -subj \ + '/C=US/ST=Michigan/L=Ann Arbor/O=Red Hat Inc/OU=Michigan Engineering/CN=yuval-1' + +echo "########## store the CA in trust store '$TRUSTFILE'" +keytool -keystore $TRUSTFILE -storepass $MYPW -alias CARoot \ + -noprompt -importcert -file $CAFILE + +echo "########## create a request '$REQFILE' for signing in key store '$KEYFILE'" +keytool -storepass $MYPW -keystore $KEYFILE \ + -alias localhost -certreq -file $REQFILE + +echo "########## sign and create certificate '$CERTFILE'" +openssl x509 -req -CA $CAFILE -CAkey $CAKEYFILE -CAcreateserial \ + -days $VALIDITY \ + -in $REQFILE -out $CERTFILE + +echo "########## store CA '$CAFILE' in key store '$KEYFILE'" +keytool -storepass $MYPW -keystore $KEYFILE -alias CARoot \ + -noprompt -importcert -file $CAFILE + +echo "########## store certificate '$CERTFILE' in key store '$KEYFILE'" +keytool -storepass $MYPW -keystore $KEYFILE -alias localhost \ + -import -file $CERTFILE + diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index cb09d4aa38b74..cba673b905f8a 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -41,8 +41,6 @@ import boto.s3.tagging # configure logging for the tests module log = logging.getLogger(__name__) -skip_amqp_ssl = True - TOPIC_SUFFIX = "_topic" NOTIFICATION_SUFFIX = "_notif" @@ -196,10 +194,15 @@ class AMQPReceiver(object): ssl_options = None if external_endpoint_address: - params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options) + if ssl_options: + # this is currently not working due to: https://github.com/pika/pika/issues/1192 + params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options) + else: + params = pika.URLParameters(external_endpoint_address) else: hostname = get_ip() params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options) + remaining_retries = 10 while remaining_retries > 0: try: @@ -1768,7 +1771,7 @@ def test_ps_s3_lifecycle_on_master(): def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): """ test object creation s3 notifications in using put/copy/post on master""" - if not external_endpoint_address and not skip_amqp_ssl: + if not external_endpoint_address: hostname = 'localhost' proc = init_rabbitmq() if proc is None: @@ -1858,13 +1861,11 @@ def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_locatio @attr('amqp_test') def test_ps_s3_creation_triggers_on_master(): - ps_s3_creation_triggers_on_master() + ps_s3_creation_triggers_on_master(external_endpoint_address="amqp://localhost:5672") @attr('amqp_ssl_test') def test_ps_s3_creation_triggers_on_master_external(): - if skip_amqp_ssl: - return SkipTest('This is an AMQP SSL test.') from distutils.util import strtobool @@ -1884,13 +1885,9 @@ def test_ps_s3_creation_triggers_on_master_external(): return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run") -@attr('amqp_ssl_test') -def test_ps_s3_creation_triggers_on_master_ssl(): - if skip_amqp_ssl: - return SkipTest('This is an AMQP SSL test.') +def generate_private_key(tempdir): import datetime - import textwrap import stat from cryptography import x509 from cryptography.x509.oid import NameOID @@ -1898,84 +1895,96 @@ def test_ps_s3_creation_triggers_on_master_ssl(): from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa + + # modify permissions to ensure that the broker user can access them + os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) + CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem') + CERTFILE = os.path.join(tempdir, 'server_certificate.pem') + KEYFILE = os.path.join(tempdir, 'server_key.pem') + + root_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), + x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"), + ]) + root_cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + root_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=3650) + ).add_extension( + x509.BasicConstraints(ca=True, path_length=None), critical=True + ).sign(root_key, hashes.SHA256(), default_backend()) + with open(CACERTFILE, "wb") as f: + f.write(root_cert.public_bytes(serialization.Encoding.PEM)) + + # Now we want to generate a cert from that root + cert_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend(), + ) + with open(KEYFILE, "wb") as f: + f.write(cert_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + )) + new_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), + ]) + cert = x509.CertificateBuilder().subject_name( + new_subject + ).issuer_name( + root_cert.issuer + ).public_key( + cert_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=30) + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName(u"localhost")]), + critical=False, + ).sign(root_key, hashes.SHA256(), default_backend()) + # Write our certificate out to disk. + with open(CERTFILE, "wb") as f: + f.write(cert.public_bytes(serialization.Encoding.PEM)) + + print("\n\n********private key generated********") + print(CACERTFILE, CERTFILE, KEYFILE) + print("\n\n") + return CACERTFILE, CERTFILE, KEYFILE + + +@attr('amqp_ssl_test') +def test_ps_s3_creation_triggers_on_master_ssl(): + + import textwrap from tempfile import TemporaryDirectory with TemporaryDirectory() as tempdir: - # modify permissions to ensure that the rabbitmq user can access them - os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) - CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem') - CERTFILE = os.path.join(tempdir, 'server_certificate.pem') - KEYFILE = os.path.join(tempdir, 'server_key.pem') + CACERTFILE, CERTFILE, KEYFILE = generate_private_key(tempdir) RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config') - - root_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) - subject = issuer = x509.Name([ - x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), - x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), - x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"), - ]) - root_cert = x509.CertificateBuilder().subject_name( - subject - ).issuer_name( - issuer - ).public_key( - root_key.public_key() - ).serial_number( - x509.random_serial_number() - ).not_valid_before( - datetime.datetime.utcnow() - ).not_valid_after( - datetime.datetime.utcnow() + datetime.timedelta(days=3650) - ).add_extension( - x509.BasicConstraints(ca=True, path_length=None), critical=True - ).sign(root_key, hashes.SHA256(), default_backend()) - with open(CACERTFILE, "wb") as f: - f.write(root_cert.public_bytes(serialization.Encoding.PEM)) - - # Now we want to generate a cert from that root - cert_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) - with open(KEYFILE, "wb") as f: - f.write(cert_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - )) - new_subject = x509.Name([ - x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), - x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), - ]) - cert = x509.CertificateBuilder().subject_name( - new_subject - ).issuer_name( - root_cert.issuer - ).public_key( - cert_key.public_key() - ).serial_number( - x509.random_serial_number() - ).not_valid_before( - datetime.datetime.utcnow() - ).not_valid_after( - datetime.datetime.utcnow() + datetime.timedelta(days=30) - ).add_extension( - x509.SubjectAlternativeName([x509.DNSName(u"localhost")]), - critical=False, - ).sign(root_key, hashes.SHA256(), default_backend()) - # Write our certificate out to disk. - with open(CERTFILE, "wb") as f: - f.write(cert.public_bytes(serialization.Encoding.PEM)) - with open(RABBITMQ_CONF_FILE, "w") as f: # use the old style config format to ensure it also runs on older RabbitMQ versions. f.write(textwrap.dedent(f''' @@ -2729,12 +2738,11 @@ def test_ps_s3_persistent_notification_pushback(): http_server.close() -@attr('manual_test') +@attr('kafka_test') def test_ps_s3_notification_kafka_idle_behaviour(): """ test pushing kafka s3 notification idle behaviour check """ # TODO convert this test to actual running test by changing # os.system call to verify the process idleness - return SkipTest("only used in manual testing") conn = connection() zonegroup = 'default' @@ -2803,10 +2811,19 @@ def test_ps_s3_notification_kafka_idle_behaviour(): time.sleep(5) receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) - print('waiting for 40sec for checking idleness') - time.sleep(40) + is_idle = False - os.system("netstat -nnp | grep 9092"); + while not is_idle: + print('waiting for 10sec for checking idleness') + time.sleep(10) + cmd = "netstat -nnp | grep 9092 | grep radosgw" + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) + out = proc.communicate()[0] + if len(out) == 0: + is_idle = True + else: + print("radosgw<->kafka connection is not idle") + print(out.decode('utf-8')) # do the process of uploading an object and checking for notification again number_of_objects = 10 @@ -3168,6 +3185,14 @@ def persistent_notification(endpoint_type): endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true' # amqp broker guarantee ordering exact_match = True + elif endpoint_type == 'kafka': + # start amqp receiver + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + endpoint_address = 'kafka://' + host + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true' + # amqp broker guarantee ordering + exact_match = True else: return SkipTest('Unknown endpoint type: ' + endpoint_type) @@ -3245,15 +3270,14 @@ def test_ps_s3_persistent_notification_http(): @attr('amqp_test') def test_ps_s3_persistent_notification_amqp(): """ test pushing persistent notification amqp """ - persistent_notification('amqp') -''' + @attr('kafka_test') def test_ps_s3_persistent_notification_kafka(): - """ test pushing persistent notification http """ + """ test pushing persistent notification kafka """ persistent_notification('kafka') -''' + def random_string(length): import string @@ -3648,36 +3672,32 @@ def test_ps_s3_multiple_topics_notification(): http_server.close() -@attr('modification_required') def kafka_security(security_type): - """ test pushing kafka s3 notification on master """ - return SkipTest('This test is yet to be modified.') - + """ test pushing kafka s3 notification securly to master """ conn = connection() - if security_type == 'SSL_SASL' and master_zone.secure_conn is None: - return SkipTest("secure connection is needed to test SASL_SSL security") zonegroup = 'default' # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) # name is constant for manual testing topic_name = bucket_name+'_topic' - # create consumer on the topic - task, receiver = create_kafka_receiver_thread(topic_name) - task.start() # create s3 topic if security_type == 'SSL_SASL': endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' - else: - # ssl only + elif security_type == 'SSL': endpoint_address = 'kafka://' + kafka_server + ':9093' - KAFKA_DIR = os.environ['KAFKA_DIR'] - # without acks from broker, with root CA - endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt' - if security_type == 'SSL_SASL': - topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup, endpoint_args=endpoint_args) else: - topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + assert False, 'unknown security method '+security_type + + KAFKA_DIR = os.environ['KAFKA_DIR'] + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+"/y-ca.crt" + + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + + # create consumer on the topic + task, receiver = create_kafka_receiver_thread(topic_name) + task.start() + topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -3730,14 +3750,12 @@ def kafka_security(security_type): stop_kafka_receiver(receiver, task) -@attr('modification_required') +@attr('kafka_ssl_test') def test_ps_s3_notification_push_kafka_security_ssl(): - return SkipTest('This test is yet to be modified.') kafka_security('SSL') -@attr('modification_required') +@attr('kafka_ssl_test') def test_ps_s3_notification_push_kafka_security_ssl_sasl(): - return SkipTest('This test is yet to be modified.') kafka_security('SSL_SASL') -- 2.39.5