# configure logging for the tests module
log = logging.getLogger(__name__)
+skip_amqp = True
TOPIC_SUFFIX = "_topic"
NOTIFICATION_SUFFIX = "_notif"
# AMQP endpoint functions
-rabbitmq_port = 5672
-
class AMQPReceiver(object):
"""class for receiving and storing messages on a topic from the AMQP broker"""
- def __init__(self, exchange, topic):
+ def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None):
import pika
- hostname = get_ip()
+ import ssl
+
+ if ca_location:
+ ssl_context = ssl.create_default_context()
+ ssl_context.load_verify_locations(cafile=ca_location)
+ ssl_options = pika.SSLOptions(ssl_context)
+ rabbitmq_port = 5671
+ else:
+ rabbitmq_port = 5672
+ ssl_options = None
+
+ if external_endpoint_address:
+ params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
+ else:
+ hostname = get_ip()
+ params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
remaining_retries = 10
while remaining_retries > 0:
try:
- connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+ connection = pika.BlockingConnection(params)
break
except Exception as error:
remaining_retries -= 1
self.events = []
return tmp
+
def amqp_receiver_thread_runner(receiver):
"""main thread function for the amqp receiver"""
try:
log.info('AMQP receiver ended unexpectedly: %s', str(error))
-def create_amqp_receiver_thread(exchange, topic):
+def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None):
"""create amqp receiver and thread"""
- receiver = AMQPReceiver(exchange, topic)
+ receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location)
task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
task.daemon = True
return task, receiver
-
def stop_amqp_receiver(receiver, task):
"""stop the receiver thread and wait for it to finis"""
try:
log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
task.join(5)
+
+def init_rabbitmq():
+ """ start a rabbitmq broker """
+ hostname = get_ip()
+ try:
+ # first try to stop any existing process
+ subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
+ time.sleep(5)
+ proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
+ except Exception as error:
+ log.info('failed to execute rabbitmq-server: %s', str(error))
+ print('failed to execute rabbitmq-server: %s' % str(error))
+ return None
+ # TODO add rabbitmq checkpoint instead of sleep
+ time.sleep(5)
+ return proc
+
+
+def clean_rabbitmq(proc):
+ """ stop the rabbitmq broker """
+ try:
+ subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
+ time.sleep(5)
+ proc.terminate()
+ except:
+ log.info('rabbitmq server already terminated')
+
+
def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
""" verify there is at least one event per element """
err = ''
vstart_secret_key = get_secret_key()
conn = S3Connection(aws_access_key_id=vstart_access_key,
- aws_secret_access_key=vstart_secret_key,
+ aws_secret_access_key=vstart_secret_key,
is_secure=False, port=port_no, host=hostname,
calling_format='boto.s3.connection.OrdinaryCallingFormat')
def test_ps_s3_notification_filter_on_master():
""" test s3 notification filter on master """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
key.delete()
conn.delete_bucket(bucket_name)
stop_amqp_receiver(receiver, task)
+ clean_rabbitmq(proc)
+
def test_ps_s3_notification_errors_on_master():
""" test s3 notification set/get/delete on master """
def test_ps_s3_notification_push_amqp_on_master():
""" test pushing amqp s3 notification on master """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
topic_conf2.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
def test_ps_s3_notification_push_kafka_on_master():
""" test pushing kafka s3 notification on master """
conn.delete_bucket(bucket_name)
http_server.close()
-def test_ps_s3_creation_triggers_on_master():
+def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ if not external_endpoint_address:
+ hostname = 'localhost'
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ else:
+ proc = None
- hostname = get_ip()
conn = connection()
zonegroup = 'default'
# start amqp receiver
exchange = 'ex1'
- task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
task.start()
# create s3 topic
- endpoint_address = 'amqp://' + hostname
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ if external_endpoint_address:
+ endpoint_address = external_endpoint_address
+ elif ca_location:
+ endpoint_address = 'amqps://' + hostname
+ else:
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl
+ if ca_location:
+ endpoint_args += '&ca-location={}'.format(ca_location)
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
key.delete()
# delete the bucket
conn.delete_bucket(bucket_name)
+ if proc:
+ clean_rabbitmq(proc)
+
+
+def test_ps_s3_creation_triggers_on_master():
+ ps_s3_creation_triggers_on_master()
+
+
+def test_ps_s3_creation_triggers_on_master_external():
+ from distutils.util import strtobool
+
+ if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
+ try:
+ if strtobool(os.environ['AMQP_VERIFY_SSL']):
+ verify_ssl = 'true'
+ else:
+ verify_ssl = 'false'
+ except Exception as e:
+ verify_ssl = 'true'
+
+ ps_s3_creation_triggers_on_master(
+ external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
+ verify_ssl=verify_ssl)
+ else:
+ return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
+
+
+def test_ps_s3_creation_triggers_on_master_ssl():
+ import datetime
+ import textwrap
+ import stat
+ from cryptography import x509
+ from cryptography.x509.oid import NameOID
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric import rsa
+ from tempfile import TemporaryDirectory
+
+ with TemporaryDirectory() as tempdir:
+ # modify permissions to ensure that the rabbitmq user can access them
+ os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+ CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
+ CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
+ KEYFILE = os.path.join(tempdir, 'server_key.pem')
+ RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
+
+ root_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ subject = issuer = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
+ ])
+ root_cert = x509.CertificateBuilder().subject_name(
+ subject
+ ).issuer_name(
+ issuer
+ ).public_key(
+ root_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=3650)
+ ).add_extension(
+ x509.BasicConstraints(ca=True, path_length=None), critical=True
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ with open(CACERTFILE, "wb") as f:
+ f.write(root_cert.public_bytes(serialization.Encoding.PEM))
+
+ # Now we want to generate a cert from that root
+ cert_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ with open(KEYFILE, "wb") as f:
+ f.write(cert_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption(),
+ ))
+ new_subject = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ ])
+ cert = x509.CertificateBuilder().subject_name(
+ new_subject
+ ).issuer_name(
+ root_cert.issuer
+ ).public_key(
+ cert_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=30)
+ ).add_extension(
+ x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
+ critical=False,
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ # Write our certificate out to disk.
+ with open(CERTFILE, "wb") as f:
+ f.write(cert.public_bytes(serialization.Encoding.PEM))
+
+ with open(RABBITMQ_CONF_FILE, "w") as f:
+ # use the old style config format to ensure it also runs on older RabbitMQ versions.
+ f.write(textwrap.dedent(f'''
+ [
+ {{rabbit, [
+ {{ssl_listeners, [5671]}},
+ {{ssl_options, [{{cacertfile, "{CACERTFILE}"}},
+ {{certfile, "{CERTFILE}"}},
+ {{keyfile, "{KEYFILE}"}},
+ {{verify, verify_peer}},
+ {{fail_if_no_peer_cert, false}}]}}]}}
+ ].
+ '''))
+ os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
+
+ ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
+
+ del os.environ['RABBITMQ_CONFIG_FILE']
+
def test_ps_s3_multipart_on_master():
""" test multipart object upload on master"""
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
key.delete()
# delete the bucket
conn.delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
def test_ps_s3_metadata_on_master():
""" test s3 notification of metadata on master """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
def test_ps_s3_tags_on_master():
""" test s3 notification of tags on master """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
def test_ps_s3_versioning_on_master():
""" test s3 notification of object versions """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
bucket.delete_key(key.name, version_id=ver2)
bucket.delete_key(key.name, version_id=ver1)
conn.delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
def test_ps_s3_versioned_deletion_on_master():
""" test s3 notification of deletion markers on master """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
hostname = get_ip()
conn = connection()
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
def test_ps_s3_persistent_cleanup():
""" test reservation cleanup after gateway crash """
client_threads.append(thr)
# stop gateway while clients are sending
os.system("killall -9 radosgw");
- zonegroup.master_zone.gateways[0].stop()
print('wait for 10 sec for before restarting the gateway')
time.sleep(10)
- zonegroup.master_zone.gateways[0].start()
+ # TODO: start the radosgw
[thr.join() for thr in client_threads]
keys = list(bucket.list())
def test_ps_s3_persistent_notification_amqp():
""" test pushing persistent notification amqp """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+
persistent_notification('amqp')
+ clean_rabbitmq(proc)
'''
def test_ps_s3_persistent_notification_kafka():
def test_ps_s3_persistent_notification_large():
""" test pushing persistent notification of large notifications """
- return SkipTest('This is an AMQP test.')
+ if skip_amqp:
+ return SkipTest('This is an AMQP test.')
+
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
conn = connection()
zonegroup = 'default'
# delete the bucket
conn.delete_bucket(bucket_name)
stop_amqp_receiver(receiver, task)
-
+ clean_rabbitmq(proc)
def test_ps_s3_topic_update():