From 0fcb3372b41bb7e3dbacabf527669eaa673b2a57 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 11 Apr 2021 18:12:35 +0300 Subject: [PATCH] rgw/test: use 'localhost' for amqp ssl test also move the amqp ssl tests to 'test_bn.py' this fix combines commit: 1418bcc1dc3f22257fec840556902b4bf88932b8 with commit: 979335f60a9ab771973c303277be4edb4da55c01 Fixes: https://tracker.ceph.com/issues/49800 Signed-off-by: Yuval Lifshitz --- src/test/rgw/bucket_notification/test_bn.py | 305 ++++++++++++++++++-- 1 file changed, 278 insertions(+), 27 deletions(-) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 9ee363f65877f..b8745c597190b 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -33,6 +33,7 @@ import boto.s3.tagging # configure logging for the tests module log = logging.getLogger(__name__) +skip_amqp = True TOPIC_SUFFIX = "_topic" NOTIFICATION_SUFFIX = "_notif" @@ -166,17 +167,30 @@ class StreamingHTTPServer: # AMQP endpoint functions -rabbitmq_port = 5672 - class AMQPReceiver(object): """class for receiving and storing messages on a topic from the AMQP broker""" - def __init__(self, exchange, topic): + def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None): import pika - hostname = get_ip() + import ssl + + if ca_location: + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations(cafile=ca_location) + ssl_options = pika.SSLOptions(ssl_context) + rabbitmq_port = 5671 + else: + rabbitmq_port = 5672 + ssl_options = None + + if external_endpoint_address: + params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options) + else: + hostname = get_ip() + params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options) remaining_retries = 10 while remaining_retries > 0: try: - connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port)) + connection = pika.BlockingConnection(params) break except Exception as error: remaining_retries -= 1 @@ -219,6 +233,7 @@ class AMQPReceiver(object): self.events = [] return tmp + def amqp_receiver_thread_runner(receiver): """main thread function for the amqp receiver""" try: @@ -229,14 +244,13 @@ def amqp_receiver_thread_runner(receiver): log.info('AMQP receiver ended unexpectedly: %s', str(error)) -def create_amqp_receiver_thread(exchange, topic): +def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None): """create amqp receiver and thread""" - receiver = AMQPReceiver(exchange, topic) + receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location) task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver - def stop_amqp_receiver(receiver, task): """stop the receiver thread and wait for it to finis""" try: @@ -246,6 +260,34 @@ def stop_amqp_receiver(receiver, task): log.info('failed to gracefuly stop AMQP receiver: %s', str(error)) task.join(5) + +def init_rabbitmq(): + """ start a rabbitmq broker """ + hostname = get_ip() + try: + # first try to stop any existing process + subprocess.call(['sudo', 'rabbitmqctl', 'stop']) + time.sleep(5) + proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server']) + except Exception as error: + log.info('failed to execute rabbitmq-server: %s', str(error)) + print('failed to execute rabbitmq-server: %s' % str(error)) + return None + # TODO add rabbitmq checkpoint instead of sleep + time.sleep(5) + return proc + + +def clean_rabbitmq(proc): + """ stop the rabbitmq broker """ + try: + subprocess.call(['sudo', 'rabbitmqctl', 'stop']) + time.sleep(5) + proc.terminate() + except: + log.info('rabbitmq server already terminated') + + def verify_events_by_elements(events, keys, exact_match=False, deletions=False): """ verify there is at least one event per element """ err = '' @@ -426,7 +468,7 @@ def connection(): vstart_secret_key = get_secret_key() conn = S3Connection(aws_access_key_id=vstart_access_key, - aws_secret_access_key=vstart_secret_key, + aws_secret_access_key=vstart_secret_key, is_secure=False, port=port_no, host=hostname, calling_format='boto.s3.connection.OrdinaryCallingFormat') @@ -627,7 +669,12 @@ def test_ps_s3_notification_on_master(): def test_ps_s3_notification_filter_on_master(): """ test s3 notification filter on master """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') hostname = get_ip() @@ -796,6 +843,8 @@ def test_ps_s3_notification_filter_on_master(): key.delete() conn.delete_bucket(bucket_name) stop_amqp_receiver(receiver, task) + clean_rabbitmq(proc) + def test_ps_s3_notification_errors_on_master(): """ test s3 notification set/get/delete on master """ @@ -894,7 +943,12 @@ def test_ps_s3_notification_errors_on_master(): def test_ps_s3_notification_push_amqp_on_master(): """ test pushing amqp s3 notification on master """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') hostname = get_ip() conn = connection() @@ -994,6 +1048,8 @@ def test_ps_s3_notification_push_amqp_on_master(): topic_conf2.del_config() # delete the bucket conn.delete_bucket(bucket_name) + clean_rabbitmq(proc) + def test_ps_s3_notification_push_kafka_on_master(): """ test pushing kafka s3 notification on master """ @@ -1295,11 +1351,19 @@ def test_ps_s3_opaque_data_on_master(): conn.delete_bucket(bucket_name) http_server.close() -def test_ps_s3_creation_triggers_on_master(): +def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): """ test object creation s3 notifications in using put/copy/post on master""" - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + if not external_endpoint_address: + hostname = 'localhost' + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + else: + proc = None - hostname = get_ip() conn = connection() zonegroup = 'default' @@ -1310,12 +1374,19 @@ def test_ps_s3_creation_triggers_on_master(): # start amqp receiver exchange = 'ex1' - task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location) task.start() # create s3 topic - endpoint_address = 'amqp://' + hostname - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + if external_endpoint_address: + endpoint_address = external_endpoint_address + elif ca_location: + endpoint_address = 'amqps://' + hostname + else: + endpoint_address = 'amqp://' + hostname + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl + if ca_location: + endpoint_args += '&ca-location={}'.format(ca_location) topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -1361,10 +1432,149 @@ def test_ps_s3_creation_triggers_on_master(): key.delete() # delete the bucket conn.delete_bucket(bucket_name) + if proc: + clean_rabbitmq(proc) + + +def test_ps_s3_creation_triggers_on_master(): + ps_s3_creation_triggers_on_master() + + +def test_ps_s3_creation_triggers_on_master_external(): + from distutils.util import strtobool + + if 'AMQP_EXTERNAL_ENDPOINT' in os.environ: + try: + if strtobool(os.environ['AMQP_VERIFY_SSL']): + verify_ssl = 'true' + else: + verify_ssl = 'false' + except Exception as e: + verify_ssl = 'true' + + ps_s3_creation_triggers_on_master( + external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'], + verify_ssl=verify_ssl) + else: + return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run") + + +def test_ps_s3_creation_triggers_on_master_ssl(): + import datetime + import textwrap + import stat + from cryptography import x509 + from cryptography.x509.oid import NameOID + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + from tempfile import TemporaryDirectory + + with TemporaryDirectory() as tempdir: + # modify permissions to ensure that the rabbitmq user can access them + os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) + CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem') + CERTFILE = os.path.join(tempdir, 'server_certificate.pem') + KEYFILE = os.path.join(tempdir, 'server_key.pem') + RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config') + + root_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), + x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"), + ]) + root_cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + root_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=3650) + ).add_extension( + x509.BasicConstraints(ca=True, path_length=None), critical=True + ).sign(root_key, hashes.SHA256(), default_backend()) + with open(CACERTFILE, "wb") as f: + f.write(root_cert.public_bytes(serialization.Encoding.PEM)) + + # Now we want to generate a cert from that root + cert_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + with open(KEYFILE, "wb") as f: + f.write(cert_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + )) + new_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), + ]) + cert = x509.CertificateBuilder().subject_name( + new_subject + ).issuer_name( + root_cert.issuer + ).public_key( + cert_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=30) + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName(u"localhost")]), + critical=False, + ).sign(root_key, hashes.SHA256(), default_backend()) + # Write our certificate out to disk. + with open(CERTFILE, "wb") as f: + f.write(cert.public_bytes(serialization.Encoding.PEM)) + + with open(RABBITMQ_CONF_FILE, "w") as f: + # use the old style config format to ensure it also runs on older RabbitMQ versions. + f.write(textwrap.dedent(f''' + [ + {{rabbit, [ + {{ssl_listeners, [5671]}}, + {{ssl_options, [{{cacertfile, "{CACERTFILE}"}}, + {{certfile, "{CERTFILE}"}}, + {{keyfile, "{KEYFILE}"}}, + {{verify, verify_peer}}, + {{fail_if_no_peer_cert, false}}]}}]}} + ]. + ''')) + os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0] + + ps_s3_creation_triggers_on_master(ca_location=CACERTFILE) + + del os.environ['RABBITMQ_CONFIG_FILE'] + def test_ps_s3_multipart_on_master(): """ test multipart object upload on master""" - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') hostname = get_ip() conn = connection() @@ -1451,10 +1661,17 @@ def test_ps_s3_multipart_on_master(): key.delete() # delete the bucket conn.delete_bucket(bucket_name) + clean_rabbitmq(proc) + def test_ps_s3_metadata_on_master(): """ test s3 notification of metadata on master """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') hostname = get_ip() conn = connection() @@ -1550,10 +1767,17 @@ def test_ps_s3_metadata_on_master(): topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) + clean_rabbitmq(proc) + def test_ps_s3_tags_on_master(): """ test s3 notification of tags on master """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') hostname = get_ip() conn = connection() @@ -1625,10 +1849,17 @@ def test_ps_s3_tags_on_master(): topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) + clean_rabbitmq(proc) + def test_ps_s3_versioning_on_master(): """ test s3 notification of object versions """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') hostname = get_ip() conn = connection() @@ -1694,10 +1925,17 @@ def test_ps_s3_versioning_on_master(): bucket.delete_key(key.name, version_id=ver2) bucket.delete_key(key.name, version_id=ver1) conn.delete_bucket(bucket_name) + clean_rabbitmq(proc) + def test_ps_s3_versioned_deletion_on_master(): """ test s3 notification of deletion markers on master """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') hostname = get_ip() conn = connection() @@ -1779,6 +2017,8 @@ def test_ps_s3_versioned_deletion_on_master(): topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) + clean_rabbitmq(proc) + def test_ps_s3_persistent_cleanup(): """ test reservation cleanup after gateway crash """ @@ -1825,10 +2065,9 @@ def test_ps_s3_persistent_cleanup(): client_threads.append(thr) # stop gateway while clients are sending os.system("killall -9 radosgw"); - zonegroup.master_zone.gateways[0].stop() print('wait for 10 sec for before restarting the gateway') time.sleep(10) - zonegroup.master_zone.gateways[0].start() + # TODO: start the radosgw [thr.join() for thr in client_threads] keys = list(bucket.list()) @@ -2346,8 +2585,15 @@ def test_ps_s3_persistent_notification_http(): def test_ps_s3_persistent_notification_amqp(): """ test pushing persistent notification amqp """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + persistent_notification('amqp') + clean_rabbitmq(proc) ''' def test_ps_s3_persistent_notification_kafka(): @@ -2362,7 +2608,12 @@ def random_string(length): def test_ps_s3_persistent_notification_large(): """ test pushing persistent notification of large notifications """ - return SkipTest('This is an AMQP test.') + if skip_amqp: + return SkipTest('This is an AMQP test.') + + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') conn = connection() zonegroup = 'default' @@ -2444,7 +2695,7 @@ def test_ps_s3_persistent_notification_large(): # delete the bucket conn.delete_bucket(bucket_name) stop_amqp_receiver(receiver, task) - + clean_rabbitmq(proc) def test_ps_s3_topic_update(): -- 2.39.5