From 38d819b3d77f4b330b542940314e22c21bb5591d Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Fri, 16 Jun 2023 15:10:19 +0000 Subject: [PATCH] rgw/amqp: remove possible race conditions with the amqp connections * simplify memory management of the connection by not using a unique_ptr * simplify the logic by handling all issues inside the amqp manager * fix iterator invalidation issue with miltiple n/acks * allow different connections with different exchanges * modify the unit tests according to the new behavior Fixes: * https://tracker.ceph.com/issues/61639 * https://tracker.ceph.com/issues/46127 Signed-off-by: Yuval Lifshitz --- doc/radosgw/notifications.rst | 3 +- src/rgw/driver/rados/rgw_pubsub_push.cc | 12 +- src/rgw/rgw_amqp.cc | 406 ++++++++++-------------- src/rgw/rgw_amqp.h | 28 +- src/test/rgw/test_rgw_amqp.cc | 368 ++++++++++++--------- 5 files changed, 414 insertions(+), 403 deletions(-) diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index e450dd9488a5d..1cc538432f3dc 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -188,8 +188,7 @@ Request parameters: specified CA will be used to authenticate the broker. The default CA will not be used. - amqp-exchange: The exchanges must exist and must be able to route messages - based on topics. This parameter is mandatory. Different topics that point - to the same endpoint must use the same exchange. + based on topics. This parameter is mandatory. - amqp-ack-level: No end2end acking is required. Messages may persist in the broker before being delivered to their final destinations. Three ack methods exist: diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index f15aa3bcc693d..bdb24ce9ad103 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -128,7 +128,7 @@ private: const std::string topic; const std::string exchange; ack_level_t ack_level; - amqp::connection_ptr_t conn; + amqp::connection_id_t conn_id; bool get_verify_ssl(const RGWHTTPArgs& args) { bool exists; @@ -181,9 +181,8 @@ public: endpoint(_endpoint), topic(_topic), exchange(get_exchange(args)), - ack_level(get_ack_level(args)), - conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) { - if (!conn) { + ack_level(get_ack_level(args)) { + if (!amqp::connect(conn_id, endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) { throw configuration_error("AMQP: failed to create connection to: " + endpoint); } } @@ -243,14 +242,13 @@ public: }; int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { - ceph_assert(conn); if (ack_level == ack_level_t::None) { - return amqp::publish(conn, topic, json_format_pubsub_event(event)); + return amqp::publish(conn_id, topic, json_format_pubsub_event(event)); } else { // TODO: currently broker and routable are the same - this will require different flags but the same mechanism // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine auto w = std::unique_ptr(new Waiter); - const auto rc = amqp::publish_with_confirm(conn, + const auto rc = amqp::publish_with_confirm(conn_id, topic, json_format_pubsub_event(event), std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index 186bdd54ad6fe..0bea58d24d57b 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp #include "rgw_amqp.h" @@ -16,6 +16,7 @@ #include #include #include +#include #include "common/dout.h" #include @@ -50,49 +51,47 @@ static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010; static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008; static const int RGW_AMQP_NO_REPLY_CODE = 0x0; -// key class for the connection list -struct connection_id_t { - const std::string host; - const int port; - const std::string vhost; - // constructed from amqp_connection_info struct - connection_id_t(const amqp_connection_info& info) - : host(info.host), port(info.port), vhost(info.vhost) {} - - // equality operator and hasher functor are needed - // so that connection_id_t could be used as key in unordered_map - bool operator==(const connection_id_t& other) const { - return host == other.host && port == other.port && vhost == other.vhost; +// the amqp_connection_info struct does not hold any memory and just points to the URL string +// so, strings are copied into connection_id_t +connection_id_t::connection_id_t(const amqp_connection_info& info, const std::string& _exchange) + : host(info.host), port(info.port), vhost(info.vhost), exchange(_exchange), ssl(info.ssl) {} + +// equality operator and hasher functor are needed +// so that connection_id_t could be used as key in unordered_map +bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) { + return lhs.host == rhs.host && lhs.port == rhs.port && + lhs.vhost == rhs.vhost && lhs.exchange == rhs.exchange; +} + +struct connection_id_hasher { + std::size_t operator()(const connection_id_t& k) const { + std::size_t h = 0; + boost::hash_combine(h, k.host); + boost::hash_combine(h, k.port); + boost::hash_combine(h, k.vhost); + boost::hash_combine(h, k.exchange); + return h; } - - struct hasher { - std::size_t operator()(const connection_id_t& k) const { - return ((std::hash()(k.host) - ^ (std::hash()(k.port) << 1)) >> 1) - ^ (std::hash()(k.vhost) << 1); - } - }; }; std::string to_string(const connection_id_t& id) { - return id.host+":"+std::to_string(id.port)+id.vhost; + return std::string("amqp")+(id.ssl ? "s" : "")+"://"+id.host+":"+std::to_string(id.port)+id.vhost+"?exchange="+id.exchange; } -// connection_t state cleaner -// could be used for automatic cleanup when getting out of scope +// automatically cleans amqp state when gets out of scope class ConnectionCleaner { private: - amqp_connection_state_t conn; + amqp_connection_state_t state; public: - ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {} + ConnectionCleaner(amqp_connection_state_t _state) : state(_state) {} ~ConnectionCleaner() { - if (conn) { - amqp_destroy_connection(conn); + if (state) { + amqp_destroy_connection(state); } } // call reset() if cleanup is not needed anymore void reset() { - conn = nullptr; + state = nullptr; } }; @@ -100,9 +99,9 @@ class ConnectionCleaner { struct reply_callback_with_tag_t { uint64_t tag; reply_callback_t cb; - + reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {} - + bool operator==(uint64_t rhs) { return tag == rhs; } @@ -111,44 +110,26 @@ struct reply_callback_with_tag_t { typedef std::vector CallbackList; // struct for holding the connection state object as well as the exchange -// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr) -// since references to deleted objects may still exist in the calling code struct connection_t { - std::atomic state; - std::string exchange; + CephContext* cct = nullptr; + amqp_connection_state_t state = nullptr; + amqp_bytes_t reply_to_queue = amqp_empty_bytes; + uint64_t delivery_tag = 1; + int status = AMQP_STATUS_OK; + int reply_type = AMQP_RESPONSE_NORMAL; + int reply_code = RGW_AMQP_NO_REPLY_CODE; + CallbackList callbacks; + ceph::coarse_real_clock::time_point next_reconnect = ceph::coarse_real_clock::now(); + bool mandatory = false; + const bool use_ssl = false; std::string user; std::string password; - amqp_bytes_t reply_to_queue; - uint64_t delivery_tag; - int status; - int reply_type; - int reply_code; - mutable std::atomic ref_count; - CephContext* cct; - CallbackList callbacks; - ceph::coarse_real_clock::time_point next_reconnect; - bool mandatory; - bool use_ssl; - bool verify_ssl; + bool verify_ssl = true; boost::optional ca_location; utime_t timestamp = ceph_clock_now(); - // default ctor - connection_t() : - state(nullptr), - reply_to_queue(amqp_empty_bytes), - delivery_tag(1), - status(AMQP_STATUS_OK), - reply_type(AMQP_RESPONSE_NORMAL), - reply_code(RGW_AMQP_NO_REPLY_CODE), - ref_count(0), - cct(nullptr), - next_reconnect(ceph::coarse_real_clock::now()), - mandatory(false), - use_ssl(false), - verify_ssl(false), - ca_location(boost::none) - {} + connection_t(CephContext* _cct, const amqp_connection_info& info, bool _verify_ssl, boost::optional _ca_location) : + cct(_cct), use_ssl(info.ssl), user(info.user), password(info.password), verify_ssl(_verify_ssl), ca_location(_ca_location) {} // cleanup of all internal connection resource // the object can still remain, and internal connection @@ -176,28 +157,15 @@ struct connection_t { ~connection_t() { destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED); } - - friend void intrusive_ptr_add_ref(const connection_t* p); - friend void intrusive_ptr_release(const connection_t* p); }; -// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr -void intrusive_ptr_add_ref(const connection_t* p) { - ++p->ref_count; -} -void intrusive_ptr_release(const connection_t* p) { - if (--p->ref_count == 0) { - delete p; - } -} - // convert connection info to string std::string to_string(const amqp_connection_info& info) { std::stringstream ss; ss << "connection info:" << "\nHost: " << info.host << "\nPort: " << info.port << - "\nUser: " << info.user << + "\nUser: " << info.user << "\nPassword: " << info.password << "\nvhost: " << info.vhost << "\nSSL support: " << info.ssl << std::endl; @@ -212,7 +180,7 @@ int reply_to_code(const amqp_rpc_reply_t& reply) { return RGW_AMQP_NO_REPLY_CODE; case AMQP_RESPONSE_LIBRARY_EXCEPTION: return reply.library_error; - case AMQP_RESPONSE_SERVER_EXCEPTION: + case AMQP_RESPONSE_SERVER_EXCEPTION: if (reply.reply.decoded) { const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded; return m->reply_code; @@ -232,7 +200,7 @@ std::string to_string(const amqp_rpc_reply_t& reply) { return "missing RPC reply type"; case AMQP_RESPONSE_LIBRARY_EXCEPTION: return amqp_error_string2(reply.library_error); - case AMQP_RESPONSE_SERVER_EXCEPTION: + case AMQP_RESPONSE_SERVER_EXCEPTION: { switch (reply.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: @@ -281,7 +249,7 @@ std::string to_string(amqp_status_enum s) { case AMQP_STATUS_SOCKET_ERROR: return "AMQP_STATUS_SOCKET_ERROR"; case AMQP_STATUS_INVALID_PARAMETER: - return "AMQP_STATUS_INVALID_PARAMETER"; + return "AMQP_STATUS_INVALID_PARAMETER"; case AMQP_STATUS_TABLE_TOO_BIG: return "AMQP_STATUS_TABLE_TOO_BIG"; case AMQP_STATUS_WRONG_METHOD: @@ -305,13 +273,13 @@ std::string to_string(amqp_status_enum s) { return "AMQP_STATUS_UNSUPPORTED"; #endif case _AMQP_STATUS_NEXT_VALUE: - return "AMQP_STATUS_INTERNAL"; + return "AMQP_STATUS_INTERNAL"; case AMQP_STATUS_TCP_ERROR: return "AMQP_STATUS_TCP_ERROR"; case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR: return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR"; case _AMQP_STATUS_TCP_NEXT_VALUE: - return "AMQP_STATUS_INTERNAL"; + return "AMQP_STATUS_INTERNAL"; case AMQP_STATUS_SSL_ERROR: return "AMQP_STATUS_SSL_ERROR"; case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED: @@ -321,7 +289,7 @@ std::string to_string(amqp_status_enum s) { case AMQP_STATUS_SSL_CONNECTION_FAILED: return "AMQP_STATUS_SSL_CONNECTION_FAILED"; case _AMQP_STATUS_SSL_NEXT_VALUE: - return "AMQP_STATUS_INTERNAL"; + return "AMQP_STATUS_INTERNAL"; #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0) case AMQP_STATUS_SSL_SET_ENGINE_FAILED: return "AMQP_STATUS_SSL_SET_ENGINE_FAILED"; @@ -374,7 +342,7 @@ std::string status_to_string(int s) { #define RETURN_ON_ERROR(C, S, OK) \ if (!OK) { \ C->status = S; \ - return C; \ + return false; \ } // in case of RPC calls, getting the RPC reply and return if an error is detected @@ -384,7 +352,7 @@ std::string status_to_string(int s) { C->status = S; \ C->reply_type = reply.reply_type; \ C->reply_code = reply_to_code(reply); \ - return C; \ + return false; \ } \ } @@ -392,25 +360,25 @@ static const amqp_channel_t CHANNEL_ID = 1; static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2; // utility function to create a connection, when the connection object already exists -connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) { - ceph_assert(conn); - +bool new_state(connection_t* conn, const connection_id_t& conn_id) { + // state must be null at this point + ceph_assert(!conn->state); // reset all status codes - conn->status = AMQP_STATUS_OK; + conn->status = AMQP_STATUS_OK; conn->reply_type = AMQP_RESPONSE_NORMAL; conn->reply_code = RGW_AMQP_NO_REPLY_CODE; auto state = amqp_new_connection(); if (!state) { conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED; - return conn; + return false; } // make sure that the connection state is cleaned up in case of error ConnectionCleaner state_guard(state); // create and open socket amqp_socket_t *socket = nullptr; - if (info.ssl) { + if (conn->use_ssl) { socket = amqp_ssl_socket_new(state); #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1) SSL_CTX* ssl_ctx = reinterpret_cast(amqp_ssl_socket_get_context(socket)); @@ -433,9 +401,9 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio if (!socket) { conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED; - return conn; + return false; } - if (info.ssl) { + if (conn->use_ssl) { if (!conn->verify_ssl) { amqp_ssl_socket_set_verify_peer(socket, 0); amqp_ssl_socket_set_verify_hostname(socket, 0); @@ -445,32 +413,32 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio if (s != AMQP_STATUS_OK) { conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED; conn->reply_code = s; - return conn; + return false; } } } - const auto s = amqp_socket_open(socket, info.host, info.port); + const auto s = amqp_socket_open(socket, conn_id.host.c_str(), conn_id.port); if (s < 0) { conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED; conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR; conn->reply_code = s; - return conn; + return false; } // login to broker const auto reply = amqp_login(state, - info.vhost, + conn_id.vhost.c_str(), AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, 0, // no heartbeat TODO: add conf AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security - info.user, - info.password); + conn->user.c_str(), + conn->password.c_str()); if (reply.reply_type != AMQP_RESPONSE_NORMAL) { conn->status = RGW_AMQP_STATUS_LOGIN_FAILED; conn->reply_type = reply.reply_type; conn->reply_code = reply_to_code(reply); - return conn; + return false; } // open channels @@ -493,9 +461,9 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio // verify that the topic exchange is there // TODO: make this step optional { - const auto ok = amqp_exchange_declare(state, + const auto ok = amqp_exchange_declare(state, CHANNEL_ID, - amqp_cstring_bytes(conn->exchange.c_str()), + amqp_cstring_bytes(conn_id.exchange.c_str()), amqp_cstring_bytes("topic"), 1, // passive - exchange must already exist on broker 1, // durable @@ -507,12 +475,12 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio } { // create queue for confirmations - const auto queue_ok = amqp_queue_declare(state, + const auto queue_ok = amqp_queue_declare(state, CHANNEL_ID, // use the regular channel for this call amqp_empty_bytes, // let broker allocate queue name - 0, // not passive - create the queue - 0, // not durable - 1, // exclusive + 0, // not passive - create the queue + 0, // not durable + 1, // exclusive 1, // auto-delete amqp_empty_table // not args TODO add args from conf: TTL, max length etc. ); @@ -520,8 +488,8 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED); // define consumption for connection - const auto consume_ok = amqp_basic_consume(state, - CONFIRMING_CHANNEL_ID, + const auto consume_ok = amqp_basic_consume(state, + CONFIRMING_CHANNEL_ID, queue_ok->queue, amqp_empty_bytes, // broker will generate consumer tag 1, // messages sent from client are never routed back @@ -533,45 +501,30 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok); RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED); // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed - + state_guard.reset(); conn->state = state; conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue); - return conn; } -} - -// utility function to create a new connection -connection_ptr_t create_new_connection(const amqp_connection_info& info, - const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional ca_location) { - // create connection state - connection_ptr_t conn = new connection_t; - conn->exchange = exchange; - conn->user.assign(info.user); - conn->password.assign(info.password); - conn->mandatory = mandatory_delivery; - conn->cct = cct; - conn->use_ssl = info.ssl; - conn->verify_ssl = verify_ssl; - conn->ca_location = ca_location; - return create_connection(conn, info); + return true; } /// struct used for holding messages in the message queue struct message_wrapper_t { - connection_ptr_t conn; + connection_id_t conn_id; std::string topic; std::string message; reply_callback_t cb; - - message_wrapper_t(connection_ptr_t& _conn, + + message_wrapper_t(const connection_id_t& _conn_id, const std::string& _topic, const std::string& _message, - reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {} + reply_callback_t _cb) : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {} }; +using connection_t_ptr = std::unique_ptr; -typedef std::unordered_map ConnectionList; +typedef std::unordered_map ConnectionList; typedef boost::lockfree::queue> MessageQueue; // macros used inside a loop where an iterator is either incremented or erased @@ -606,14 +559,23 @@ private: void publish_internal(message_wrapper_t* message) { const std::unique_ptr msg_owner(message); - auto& conn = message->conn; + const auto& conn_id = message->conn_id; + auto conn_it = connections.find(conn_id); + if (conn_it == connections.end()) { + ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' not found" << dendl; + if (message->cb) { + message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED); + } + return; + } + + auto& conn = conn_it->second; conn->timestamp = ceph_clock_now(); if (!conn->is_ok()) { // connection had an issue while message was in the queue - // TODO add error stats - ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl; + ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' is closed" << dendl; if (message->cb) { message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED); } @@ -621,20 +583,19 @@ private: } if (message->cb == nullptr) { - // TODO add error stats const auto rc = amqp_basic_publish(conn->state, CHANNEL_ID, - amqp_cstring_bytes(conn->exchange.c_str()), + amqp_cstring_bytes(conn_id.exchange.c_str()), amqp_cstring_bytes(message->topic.c_str()), 0, // does not have to be routable 0, // not immediate nullptr, // no properties needed amqp_cstring_bytes(message->message.c_str())); if (rc == AMQP_STATUS_OK) { - ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl; + ldout(cct, 20) << "AMQP publish (no callback): OK" << dendl; return; } - ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl; + ldout(cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl; // an error occurred, close connection // it will be retied by the main loop conn->destroy(rc); @@ -642,15 +603,15 @@ private: } amqp_basic_properties_t props; - props._flags = - AMQP_BASIC_DELIVERY_MODE_FLAG | + props._flags = + AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG; props.delivery_mode = 2; // persistent delivery TODO take from conf props.reply_to = conn->reply_to_queue; const auto rc = amqp_basic_publish(conn->state, CONFIRMING_CHANNEL_ID, - amqp_cstring_bytes(conn->exchange.c_str()), + amqp_cstring_bytes(conn_id.exchange.c_str()), amqp_cstring_bytes(message->topic.c_str()), conn->mandatory, 0, // not immediate @@ -660,17 +621,17 @@ private: if (rc == AMQP_STATUS_OK) { auto const q_len = conn->callbacks.size(); if (q_len < max_inflight) { - ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; + ldout(cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; conn->callbacks.emplace_back(conn->delivery_tag++, message->cb); } else { // immediately invoke callback with error - ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl; + ldout(cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl; message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT); } } else { // an error occurred, close connection // it will be retied by the main loop - ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl; + ldout(cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl; conn->destroy(rc); // immediately invoke callback with error message->cb(rc); @@ -702,12 +663,12 @@ private: auto incoming_message = false; // loop over all connections to read acks for (;conn_it != end_it;) { - + + const auto& conn_id = conn_it->first; auto& conn = conn_it->second; - const auto& conn_key = conn_it->first; if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { - ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; + ldout(cct, 20) << "AMQP run: Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; ERASE_AND_CONTINUE(conn_it, connections); } @@ -717,22 +678,15 @@ private: if (now >= conn->next_reconnect) { // pointers are used temporarily inside the amqp_connection_info object // as read-only values, hence the assignment, and const_cast are safe here - amqp_connection_info info; - info.host = const_cast(conn_key.host.c_str()); - info.port = conn_key.port; - info.vhost = const_cast(conn_key.vhost.c_str()); - info.user = const_cast(conn->user.c_str()); - info.password = const_cast(conn->password.c_str()); - info.ssl = conn->use_ssl; - ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl; - if (create_connection(conn, info)->is_ok() == false) { - ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " << + ldout(cct, 20) << "AMQP run: retry connection" << dendl; + if (!new_state(conn.get(), conn_id)) { + ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry failed. error: " << status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl; // TODO: add error counter for failed retries // TODO: add exponential backoff for retries conn->next_reconnect = now + reconnect_time; } else { - ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl; + ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry successfull" << dendl; } } INCREMENT_AND_CONTINUE(conn_it); @@ -744,7 +698,7 @@ private: // TODO mark connection as idle INCREMENT_AND_CONTINUE(conn_it); } - + // this is just to prevent spinning idle, does not indicate that a message // was successfully processed or not incoming_message = true; @@ -753,13 +707,13 @@ private: if (rc != AMQP_STATUS_OK) { // an error occurred, close connection // it will be retied by the main loop - ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl; + ldout(cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl; conn->destroy(rc); INCREMENT_AND_CONTINUE(conn_it); } if (frame.frame_type != AMQP_FRAME_METHOD) { - ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " + ldout(cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " << unsigned(frame.frame_type) << dendl; // handler is for publish confirmation only - handle only method frames INCREMENT_AND_CONTINUE(conn_it); @@ -770,7 +724,7 @@ private: int result; switch (frame.payload.method.id) { - case AMQP_BASIC_ACK_METHOD: + case AMQP_BASIC_ACK_METHOD: { result = AMQP_STATUS_OK; const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded; @@ -788,12 +742,12 @@ private: multiple = nack->multiple; break; } - case AMQP_BASIC_REJECT_METHOD: - { - result = RGW_AMQP_STATUS_BROKER_NACK; - const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded; - tag = reject->delivery_tag; - multiple = false; + case AMQP_BASIC_REJECT_METHOD: + { + result = RGW_AMQP_STATUS_BROKER_NACK; + const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded; + tag = reject->delivery_tag; + multiple = false; break; } case AMQP_CONNECTION_CLOSE_METHOD: @@ -801,42 +755,40 @@ private: case AMQP_CHANNEL_CLOSE_METHOD: { // other side closed the connection, no need to continue - ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl; + ldout(cct, 10) << "AMQP run: connection was closed by broker" << dendl; conn->destroy(rc); INCREMENT_AND_CONTINUE(conn_it); } case AMQP_BASIC_RETURN_METHOD: // message was not delivered, returned to sender - ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl; + ldout(cct, 10) << "AMQP run: message was not routable" << dendl; INCREMENT_AND_CONTINUE(conn_it); break; default: // unexpected method - ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl; + ldout(cct, 10) << "AMQP run: unexpected message" << dendl; INCREMENT_AND_CONTINUE(conn_it); } - const auto& callbacks_end = conn->callbacks.end(); - const auto& callbacks_begin = conn->callbacks.begin(); - const auto tag_it = std::find(callbacks_begin, callbacks_end, tag); - if (tag_it != callbacks_end) { + const auto tag_it = std::find(conn->callbacks.begin(), conn->callbacks.end(), tag); + if (tag_it != conn->callbacks.end()) { if (multiple) { // n/ack all up to (and including) the tag - ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl; - auto it = callbacks_begin; + ldout(cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl; + auto it = conn->callbacks.begin(); while (it->tag <= tag && it != conn->callbacks.end()) { - ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl; + ldout(cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl; it->cb(result); it = conn->callbacks.erase(it); } } else { // n/ack a specific tag - ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl; + ldout(cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl; tag_it->cb(result); conn->callbacks.erase(tag_it); } } else { - ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl; + ldout(cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl; } // just increment the iterator ++conn_it; @@ -856,11 +808,11 @@ private: public: Manager(size_t _max_connections, size_t _max_inflight, - size_t _max_queue, + size_t _max_queue, long _usec_timeout, unsigned reconnect_time_ms, unsigned idle_time_ms, - CephContext* _cct) : + CephContext* _cct) : max_connections(_max_connections), max_inflight(_max_inflight), max_queue(_max_queue), @@ -876,9 +828,9 @@ public: idle_time(std::chrono::milliseconds(idle_time_ms)), reconnect_time(std::chrono::milliseconds(reconnect_time_ms)), runner(&Manager::run, this) { - // The hashmap has "max connections" as the initial number of buckets, + // The hashmap has "max connections" as the initial number of buckets, // and allows for 10 collisions per bucket before rehash. - // This is to prevent rehashing so that iterators are not invalidated + // This is to prevent rehashing so that iterators are not invalidated // when a new connection is added. connections.max_load_factor(10.0); // give the runner thread a name for easier debugging @@ -896,76 +848,68 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, + bool connect(connection_id_t& id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, boost::optional ca_location) { if (stopped) { ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl; - return nullptr; + return false; } - struct amqp_connection_info info; + amqp_connection_info info; // cache the URL so that parsing could happen in-place std::vector url_cache(url.c_str(), url.c_str()+url.size()+1); const auto retcode = amqp_parse_url(url_cache.data(), &info); if (AMQP_STATUS_OK != retcode) { ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl; - return nullptr; + return false; } + connection_id_t tmp_id(info, exchange); - const connection_id_t id(info); std::lock_guard lock(connections_lock); - const auto it = connections.find(id); + const auto it = connections.find(tmp_id); if (it != connections.end()) { - if (it->second->exchange != exchange) { - ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl; - return nullptr; - } // connection found - return even if non-ok ldout(cct, 20) << "AMQP connect: connection found" << dendl; - return it->second; + id = it->first; + return true; } // connection not found, creating a new one if (connection_count >= max_connections) { ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl; - return nullptr; + return false; } - const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location); - if (!conn->is_ok()) { - ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" << - status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl; - } - // create_new_connection must always return a connection object - // even if error occurred during creation. - // in such a case the creation will be retried in the main thread - ceph_assert(conn); + // if error occurred during creation the creation will be retried in the main thread ++connection_count; + auto conn = connections.emplace(tmp_id, std::make_unique(cct, info, verify_ssl, ca_location)).first->second.get(); ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl; - ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl; - return connections.emplace(id, conn).first->second; + if (!new_state(conn, tmp_id)) { + ldout(cct, 1) << "AMQP connect: new connection '" << to_string(tmp_id) << "' is created. but state creation failed (will retry). error: " << + status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl; + } + id = std::move(tmp_id); + return true; } // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack) - int publish(connection_ptr_t& conn, + int publish(const connection_id_t& conn_id, const std::string& topic, const std::string& message) { if (stopped) { ldout(cct, 1) << "AMQP publish: manager is not running" << dendl; return RGW_AMQP_STATUS_MANAGER_STOPPED; } - if (!conn || !conn->is_ok()) { - ldout(cct, 1) << "AMQP publish: no connection" << dendl; - return RGW_AMQP_STATUS_CONNECTION_CLOSED; - } - if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) { + auto wrapper = std::make_unique(conn_id, topic, message, nullptr); + if (messages.push(wrapper.get())) { + std::ignore = wrapper.release(); ++queued; return AMQP_STATUS_OK; } ldout(cct, 1) << "AMQP publish: queue is full" << dendl; return RGW_AMQP_STATUS_QUEUE_FULL; } - - int publish_with_confirm(connection_ptr_t& conn, + + int publish_with_confirm(const connection_id_t& conn_id, const std::string& topic, const std::string& message, reply_callback_t cb) { @@ -973,11 +917,9 @@ public: ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl; return RGW_AMQP_STATUS_MANAGER_STOPPED; } - if (!conn || !conn->is_ok()) { - ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl; - return RGW_AMQP_STATUS_CONNECTION_CLOSED; - } - if (messages.push(new message_wrapper_t(conn, topic, message, cb))) { + auto wrapper = std::make_unique(conn_id, topic, message, cb); + if (messages.push(wrapper.get())) { + std::ignore = wrapper.release(); ++queued; return AMQP_STATUS_OK; } @@ -997,7 +939,7 @@ public: size_t get_connection_count() const { return connection_count; } - + // get the number of in-flight messages size_t get_inflight() const { size_t sum = 0; @@ -1026,7 +968,7 @@ public: static Manager* s_manager = nullptr; static const size_t MAX_CONNECTIONS_DEFAULT = 256; -static const size_t MAX_INFLIGHT_DEFAULT = 8192; +static const size_t MAX_INFLIGHT_DEFAULT = 8192; static const size_t MAX_QUEUE_DEFAULT = 8192; static const long READ_TIMEOUT_USEC = 100; static const unsigned IDLE_TIME_MS = 100; @@ -1037,7 +979,7 @@ bool init(CephContext* cct) { return false; } // TODO: take conf from CephContext - s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, + s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct); return true; } @@ -1047,32 +989,32 @@ void shutdown() { s_manager = nullptr; } -connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, +bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, boost::optional ca_location) { - if (!s_manager) return nullptr; - return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location); + if (!s_manager) return false; + return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location); } -int publish(connection_ptr_t& conn, +int publish(const connection_id_t& conn_id, const std::string& topic, const std::string& message) { if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; - return s_manager->publish(conn, topic, message); + return s_manager->publish(conn_id, topic, message); } -int publish_with_confirm(connection_ptr_t& conn, +int publish_with_confirm(const connection_id_t& conn_id, const std::string& topic, const std::string& message, reply_callback_t cb) { if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; - return s_manager->publish_with_confirm(conn, topic, message, cb); + return s_manager->publish_with_confirm(conn_id, topic, message, cb); } size_t get_connection_count() { if (!s_manager) return 0; return s_manager->get_connection_count(); } - + size_t get_inflight() { if (!s_manager) return 0; return s_manager->get_inflight(); diff --git a/src/rgw/rgw_amqp.h b/src/rgw/rgw_amqp.h index 84d0650731c53..89cdafc448f56 100644 --- a/src/rgw/rgw_amqp.h +++ b/src/rgw/rgw_amqp.h @@ -6,19 +6,12 @@ #include #include #include -#include #include "include/common_fwd.h" -namespace rgw::amqp { -// forward declaration of connection object -struct connection_t; - -typedef boost::intrusive_ptr connection_ptr_t; +struct amqp_connection_info; -// required interfaces needed so that connection_t could be used inside boost::intrusive_ptr -void intrusive_ptr_add_ref(const connection_t* p); -void intrusive_ptr_release(const connection_t* p); +namespace rgw::amqp { // the reply callback is expected to get an integer parameter // indicating the result, and not to return anything @@ -30,19 +23,30 @@ bool init(CephContext* cct); // shutdown the amqp manager void shutdown(); +// key class for the connection list +struct connection_id_t { + std::string host; + int port; + std::string vhost; + std::string exchange; + bool ssl; + connection_id_t() = default; + connection_id_t(const amqp_connection_info& info, const std::string& _exchange); +}; + // connect to an amqp endpoint -connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, +bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, boost::optional ca_location); // publish a message over a connection that was already created -int publish(connection_ptr_t& conn, +int publish(const connection_id_t& conn_id, const std::string& topic, const std::string& message); // publish a message over a connection that was already created // and pass a callback that will be invoked (async) when broker confirms // receiving the message -int publish_with_confirm(connection_ptr_t& conn, +int publish_with_confirm(const connection_id_t& conn_id, const std::string& topic, const std::string& message, reply_callback_t cb); diff --git a/src/test/rgw/test_rgw_amqp.cc b/src/test/rgw/test_rgw_amqp.cc index bf8671771d9eb..2ce9cb2a0a055 100644 --- a/src/test/rgw/test_rgw_amqp.cc +++ b/src/test/rgw/test_rgw_amqp.cc @@ -13,6 +13,7 @@ using namespace rgw; const std::chrono::milliseconds wait_time(10); const std::chrono::milliseconds long_wait_time = wait_time*50; +const std::chrono::seconds idle_time(30); class CctCleaner { @@ -34,7 +35,7 @@ CctCleaner cleaner(cct); class TestAMQP : public ::testing::Test { protected: - amqp::connection_ptr_t conn = nullptr; + amqp::connection_id_t conn_id; unsigned current_dequeued = 0U; void SetUp() override { @@ -58,13 +59,54 @@ protected: } }; +std::atomic callback_invoked = false; + +std::atomic callbacks_invoked = 0; + +// note: because these callback are shared among different "publish" calls +// they should be used on different connections + +void my_callback_expect_ack(int rc) { + EXPECT_EQ(0, rc); + callback_invoked = true; +} + +void my_callback_expect_nack(int rc) { + EXPECT_LT(rc, 0); + callback_invoked = true; +} + +void my_callback_expect_multiple_acks(int rc) { + EXPECT_EQ(0, rc); + ++callbacks_invoked; +} + +class dynamic_callback_wrapper { + dynamic_callback_wrapper() = default; +public: + static dynamic_callback_wrapper* create() { + return new dynamic_callback_wrapper; + } + void callback(int rc) { + EXPECT_EQ(0, rc); + ++callbacks_invoked; + delete this; + } +}; + +void my_callback_expect_close_or_ack(int rc) { + // deleting the connection should trigger the callback with -4098 + // but due to race conditions, some my get an ack + EXPECT_TRUE(-4098 == rc || 0 == rc); +} + TEST_F(TestAMQP, ConnectionOK) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); + rc = amqp::publish(conn_id, "topic", "message"); EXPECT_EQ(rc, 0); } @@ -73,10 +115,10 @@ TEST_F(TestAMQP, SSLConnectionOK) const int port = 5671; const auto connection_number = amqp::get_connection_count(); amqp_mock::set_valid_port(port); - conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + auto rc = amqp::connect(conn_id, "amqps://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); + rc = amqp::publish(conn_id, "topic", "message"); EXPECT_EQ(rc, 0); amqp_mock::set_valid_port(5672); } @@ -86,193 +128,195 @@ TEST_F(TestAMQP, PlainAndSSLConnectionsOK) const int port = 5671; const auto connection_number = amqp::get_connection_count(); amqp_mock::set_valid_port(port); - amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none); - EXPECT_TRUE(conn1); + amqp::connection_id_t conn_id1; + auto rc = amqp::connect(conn_id1, "amqps://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn1, "topic", "message"); + rc = amqp::publish(conn_id1, "topic", "message"); EXPECT_EQ(rc, 0); amqp_mock::set_valid_port(5672); - amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); - EXPECT_TRUE(conn2); + amqp::connection_id_t conn_id2; + rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 2); - rc = amqp::publish(conn2, "topic", "message"); + rc = amqp::publish(conn_id2, "topic", "message"); EXPECT_EQ(rc, 0); } TEST_F(TestAMQP, ConnectionReuse) { - amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); - EXPECT_TRUE(conn1); + amqp::connection_id_t conn_id1; + auto rc = amqp::connect(conn_id1, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none); - EXPECT_TRUE(conn2); + amqp::connection_id_t conn_id2; + rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number); - auto rc = amqp::publish(conn1, "topic", "message"); + rc = amqp::publish(conn_id1, "topic", "message"); EXPECT_EQ(rc, 0); } TEST_F(TestAMQP, NameResolutionFail) { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://kaboom", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } TEST_F(TestAMQP, InvalidPort) { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost:1234", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } TEST_F(TestAMQP, InvalidHost) { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://0.0.0.1", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } TEST_F(TestAMQP, InvalidVhost) { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost/kaboom", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } TEST_F(TestAMQP, UserPassword) { amqp_mock::set_valid_host("127.0.0.1"); { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } // now try the same connection with default user/password amqp_mock::set_valid_host("127.0.0.2"); { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } amqp_mock::set_valid_host("localhost"); } TEST_F(TestAMQP, URLParseError) { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("http://localhost", "ex1", false, false, boost::none); - EXPECT_FALSE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "http://localhost", "ex1", false, false, boost::none); + EXPECT_FALSE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } TEST_F(TestAMQP, ExchangeMismatch) { + callback_invoked = false; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("http://localhost", "ex2", false, false, boost::none); - EXPECT_FALSE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "http://localhost", "ex2", false, false, boost::none); + EXPECT_FALSE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); } TEST_F(TestAMQP, MaxConnections) { // fill up all connections - std::vector connections; + std::vector connections; auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count(); while (remaining_connections > 0) { const auto host = "127.10.0." + std::to_string(remaining_connections); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); - auto rc = amqp::publish(conn, "topic", "message"); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); --remaining_connections; - connections.push_back(conn); + connections.push_back(conn_id); } EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections()); + wait_until_drained(); // try to add another connection { const std::string host = "toomany"; amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_FALSE(conn); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_FALSE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); } EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections()); amqp_mock::set_valid_host("localhost"); } -std::atomic callback_invoked = false; - -std::atomic callbacks_invoked = 0; - -// note: because these callback are shared among different "publish" calls -// they should be used on different connections - -void my_callback_expect_ack(int rc) { - EXPECT_EQ(0, rc); - callback_invoked = true; -} - -void my_callback_expect_nack(int rc) { - EXPECT_LT(rc, 0); - callback_invoked = true; -} - -void my_callback_expect_multiple_acks(int rc) { - EXPECT_EQ(0, rc); - ++callbacks_invoked; -} - -class dynamic_callback_wrapper { - dynamic_callback_wrapper() = default; -public: - static dynamic_callback_wrapper* create() { - return new dynamic_callback_wrapper; - } - void callback(int rc) { - EXPECT_EQ(0, rc); - ++callbacks_invoked; - delete this; - } -}; - -void my_callback_expect_close_or_ack(int rc) { - // deleting the connection should trigger the callback with -4098 - // but due to race conditions, some my get an ack - EXPECT_TRUE(-4098 == rc || 0 == rc); -} TEST_F(TestAMQP, ReceiveAck) { callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); wait_until_drained(); EXPECT_TRUE(callback_invoked); @@ -284,16 +328,15 @@ TEST_F(TestAMQP, ImplicitConnectionClose) callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); const auto NUMBER_OF_CALLS = 2000; for (auto i = 0; i < NUMBER_OF_CALLS; ++i) { - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack); + auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_close_or_ack); EXPECT_EQ(rc, 0); } wait_until_drained(); - // deleting the connection object should close the connection - conn.reset(nullptr); amqp_mock::set_valid_host("localhost"); } @@ -302,11 +345,12 @@ TEST_F(TestAMQP, ReceiveMultipleAck) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); const auto NUMBER_OF_CALLS = 100; for (auto i=0; i < NUMBER_OF_CALLS; ++i) { - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks); + auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks); EXPECT_EQ(rc, 0); } wait_until_drained(); @@ -320,12 +364,13 @@ TEST_F(TestAMQP, ReceiveAckForMultiple) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; for (auto i=0; i < NUMBER_OF_CALLS; ++i) { - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks); EXPECT_EQ(rc, 0); } wait_until_drained(); @@ -339,12 +384,13 @@ TEST_F(TestAMQP, DynamicCallback) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; for (auto i=0; i < NUMBER_OF_CALLS; ++i) { - auto rc = publish_with_confirm(conn, "topic", "message", + rc = publish_with_confirm(conn_id, "topic", "message", std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1)); EXPECT_EQ(rc, 0); } @@ -360,9 +406,10 @@ TEST_F(TestAMQP, ReceiveNack) amqp_mock::REPLY_ACK = false; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); wait_until_drained(); EXPECT_TRUE(callback_invoked); @@ -377,9 +424,10 @@ TEST_F(TestAMQP, FailWrite) amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); wait_until_drained(); EXPECT_TRUE(callback_invoked); @@ -390,35 +438,49 @@ TEST_F(TestAMQP, FailWrite) TEST_F(TestAMQP, RetryInvalidHost) { + callback_invoked = false; const std::string host = "192.168.0.1"; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://"+host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); // now next retry should be ok + callback_invoked = false; amqp_mock::set_valid_host(host); std::this_thread::sleep_for(long_wait_time); - rc = amqp::publish(conn, "topic", "message"); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); amqp_mock::set_valid_host("localhost"); } TEST_F(TestAMQP, RetryInvalidPort) { + callback_invoked = false; const int port = 9999; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none); - EXPECT_TRUE(conn); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none); + EXPECT_TRUE(rc); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); - auto rc = amqp::publish(conn, "topic", "message"); - EXPECT_LT(rc, 0); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); // now next retry should be ok + callback_invoked = false; amqp_mock::set_valid_port(port); std::this_thread::sleep_for(long_wait_time); - rc = amqp::publish(conn, "topic", "message"); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); amqp_mock::set_valid_port(5672); } @@ -426,34 +488,40 @@ TEST_F(TestAMQP, RetryFailWrite) { callback_invoked = false; amqp_mock::FAIL_NEXT_WRITE = true; - const std::string host("localhost4"); + const std::string host("localhost2"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); - // set port to a different one, so that reconnect would fail - amqp_mock::set_valid_port(9999); wait_until_drained(); EXPECT_TRUE(callback_invoked); - callback_invoked = false; - rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); - EXPECT_LT(rc, 0); - // expect immediate failure, no callback called after sleep - std::this_thread::sleep_for(long_wait_time); - EXPECT_FALSE(callback_invoked); - // set port to the right one so that reconnect would succeed - amqp_mock::set_valid_port(5672); - callback_invoked = false; + // now next retry should be ok amqp_mock::FAIL_NEXT_WRITE = false; - // give time to reconnect + callback_invoked = false; std::this_thread::sleep_for(long_wait_time); - // retry to publish should succeed now - rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); wait_until_drained(); EXPECT_TRUE(callback_invoked); - callback_invoked = false; amqp_mock::set_valid_host("localhost"); } +TEST_F(TestAMQP, IdleConnection) +{ + // this test is skipped since it takes 30seconds + //GTEST_SKIP(); + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + std::this_thread::sleep_for(idle_time); + EXPECT_EQ(amqp::get_connection_count(), connection_number); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + -- 2.39.5