- AMQP0.9.1 endpoint
- - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - URI: ``amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- user/password defaults to: guest/guest
- user/password may only be provided over HTTPS. If not, topic creation request will be rejected.
- - port defaults to: 5672
+ - port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections
- vhost defaults to: "/"
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
- AMQP0.9.1 endpoint
- - URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
+ - URI: ``amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
- user/password defaults to: guest/guest
- user/password may only be provided over HTTPS. Topic creation request will be rejected if not
- - port defaults to: 5672
+ - port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections
- vhost defaults to: "/"
+ - verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
+ - if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
if(WITH_RADOSGW_AMQP_ENDPOINT)
# used by rgw_amqp.cc
list(APPEND rgw_libs RabbitMQ::RabbitMQ)
+ list(APPEND rgw_libs OpenSSL::SSL)
endif()
if(WITH_RADOSGW_KAFKA_ENDPOINT)
# used by rgw_kafka.cc
if(WITH_RADOSGW_AMQP_ENDPOINT)
target_link_libraries(rgw PRIVATE RabbitMQ::RabbitMQ)
+ target_link_libraries(rgw PRIVATE OpenSSL::SSL)
endif()
if(WITH_RADOSGW_KAFKA_ENDPOINT)
#include "rgw_amqp.h"
#include <amqp.h>
+#include <amqp_ssl_socket.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include "include/ceph_assert.h"
#include <mutex>
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"
+#include <openssl/ssl.h>
#define dout_subsys ceph_subsys_rgw
static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
+static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010;
static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
CallbackList callbacks;
ceph::coarse_real_clock::time_point next_reconnect;
bool mandatory;
+ bool verify_ssl;
+ boost::optional<const std::string&> ca_location;
// default ctor
connection_t() :
ref_count(0),
cct(nullptr),
next_reconnect(ceph::coarse_real_clock::now()),
- mandatory(false)
+ mandatory(false),
+ verify_ssl(false),
+ ca_location(boost::none)
{}
// cleanup of all internal connection resource
return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
+ case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
+ return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
}
return to_string((amqp_status_enum)s);
}
ConnectionCleaner state_guard(state);
// create and open socket
- auto socket = amqp_tcp_socket_new(state);
+ amqp_socket_t *socket = nullptr;
+ if (info.ssl) {
+ socket = amqp_ssl_socket_new(state);
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+ SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
+#else
+ // taken from https://github.com/alanxz/rabbitmq-c/pull/560
+ struct hack {
+ const struct amqp_socket_class_t *klass;
+ SSL_CTX *ctx;
+ };
+
+ struct hack *h = reinterpret_cast<struct hack*>(socket);
+ SSL_CTX* ssl_ctx = h->ctx;
+#endif
+ // ensure system CA certificates get loaded
+ SSL_CTX_set_default_verify_paths(ssl_ctx);
+ }
+ else {
+ socket = amqp_tcp_socket_new(state);
+ }
+
if (!socket) {
conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
return conn;
}
+ if (info.ssl) {
+ if (!conn->verify_ssl) {
+ amqp_ssl_socket_set_verify_peer(socket, 0);
+ amqp_ssl_socket_set_verify_hostname(socket, 0);
+ }
+ if (conn->ca_location.has_value()) {
+ const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
+ if (s != AMQP_STATUS_OK) {
+ conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
+ conn->reply_code = s;
+ return conn;
+ }
+ }
+ }
const auto s = amqp_socket_open(socket, info.host, info.port);
if (s < 0) {
conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
// utility function to create a new connection
connection_ptr_t create_new_connection(const amqp_connection_info& info,
- const std::string& exchange, bool mandatory_delivery, CephContext* cct) {
+ const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) {
// create connection state
connection_ptr_t conn = new connection_t;
conn->exchange = exchange;
conn->password.assign(info.password);
conn->mandatory = mandatory_delivery;
conn->cct = cct;
+ conn->verify_ssl = verify_ssl;
+ conn->ca_location = ca_location;
return create_connection(conn, info);
}
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
+ connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (stopped) {
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
return nullptr;
ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
return nullptr;
}
- const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
+ const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
if (!conn->is_ok()) {
ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
s_manager = nullptr;
}
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (!s_manager) return nullptr;
- return s_manager->connect(url, exchange, mandatory_delivery);
+ return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
}
int publish(connection_ptr_t& conn,
#include <string>
#include <functional>
+#include <boost/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include "include/common_fwd.h"
void shutdown();
// connect to an amqp endpoint
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery);
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ boost::optional<const std::string&> ca_location);
// publish a message over a connection that was already created
int publish(connection_ptr_t& conn,
ack_level_t ack_level;
amqp::connection_ptr_t conn;
+ bool get_verify_ssl(const RGWHTTPArgs& args) {
+ bool exists;
+ auto str_verify_ssl = args.get("verify-ssl", &exists);
+ if (!exists) {
+ // verify server certificate by default
+ return true;
+ }
+ boost::algorithm::to_lower(str_verify_ssl);
+ if (str_verify_ssl == "true") {
+ return true;
+ }
+ if (str_verify_ssl == "false") {
+ return false;
+ }
+ throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
+ }
+
std::string get_exchange(const RGWHTTPArgs& args) {
bool exists;
const auto exchange = args.get("amqp-exchange", &exists);
topic(_topic),
exchange(get_exchange(args)),
ack_level(get_ack_level(args)),
- conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) {
+ conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
if (!conn) {
throw configuration_error("AMQP: failed to create connection to: " + endpoint);
}
if (schema == "http" || schema == "https") {
return WEBHOOK_SCHEMA;
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- } else if (schema == "amqp") {
+ } else if (schema == "amqp" || schema == "amqps") {
return AMQP_SCHEMA;
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
throw configuration_error("AMQP: unknown version: " + version);
return nullptr;
}
- } else if (schema == "amqps") {
- throw configuration_error("AMQP: ssl not supported");
- return nullptr;
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
} else if (schema == KAFKA_SCHEMA) {
#include "amqp_mock.h"
#include <amqp.h>
+#include <amqp_ssl_socket.h>
#include <amqp_tcp_socket.h>
#include <string>
#include <stdarg.h>
#include <mutex>
#include <boost/lockfree/queue.hpp>
+#include <openssl/ssl.h>
namespace amqp_mock {
amqp_rpc_reply_t reply;
amqp_basic_ack_t ack;
amqp_basic_nack_t nack;
+ bool use_ssl;
// ctor
amqp_connection_state_t_() :
socket(nullptr),
login_called(false),
ack_list(1024),
nack_list(1024),
- delivery_tag(1) {
+ delivery_tag(1),
+ use_ssl(false) {
reply.reply_type = AMQP_RESPONSE_NONE;
}
};
struct amqp_socket_t_ {
+ void *klass;
+ void *ssl_ctx;
bool open_called;
// ctor
- amqp_socket_t_() : open_called(false) {
+ amqp_socket_t_() : klass(nullptr), ssl_ctx(nullptr), open_called(false) {
}
};
return state->socket;
}
+amqp_socket_t* amqp_ssl_socket_new(amqp_connection_state_t state) {
+ state->socket = new amqp_socket_t;
+ state->use_ssl = true;
+ return state->socket;
+}
+
+int amqp_ssl_socket_set_cacert(amqp_socket_t *self, const char *cacert) {
+ // do nothing
+ return AMQP_STATUS_OK;
+}
+
+void amqp_ssl_socket_set_verify_peer(amqp_socket_t *self, amqp_boolean_t verify) {
+ // do nothing
+}
+
+void amqp_ssl_socket_set_verify_hostname(amqp_socket_t *self, amqp_boolean_t verify) {
+ // do nothing
+}
+
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+void* amqp_ssl_socket_get_context(amqp_socket_t *self) {
+ return nullptr;
+}
+#endif
+
+int SSL_CTX_set_default_verify_paths(SSL_CTX *ctx) {
+ return 1;
+}
+
int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
if (!self) {
return -1;
# AMQP endpoint functions
-rabbitmq_port = 5672
class AMQPReceiver(object):
"""class for receiving and storing messages on a topic from the AMQP broker"""
- def __init__(self, exchange, topic):
+ def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None):
import pika
- hostname = get_ip()
+ import ssl
+
+ if ca_location:
+ ssl_context = ssl.create_default_context()
+ ssl_context.load_verify_locations(cafile=ca_location)
+ ssl_options = pika.SSLOptions(ssl_context)
+ rabbitmq_port = 5671
+ else:
+ rabbitmq_port = 5672
+ ssl_options = None
+
+ if external_endpoint_address:
+ params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
+ else:
+ hostname = get_ip()
+ params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
remaining_retries = 10
while remaining_retries > 0:
try:
- connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+ connection = pika.BlockingConnection(params)
break
except Exception as error:
remaining_retries -= 1
log.info('AMQP receiver ended unexpectedly: %s', str(error))
-def create_amqp_receiver_thread(exchange, topic):
+def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None):
"""create amqp receiver and thread"""
- receiver = AMQPReceiver(exchange, topic)
+ receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location)
task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
task.daemon = True
return task, receiver
# TODO: support multiple brokers per host using env
# make sure we don't collide with the default
try:
- proc = subprocess.Popen(['sudo', 'rabbitmq-server'])
+ proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
except Exception as error:
log.info('failed to execute rabbitmq-server: %s', str(error))
print('failed to execute rabbitmq-server: %s' % str(error))
key.delete()
master_zone.delete_bucket(bucket_name)
-
-def test_ps_s3_creation_triggers_on_master():
+def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
- hostname = get_ip()
- proc = init_rabbitmq()
- if proc is None:
- return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ if not external_endpoint_address:
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ else:
+ proc = None
master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# start amqp receiver
exchange = 'ex1'
- task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
task.start()
# create s3 topic
- endpoint_address = 'amqp://' + hostname
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ if external_endpoint_address:
+ endpoint_address = external_endpoint_address
+ elif ca_location:
+ endpoint_address = 'amqps://' + hostname
+ else:
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl
+ if ca_location:
+ endpoint_args += '&ca-location={}'.format(ca_location)
+ if external_endpoint_address:
+ topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ else:
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
key.delete()
# delete the bucket
master_zone.delete_bucket(bucket_name)
- clean_rabbitmq(proc)
+ if proc:
+ clean_rabbitmq(proc)
+
+
+def test_ps_s3_creation_triggers_on_master():
+ ps_s3_creation_triggers_on_master()
+
+
+def test_ps_s3_creation_triggers_on_master_external():
+ from distutils.util import strtobool
+
+ if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
+ try:
+ if strtobool(os.environ['AMQP_VERIFY_SSL']):
+ verify_ssl = 'true'
+ else:
+ verify_ssl = 'false'
+ except Exception as e:
+ verify_ssl = 'true'
+
+ ps_s3_creation_triggers_on_master(
+ external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
+ verify_ssl=verify_ssl)
+ else:
+ return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
+
+def test_ps_s3_creation_triggers_on_master_ssl():
+ import datetime
+ import textwrap
+ import stat
+ from cryptography import x509
+ from cryptography.x509.oid import NameOID
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric import rsa
+ from tempfile import TemporaryDirectory
+
+ with TemporaryDirectory() as tempdir:
+ # modify permissions to ensure that the rabbitmq user can access them
+ os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+ CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
+ CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
+ KEYFILE = os.path.join(tempdir, 'server_key.pem')
+ RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
+
+ root_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ subject = issuer = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
+ ])
+ root_cert = x509.CertificateBuilder().subject_name(
+ subject
+ ).issuer_name(
+ issuer
+ ).public_key(
+ root_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=3650)
+ ).add_extension(
+ x509.BasicConstraints(ca=True, path_length=None), critical=True
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ with open(CACERTFILE, "wb") as f:
+ f.write(root_cert.public_bytes(serialization.Encoding.PEM))
+
+ # Now we want to generate a cert from that root
+ cert_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ with open(KEYFILE, "wb") as f:
+ f.write(cert_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption(),
+ ))
+ new_subject = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ ])
+ cert = x509.CertificateBuilder().subject_name(
+ new_subject
+ ).issuer_name(
+ root_cert.issuer
+ ).public_key(
+ cert_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=30)
+ ).add_extension(
+ x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
+ critical=False,
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ # Write our certificate out to disk.
+ with open(CERTFILE, "wb") as f:
+ f.write(cert.public_bytes(serialization.Encoding.PEM))
+
+ with open(RABBITMQ_CONF_FILE, "w") as f:
+ # use the old style config format to ensure it also runs on older RabbitMQ versions.
+ f.write(textwrap.dedent(f'''
+ [
+ {{rabbit, [
+ {{ssl_listeners, [5671]}},
+ {{ssl_options, [{{cacertfile, "{CACERTFILE}"}},
+ {{certfile, "{CERTFILE}"}},
+ {{keyfile, "{KEYFILE}"}},
+ {{verify, verify_peer}},
+ {{fail_if_no_peer_cert, false}}]}}]}}
+ ].
+ '''))
+ os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
+
+ ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
+
+ del os.environ['RABBITMQ_CONFIG_FILE']
def test_ps_s3_multipart_on_master():
TEST_F(TestAMQP, ConnectionOK)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost", "ex1", false);
+ conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_EQ(rc, 0);
}
+TEST_F(TestAMQP, SSLConnectionOK)
+{
+ const int port = 5671;
+ const auto connection_number = amqp::get_connection_count();
+ amqp_mock::set_valid_port(port);
+ conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(conn);
+ EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+ auto rc = amqp::publish(conn, "topic", "message");
+ EXPECT_EQ(rc, 0);
+ amqp_mock::set_valid_port(5672);
+}
+
+TEST_F(TestAMQP, PlainAndSSLConnectionsOK)
+{
+ const int port = 5671;
+ const auto connection_number = amqp::get_connection_count();
+ amqp_mock::set_valid_port(port);
+ amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(conn1);
+ EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+ auto rc = amqp::publish(conn1, "topic", "message");
+ EXPECT_EQ(rc, 0);
+ amqp_mock::set_valid_port(5672);
+ amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(conn2);
+ EXPECT_EQ(amqp::get_connection_count(), connection_number + 2);
+ rc = amqp::publish(conn2, "topic", "message");
+ EXPECT_EQ(rc, 0);
+}
+
TEST_F(TestAMQP, ConnectionReuse)
{
- amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false);
+ amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
EXPECT_TRUE(conn1);
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false);
+ amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
EXPECT_TRUE(conn2);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn1, "topic", "message");
TEST_F(TestAMQP, NameResolutionFail)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://kaboom", "ex1", false);
+ conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidPort)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost:1234", "ex1", false);
+ conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidHost)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://0.0.0.1", "ex1", false);
+ conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidVhost)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost/kaboom", "ex1", false);
+ conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("127.0.0.1");
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false);
+ conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("127.0.0.2");
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false);
+ conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, URLParseError)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("http://localhost", "ex1", false);
+ conn = amqp::connect("http://localhost", "ex1", false, false, boost::none);
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, ExchangeMismatch)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("http://localhost", "ex2", false);
+ conn = amqp::connect("http://localhost", "ex2", false, false, boost::none);
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
while (remaining_connections > 0) {
const auto host = "127.10.0." + std::to_string(remaining_connections);
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_EQ(rc, 0);
{
const std::string host = "toomany";
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_FALSE(conn);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
const auto NUMBER_OF_CALLS = 2000;
for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
const auto NUMBER_OF_CALLS = 100;
for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
amqp_mock::REPLY_ACK = false;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
const auto current_connections = amqp::get_connection_count();
const std::string host("localhost3");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
EXPECT_TRUE(amqp::disconnect(conn));
{
const std::string host = "192.168.0.1";
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://"+host, "ex1", false);
+ conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
{
const int port = 9999;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false);
+ conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost4");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false);
+ conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
cls_rgw_client)
if(WITH_RADOSGW_AMQP_ENDPOINT)
list(APPEND DENCODER_EXTRALIBS
- rabbitmq)
+ rabbitmq ssl)
endif()
if(WITH_RADOSGW_KAFKA_ENDPOINT)
list(APPEND DENCODER_EXTRALIBS