]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/test: use 'localhost' for amqp ssl test 40798/head
authorYuval Lifshitz <ylifshit@redhat.com>
Sun, 11 Apr 2021 15:12:35 +0000 (18:12 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 11 Apr 2021 15:16:10 +0000 (18:16 +0300)
also move the amqp ssl tests to 'test_bn.py'
this fix combines commit: 1418bcc1dc3f22257fec840556902b4bf88932b8
with commit: 979335f60a9ab771973c303277be4edb4da55c01

Fixes: https://tracker.ceph.com/issues/49800
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/test/rgw/bucket_notification/test_bn.py

index 9ee363f65877f0dfde2b99185c73d346eef65104..b8745c597190bf4c3ac665d153d528f516cbdfcc 100644 (file)
@@ -33,6 +33,7 @@ import boto.s3.tagging
 # configure logging for the tests module
 log = logging.getLogger(__name__)
 
+skip_amqp = True
 
 TOPIC_SUFFIX = "_topic"
 NOTIFICATION_SUFFIX = "_notif"
@@ -166,17 +167,30 @@ class StreamingHTTPServer:
 
 # AMQP endpoint functions
 
-rabbitmq_port = 5672
-
 class AMQPReceiver(object):
     """class for receiving and storing messages on a topic from the AMQP broker"""
-    def __init__(self, exchange, topic):
+    def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None):
         import pika
-        hostname = get_ip()
+        import ssl
+
+        if ca_location:
+            ssl_context = ssl.create_default_context()
+            ssl_context.load_verify_locations(cafile=ca_location)
+            ssl_options = pika.SSLOptions(ssl_context)
+            rabbitmq_port = 5671
+        else:
+            rabbitmq_port = 5672
+            ssl_options = None
+
+        if external_endpoint_address:
+            params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
+        else:
+            hostname = get_ip()
+            params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
         remaining_retries = 10
         while remaining_retries > 0:
             try:
-                connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+                connection = pika.BlockingConnection(params)
                 break
             except Exception as error:
                 remaining_retries -= 1
@@ -219,6 +233,7 @@ class AMQPReceiver(object):
         self.events = []
         return tmp
 
+
 def amqp_receiver_thread_runner(receiver):
     """main thread function for the amqp receiver"""
     try:
@@ -229,14 +244,13 @@ def amqp_receiver_thread_runner(receiver):
         log.info('AMQP receiver ended unexpectedly: %s', str(error))
 
 
-def create_amqp_receiver_thread(exchange, topic):
+def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None):
     """create amqp receiver and thread"""
-    receiver = AMQPReceiver(exchange, topic)
+    receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location)
     task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
     task.daemon = True
     return task, receiver
 
-
 def stop_amqp_receiver(receiver, task):
     """stop the receiver thread and wait for it to finis"""
     try:
@@ -246,6 +260,34 @@ def stop_amqp_receiver(receiver, task):
         log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
     task.join(5)
 
+
+def init_rabbitmq():
+    """ start a rabbitmq broker """
+    hostname = get_ip()
+    try:
+        # first try to stop any existing process
+        subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
+        time.sleep(5)
+        proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
+    except Exception as error:
+        log.info('failed to execute rabbitmq-server: %s', str(error))
+        print('failed to execute rabbitmq-server: %s' % str(error))
+        return None
+    # TODO add rabbitmq checkpoint instead of sleep
+    time.sleep(5)
+    return proc
+
+
+def clean_rabbitmq(proc):
+    """ stop the rabbitmq broker """
+    try:
+        subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
+        time.sleep(5)
+        proc.terminate()
+    except:
+        log.info('rabbitmq server already terminated')
+
+
 def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
     """ verify there is at least one event per element """
     err = ''
@@ -426,7 +468,7 @@ def connection():
     vstart_secret_key = get_secret_key()
 
     conn = S3Connection(aws_access_key_id=vstart_access_key,
-                     aws_secret_access_key=vstart_secret_key,
+                  aws_secret_access_key=vstart_secret_key,
                       is_secure=False, port=port_no, host=hostname, 
                       calling_format='boto.s3.connection.OrdinaryCallingFormat')
 
@@ -627,7 +669,12 @@ def test_ps_s3_notification_on_master():
 
 def test_ps_s3_notification_filter_on_master():
     """ test s3 notification filter on master """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     
@@ -796,6 +843,8 @@ def test_ps_s3_notification_filter_on_master():
         key.delete()
     conn.delete_bucket(bucket_name)
     stop_amqp_receiver(receiver, task)
+    clean_rabbitmq(proc)
+
 
 def test_ps_s3_notification_errors_on_master():
     """ test s3 notification set/get/delete on master """
@@ -894,7 +943,12 @@ def test_ps_s3_notification_errors_on_master():
 
 def test_ps_s3_notification_push_amqp_on_master():
     """ test pushing amqp s3 notification on master """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -994,6 +1048,8 @@ def test_ps_s3_notification_push_amqp_on_master():
     topic_conf2.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
 
 def test_ps_s3_notification_push_kafka_on_master():
     """ test pushing kafka s3 notification on master """
@@ -1295,11 +1351,19 @@ def test_ps_s3_opaque_data_on_master():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
-def test_ps_s3_creation_triggers_on_master():
+def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
     """ test object creation s3 notifications in using put/copy/post on master"""
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    if not external_endpoint_address:
+        hostname = 'localhost'
+        proc = init_rabbitmq()
+        if proc is  None:
+            return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    else:
+        proc = None
 
-    hostname = get_ip()
     conn = connection()
     zonegroup = 'default'
 
@@ -1310,12 +1374,19 @@ def test_ps_s3_creation_triggers_on_master():
 
     # start amqp receiver
     exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
     task.start()
 
     # create s3 topic
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    if external_endpoint_address:
+        endpoint_address = external_endpoint_address
+    elif ca_location:
+        endpoint_address = 'amqps://' + hostname
+    else:
+        endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl
+    if ca_location:
+        endpoint_args += '&ca-location={}'.format(ca_location)
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -1361,10 +1432,149 @@ def test_ps_s3_creation_triggers_on_master():
         key.delete()
     # delete the bucket
     conn.delete_bucket(bucket_name)
+    if proc:
+        clean_rabbitmq(proc)
+
+
+def test_ps_s3_creation_triggers_on_master():
+    ps_s3_creation_triggers_on_master()
+
+
+def test_ps_s3_creation_triggers_on_master_external():
+    from distutils.util import strtobool
+
+    if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
+        try:
+            if strtobool(os.environ['AMQP_VERIFY_SSL']):
+                verify_ssl = 'true'
+            else:
+                verify_ssl = 'false'
+        except Exception as e:
+            verify_ssl = 'true'
+
+        ps_s3_creation_triggers_on_master(
+            external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
+            verify_ssl=verify_ssl)
+    else:
+        return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
+
+
+def test_ps_s3_creation_triggers_on_master_ssl():
+    import datetime
+    import textwrap
+    import stat
+    from cryptography import x509
+    from cryptography.x509.oid import NameOID
+    from cryptography.hazmat.primitives import hashes
+    from cryptography.hazmat.backends import default_backend
+    from cryptography.hazmat.primitives import serialization
+    from cryptography.hazmat.primitives.asymmetric import rsa
+    from tempfile import TemporaryDirectory
+
+    with TemporaryDirectory() as tempdir:
+        # modify permissions to ensure that the rabbitmq user can access them
+        os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+        CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
+        CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
+        KEYFILE = os.path.join(tempdir, 'server_key.pem')
+        RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
+
+        root_key = rsa.generate_private_key(
+            public_exponent=65537,
+            key_size=2048,
+            backend=default_backend()
+        )
+        subject = issuer = x509.Name([
+            x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+            x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
+        ])
+        root_cert = x509.CertificateBuilder().subject_name(
+            subject
+        ).issuer_name(
+            issuer
+        ).public_key(
+            root_key.public_key()
+        ).serial_number(
+            x509.random_serial_number()
+        ).not_valid_before(
+            datetime.datetime.utcnow()
+        ).not_valid_after(
+            datetime.datetime.utcnow() + datetime.timedelta(days=3650)
+        ).add_extension(
+            x509.BasicConstraints(ca=True, path_length=None), critical=True
+        ).sign(root_key, hashes.SHA256(), default_backend())
+        with open(CACERTFILE, "wb") as f:
+            f.write(root_cert.public_bytes(serialization.Encoding.PEM))
+
+        # Now we want to generate a cert from that root
+        cert_key = rsa.generate_private_key(
+            public_exponent=65537,
+            key_size=2048,
+            backend=default_backend()
+        )
+        with open(KEYFILE, "wb") as f:
+            f.write(cert_key.private_bytes(
+                encoding=serialization.Encoding.PEM,
+                format=serialization.PrivateFormat.TraditionalOpenSSL,
+                encryption_algorithm=serialization.NoEncryption(),
+            ))
+        new_subject = x509.Name([
+            x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+        ])
+        cert = x509.CertificateBuilder().subject_name(
+            new_subject
+        ).issuer_name(
+            root_cert.issuer
+        ).public_key(
+            cert_key.public_key()
+        ).serial_number(
+            x509.random_serial_number()
+        ).not_valid_before(
+            datetime.datetime.utcnow()
+        ).not_valid_after(
+        datetime.datetime.utcnow() + datetime.timedelta(days=30)
+        ).add_extension(
+            x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
+            critical=False,
+        ).sign(root_key, hashes.SHA256(), default_backend())
+        # Write our certificate out to disk.
+        with open(CERTFILE, "wb") as f:
+            f.write(cert.public_bytes(serialization.Encoding.PEM))
+
+        with open(RABBITMQ_CONF_FILE, "w") as f:
+            # use the old style config format to ensure it also runs on older RabbitMQ versions.
+            f.write(textwrap.dedent(f'''
+                [
+                    {{rabbit, [
+                        {{ssl_listeners, [5671]}},
+                        {{ssl_options, [{{cacertfile,           "{CACERTFILE}"}},
+                                        {{certfile,             "{CERTFILE}"}},
+                                        {{keyfile,              "{KEYFILE}"}},
+                                        {{verify,               verify_peer}},
+                                        {{fail_if_no_peer_cert, false}}]}}]}}
+                ].
+            '''))
+        os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
+
+        ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
+
+        del os.environ['RABBITMQ_CONFIG_FILE']
+
 
 def test_ps_s3_multipart_on_master():
     """ test multipart object upload on master"""
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1451,10 +1661,17 @@ def test_ps_s3_multipart_on_master():
         key.delete()
     # delete the bucket
     conn.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
 
 def test_ps_s3_metadata_on_master():
     """ test s3 notification of metadata on master """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1550,10 +1767,17 @@ def test_ps_s3_metadata_on_master():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
 
 def test_ps_s3_tags_on_master():
     """ test s3 notification of tags on master """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1625,10 +1849,17 @@ def test_ps_s3_tags_on_master():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
 
 def test_ps_s3_versioning_on_master():
     """ test s3 notification of object versions """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1694,10 +1925,17 @@ def test_ps_s3_versioning_on_master():
     bucket.delete_key(key.name, version_id=ver2)
     bucket.delete_key(key.name, version_id=ver1)
     conn.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
 
 def test_ps_s3_versioned_deletion_on_master():
     """ test s3 notification of deletion markers on master """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     hostname = get_ip()
     conn = connection()
@@ -1779,6 +2017,8 @@ def test_ps_s3_versioned_deletion_on_master():
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
 
 def test_ps_s3_persistent_cleanup():
     """ test reservation cleanup after gateway crash """
@@ -1825,10 +2065,9 @@ def test_ps_s3_persistent_cleanup():
         client_threads.append(thr)
     # stop gateway while clients are sending
     os.system("killall -9 radosgw");
-    zonegroup.master_zone.gateways[0].stop()
     print('wait for 10 sec for before restarting the gateway')
     time.sleep(10)
-    zonegroup.master_zone.gateways[0].start()
+    # TODO: start the radosgw
     [thr.join() for thr in client_threads]
 
     keys = list(bucket.list())
@@ -2346,8 +2585,15 @@ def test_ps_s3_persistent_notification_http():
 
 def test_ps_s3_persistent_notification_amqp():
     """ test pushing persistent notification amqp """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+
     persistent_notification('amqp')
+    clean_rabbitmq(proc)
 
 '''
 def test_ps_s3_persistent_notification_kafka():
@@ -2362,7 +2608,12 @@ def random_string(length):
 
 def test_ps_s3_persistent_notification_large():
     """ test pushing persistent notification of large notifications """
-    return SkipTest('This is an AMQP test.')
+    if skip_amqp:
+        return SkipTest('This is an AMQP test.')
+
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
     conn = connection()
     zonegroup = 'default'
@@ -2444,7 +2695,7 @@ def test_ps_s3_persistent_notification_large():
     # delete the bucket
     conn.delete_bucket(bucket_name)
     stop_amqp_receiver(receiver, task)
-
+    clean_rabbitmq(proc)
 
 
 def test_ps_s3_topic_update():