]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add support for SSL encrypted AMQP connections 40261/head
authorTom Schoonjans <Tom.Schoonjans@rfi.ac.uk>
Tue, 9 Feb 2021 20:19:24 +0000 (20:19 +0000)
committersinguliere <singuliere@autistici.org>
Fri, 19 Mar 2021 18:24:14 +0000 (19:24 +0100)
Fixes: https://tracker.ceph.com/issues/42902
Signed-off-by: Tom Schoonjans <Tom.Schoonjans@rfi.ac.uk>
(cherry picked from commit 1418bcc1dc3f22257fec840556902b4bf88932b8)

doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/rgw/CMakeLists.txt
src/rgw/rgw_amqp.cc
src/rgw/rgw_amqp.h
src/rgw/rgw_pubsub_push.cc
src/test/rgw/amqp_mock.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/test_rgw_amqp.cc
src/tools/ceph-dencoder/CMakeLists.txt

index bda5e0e9731296fe1dec7143a88e0b451984634f..b94098ddf2b7c0a1b29b1cbc20ca096545575d80 100644 (file)
@@ -136,11 +136,13 @@ Request parameters:
 
 - AMQP0.9.1 endpoint
 
- - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - URI: ``amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
  - user/password defaults to: guest/guest
  - user/password may only be provided over HTTPS. If not, topic creation request will be rejected.
- - port defaults to: 5672
+ - port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections
  - vhost defaults to: "/"
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
  - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
  - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
 
index e2eaac7c8e24f945c65a0915bafc430bd6e86ecf..1ae0c20735f4c02ee4ccdc5632ecf79c45f199fe 100644 (file)
@@ -220,11 +220,13 @@ The endpoint URI may include parameters depending with the type of endpoint:
 
 - AMQP0.9.1 endpoint
 
- - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - URI: ``amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
  - user/password defaults to: guest/guest
  - user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- - port defaults to: 5672
+ - port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections
  - vhost defaults to: "/"
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
  - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
  - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
 
index b009842403a149d8e707b404e40f1d7db227a988..ab4a239c975877340bf85a2f1ffdba0a05680a5f 100644 (file)
@@ -272,6 +272,7 @@ set(rgw_libs rgw_a)
 if(WITH_RADOSGW_AMQP_ENDPOINT)
   # used by rgw_amqp.cc
   list(APPEND rgw_libs RabbitMQ::RabbitMQ)
+  list(APPEND rgw_libs OpenSSL::SSL)
 endif()
 if(WITH_RADOSGW_KAFKA_ENDPOINT)
   # used by rgw_kafka.cc
@@ -412,6 +413,7 @@ target_link_libraries(rgw
 
 if(WITH_RADOSGW_AMQP_ENDPOINT)
   target_link_libraries(rgw PRIVATE RabbitMQ::RabbitMQ)
+  target_link_libraries(rgw PRIVATE OpenSSL::SSL)
 endif()
 
 if(WITH_RADOSGW_KAFKA_ENDPOINT)
index ff30a7841d57260e8fc1123a58b839242031535d..7a1b9df50619bb4f218949a3266860486e1547fe 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "rgw_amqp.h"
 #include <amqp.h>
+#include <amqp_ssl_socket.h>
 #include <amqp_tcp_socket.h>
 #include <amqp_framing.h>
 #include "include/ceph_assert.h"
@@ -16,6 +17,7 @@
 #include <mutex>
 #include <boost/lockfree/queue.hpp>
 #include "common/dout.h"
+#include <openssl/ssl.h>
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -43,6 +45,7 @@ static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED =       -0x2007;
 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
+static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED =   -0x2010;
 
 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR =         -0x3008;
 static const int RGW_AMQP_NO_REPLY_CODE =                 0x0;
@@ -126,6 +129,8 @@ struct connection_t {
   CallbackList callbacks;
   ceph::coarse_real_clock::time_point next_reconnect;
   bool mandatory;
+  bool verify_ssl;
+  boost::optional<const std::string&> ca_location;
 
   // default ctor
   connection_t() :
@@ -139,7 +144,9 @@ struct connection_t {
     ref_count(0),
     cct(nullptr),
     next_reconnect(ceph::coarse_real_clock::now()),
-    mandatory(false)
+    mandatory(false),
+    verify_ssl(false),
+    ca_location(boost::none)
   {}
 
   // cleanup of all internal connection resource
@@ -351,6 +358,8 @@ std::string status_to_string(int s) {
       return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
     case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
       return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
+    case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
+      return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
   }
   return to_string((amqp_status_enum)s);
 }
@@ -395,11 +404,46 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
   ConnectionCleaner state_guard(state);
 
   // create and open socket
-  auto socket = amqp_tcp_socket_new(state);
+  amqp_socket_t *socket = nullptr;
+  if (info.ssl) {
+    socket = amqp_ssl_socket_new(state);
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+    SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
+#else
+    // taken from https://github.com/alanxz/rabbitmq-c/pull/560
+    struct hack {
+                 const struct amqp_socket_class_t *klass;
+                 SSL_CTX *ctx;
+         };
+
+         struct hack *h = reinterpret_cast<struct hack*>(socket);
+    SSL_CTX* ssl_ctx = h->ctx;
+#endif
+    // ensure system CA certificates get loaded
+    SSL_CTX_set_default_verify_paths(ssl_ctx);
+  }
+  else {
+    socket = amqp_tcp_socket_new(state);
+  }
+
   if (!socket) {
     conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
     return conn;
   }
+  if (info.ssl) {
+    if (!conn->verify_ssl) {
+      amqp_ssl_socket_set_verify_peer(socket, 0);
+      amqp_ssl_socket_set_verify_hostname(socket, 0);
+    }
+    if (conn->ca_location.has_value()) {
+      const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
+      if (s != AMQP_STATUS_OK) {
+        conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
+        conn->reply_code = s;
+        return conn;
+      }
+    }
+  }
   const auto s = amqp_socket_open(socket, info.host, info.port);
   if (s < 0) {
     conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
@@ -494,7 +538,7 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
 
 // utility function to create a new connection
 connection_ptr_t create_new_connection(const amqp_connection_info& info, 
-    const std::string& exchange, bool mandatory_delivery, CephContext* cct) { 
+    const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) { 
   // create connection state
   connection_ptr_t conn = new connection_t;
   conn->exchange = exchange;
@@ -502,6 +546,8 @@ connection_ptr_t create_new_connection(const amqp_connection_info& info,
   conn->password.assign(info.password);
   conn->mandatory = mandatory_delivery;
   conn->cct = cct;
+  conn->verify_ssl = verify_ssl;
+  conn->ca_location =  ca_location;
   return create_connection(conn, info);
 }
 
@@ -851,7 +897,8 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
+  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+        boost::optional<const std::string&> ca_location) {
     if (stopped) {
       ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
       return nullptr;
@@ -886,7 +933,7 @@ public:
       ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
       return nullptr;
     }
-    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
+    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
     if (!conn->is_ok()) {
       ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
               status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
@@ -1002,9 +1049,10 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+        boost::optional<const std::string&> ca_location) {
   if (!s_manager) return nullptr;
-  return s_manager->connect(url, exchange, mandatory_delivery);
+  return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
 }
 
 int publish(connection_ptr_t& conn, 
index eaf97ed9dc01acbeb64ebe2957b52d8008babed3..7a2316858ca4326df7c54e1edc32b26f1c428995 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <string>
 #include <functional>
+#include <boost/optional.hpp>
 #include <boost/smart_ptr/intrusive_ptr.hpp>
 
 #include "include/common_fwd.h"
@@ -30,7 +31,8 @@ bool init(CephContext* cct);
 void shutdown();
 
 // connect to an amqp endpoint
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery);
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+        boost::optional<const std::string&> ca_location);
 
 // publish a message over a connection that was already created
 int publish(connection_ptr_t& conn,
index c6fbe325527ef8ffd6859e27c8fe58ae73090a3c..3b5b926661051ecd1923f8f2e5a911e99fdea0a4 100644 (file)
@@ -173,6 +173,23 @@ private:
   ack_level_t ack_level;
   amqp::connection_ptr_t conn;
 
+  bool get_verify_ssl(const RGWHTTPArgs& args) {
+    bool exists;
+    auto str_verify_ssl = args.get("verify-ssl", &exists);
+    if (!exists) {
+      // verify server certificate by default
+      return true;
+    }
+    boost::algorithm::to_lower(str_verify_ssl);
+    if (str_verify_ssl == "true") {
+      return true;
+    }
+    if (str_verify_ssl == "false") {
+      return false;
+    }
+    throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
+  }
+
   std::string get_exchange(const RGWHTTPArgs& args) {
     bool exists;
     const auto exchange = args.get("amqp-exchange", &exists);
@@ -297,7 +314,7 @@ public:
         topic(_topic),
         exchange(get_exchange(args)),
         ack_level(get_ack_level(args)),
-        conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) {
+        conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
     if (!conn) { 
       throw configuration_error("AMQP: failed to create connection to: " + endpoint);
     }
@@ -691,7 +708,7 @@ const std::string& get_schema(const std::string& endpoint) {
   if (schema == "http" || schema == "https") {
     return WEBHOOK_SCHEMA;
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
-  } else if (schema == "amqp") {
+  } else if (schema == "amqp" || schema == "amqps") {
     return AMQP_SCHEMA;
 #endif
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
@@ -725,9 +742,6 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
       throw configuration_error("AMQP: unknown version: " + version);
       return nullptr;
     }
-  } else if (schema == "amqps") {
-    throw configuration_error("AMQP: ssl not supported");
-    return nullptr;
 #endif
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
   } else if (schema == KAFKA_SCHEMA) {
index e37151e48fc7264cb1c02bd5d2793e64c2b46a29..5f0b00e9f1e0f6264e3a04cc3e9297635a35f918 100644 (file)
@@ -3,11 +3,13 @@
 
 #include "amqp_mock.h"
 #include <amqp.h>
+#include <amqp_ssl_socket.h>
 #include <amqp_tcp_socket.h>
 #include <string>
 #include <stdarg.h>
 #include <mutex>
 #include <boost/lockfree/queue.hpp>
+#include <openssl/ssl.h>
 
 namespace amqp_mock {
 
@@ -74,6 +76,7 @@ struct amqp_connection_state_t_ {
   amqp_rpc_reply_t reply;
   amqp_basic_ack_t ack;
   amqp_basic_nack_t nack;
+  bool use_ssl;
   // ctor
   amqp_connection_state_t_() : 
     socket(nullptr), 
@@ -86,15 +89,18 @@ struct amqp_connection_state_t_ {
     login_called(false),
     ack_list(1024),
     nack_list(1024),
-    delivery_tag(1) {
+    delivery_tag(1),
+    use_ssl(false) {
       reply.reply_type = AMQP_RESPONSE_NONE;
     }
 };
 
 struct amqp_socket_t_ {
+  void *klass;
+  void *ssl_ctx;
   bool open_called;
   // ctor
-  amqp_socket_t_() : open_called(false) {
+  amqp_socket_t_() : klass(nullptr), ssl_ctx(nullptr), open_called(false) {
   }
 };
 
@@ -120,6 +126,35 @@ amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) {
   return state->socket;
 }
 
+amqp_socket_t* amqp_ssl_socket_new(amqp_connection_state_t state) {
+  state->socket = new amqp_socket_t;
+  state->use_ssl = true;
+  return state->socket;
+}
+
+int amqp_ssl_socket_set_cacert(amqp_socket_t *self, const char *cacert) {
+  // do nothing
+  return AMQP_STATUS_OK;
+}
+
+void amqp_ssl_socket_set_verify_peer(amqp_socket_t *self, amqp_boolean_t verify) {
+  // do nothing
+}
+
+void amqp_ssl_socket_set_verify_hostname(amqp_socket_t *self, amqp_boolean_t verify) {
+  // do nothing
+}
+
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+void* amqp_ssl_socket_get_context(amqp_socket_t *self) {
+  return nullptr;
+}
+#endif
+
+int SSL_CTX_set_default_verify_paths(SSL_CTX *ctx) {
+  return 1;
+}
+
 int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
   if (!self) {
     return -1;
index 5fac1cdfb81de3887299c00c83660a372d99a050..fd6e73e868adfe772f5e788bb3e40138081178d4 100644 (file)
@@ -168,17 +168,31 @@ class StreamingHTTPServer:
 
 # AMQP endpoint functions
 
-rabbitmq_port = 5672
 
 class AMQPReceiver(object):
     """class for receiving and storing messages on a topic from the AMQP broker"""
-    def __init__(self, exchange, topic):
+    def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None):
         import pika
-        hostname = get_ip()
+        import ssl
+
+        if ca_location:
+            ssl_context = ssl.create_default_context()
+            ssl_context.load_verify_locations(cafile=ca_location)
+            ssl_options = pika.SSLOptions(ssl_context)
+            rabbitmq_port = 5671
+        else:
+            rabbitmq_port = 5672
+            ssl_options = None
+
+        if external_endpoint_address:
+            params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
+        else:
+            hostname = get_ip()
+            params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
         remaining_retries = 10
         while remaining_retries > 0:
             try:
-                connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+                connection = pika.BlockingConnection(params)
                 break
             except Exception as error:
                 remaining_retries -= 1
@@ -232,9 +246,9 @@ def amqp_receiver_thread_runner(receiver):
         log.info('AMQP receiver ended unexpectedly: %s', str(error))
 
 
-def create_amqp_receiver_thread(exchange, topic):
+def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None):
     """create amqp receiver and thread"""
-    receiver = AMQPReceiver(exchange, topic)
+    receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location)
     task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
     task.daemon = True
     return task, receiver
@@ -378,7 +392,7 @@ def init_rabbitmq():
     # TODO: support multiple brokers per host using env
     # make sure we don't collide with the default
     try:
-        proc = subprocess.Popen(['sudo', 'rabbitmq-server'])
+        proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
     except Exception as error:
         log.info('failed to execute rabbitmq-server: %s', str(error))
         print('failed to execute rabbitmq-server: %s' % str(error))
@@ -3466,15 +3480,17 @@ def test_ps_creation_triggers():
         key.delete()
     master_zone.delete_bucket(bucket_name)
 
-
-def test_ps_s3_creation_triggers_on_master():
+def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
     """ test object creation s3 notifications in using put/copy/post on master"""
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    if not external_endpoint_address:
+        hostname = get_ip()
+        proc = init_rabbitmq()
+        if proc is  None:
+            return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    else:
+        proc = None
     master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
@@ -3486,13 +3502,23 @@ def test_ps_s3_creation_triggers_on_master():
 
     # start amqp receiver
     exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
     task.start()
 
     # create s3 topic
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    if external_endpoint_address:
+        endpoint_address = external_endpoint_address
+    elif ca_location:
+        endpoint_address = 'amqps://' + hostname
+    else:
+        endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl
+    if ca_location:
+        endpoint_args += '&ca-location={}'.format(ca_location)
+    if external_endpoint_address:
+        topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    else:
+        topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -3537,7 +3563,138 @@ def test_ps_s3_creation_triggers_on_master():
         key.delete()
     # delete the bucket
     master_zone.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
+    if proc:
+        clean_rabbitmq(proc)
+
+
+def test_ps_s3_creation_triggers_on_master():
+    ps_s3_creation_triggers_on_master()
+
+
+def test_ps_s3_creation_triggers_on_master_external():
+    from distutils.util import strtobool
+
+    if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
+        try:
+            if strtobool(os.environ['AMQP_VERIFY_SSL']):
+                verify_ssl = 'true'
+            else:
+                verify_ssl = 'false'
+        except Exception as e:
+            verify_ssl = 'true'
+
+        ps_s3_creation_triggers_on_master(
+            external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
+            verify_ssl=verify_ssl)
+    else:
+        return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
+
+def test_ps_s3_creation_triggers_on_master_ssl():
+    import datetime
+    import textwrap
+    import stat
+    from cryptography import x509
+    from cryptography.x509.oid import NameOID
+    from cryptography.hazmat.primitives import hashes
+    from cryptography.hazmat.backends import default_backend
+    from cryptography.hazmat.primitives import serialization
+    from cryptography.hazmat.primitives.asymmetric import rsa
+    from tempfile import TemporaryDirectory
+
+    with TemporaryDirectory() as tempdir:
+        # modify permissions to ensure that the rabbitmq user can access them
+        os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+        CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
+        CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
+        KEYFILE = os.path.join(tempdir, 'server_key.pem')
+        RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
+
+        root_key = rsa.generate_private_key(
+            public_exponent=65537,
+            key_size=2048,
+            backend=default_backend()
+        )
+        subject = issuer = x509.Name([
+            x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+            x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
+        ])
+        root_cert = x509.CertificateBuilder().subject_name(
+            subject
+        ).issuer_name(
+            issuer
+        ).public_key(
+            root_key.public_key()
+        ).serial_number(
+            x509.random_serial_number()
+        ).not_valid_before(
+            datetime.datetime.utcnow()
+        ).not_valid_after(
+            datetime.datetime.utcnow() + datetime.timedelta(days=3650)
+        ).add_extension(
+            x509.BasicConstraints(ca=True, path_length=None), critical=True
+        ).sign(root_key, hashes.SHA256(), default_backend())
+        with open(CACERTFILE, "wb") as f:
+            f.write(root_cert.public_bytes(serialization.Encoding.PEM))
+
+        # Now we want to generate a cert from that root
+        cert_key = rsa.generate_private_key(
+            public_exponent=65537,
+            key_size=2048,
+            backend=default_backend()
+        )
+        with open(KEYFILE, "wb") as f:
+            f.write(cert_key.private_bytes(
+                encoding=serialization.Encoding.PEM,
+                format=serialization.PrivateFormat.TraditionalOpenSSL,
+                encryption_algorithm=serialization.NoEncryption(),
+            ))
+        new_subject = x509.Name([
+            x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+        ])
+        cert = x509.CertificateBuilder().subject_name(
+            new_subject
+        ).issuer_name(
+            root_cert.issuer
+        ).public_key(
+            cert_key.public_key()
+        ).serial_number(
+            x509.random_serial_number()
+        ).not_valid_before(
+            datetime.datetime.utcnow()
+        ).not_valid_after(
+        datetime.datetime.utcnow() + datetime.timedelta(days=30)
+        ).add_extension(
+            x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
+            critical=False,
+        ).sign(root_key, hashes.SHA256(), default_backend())
+        # Write our certificate out to disk.
+        with open(CERTFILE, "wb") as f:
+            f.write(cert.public_bytes(serialization.Encoding.PEM))
+
+        with open(RABBITMQ_CONF_FILE, "w") as f:
+            # use the old style config format to ensure it also runs on older RabbitMQ versions.
+            f.write(textwrap.dedent(f'''
+                [
+                    {{rabbit, [
+                        {{ssl_listeners, [5671]}},
+                        {{ssl_options, [{{cacertfile,           "{CACERTFILE}"}},
+                                        {{certfile,             "{CERTFILE}"}},
+                                        {{keyfile,              "{KEYFILE}"}},
+                                        {{verify,               verify_peer}},
+                                        {{fail_if_no_peer_cert, false}}]}}]}}
+                ].
+            '''))
+        os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
+
+        ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
+
+        del os.environ['RABBITMQ_CONFIG_FILE']
 
 
 def test_ps_s3_multipart_on_master():
index 13bab823f02cf198ff0c8f7ed1b674879051a0c8..421f757515bcf8829fe944a231a2af09973dd0d0 100644 (file)
@@ -61,19 +61,50 @@ protected:
 TEST_F(TestAMQP, ConnectionOK)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost", "ex1", false);
+  conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
+TEST_F(TestAMQP, SSLConnectionOK)
+{
+  const int port = 5671;
+  const auto connection_number = amqp::get_connection_count();
+  amqp_mock::set_valid_port(port);
+  conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(conn);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+  auto rc = amqp::publish(conn, "topic", "message");
+  EXPECT_EQ(rc, 0);
+  amqp_mock::set_valid_port(5672);
+}
+
+TEST_F(TestAMQP, PlainAndSSLConnectionsOK)
+{
+  const int port = 5671;
+  const auto connection_number = amqp::get_connection_count();
+  amqp_mock::set_valid_port(port);
+  amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(conn1);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+  auto rc = amqp::publish(conn1, "topic", "message");
+  EXPECT_EQ(rc, 0);
+  amqp_mock::set_valid_port(5672);
+  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(conn2);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number + 2);
+  rc = amqp::publish(conn2, "topic", "message");
+  EXPECT_EQ(rc, 0);
+}
+
 TEST_F(TestAMQP, ConnectionReuse)
 {
-  amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false);
+  amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
   EXPECT_TRUE(conn1);
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false);
+  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
   EXPECT_TRUE(conn2);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn1, "topic", "message");
@@ -83,7 +114,7 @@ TEST_F(TestAMQP, ConnectionReuse)
 TEST_F(TestAMQP, NameResolutionFail)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://kaboom", "ex1", false);
+  conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -93,7 +124,7 @@ TEST_F(TestAMQP, NameResolutionFail)
 TEST_F(TestAMQP, InvalidPort)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:1234", "ex1", false);
+  conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -103,7 +134,7 @@ TEST_F(TestAMQP, InvalidPort)
 TEST_F(TestAMQP, InvalidHost)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://0.0.0.1", "ex1", false);
+  conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -113,7 +144,7 @@ TEST_F(TestAMQP, InvalidHost)
 TEST_F(TestAMQP, InvalidVhost)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost/kaboom", "ex1", false);
+  conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -125,7 +156,7 @@ TEST_F(TestAMQP, UserPassword)
   amqp_mock::set_valid_host("127.0.0.1");
   {
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false);
+    conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -135,7 +166,7 @@ TEST_F(TestAMQP, UserPassword)
   amqp_mock::set_valid_host("127.0.0.2");
   {
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false);
+    conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -147,7 +178,7 @@ TEST_F(TestAMQP, UserPassword)
 TEST_F(TestAMQP, URLParseError)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex1", false);
+  conn = amqp::connect("http://localhost", "ex1", false, false, boost::none);
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -157,7 +188,7 @@ TEST_F(TestAMQP, URLParseError)
 TEST_F(TestAMQP, ExchangeMismatch)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex2", false);
+  conn = amqp::connect("http://localhost", "ex2", false, false, boost::none);
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -172,7 +203,7 @@ TEST_F(TestAMQP, MaxConnections)
   while (remaining_connections > 0) {
     const auto host = "127.10.0." + std::to_string(remaining_connections);
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
+    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
     EXPECT_TRUE(conn);
     auto rc = amqp::publish(conn, "topic", "message");
     EXPECT_EQ(rc, 0);
@@ -184,7 +215,7 @@ TEST_F(TestAMQP, MaxConnections)
   {
     const std::string host = "toomany";
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
+    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
     EXPECT_FALSE(conn);
     auto rc = amqp::publish(conn, "topic", "message");
     EXPECT_LT(rc, 0);
@@ -246,7 +277,7 @@ TEST_F(TestAMQP, ReceiveAck)
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
@@ -260,7 +291,7 @@ TEST_F(TestAMQP, ImplicitConnectionClose)
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   const auto NUMBER_OF_CALLS = 2000;
   for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
@@ -278,7 +309,7 @@ TEST_F(TestAMQP, ReceiveMultipleAck)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
@@ -296,7 +327,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
@@ -315,7 +346,7 @@ TEST_F(TestAMQP, DynamicCallback)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
@@ -336,7 +367,7 @@ TEST_F(TestAMQP, ReceiveNack)
   amqp_mock::REPLY_ACK = false;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
@@ -353,7 +384,7 @@ TEST_F(TestAMQP, FailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
@@ -370,7 +401,7 @@ TEST_F(TestAMQP, ClosedConnection)
   const auto current_connections = amqp::get_connection_count();
   const std::string host("localhost3");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
   EXPECT_TRUE(amqp::disconnect(conn));
@@ -389,7 +420,7 @@ TEST_F(TestAMQP, RetryInvalidHost)
 {
   const std::string host = "192.168.0.1";
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://"+host, "ex1", false);
+  conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -406,7 +437,7 @@ TEST_F(TestAMQP, RetryInvalidPort)
 {
   const int port = 9999;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false);
+  conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -425,7 +456,7 @@ TEST_F(TestAMQP, RetryFailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost4");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false);
+  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
index 4a7b452f71d73bc76c8cbe977c3bbceac162e737..869904ebb407d80c300ad515e7f1a519bea13ae3 100644 (file)
@@ -32,7 +32,7 @@ if(WITH_RADOSGW)
     cls_rgw_client)
   if(WITH_RADOSGW_AMQP_ENDPOINT)
     list(APPEND DENCODER_EXTRALIBS
-      rabbitmq)
+      rabbitmq ssl)
   endif()
   if(WITH_RADOSGW_KAFKA_ENDPOINT)
     list(APPEND DENCODER_EXTRALIBS