-============================
- Bucket Notification tests
-============================
+==========================
+ Bucket Notification Tests
+==========================
You will need to use the sample configuration file named ``bntests.conf.SAMPLE``
that has been provided at ``/path/to/ceph/src/test/rgw/bucket_notification/``. You can also copy this file to the directory where you are
running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started
with vstart.
+For the tests covering Kafka and RabbitMQ security, the RGW will need to accept use/password without TLS connection between the client and the RGW.
+So, the cluster will have to be started with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``.
+For example::
-============
-Kafka tests
-============
+ MON=1 OSD=1 MDS=0 MGR=1 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true"
-You also need to install Kafka which can be done by downloading and unzipping from the following::
+===========
+Kafka Tests
+===========
- https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz
+You also need to install Kafka which can be downloaded from: https://kafka.apache.org/downloads
-Then inside the kafka config directory (``/path/to/kafka-2.6.0-src/config/``) you need to create a file named ``kafka_server_jaas.conf``
-with the following content::
+To test Kafka security, you should first run the ``kafka-security.sh`` script inside the Kafka directory.
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="alice"
- password="alice-secret";
- };
+Then edit the Kafka server properties file (``/path/to/kafka/config/server.properties``)
+to have the following lines::
-After creating this above file run the following command in kafka directory (``/path/to/kafka-2.6.0-src/``)::
+ listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
+ ssl.keystore.location=/home/ylifshit/kafka-3.3.1-src/server.keystore.jks
+ ssl.keystore.password=mypassword
+ ssl.key.password=mypassword
+ ssl.truststore.location=/home/ylifshit/kafka-3.3.1-src/server.truststore.jks
+ ssl.truststore.password=mypassword
+ sasl.enabled.mechanisms=PLAIN
+ listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+ username="alice" \
+ password="alice-secret" \
+ user_alice="alice-secret";
- ./gradlew jar -PscalaVersion=2.13.2
-
-After following the above steps next is you need to start the Zookeeper and Kafka services.
-Here's the commands which can be used to start these services. For starting
-Zookeeper service run::
+After following the above steps, start the Zookeeper and Kafka services.
+For starting Zookeeper service run::
bin/zookeeper-server-start.sh config/zookeeper.properties
-and then run to start the Kafka service::
+and then start the Kafka service::
bin/kafka-server-start.sh config/server.properties
-If you want to run Zookeeper and Kafka services in background add ``-daemon`` at the end of the command like::
+If you want to run Zookeeper and Kafka services in the background add ``-daemon`` at the end of the command like::
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
-After starting vstart, zookeeper and kafka services you're ready to run the Kafka tests::
+After running vstart, Zookeeper, and Kafka services you're ready to run the Kafka tests::
BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_test'
-After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``), zookeeper and kafka services which could be stopped by ``Ctrl+C``.
+To run the Kafka security test, you also need to provide the test with the location of the Kafka directory::
+
+ KAFKA_DIR=/path/to/kafkaBNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_ssl_test'
-===============
-RabbitMQ tests
-===============
+==============
+RabbitMQ Tests
+==============
You need to install RabbitMQ in the following way::
sudo chkconfig rabbitmq-server on
-Finally to start the RabbitMQ server you need to run the following command::
+Finally, to start the RabbitMQ server you need to run the following command::
sudo /sbin/service rabbitmq-server start
sudo /sbin/service rabbitmq-server status
-After starting vstart and RabbitMQ server you're ready to run the AMQP tests::
+After running vstart and RabbitMQ server you're ready to run the AMQP tests::
BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_test'
After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command::
sudo /sbin/service rabbitmq-server stop
+
+To run the RabbitMQ SSL security tests use the following::
+
+ BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_ssl_test'
+
+During these tests, the test script will restart the RabbitMQ server with the correct security configuration (``sudo`` privileges will be needed).
+
# configure logging for the tests module
log = logging.getLogger(__name__)
-skip_amqp_ssl = True
-
TOPIC_SUFFIX = "_topic"
NOTIFICATION_SUFFIX = "_notif"
ssl_options = None
if external_endpoint_address:
- params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
+ if ssl_options:
+ # this is currently not working due to: https://github.com/pika/pika/issues/1192
+ params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
+ else:
+ params = pika.URLParameters(external_endpoint_address)
else:
hostname = get_ip()
params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
+
remaining_retries = 10
while remaining_retries > 0:
try:
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""
- if not external_endpoint_address and not skip_amqp_ssl:
+ if not external_endpoint_address:
hostname = 'localhost'
proc = init_rabbitmq()
if proc is None:
@attr('amqp_test')
def test_ps_s3_creation_triggers_on_master():
- ps_s3_creation_triggers_on_master()
+ ps_s3_creation_triggers_on_master(external_endpoint_address="amqp://localhost:5672")
@attr('amqp_ssl_test')
def test_ps_s3_creation_triggers_on_master_external():
- if skip_amqp_ssl:
- return SkipTest('This is an AMQP SSL test.')
from distutils.util import strtobool
return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
-@attr('amqp_ssl_test')
-def test_ps_s3_creation_triggers_on_master_ssl():
- if skip_amqp_ssl:
- return SkipTest('This is an AMQP SSL test.')
+def generate_private_key(tempdir):
import datetime
- import textwrap
import stat
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
+
+ # modify permissions to ensure that the broker user can access them
+ os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+ CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
+ CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
+ KEYFILE = os.path.join(tempdir, 'server_key.pem')
+
+ root_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ subject = issuer = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
+ ])
+ root_cert = x509.CertificateBuilder().subject_name(
+ subject
+ ).issuer_name(
+ issuer
+ ).public_key(
+ root_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=3650)
+ ).add_extension(
+ x509.BasicConstraints(ca=True, path_length=None), critical=True
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ with open(CACERTFILE, "wb") as f:
+ f.write(root_cert.public_bytes(serialization.Encoding.PEM))
+
+ # Now we want to generate a cert from that root
+ cert_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend(),
+ )
+ with open(KEYFILE, "wb") as f:
+ f.write(cert_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=serialization.NoEncryption(),
+ ))
+ new_subject = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ ])
+ cert = x509.CertificateBuilder().subject_name(
+ new_subject
+ ).issuer_name(
+ root_cert.issuer
+ ).public_key(
+ cert_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=30)
+ ).add_extension(
+ x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
+ critical=False,
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ # Write our certificate out to disk.
+ with open(CERTFILE, "wb") as f:
+ f.write(cert.public_bytes(serialization.Encoding.PEM))
+
+ print("\n\n********private key generated********")
+ print(CACERTFILE, CERTFILE, KEYFILE)
+ print("\n\n")
+ return CACERTFILE, CERTFILE, KEYFILE
+
+
+@attr('amqp_ssl_test')
+def test_ps_s3_creation_triggers_on_master_ssl():
+
+ import textwrap
from tempfile import TemporaryDirectory
with TemporaryDirectory() as tempdir:
- # modify permissions to ensure that the rabbitmq user can access them
- os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
- CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
- CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
- KEYFILE = os.path.join(tempdir, 'server_key.pem')
+ CACERTFILE, CERTFILE, KEYFILE = generate_private_key(tempdir)
RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
-
- root_key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- subject = issuer = x509.Name([
- x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
- x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
- x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
- x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
- x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
- ])
- root_cert = x509.CertificateBuilder().subject_name(
- subject
- ).issuer_name(
- issuer
- ).public_key(
- root_key.public_key()
- ).serial_number(
- x509.random_serial_number()
- ).not_valid_before(
- datetime.datetime.utcnow()
- ).not_valid_after(
- datetime.datetime.utcnow() + datetime.timedelta(days=3650)
- ).add_extension(
- x509.BasicConstraints(ca=True, path_length=None), critical=True
- ).sign(root_key, hashes.SHA256(), default_backend())
- with open(CACERTFILE, "wb") as f:
- f.write(root_cert.public_bytes(serialization.Encoding.PEM))
-
- # Now we want to generate a cert from that root
- cert_key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- with open(KEYFILE, "wb") as f:
- f.write(cert_key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- ))
- new_subject = x509.Name([
- x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
- x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
- x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
- x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
- ])
- cert = x509.CertificateBuilder().subject_name(
- new_subject
- ).issuer_name(
- root_cert.issuer
- ).public_key(
- cert_key.public_key()
- ).serial_number(
- x509.random_serial_number()
- ).not_valid_before(
- datetime.datetime.utcnow()
- ).not_valid_after(
- datetime.datetime.utcnow() + datetime.timedelta(days=30)
- ).add_extension(
- x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
- critical=False,
- ).sign(root_key, hashes.SHA256(), default_backend())
- # Write our certificate out to disk.
- with open(CERTFILE, "wb") as f:
- f.write(cert.public_bytes(serialization.Encoding.PEM))
-
with open(RABBITMQ_CONF_FILE, "w") as f:
# use the old style config format to ensure it also runs on older RabbitMQ versions.
f.write(textwrap.dedent(f'''
http_server.close()
-@attr('manual_test')
+@attr('kafka_test')
def test_ps_s3_notification_kafka_idle_behaviour():
""" test pushing kafka s3 notification idle behaviour check """
# TODO convert this test to actual running test by changing
# os.system call to verify the process idleness
- return SkipTest("only used in manual testing")
conn = connection()
zonegroup = 'default'
time.sleep(5)
receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
- print('waiting for 40sec for checking idleness')
- time.sleep(40)
+ is_idle = False
- os.system("netstat -nnp | grep 9092");
+ while not is_idle:
+ print('waiting for 10sec for checking idleness')
+ time.sleep(10)
+ cmd = "netstat -nnp | grep 9092 | grep radosgw"
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+ out = proc.communicate()[0]
+ if len(out) == 0:
+ is_idle = True
+ else:
+ print("radosgw<->kafka connection is not idle")
+ print(out.decode('utf-8'))
# do the process of uploading an object and checking for notification again
number_of_objects = 10
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
# amqp broker guarantee ordering
exact_match = True
+ elif endpoint_type == 'kafka':
+ # start amqp receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
+ # amqp broker guarantee ordering
+ exact_match = True
else:
return SkipTest('Unknown endpoint type: ' + endpoint_type)
@attr('amqp_test')
def test_ps_s3_persistent_notification_amqp():
""" test pushing persistent notification amqp """
-
persistent_notification('amqp')
-'''
+
@attr('kafka_test')
def test_ps_s3_persistent_notification_kafka():
- """ test pushing persistent notification http """
+ """ test pushing persistent notification kafka """
persistent_notification('kafka')
-'''
+
def random_string(length):
import string
http_server.close()
-@attr('modification_required')
def kafka_security(security_type):
- """ test pushing kafka s3 notification on master """
- return SkipTest('This test is yet to be modified.')
-
+ """ test pushing kafka s3 notification securly to master """
conn = connection()
- if security_type == 'SSL_SASL' and master_zone.secure_conn is None:
- return SkipTest("secure connection is needed to test SASL_SSL security")
zonegroup = 'default'
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
# name is constant for manual testing
topic_name = bucket_name+'_topic'
- # create consumer on the topic
- task, receiver = create_kafka_receiver_thread(topic_name)
- task.start()
# create s3 topic
if security_type == 'SSL_SASL':
endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
- else:
- # ssl only
+ elif security_type == 'SSL':
endpoint_address = 'kafka://' + kafka_server + ':9093'
- KAFKA_DIR = os.environ['KAFKA_DIR']
- # without acks from broker, with root CA
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
- if security_type == 'SSL_SASL':
- topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup, endpoint_args=endpoint_args)
else:
- topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ assert False, 'unknown security method '+security_type
+
+ KAFKA_DIR = os.environ['KAFKA_DIR']
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+"/y-ca.crt"
+
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+
+ # create consumer on the topic
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
stop_kafka_receiver(receiver, task)
-@attr('modification_required')
+@attr('kafka_ssl_test')
def test_ps_s3_notification_push_kafka_security_ssl():
- return SkipTest('This test is yet to be modified.')
kafka_security('SSL')
-@attr('modification_required')
+@attr('kafka_ssl_test')
def test_ps_s3_notification_push_kafka_security_ssl_sasl():
- return SkipTest('This test is yet to be modified.')
kafka_security('SSL_SASL')