From 93c8143ba61059974b116cba28bf1f221ded2f59 Mon Sep 17 00:00:00 2001 From: Tom Schoonjans Date: Tue, 9 Feb 2021 20:19:24 +0000 Subject: [PATCH] rgw: add support for SSL encrypted AMQP connections Fixes: https://tracker.ceph.com/issues/42902 Signed-off-by: Tom Schoonjans (cherry picked from commit 1418bcc1dc3f22257fec840556902b4bf88932b8) --- doc/radosgw/notifications.rst | 6 +- doc/radosgw/pubsub-module.rst | 6 +- src/rgw/CMakeLists.txt | 2 + src/rgw/rgw_amqp.cc | 62 +++++++- src/rgw/rgw_amqp.h | 4 +- src/rgw/rgw_pubsub_push.cc | 24 ++- src/test/rgw/amqp_mock.cc | 39 ++++- src/test/rgw/rgw_multi/tests_ps.py | 193 ++++++++++++++++++++++--- src/test/rgw/test_rgw_amqp.cc | 79 +++++++--- src/tools/ceph-dencoder/CMakeLists.txt | 2 +- 10 files changed, 355 insertions(+), 62 deletions(-) diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index bda5e0e973129..b94098ddf2b7c 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -136,11 +136,13 @@ Request parameters: - AMQP0.9.1 endpoint - - URI: ``amqp://[:@][:][/]`` + - URI: ``amqp[s]://[:@][:][/]`` - user/password defaults to: guest/guest - user/password may only be provided over HTTPS. If not, topic creation request will be rejected. - - port defaults to: 5672 + - port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections - vhost defaults to: "/" + - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default) + - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist: diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index e2eaac7c8e24f..1ae0c20735f4c 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -220,11 +220,13 @@ The endpoint URI may include parameters depending with the type of endpoint: - AMQP0.9.1 endpoint - - URI: ``amqp://[:@][:][/]`` + - URI: ``amqp[s]://[:@][:][/]`` - user/password defaults to: guest/guest - user/password may only be provided over HTTPS. Topic creation request will be rejected if not - - port defaults to: 5672 + - port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections - vhost defaults to: "/" + - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default) + - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist: diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index b009842403a14..ab4a239c97587 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -272,6 +272,7 @@ set(rgw_libs rgw_a) if(WITH_RADOSGW_AMQP_ENDPOINT) # used by rgw_amqp.cc list(APPEND rgw_libs RabbitMQ::RabbitMQ) + list(APPEND rgw_libs OpenSSL::SSL) endif() if(WITH_RADOSGW_KAFKA_ENDPOINT) # used by rgw_kafka.cc @@ -412,6 +413,7 @@ target_link_libraries(rgw if(WITH_RADOSGW_AMQP_ENDPOINT) target_link_libraries(rgw PRIVATE RabbitMQ::RabbitMQ) + target_link_libraries(rgw PRIVATE OpenSSL::SSL) endif() if(WITH_RADOSGW_KAFKA_ENDPOINT) diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index ff30a7841d572..7a1b9df50619b 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -3,6 +3,7 @@ #include "rgw_amqp.h" #include +#include #include #include #include "include/ceph_assert.h" @@ -16,6 +17,7 @@ #include #include #include "common/dout.h" +#include #define dout_subsys ceph_subsys_rgw @@ -43,6 +45,7 @@ static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006; static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007; static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008; static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009; +static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010; static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008; static const int RGW_AMQP_NO_REPLY_CODE = 0x0; @@ -126,6 +129,8 @@ struct connection_t { CallbackList callbacks; ceph::coarse_real_clock::time_point next_reconnect; bool mandatory; + bool verify_ssl; + boost::optional ca_location; // default ctor connection_t() : @@ -139,7 +144,9 @@ struct connection_t { ref_count(0), cct(nullptr), next_reconnect(ceph::coarse_real_clock::now()), - mandatory(false) + mandatory(false), + verify_ssl(false), + ca_location(boost::none) {} // cleanup of all internal connection resource @@ -351,6 +358,8 @@ std::string status_to_string(int s) { return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED"; case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED: return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED"; + case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED: + return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED"; } return to_string((amqp_status_enum)s); } @@ -395,11 +404,46 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio ConnectionCleaner state_guard(state); // create and open socket - auto socket = amqp_tcp_socket_new(state); + amqp_socket_t *socket = nullptr; + if (info.ssl) { + socket = amqp_ssl_socket_new(state); +#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1) + SSL_CTX* ssl_ctx = reinterpret_cast(amqp_ssl_socket_get_context(socket)); +#else + // taken from https://github.com/alanxz/rabbitmq-c/pull/560 + struct hack { + const struct amqp_socket_class_t *klass; + SSL_CTX *ctx; + }; + + struct hack *h = reinterpret_cast(socket); + SSL_CTX* ssl_ctx = h->ctx; +#endif + // ensure system CA certificates get loaded + SSL_CTX_set_default_verify_paths(ssl_ctx); + } + else { + socket = amqp_tcp_socket_new(state); + } + if (!socket) { conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED; return conn; } + if (info.ssl) { + if (!conn->verify_ssl) { + amqp_ssl_socket_set_verify_peer(socket, 0); + amqp_ssl_socket_set_verify_hostname(socket, 0); + } + if (conn->ca_location.has_value()) { + const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str()); + if (s != AMQP_STATUS_OK) { + conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED; + conn->reply_code = s; + return conn; + } + } + } const auto s = amqp_socket_open(socket, info.host, info.port); if (s < 0) { conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED; @@ -494,7 +538,7 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio // utility function to create a new connection connection_ptr_t create_new_connection(const amqp_connection_info& info, - const std::string& exchange, bool mandatory_delivery, CephContext* cct) { + const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional ca_location) { // create connection state connection_ptr_t conn = new connection_t; conn->exchange = exchange; @@ -502,6 +546,8 @@ connection_ptr_t create_new_connection(const amqp_connection_info& info, conn->password.assign(info.password); conn->mandatory = mandatory_delivery; conn->cct = cct; + conn->verify_ssl = verify_ssl; + conn->ca_location = ca_location; return create_connection(conn, info); } @@ -851,7 +897,8 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) { + connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, + boost::optional ca_location) { if (stopped) { ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl; return nullptr; @@ -886,7 +933,7 @@ public: ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl; return nullptr; } - const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct); + const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location); if (!conn->is_ok()) { ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" << status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl; @@ -1002,9 +1049,10 @@ void shutdown() { s_manager = nullptr; } -connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) { +connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, + boost::optional ca_location) { if (!s_manager) return nullptr; - return s_manager->connect(url, exchange, mandatory_delivery); + return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location); } int publish(connection_ptr_t& conn, diff --git a/src/rgw/rgw_amqp.h b/src/rgw/rgw_amqp.h index eaf97ed9dc01a..7a2316858ca43 100644 --- a/src/rgw/rgw_amqp.h +++ b/src/rgw/rgw_amqp.h @@ -5,6 +5,7 @@ #include #include +#include #include #include "include/common_fwd.h" @@ -30,7 +31,8 @@ bool init(CephContext* cct); void shutdown(); // connect to an amqp endpoint -connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery); +connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, + boost::optional ca_location); // publish a message over a connection that was already created int publish(connection_ptr_t& conn, diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index c6fbe325527ef..3b5b926661051 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -173,6 +173,23 @@ private: ack_level_t ack_level; amqp::connection_ptr_t conn; + bool get_verify_ssl(const RGWHTTPArgs& args) { + bool exists; + auto str_verify_ssl = args.get("verify-ssl", &exists); + if (!exists) { + // verify server certificate by default + return true; + } + boost::algorithm::to_lower(str_verify_ssl); + if (str_verify_ssl == "true") { + return true; + } + if (str_verify_ssl == "false") { + return false; + } + throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl); + } + std::string get_exchange(const RGWHTTPArgs& args) { bool exists; const auto exchange = args.get("amqp-exchange", &exists); @@ -297,7 +314,7 @@ public: topic(_topic), exchange(get_exchange(args)), ack_level(get_ack_level(args)), - conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) { + conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) { if (!conn) { throw configuration_error("AMQP: failed to create connection to: " + endpoint); } @@ -691,7 +708,7 @@ const std::string& get_schema(const std::string& endpoint) { if (schema == "http" || schema == "https") { return WEBHOOK_SCHEMA; #ifdef WITH_RADOSGW_AMQP_ENDPOINT - } else if (schema == "amqp") { + } else if (schema == "amqp" || schema == "amqps") { return AMQP_SCHEMA; #endif #ifdef WITH_RADOSGW_KAFKA_ENDPOINT @@ -725,9 +742,6 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, throw configuration_error("AMQP: unknown version: " + version); return nullptr; } - } else if (schema == "amqps") { - throw configuration_error("AMQP: ssl not supported"); - return nullptr; #endif #ifdef WITH_RADOSGW_KAFKA_ENDPOINT } else if (schema == KAFKA_SCHEMA) { diff --git a/src/test/rgw/amqp_mock.cc b/src/test/rgw/amqp_mock.cc index e37151e48fc72..5f0b00e9f1e0f 100644 --- a/src/test/rgw/amqp_mock.cc +++ b/src/test/rgw/amqp_mock.cc @@ -3,11 +3,13 @@ #include "amqp_mock.h" #include +#include #include #include #include #include #include +#include namespace amqp_mock { @@ -74,6 +76,7 @@ struct amqp_connection_state_t_ { amqp_rpc_reply_t reply; amqp_basic_ack_t ack; amqp_basic_nack_t nack; + bool use_ssl; // ctor amqp_connection_state_t_() : socket(nullptr), @@ -86,15 +89,18 @@ struct amqp_connection_state_t_ { login_called(false), ack_list(1024), nack_list(1024), - delivery_tag(1) { + delivery_tag(1), + use_ssl(false) { reply.reply_type = AMQP_RESPONSE_NONE; } }; struct amqp_socket_t_ { + void *klass; + void *ssl_ctx; bool open_called; // ctor - amqp_socket_t_() : open_called(false) { + amqp_socket_t_() : klass(nullptr), ssl_ctx(nullptr), open_called(false) { } }; @@ -120,6 +126,35 @@ amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) { return state->socket; } +amqp_socket_t* amqp_ssl_socket_new(amqp_connection_state_t state) { + state->socket = new amqp_socket_t; + state->use_ssl = true; + return state->socket; +} + +int amqp_ssl_socket_set_cacert(amqp_socket_t *self, const char *cacert) { + // do nothing + return AMQP_STATUS_OK; +} + +void amqp_ssl_socket_set_verify_peer(amqp_socket_t *self, amqp_boolean_t verify) { + // do nothing +} + +void amqp_ssl_socket_set_verify_hostname(amqp_socket_t *self, amqp_boolean_t verify) { + // do nothing +} + +#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1) +void* amqp_ssl_socket_get_context(amqp_socket_t *self) { + return nullptr; +} +#endif + +int SSL_CTX_set_default_verify_paths(SSL_CTX *ctx) { + return 1; +} + int amqp_socket_open(amqp_socket_t *self, const char *host, int port) { if (!self) { return -1; diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 5fac1cdfb81de..fd6e73e868adf 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -168,17 +168,31 @@ class StreamingHTTPServer: # AMQP endpoint functions -rabbitmq_port = 5672 class AMQPReceiver(object): """class for receiving and storing messages on a topic from the AMQP broker""" - def __init__(self, exchange, topic): + def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None): import pika - hostname = get_ip() + import ssl + + if ca_location: + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations(cafile=ca_location) + ssl_options = pika.SSLOptions(ssl_context) + rabbitmq_port = 5671 + else: + rabbitmq_port = 5672 + ssl_options = None + + if external_endpoint_address: + params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options) + else: + hostname = get_ip() + params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options) remaining_retries = 10 while remaining_retries > 0: try: - connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port)) + connection = pika.BlockingConnection(params) break except Exception as error: remaining_retries -= 1 @@ -232,9 +246,9 @@ def amqp_receiver_thread_runner(receiver): log.info('AMQP receiver ended unexpectedly: %s', str(error)) -def create_amqp_receiver_thread(exchange, topic): +def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None): """create amqp receiver and thread""" - receiver = AMQPReceiver(exchange, topic) + receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location) task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver @@ -378,7 +392,7 @@ def init_rabbitmq(): # TODO: support multiple brokers per host using env # make sure we don't collide with the default try: - proc = subprocess.Popen(['sudo', 'rabbitmq-server']) + proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server']) except Exception as error: log.info('failed to execute rabbitmq-server: %s', str(error)) print('failed to execute rabbitmq-server: %s' % str(error)) @@ -3466,15 +3480,17 @@ def test_ps_creation_triggers(): key.delete() master_zone.delete_bucket(bucket_name) - -def test_ps_s3_creation_triggers_on_master(): +def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): """ test object creation s3 notifications in using put/copy/post on master""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") - hostname = get_ip() - proc = init_rabbitmq() - if proc is None: - return SkipTest('end2end amqp tests require rabbitmq-server installed') + if not external_endpoint_address: + hostname = get_ip() + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + else: + proc = None master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() @@ -3486,13 +3502,23 @@ def test_ps_s3_creation_triggers_on_master(): # start amqp receiver exchange = 'ex1' - task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location) task.start() # create s3 topic - endpoint_address = 'amqp://' + hostname - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + if external_endpoint_address: + endpoint_address = external_endpoint_address + elif ca_location: + endpoint_address = 'amqps://' + hostname + else: + endpoint_address = 'amqp://' + hostname + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl + if ca_location: + endpoint_args += '&ca-location={}'.format(ca_location) + if external_endpoint_address: + topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + else: + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -3537,7 +3563,138 @@ def test_ps_s3_creation_triggers_on_master(): key.delete() # delete the bucket master_zone.delete_bucket(bucket_name) - clean_rabbitmq(proc) + if proc: + clean_rabbitmq(proc) + + +def test_ps_s3_creation_triggers_on_master(): + ps_s3_creation_triggers_on_master() + + +def test_ps_s3_creation_triggers_on_master_external(): + from distutils.util import strtobool + + if 'AMQP_EXTERNAL_ENDPOINT' in os.environ: + try: + if strtobool(os.environ['AMQP_VERIFY_SSL']): + verify_ssl = 'true' + else: + verify_ssl = 'false' + except Exception as e: + verify_ssl = 'true' + + ps_s3_creation_triggers_on_master( + external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'], + verify_ssl=verify_ssl) + else: + return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run") + +def test_ps_s3_creation_triggers_on_master_ssl(): + import datetime + import textwrap + import stat + from cryptography import x509 + from cryptography.x509.oid import NameOID + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + from tempfile import TemporaryDirectory + + with TemporaryDirectory() as tempdir: + # modify permissions to ensure that the rabbitmq user can access them + os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) + CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem') + CERTFILE = os.path.join(tempdir, 'server_certificate.pem') + KEYFILE = os.path.join(tempdir, 'server_key.pem') + RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config') + + root_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), + x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"), + ]) + root_cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + root_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=3650) + ).add_extension( + x509.BasicConstraints(ca=True, path_length=None), critical=True + ).sign(root_key, hashes.SHA256(), default_backend()) + with open(CACERTFILE, "wb") as f: + f.write(root_cert.public_bytes(serialization.Encoding.PEM)) + + # Now we want to generate a cert from that root + cert_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + with open(KEYFILE, "wb") as f: + f.write(cert_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + )) + new_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), + ]) + cert = x509.CertificateBuilder().subject_name( + new_subject + ).issuer_name( + root_cert.issuer + ).public_key( + cert_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=30) + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName(u"localhost")]), + critical=False, + ).sign(root_key, hashes.SHA256(), default_backend()) + # Write our certificate out to disk. + with open(CERTFILE, "wb") as f: + f.write(cert.public_bytes(serialization.Encoding.PEM)) + + with open(RABBITMQ_CONF_FILE, "w") as f: + # use the old style config format to ensure it also runs on older RabbitMQ versions. + f.write(textwrap.dedent(f''' + [ + {{rabbit, [ + {{ssl_listeners, [5671]}}, + {{ssl_options, [{{cacertfile, "{CACERTFILE}"}}, + {{certfile, "{CERTFILE}"}}, + {{keyfile, "{KEYFILE}"}}, + {{verify, verify_peer}}, + {{fail_if_no_peer_cert, false}}]}}]}} + ]. + ''')) + os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0] + + ps_s3_creation_triggers_on_master(ca_location=CACERTFILE) + + del os.environ['RABBITMQ_CONFIG_FILE'] def test_ps_s3_multipart_on_master(): diff --git a/src/test/rgw/test_rgw_amqp.cc b/src/test/rgw/test_rgw_amqp.cc index 13bab823f02cf..421f757515bcf 100644 --- a/src/test/rgw/test_rgw_amqp.cc +++ b/src/test/rgw/test_rgw_amqp.cc @@ -61,19 +61,50 @@ protected: TEST_F(TestAMQP, ConnectionOK) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost", "ex1", false); + conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_EQ(rc, 0); } +TEST_F(TestAMQP, SSLConnectionOK) +{ + const int port = 5671; + const auto connection_number = amqp::get_connection_count(); + amqp_mock::set_valid_port(port); + conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(conn); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + auto rc = amqp::publish(conn, "topic", "message"); + EXPECT_EQ(rc, 0); + amqp_mock::set_valid_port(5672); +} + +TEST_F(TestAMQP, PlainAndSSLConnectionsOK) +{ + const int port = 5671; + const auto connection_number = amqp::get_connection_count(); + amqp_mock::set_valid_port(port); + amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(conn1); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + auto rc = amqp::publish(conn1, "topic", "message"); + EXPECT_EQ(rc, 0); + amqp_mock::set_valid_port(5672); + amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(conn2); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 2); + rc = amqp::publish(conn2, "topic", "message"); + EXPECT_EQ(rc, 0); +} + TEST_F(TestAMQP, ConnectionReuse) { - amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false); + amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); EXPECT_TRUE(conn1); const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false); + amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); EXPECT_TRUE(conn2); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn1, "topic", "message"); @@ -83,7 +114,7 @@ TEST_F(TestAMQP, ConnectionReuse) TEST_F(TestAMQP, NameResolutionFail) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://kaboom", "ex1", false); + conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -93,7 +124,7 @@ TEST_F(TestAMQP, NameResolutionFail) TEST_F(TestAMQP, InvalidPort) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost:1234", "ex1", false); + conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -103,7 +134,7 @@ TEST_F(TestAMQP, InvalidPort) TEST_F(TestAMQP, InvalidHost) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://0.0.0.1", "ex1", false); + conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -113,7 +144,7 @@ TEST_F(TestAMQP, InvalidHost) TEST_F(TestAMQP, InvalidVhost) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost/kaboom", "ex1", false); + conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -125,7 +156,7 @@ TEST_F(TestAMQP, UserPassword) amqp_mock::set_valid_host("127.0.0.1"); { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false); + conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -135,7 +166,7 @@ TEST_F(TestAMQP, UserPassword) amqp_mock::set_valid_host("127.0.0.2"); { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false); + conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -147,7 +178,7 @@ TEST_F(TestAMQP, UserPassword) TEST_F(TestAMQP, URLParseError) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("http://localhost", "ex1", false); + conn = amqp::connect("http://localhost", "ex1", false, false, boost::none); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); @@ -157,7 +188,7 @@ TEST_F(TestAMQP, URLParseError) TEST_F(TestAMQP, ExchangeMismatch) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("http://localhost", "ex2", false); + conn = amqp::connect("http://localhost", "ex2", false, false, boost::none); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); @@ -172,7 +203,7 @@ TEST_F(TestAMQP, MaxConnections) while (remaining_connections > 0) { const auto host = "127.10.0." + std::to_string(remaining_connections); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_EQ(rc, 0); @@ -184,7 +215,7 @@ TEST_F(TestAMQP, MaxConnections) { const std::string host = "toomany"; amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_FALSE(conn); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); @@ -246,7 +277,7 @@ TEST_F(TestAMQP, ReceiveAck) callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); @@ -260,7 +291,7 @@ TEST_F(TestAMQP, ImplicitConnectionClose) callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); const auto NUMBER_OF_CALLS = 2000; for (auto i = 0; i < NUMBER_OF_CALLS; ++i) { @@ -278,7 +309,7 @@ TEST_F(TestAMQP, ReceiveMultipleAck) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); const auto NUMBER_OF_CALLS = 100; for (auto i=0; i < NUMBER_OF_CALLS; ++i) { @@ -296,7 +327,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; @@ -315,7 +346,7 @@ TEST_F(TestAMQP, DynamicCallback) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; @@ -336,7 +367,7 @@ TEST_F(TestAMQP, ReceiveNack) amqp_mock::REPLY_ACK = false; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); @@ -353,7 +384,7 @@ TEST_F(TestAMQP, FailWrite) amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); @@ -370,7 +401,7 @@ TEST_F(TestAMQP, ClosedConnection) const auto current_connections = amqp::get_connection_count(); const std::string host("localhost3"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), current_connections + 1); EXPECT_TRUE(amqp::disconnect(conn)); @@ -389,7 +420,7 @@ TEST_F(TestAMQP, RetryInvalidHost) { const std::string host = "192.168.0.1"; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://"+host, "ex1", false); + conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -406,7 +437,7 @@ TEST_F(TestAMQP, RetryInvalidPort) { const int port = 9999; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false); + conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -425,7 +456,7 @@ TEST_F(TestAMQP, RetryFailWrite) amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost4"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false); + conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); diff --git a/src/tools/ceph-dencoder/CMakeLists.txt b/src/tools/ceph-dencoder/CMakeLists.txt index 4a7b452f71d73..869904ebb407d 100644 --- a/src/tools/ceph-dencoder/CMakeLists.txt +++ b/src/tools/ceph-dencoder/CMakeLists.txt @@ -32,7 +32,7 @@ if(WITH_RADOSGW) cls_rgw_client) if(WITH_RADOSGW_AMQP_ENDPOINT) list(APPEND DENCODER_EXTRALIBS - rabbitmq) + rabbitmq ssl) endif() if(WITH_RADOSGW_KAFKA_ENDPOINT) list(APPEND DENCODER_EXTRALIBS -- 2.39.5