]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/amqp: remove possible race conditions with the amqp connections
authorYuval Lifshitz <ylifshit@redhat.com>
Fri, 16 Jun 2023 15:10:19 +0000 (15:10 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Wed, 21 Jun 2023 08:06:17 +0000 (08:06 +0000)
* simplify memory management of the connection by not using a unique_ptr
* simplify the logic by handling all issues inside the amqp manager
* fix iterator invalidation issue with miltiple n/acks
* allow different connections with different exchanges
* modify the unit tests according to the new behavior

Fixes:
* https://tracker.ceph.com/issues/61639
* https://tracker.ceph.com/issues/46127

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
doc/radosgw/notifications.rst
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_amqp.cc
src/rgw/rgw_amqp.h
src/test/rgw/test_rgw_amqp.cc

index e450dd9488a5d50b1c4091e71f1684f03930cad1..1cc538432f3dce16d2ffd241c8d6e53250b5e3ee 100644 (file)
@@ -188,8 +188,7 @@ Request parameters:
    specified CA will be used to authenticate the broker. The default CA will
    not be used.  
  - amqp-exchange: The exchanges must exist and must be able to route messages
-   based on topics. This parameter is mandatory.  Different topics that point
-   to the same endpoint must use the same exchange.
+   based on topics. This parameter is mandatory.
  - amqp-ack-level: No end2end acking is required. Messages may persist in the
    broker before being delivered to their final destinations. Three ack methods
    exist:
index f15aa3bcc693d9b8c7d0b1dccf1df35d938d8ebc..bdb24ce9ad10326d2e4438d20e14c1ede6d9d6a2 100644 (file)
@@ -128,7 +128,7 @@ private:
   const std::string topic;
   const std::string exchange;
   ack_level_t ack_level;
-  amqp::connection_ptr_t conn;
+  amqp::connection_id_t conn_id;
 
   bool get_verify_ssl(const RGWHTTPArgs& args) {
     bool exists;
@@ -181,9 +181,8 @@ public:
         endpoint(_endpoint), 
         topic(_topic),
         exchange(get_exchange(args)),
-        ack_level(get_ack_level(args)),
-        conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
-    if (!conn) { 
+        ack_level(get_ack_level(args)) {
+    if (!amqp::connect(conn_id, endpoint, exchange, (ack_level == ack_level_t::Broker), get_verify_ssl(args), args.get_optional("ca-location"))) {
       throw configuration_error("AMQP: failed to create connection to: " + endpoint);
     }
   }
@@ -243,14 +242,13 @@ public:
   };
 
   int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
-    ceph_assert(conn);
     if (ack_level == ack_level_t::None) {
-      return amqp::publish(conn, topic, json_format_pubsub_event(event));
+      return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
       // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
       auto w = std::unique_ptr<Waiter>(new Waiter);
-      const auto rc = amqp::publish_with_confirm(conn, 
+      const auto rc = amqp::publish_with_confirm(conn_id
         topic,
         json_format_pubsub_event(event),
         std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
index 186bdd54ad6fe4271912633c6ba5fd334cc3452b..0bea58d24d57b28bdc7b5d1d79340d140e30da11 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab ft=cpp
 
 #include "rgw_amqp.h"
@@ -16,6 +16,7 @@
 #include <atomic>
 #include <mutex>
 #include <boost/lockfree/queue.hpp>
+#include <boost/functional/hash.hpp>
 #include "common/dout.h"
 #include <openssl/ssl.h>
 
@@ -50,49 +51,47 @@ static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED =   -0x2010;
 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR =         -0x3008;
 static const int RGW_AMQP_NO_REPLY_CODE =                 0x0;
 
-// key class for the connection list
-struct connection_id_t {
-  const std::string host;
-  const int port;
-  const std::string vhost;
-  // constructed from amqp_connection_info struct
-  connection_id_t(const amqp_connection_info& info) 
-    : host(info.host), port(info.port), vhost(info.vhost) {}
-
-  // equality operator and hasher functor are needed 
-  // so that connection_id_t could be used as key in unordered_map
-  bool operator==(const connection_id_t& other) const {
-    return host == other.host && port == other.port && vhost == other.vhost;
+// the amqp_connection_info struct does not hold any memory and just points to the URL string
+// so, strings are copied into connection_id_t
+connection_id_t::connection_id_t(const amqp_connection_info& info, const std::string& _exchange)
+    : host(info.host), port(info.port), vhost(info.vhost), exchange(_exchange), ssl(info.ssl) {}
+
+// equality operator and hasher functor are needed
+// so that connection_id_t could be used as key in unordered_map
+bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
+  return lhs.host == rhs.host && lhs.port == rhs.port &&
+    lhs.vhost == rhs.vhost && lhs.exchange == rhs.exchange;
+}
+
+struct connection_id_hasher {
+  std::size_t operator()(const connection_id_t& k) const {
+    std::size_t h = 0;
+    boost::hash_combine(h, k.host);
+    boost::hash_combine(h, k.port);
+    boost::hash_combine(h, k.vhost);
+    boost::hash_combine(h, k.exchange);
+    return h;
   }
-  
-  struct hasher {
-    std::size_t operator()(const connection_id_t& k) const {
-       return ((std::hash<std::string>()(k.host)
-             ^ (std::hash<int>()(k.port) << 1)) >> 1)
-             ^ (std::hash<std::string>()(k.vhost) << 1); 
-    }
-  };
 };
 
 std::string to_string(const connection_id_t& id) {
-    return id.host+":"+std::to_string(id.port)+id.vhost;
+    return std::string("amqp")+(id.ssl ? "s" : "")+"://"+id.host+":"+std::to_string(id.port)+id.vhost+"?exchange="+id.exchange;
 }
 
-// connection_t state cleaner
-// could be used for automatic cleanup when getting out of scope
+// automatically cleans amqp state when gets out of scope
 class ConnectionCleaner {
   private:
-    amqp_connection_state_t conn;
+    amqp_connection_state_t state;
   public:
-    ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {}
+    ConnectionCleaner(amqp_connection_state_t _state) : state(_state) {}
     ~ConnectionCleaner() {
-      if (conn) {
-        amqp_destroy_connection(conn);
+      if (state) {
+        amqp_destroy_connection(state);
       }
     }
     // call reset() if cleanup is not needed anymore
     void reset() {
-      conn = nullptr;
+      state = nullptr;
     }
 };
 
@@ -100,9 +99,9 @@ class ConnectionCleaner {
 struct reply_callback_with_tag_t {
   uint64_t tag;
   reply_callback_t cb;
-  
+
   reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
-  
+
   bool operator==(uint64_t rhs) {
     return tag == rhs;
   }
@@ -111,44 +110,26 @@ struct reply_callback_with_tag_t {
 typedef std::vector<reply_callback_with_tag_t> CallbackList;
 
 // struct for holding the connection state object as well as the exchange
-// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
-// since references to deleted objects may still exist in the calling code
 struct connection_t {
-  std::atomic<amqp_connection_state_t> state;
-  std::string exchange;
+  CephContext* cct = nullptr;
+  amqp_connection_state_t state = nullptr;
+  amqp_bytes_t reply_to_queue = amqp_empty_bytes;
+  uint64_t delivery_tag = 1;
+  int status = AMQP_STATUS_OK;
+  int reply_type = AMQP_RESPONSE_NORMAL;
+  int reply_code = RGW_AMQP_NO_REPLY_CODE;
+  CallbackList callbacks;
+  ceph::coarse_real_clock::time_point next_reconnect = ceph::coarse_real_clock::now();
+  bool mandatory = false;
+  const bool use_ssl = false;
   std::string user;
   std::string password;
-  amqp_bytes_t reply_to_queue;
-  uint64_t delivery_tag;
-  int status;
-  int reply_type;
-  int reply_code;
-  mutable std::atomic<int> ref_count;
-  CephContext* cct;
-  CallbackList callbacks;
-  ceph::coarse_real_clock::time_point next_reconnect;
-  bool mandatory;
-  bool use_ssl;
-  bool verify_ssl;
+  bool verify_ssl = true;
   boost::optional<std::string> ca_location;
   utime_t timestamp = ceph_clock_now();
 
-  // default ctor
-  connection_t() :
-    state(nullptr),
-    reply_to_queue(amqp_empty_bytes),
-    delivery_tag(1),
-    status(AMQP_STATUS_OK),
-    reply_type(AMQP_RESPONSE_NORMAL),
-    reply_code(RGW_AMQP_NO_REPLY_CODE),
-    ref_count(0),
-    cct(nullptr),
-    next_reconnect(ceph::coarse_real_clock::now()),
-    mandatory(false),
-    use_ssl(false),
-    verify_ssl(false),
-    ca_location(boost::none)
-  {}
+  connection_t(CephContext* _cct, const amqp_connection_info& info, bool _verify_ssl, boost::optional<const std::string&> _ca_location) :
+    cct(_cct), use_ssl(info.ssl), user(info.user), password(info.password), verify_ssl(_verify_ssl), ca_location(_ca_location) {}
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -176,28 +157,15 @@ struct connection_t {
   ~connection_t() {
     destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
   }
-
-  friend void intrusive_ptr_add_ref(const connection_t* p);
-  friend void intrusive_ptr_release(const connection_t* p);
 };
 
-// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
-void intrusive_ptr_add_ref(const connection_t* p) {
-  ++p->ref_count;
-}
-void intrusive_ptr_release(const connection_t* p) {
-  if (--p->ref_count == 0) {
-    delete p;
-  }
-}
-
 // convert connection info to string
 std::string to_string(const amqp_connection_info& info) {
   std::stringstream ss;
   ss << "connection info:" <<
         "\nHost: " << info.host <<
         "\nPort: " << info.port <<
-        "\nUser: " << info.user << 
+        "\nUser: " << info.user <<
         "\nPassword: " << info.password <<
         "\nvhost: " << info.vhost <<
         "\nSSL support: " << info.ssl << std::endl;
@@ -212,7 +180,7 @@ int reply_to_code(const amqp_rpc_reply_t& reply) {
       return RGW_AMQP_NO_REPLY_CODE;
     case AMQP_RESPONSE_LIBRARY_EXCEPTION:
       return reply.library_error;
-    case AMQP_RESPONSE_SERVER_EXCEPTION: 
+    case AMQP_RESPONSE_SERVER_EXCEPTION:
       if (reply.reply.decoded) {
         const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
         return m->reply_code;
@@ -232,7 +200,7 @@ std::string to_string(const amqp_rpc_reply_t& reply) {
       return "missing RPC reply type";
     case AMQP_RESPONSE_LIBRARY_EXCEPTION:
       return amqp_error_string2(reply.library_error);
-    case AMQP_RESPONSE_SERVER_EXCEPTION: 
+    case AMQP_RESPONSE_SERVER_EXCEPTION:
       {
         switch (reply.reply.id) {
           case AMQP_CONNECTION_CLOSE_METHOD:
@@ -281,7 +249,7 @@ std::string to_string(amqp_status_enum s) {
     case AMQP_STATUS_SOCKET_ERROR:
       return "AMQP_STATUS_SOCKET_ERROR";
     case AMQP_STATUS_INVALID_PARAMETER:
-      return "AMQP_STATUS_INVALID_PARAMETER"; 
+      return "AMQP_STATUS_INVALID_PARAMETER";
     case AMQP_STATUS_TABLE_TOO_BIG:
       return "AMQP_STATUS_TABLE_TOO_BIG";
     case AMQP_STATUS_WRONG_METHOD:
@@ -305,13 +273,13 @@ std::string to_string(amqp_status_enum s) {
       return "AMQP_STATUS_UNSUPPORTED";
 #endif
     case _AMQP_STATUS_NEXT_VALUE:
-      return "AMQP_STATUS_INTERNAL"; 
+      return "AMQP_STATUS_INTERNAL";
     case AMQP_STATUS_TCP_ERROR:
         return "AMQP_STATUS_TCP_ERROR";
     case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
       return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
     case _AMQP_STATUS_TCP_NEXT_VALUE:
-      return "AMQP_STATUS_INTERNAL"; 
+      return "AMQP_STATUS_INTERNAL";
     case AMQP_STATUS_SSL_ERROR:
       return "AMQP_STATUS_SSL_ERROR";
     case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
@@ -321,7 +289,7 @@ std::string to_string(amqp_status_enum s) {
     case AMQP_STATUS_SSL_CONNECTION_FAILED:
       return "AMQP_STATUS_SSL_CONNECTION_FAILED";
     case _AMQP_STATUS_SSL_NEXT_VALUE:
-      return "AMQP_STATUS_INTERNAL"; 
+      return "AMQP_STATUS_INTERNAL";
 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
     case AMQP_STATUS_SSL_SET_ENGINE_FAILED:
       return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
@@ -374,7 +342,7 @@ std::string status_to_string(int s) {
 #define RETURN_ON_ERROR(C, S, OK) \
   if (!OK) { \
     C->status = S; \
-    return C; \
+    return false; \
   }
 
 // in case of RPC calls, getting the RPC reply and return if an error is detected
@@ -384,7 +352,7 @@ std::string status_to_string(int s) {
         C->status = S; \
         C->reply_type = reply.reply_type; \
         C->reply_code = reply_to_code(reply); \
-        return C; \
+        return false; \
       } \
     }
 
@@ -392,25 +360,25 @@ static const amqp_channel_t CHANNEL_ID = 1;
 static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
 
 // utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
-  ceph_assert(conn);
-
+bool new_state(connection_t* conn, const connection_id_t& conn_id) {
+  // state must be null at this point
+  ceph_assert(!conn->state);
   // reset all status codes
-  conn->status = AMQP_STATUS_OK; 
+  conn->status = AMQP_STATUS_OK;
   conn->reply_type = AMQP_RESPONSE_NORMAL;
   conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
 
   auto state = amqp_new_connection();
   if (!state) {
     conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
-    return conn;
+    return false;
   }
   // make sure that the connection state is cleaned up in case of error
   ConnectionCleaner state_guard(state);
 
   // create and open socket
   amqp_socket_t *socket = nullptr;
-  if (info.ssl) {
+  if (conn->use_ssl) {
     socket = amqp_ssl_socket_new(state);
 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
     SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
@@ -433,9 +401,9 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
 
   if (!socket) {
     conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
-    return conn;
+    return false;
   }
-  if (info.ssl) {
+  if (conn->use_ssl) {
     if (!conn->verify_ssl) {
       amqp_ssl_socket_set_verify_peer(socket, 0);
       amqp_ssl_socket_set_verify_hostname(socket, 0);
@@ -445,32 +413,32 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
       if (s != AMQP_STATUS_OK) {
         conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
         conn->reply_code = s;
-        return conn;
+        return false;
       }
     }
   }
-  const auto s = amqp_socket_open(socket, info.host, info.port);
+  const auto s = amqp_socket_open(socket, conn_id.host.c_str(), conn_id.port);
   if (s < 0) {
     conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
     conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
     conn->reply_code = s;
-    return conn;
+    return false;
   }
 
   // login to broker
   const auto reply = amqp_login(state,
-      info.vhost, 
+      conn_id.vhost.c_str(),
       AMQP_DEFAULT_MAX_CHANNELS,
       AMQP_DEFAULT_FRAME_SIZE,
       0,                        // no heartbeat TODO: add conf
       AMQP_SASL_METHOD_PLAIN,   // TODO: add other types of security
-      info.user,
-      info.password);
+      conn->user.c_str(),
+      conn->password.c_str());
   if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
     conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
     conn->reply_type = reply.reply_type;
     conn->reply_code = reply_to_code(reply);
-    return conn;
+    return false;
   }
 
   // open channels
@@ -493,9 +461,9 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
   // verify that the topic exchange is there
   // TODO: make this step optional
   {
-    const auto ok = amqp_exchange_declare(state, 
+    const auto ok = amqp_exchange_declare(state,
       CHANNEL_ID,
-      amqp_cstring_bytes(conn->exchange.c_str()),
+      amqp_cstring_bytes(conn_id.exchange.c_str()),
       amqp_cstring_bytes("topic"),
       1, // passive - exchange must already exist on broker
       1, // durable
@@ -507,12 +475,12 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
   }
   {
     // create queue for confirmations
-    const auto queue_ok = amqp_queue_declare(state, 
+    const auto queue_ok = amqp_queue_declare(state,
         CHANNEL_ID,         // use the regular channel for this call
         amqp_empty_bytes,   // let broker allocate queue name
-        0,                  // not passive - create the queue 
-        0,                  // not durable 
-        1,                  // exclusive 
+        0,                  // not passive - create the queue
+        0,                  // not durable
+        1,                  // exclusive
         1,                  // auto-delete
         amqp_empty_table    // not args TODO add args from conf: TTL, max length etc.
         );
@@ -520,8 +488,8 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
     RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
 
     // define consumption for connection
-    const auto consume_ok = amqp_basic_consume(state, 
-        CONFIRMING_CHANNEL_ID, 
+    const auto consume_ok = amqp_basic_consume(state,
+        CONFIRMING_CHANNEL_ID,
         queue_ok->queue,
         amqp_empty_bytes, // broker will generate consumer tag
         1,                // messages sent from client are never routed back
@@ -533,45 +501,30 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
     RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
     RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
     // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
-    
+
     state_guard.reset();
     conn->state = state;
     conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
-    return conn;
   }
-}
-
-// utility function to create a new connection
-connection_ptr_t create_new_connection(const amqp_connection_info& info, 
-    const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) { 
-  // create connection state
-  connection_ptr_t conn = new connection_t;
-  conn->exchange = exchange;
-  conn->user.assign(info.user);
-  conn->password.assign(info.password);
-  conn->mandatory = mandatory_delivery;
-  conn->cct = cct;
-  conn->use_ssl = info.ssl;
-  conn->verify_ssl = verify_ssl;
-  conn->ca_location = ca_location;
-  return create_connection(conn, info);
+  return true;
 }
 
 /// struct used for holding messages in the message queue
 struct message_wrapper_t {
-  connection_ptr_t conn; 
+  connection_id_t conn_id;
   std::string topic;
   std::string message;
   reply_callback_t cb;
-  
-  message_wrapper_t(connection_ptr_t& _conn,
+
+  message_wrapper_t(const connection_id_t& _conn_id,
       const std::string& _topic,
       const std::string& _message,
-      reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
+      reply_callback_t _cb) : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
 };
 
+using connection_t_ptr = std::unique_ptr<connection_t>;
 
-typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList;
+typedef std::unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher> ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
 // macros used inside a loop where an iterator is either incremented or erased
@@ -606,14 +559,23 @@ private:
 
   void publish_internal(message_wrapper_t* message) {
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
-    auto& conn = message->conn;
+    const auto& conn_id = message->conn_id;
+    auto conn_it = connections.find(conn_id);
+    if (conn_it == connections.end()) {
+      ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' not found" << dendl;
+      if (message->cb) {
+        message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
+      }
+      return;
+    }
+
+    auto& conn = conn_it->second;
 
     conn->timestamp = ceph_clock_now();
 
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
-      // TODO add error stats
-      ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
+      ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' is closed" << dendl;
       if (message->cb) {
         message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
       }
@@ -621,20 +583,19 @@ private:
     }
 
     if (message->cb == nullptr) {
-      // TODO add error stats
       const auto rc = amqp_basic_publish(conn->state,
         CHANNEL_ID,
-        amqp_cstring_bytes(conn->exchange.c_str()),
+        amqp_cstring_bytes(conn_id.exchange.c_str()),
         amqp_cstring_bytes(message->topic.c_str()),
         0, // does not have to be routable
         0, // not immediate
         nullptr, // no properties needed
         amqp_cstring_bytes(message->message.c_str()));
       if (rc == AMQP_STATUS_OK) {
-        ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
+        ldout(cct, 20) << "AMQP publish (no callback): OK" << dendl;
         return;
       }
-      ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
+      ldout(cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
       // an error occurred, close connection
       // it will be retied by the main loop
       conn->destroy(rc);
@@ -642,15 +603,15 @@ private:
     }
 
     amqp_basic_properties_t props;
-    props._flags = 
-      AMQP_BASIC_DELIVERY_MODE_FLAG | 
+    props._flags =
+      AMQP_BASIC_DELIVERY_MODE_FLAG |
       AMQP_BASIC_REPLY_TO_FLAG;
     props.delivery_mode = 2; // persistent delivery TODO take from conf
     props.reply_to = conn->reply_to_queue;
 
     const auto rc = amqp_basic_publish(conn->state,
       CONFIRMING_CHANNEL_ID,
-      amqp_cstring_bytes(conn->exchange.c_str()),
+      amqp_cstring_bytes(conn_id.exchange.c_str()),
       amqp_cstring_bytes(message->topic.c_str()),
       conn->mandatory,
       0, // not immediate
@@ -660,17 +621,17 @@ private:
     if (rc == AMQP_STATUS_OK) {
       auto const q_len = conn->callbacks.size();
       if (q_len < max_inflight) {
-        ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+        ldout(cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
         conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
       } else {
         // immediately invoke callback with error
-        ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
+        ldout(cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
         message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
       }
     } else {
       // an error occurred, close connection
       // it will be retied by the main loop
-      ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
+      ldout(cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
       conn->destroy(rc);
       // immediately invoke callback with error
       message->cb(rc);
@@ -702,12 +663,12 @@ private:
       auto incoming_message = false;
       // loop over all connections to read acks
       for (;conn_it != end_it;) {
-        
+
+        const auto& conn_id = conn_it->first;
         auto& conn = conn_it->second;
-        const auto& conn_key = conn_it->first;
 
         if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
-          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+          ldout(cct, 20) << "AMQP run: Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           ERASE_AND_CONTINUE(conn_it, connections);
         }
 
@@ -717,22 +678,15 @@ private:
           if (now >= conn->next_reconnect) {
             // pointers are used temporarily inside the amqp_connection_info object
             // as read-only values, hence the assignment, and const_cast are safe here
-            amqp_connection_info info;
-            info.host = const_cast<char*>(conn_key.host.c_str());
-            info.port = conn_key.port;
-            info.vhost = const_cast<char*>(conn_key.vhost.c_str());
-            info.user = const_cast<char*>(conn->user.c_str());
-            info.password = const_cast<char*>(conn->password.c_str());
-            info.ssl = conn->use_ssl;
-            ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
-            if (create_connection(conn, info)->is_ok() == false) {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
+            ldout(cct, 20) << "AMQP run: retry connection" << dendl;
+            if (!new_state(conn.get(), conn_id)) {
+              ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry failed. error: " <<
                 status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
               // TODO: add error counter for failed retries
               // TODO: add exponential backoff for retries
               conn->next_reconnect = now + reconnect_time;
             } else {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
+              ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry successfull" << dendl;
             }
           }
           INCREMENT_AND_CONTINUE(conn_it);
@@ -744,7 +698,7 @@ private:
           // TODO mark connection as idle
           INCREMENT_AND_CONTINUE(conn_it);
         }
-       
+
         // this is just to prevent spinning idle, does not indicate that a message
         // was successfully processed or not
         incoming_message = true;
@@ -753,13 +707,13 @@ private:
         if (rc != AMQP_STATUS_OK) {
           // an error occurred, close connection
           // it will be retied by the main loop
-          ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
+          ldout(cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
           conn->destroy(rc);
           INCREMENT_AND_CONTINUE(conn_it);
         }
 
         if (frame.frame_type != AMQP_FRAME_METHOD) {
-          ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " 
+          ldout(cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
             << unsigned(frame.frame_type) << dendl;
           // handler is for publish confirmation only - handle only method frames
           INCREMENT_AND_CONTINUE(conn_it);
@@ -770,7 +724,7 @@ private:
         int result;
 
         switch (frame.payload.method.id) {
-          case AMQP_BASIC_ACK_METHOD: 
+          case AMQP_BASIC_ACK_METHOD:
             {
               result = AMQP_STATUS_OK;
               const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
@@ -788,12 +742,12 @@ private:
               multiple = nack->multiple;
               break;
             }
-          case AMQP_BASIC_REJECT_METHOD:                                                   
-            {                                                                              
-              result = RGW_AMQP_STATUS_BROKER_NACK;                                        
-              const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;      
-              tag = reject->delivery_tag;                                                  
-              multiple = false;                                                            
+          case AMQP_BASIC_REJECT_METHOD:
+            {
+              result = RGW_AMQP_STATUS_BROKER_NACK;
+              const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
+              tag = reject->delivery_tag;
+              multiple = false;
               break;
             }
           case AMQP_CONNECTION_CLOSE_METHOD:
@@ -801,42 +755,40 @@ private:
           case AMQP_CHANNEL_CLOSE_METHOD:
             {
               // other side closed the connection, no need to continue
-              ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
+              ldout(cct, 10) << "AMQP run: connection was closed by broker" << dendl;
               conn->destroy(rc);
               INCREMENT_AND_CONTINUE(conn_it);
             }
           case AMQP_BASIC_RETURN_METHOD:
             // message was not delivered, returned to sender
-            ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
+            ldout(cct, 10) << "AMQP run: message was not routable" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
             break;
           default:
             // unexpected method
-            ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
+            ldout(cct, 10) << "AMQP run: unexpected message" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
         }
 
-        const auto& callbacks_end = conn->callbacks.end();
-        const auto& callbacks_begin = conn->callbacks.begin();
-        const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
-        if (tag_it != callbacks_end) {
+        const auto tag_it = std::find(conn->callbacks.begin(), conn->callbacks.end(), tag);
+        if (tag_it != conn->callbacks.end()) {
           if (multiple) {
             // n/ack all up to (and including) the tag
-            ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
-            auto it = callbacks_begin;
+            ldout(cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+            auto it = conn->callbacks.begin();
             while (it->tag <= tag && it != conn->callbacks.end()) {
-              ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+              ldout(cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
               it->cb(result);
               it = conn->callbacks.erase(it);
             }
           } else {
             // n/ack a specific tag
-            ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+            ldout(cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
             tag_it->cb(result);
             conn->callbacks.erase(tag_it);
           }
         } else {
-          ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
+          ldout(cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
         }
         // just increment the iterator
         ++conn_it;
@@ -856,11 +808,11 @@ private:
 public:
   Manager(size_t _max_connections,
       size_t _max_inflight,
-      size_t _max_queue, 
+      size_t _max_queue,
       long _usec_timeout,
       unsigned reconnect_time_ms,
       unsigned idle_time_ms,
-      CephContext* _cct) : 
+      CephContext* _cct) :
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
@@ -876,9 +828,9 @@ public:
     idle_time(std::chrono::milliseconds(idle_time_ms)),
     reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
     runner(&Manager::run, this) {
-      // The hashmap has "max connections" as the initial number of buckets, 
+      // The hashmap has "max connections" as the initial number of buckets,
       // and allows for 10 collisions per bucket before rehash.
-      // This is to prevent rehashing so that iterators are not invalidated 
+      // This is to prevent rehashing so that iterators are not invalidated
       // when a new connection is added.
       connections.max_load_factor(10.0);
       // give the runner thread a name for easier debugging
@@ -896,76 +848,68 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+  bool connect(connection_id_t& id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location) {
     if (stopped) {
       ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
-      return nullptr;
+      return false;
     }
 
-    struct amqp_connection_info info;
+    amqp_connection_info info;
     // cache the URL so that parsing could happen in-place
     std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
     const auto retcode = amqp_parse_url(url_cache.data(), &info);
     if (AMQP_STATUS_OK != retcode) {
       ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
-      return nullptr;
+      return false;
     }
+    connection_id_t tmp_id(info, exchange);
 
-    const connection_id_t id(info);
     std::lock_guard lock(connections_lock);
-    const auto it = connections.find(id);
+    const auto it = connections.find(tmp_id);
     if (it != connections.end()) {
-      if (it->second->exchange != exchange) {
-        ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
-        return nullptr;
-      }
       // connection found - return even if non-ok
       ldout(cct, 20) << "AMQP connect: connection found" << dendl;
-      return it->second;
+      id = it->first;
+      return true;
     }
 
     // connection not found, creating a new one
     if (connection_count >= max_connections) {
       ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
-      return nullptr;
+      return false;
     }
-    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
-    if (!conn->is_ok()) {
-      ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
-              status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
-    }
-    // create_new_connection must always return a connection object
-    // even if error occurred during creation. 
-    // in such a case the creation will be retried in the main thread
-    ceph_assert(conn);
+    // if error occurred during creation the creation will be retried in the main thread
     ++connection_count;
+    auto conn = connections.emplace(tmp_id, std::make_unique<connection_t>(cct, info, verify_ssl, ca_location)).first->second.get();
     ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
-    ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
-    return connections.emplace(id, conn).first->second;
+    if (!new_state(conn, tmp_id)) {
+      ldout(cct, 1) << "AMQP connect: new connection '" << to_string(tmp_id) << "' is created. but state creation failed (will retry). error: " <<
+        status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
+    }
+    id = std::move(tmp_id);
+    return true;
   }
 
   // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
-  int publish(connection_ptr_t& conn, 
+  int publish(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message) {
     if (stopped) {
       ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
-    if (!conn || !conn->is_ok()) {
-      ldout(cct, 1) << "AMQP publish: no connection" << dendl;
-      return RGW_AMQP_STATUS_CONNECTION_CLOSED;
-    }
-    if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
+    auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
+    if (messages.push(wrapper.get())) {
+      std::ignore = wrapper.release();
       ++queued;
       return AMQP_STATUS_OK;
     }
     ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
     return RGW_AMQP_STATUS_QUEUE_FULL;
   }
-  
-  int publish_with_confirm(connection_ptr_t& conn, 
+
+  int publish_with_confirm(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
@@ -973,11 +917,9 @@ public:
       ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
-    if (!conn || !conn->is_ok()) {
-      ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
-      return RGW_AMQP_STATUS_CONNECTION_CLOSED;
-    }
-    if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
+    auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
+    if (messages.push(wrapper.get())) {
+      std::ignore = wrapper.release();
       ++queued;
       return AMQP_STATUS_OK;
     }
@@ -997,7 +939,7 @@ public:
   size_t get_connection_count() const {
     return connection_count;
   }
-  
+
   // get the number of in-flight messages
   size_t get_inflight() const {
     size_t sum = 0;
@@ -1026,7 +968,7 @@ public:
 static Manager* s_manager = nullptr;
 
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
-static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
+static const size_t MAX_INFLIGHT_DEFAULT = 8192;
 static const size_t MAX_QUEUE_DEFAULT = 8192;
 static const long READ_TIMEOUT_USEC = 100;
 static const unsigned IDLE_TIME_MS = 100;
@@ -1037,7 +979,7 @@ bool init(CephContext* cct) {
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
       READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
   return true;
 }
@@ -1047,32 +989,32 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location) {
-  if (!s_manager) return nullptr;
-  return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
+  if (!s_manager) return false;
+  return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
 }
 
-int publish(connection_ptr_t& conn, 
+int publish(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message) {
   if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
-  return s_manager->publish(conn, topic, message);
+  return s_manager->publish(conn_id, topic, message);
 }
 
-int publish_with_confirm(connection_ptr_t& conn, 
+int publish_with_confirm(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
   if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
-  return s_manager->publish_with_confirm(conn, topic, message, cb);
+  return s_manager->publish_with_confirm(conn_id, topic, message, cb);
 }
 
 size_t get_connection_count() {
   if (!s_manager) return 0;
   return s_manager->get_connection_count();
 }
-  
+
 size_t get_inflight() {
   if (!s_manager) return 0;
   return s_manager->get_inflight();
index 84d0650731c53139a032000b64911a25705faf24..89cdafc448f56f69935b43b857dc726aa0347563 100644 (file)
@@ -6,19 +6,12 @@
 #include <string>
 #include <functional>
 #include <boost/optional.hpp>
-#include <boost/smart_ptr/intrusive_ptr.hpp>
 
 #include "include/common_fwd.h"
 
-namespace rgw::amqp {
-// forward declaration of connection object
-struct connection_t;
-
-typedef boost::intrusive_ptr<connection_t> connection_ptr_t;
+struct amqp_connection_info;
 
-// required interfaces needed so that connection_t could be used inside boost::intrusive_ptr
-void intrusive_ptr_add_ref(const connection_t* p);
-void intrusive_ptr_release(const connection_t* p);
+namespace rgw::amqp {
 
 // the reply callback is expected to get an integer parameter
 // indicating the result, and not to return anything
@@ -30,19 +23,30 @@ bool init(CephContext* cct);
 // shutdown the amqp manager
 void shutdown();
 
+// key class for the connection list
+struct connection_id_t {
+  std::string host;
+  int port;
+  std::string vhost;
+  std::string exchange;
+  bool ssl;
+  connection_id_t() = default;
+  connection_id_t(const amqp_connection_info& info, const std::string& _exchange);
+};
+
 // connect to an amqp endpoint
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location);
 
 // publish a message over a connection that was already created
-int publish(connection_ptr_t& conn,
+int publish(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message);
 
 // publish a message over a connection that was already created
 // and pass a callback that will be invoked (async) when broker confirms
 // receiving the message
-int publish_with_confirm(connection_ptr_t& conn
+int publish_with_confirm(const connection_id_t& conn_id
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb);
index bf8671771d9eb72e4dc8739059236e9929080747..2ce9cb2a0a0551e9fd437bd5bdcfc2c140201f50 100644 (file)
@@ -13,6 +13,7 @@ using namespace rgw;
 
 const std::chrono::milliseconds wait_time(10);
 const std::chrono::milliseconds long_wait_time = wait_time*50;
+const std::chrono::seconds idle_time(30);
 
 
 class CctCleaner {
@@ -34,7 +35,7 @@ CctCleaner cleaner(cct);
 
 class TestAMQP : public ::testing::Test {
 protected:
-  amqp::connection_ptr_t conn = nullptr;
+  amqp::connection_id_t conn_id;
   unsigned current_dequeued = 0U;
 
   void SetUp() override {
@@ -58,13 +59,54 @@ protected:
   }
 };
 
+std::atomic<bool> callback_invoked = false;
+
+std::atomic<int> callbacks_invoked = 0;
+
+// note: because these callback are shared among different "publish" calls
+// they should be used on different connections
+
+void my_callback_expect_ack(int rc) {
+  EXPECT_EQ(0, rc);
+  callback_invoked = true;
+}
+
+void my_callback_expect_nack(int rc) {
+  EXPECT_LT(rc, 0);
+  callback_invoked = true;
+}
+
+void my_callback_expect_multiple_acks(int rc) {
+  EXPECT_EQ(0, rc);
+  ++callbacks_invoked;
+}
+
+class dynamic_callback_wrapper {
+    dynamic_callback_wrapper() = default;
+public:
+    static dynamic_callback_wrapper* create() {
+        return new dynamic_callback_wrapper;
+    }
+    void callback(int rc) {
+      EXPECT_EQ(0, rc);
+      ++callbacks_invoked;
+      delete this;
+    }
+};
+
+void my_callback_expect_close_or_ack(int rc) {
+  // deleting the connection should trigger the callback with -4098
+  // but due to race conditions, some my get an ack
+  EXPECT_TRUE(-4098 == rc || 0 == rc);
+}
+
 TEST_F(TestAMQP, ConnectionOK)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
+  rc = amqp::publish(conn_id, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
@@ -73,10 +115,10 @@ TEST_F(TestAMQP, SSLConnectionOK)
   const int port = 5671;
   const auto connection_number = amqp::get_connection_count();
   amqp_mock::set_valid_port(port);
-  conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  auto rc = amqp::connect(conn_id, "amqps://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
+  rc = amqp::publish(conn_id, "topic", "message");
   EXPECT_EQ(rc, 0);
   amqp_mock::set_valid_port(5672);
 }
@@ -86,193 +128,195 @@ TEST_F(TestAMQP, PlainAndSSLConnectionsOK)
   const int port = 5671;
   const auto connection_number = amqp::get_connection_count();
   amqp_mock::set_valid_port(port);
-  amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn1);
+  amqp::connection_id_t conn_id1;
+  auto rc = amqp::connect(conn_id1, "amqps://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn1, "topic", "message");
+  rc = amqp::publish(conn_id1, "topic", "message");
   EXPECT_EQ(rc, 0);
   amqp_mock::set_valid_port(5672);
-  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn2);
+  amqp::connection_id_t conn_id2;
+  rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 2);
-  rc = amqp::publish(conn2, "topic", "message");
+  rc = amqp::publish(conn_id2, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
 TEST_F(TestAMQP, ConnectionReuse)
 {
-  amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn1);
+  amqp::connection_id_t conn_id1;
+  auto rc = amqp::connect(conn_id1, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn2);
+  amqp::connection_id_t conn_id2;
+  rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
-  auto rc = amqp::publish(conn1, "topic", "message");
+  rc = amqp::publish(conn_id1, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
 TEST_F(TestAMQP, NameResolutionFail)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://kaboom", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, InvalidPort)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost:1234", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, InvalidHost)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://0.0.0.1", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, InvalidVhost)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost/kaboom", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, UserPassword)
 {
   amqp_mock::set_valid_host("127.0.0.1");
   {
+    callback_invoked = false;
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
-    EXPECT_TRUE(conn);
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
+    EXPECT_TRUE(rc);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-    auto rc = amqp::publish(conn, "topic", "message");
-    EXPECT_LT(rc, 0);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+    EXPECT_EQ(rc, 0);
+    wait_until_drained();
+    EXPECT_TRUE(callback_invoked);
   }
   // now try the same connection with default user/password
   amqp_mock::set_valid_host("127.0.0.2");
   {
+    callback_invoked = false;
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
-    EXPECT_TRUE(conn);
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
+    EXPECT_TRUE(rc);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-    auto rc = amqp::publish(conn, "topic", "message");
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
     EXPECT_EQ(rc, 0);
+    wait_until_drained();
+    EXPECT_TRUE(callback_invoked);
   }
   amqp_mock::set_valid_host("localhost");
 }
 
 TEST_F(TestAMQP, URLParseError)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex1", false, false, boost::none);
-  EXPECT_FALSE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "http://localhost", "ex1", false, false, boost::none);
+  EXPECT_FALSE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, ExchangeMismatch)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex2", false, false, boost::none);
-  EXPECT_FALSE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "http://localhost", "ex2", false, false, boost::none);
+  EXPECT_FALSE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, MaxConnections)
 {
   // fill up all connections
-  std::vector<amqp::connection_ptr_t> connections;
+  std::vector<amqp::connection_id_t> connections;
   auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count();
   while (remaining_connections > 0) {
     const auto host = "127.10.0." + std::to_string(remaining_connections);
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-    EXPECT_TRUE(conn);
-    auto rc = amqp::publish(conn, "topic", "message");
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+    EXPECT_TRUE(rc);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
     EXPECT_EQ(rc, 0);
     --remaining_connections;
-    connections.push_back(conn);
+    connections.push_back(conn_id);
   }
   EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
+  wait_until_drained();
   // try to add another connection
   {
     const std::string host = "toomany";
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-    EXPECT_FALSE(conn);
-    auto rc = amqp::publish(conn, "topic", "message");
-    EXPECT_LT(rc, 0);
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+    EXPECT_FALSE(rc);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+    EXPECT_EQ(rc, 0);
+    wait_until_drained();
   }
   EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
   amqp_mock::set_valid_host("localhost");
 }
 
-std::atomic<bool> callback_invoked = false;
-
-std::atomic<int> callbacks_invoked = 0;
-
-// note: because these callback are shared among different "publish" calls
-// they should be used on different connections
-
-void my_callback_expect_ack(int rc) {
-  EXPECT_EQ(0, rc);
-  callback_invoked = true;
-}
-
-void my_callback_expect_nack(int rc) {
-  EXPECT_LT(rc, 0);
-  callback_invoked = true;
-}
-
-void my_callback_expect_multiple_acks(int rc) {
-  EXPECT_EQ(0, rc);
-  ++callbacks_invoked;
-}
-
-class dynamic_callback_wrapper {
-    dynamic_callback_wrapper() = default;
-public:
-    static dynamic_callback_wrapper* create() {
-        return new dynamic_callback_wrapper;
-    }
-    void callback(int rc) {
-      EXPECT_EQ(0, rc);
-      ++callbacks_invoked;
-      delete this;
-    }
-};
-
-void my_callback_expect_close_or_ack(int rc) {
-  // deleting the connection should trigger the callback with -4098
-  // but due to race conditions, some my get an ack
-  EXPECT_TRUE(-4098 == rc || 0 == rc);
-}
 
 TEST_F(TestAMQP, ReceiveAck)
 {
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
@@ -284,16 +328,15 @@ TEST_F(TestAMQP, ImplicitConnectionClose)
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   const auto NUMBER_OF_CALLS = 2000;
   for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack);
+    auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_close_or_ack);
     EXPECT_EQ(rc, 0);
   }
   wait_until_drained();
-  // deleting the connection object should close the connection
-  conn.reset(nullptr);
   amqp_mock::set_valid_host("localhost");
 }
 
@@ -302,11 +345,12 @@ TEST_F(TestAMQP, ReceiveMultipleAck)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+    auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
     EXPECT_EQ(rc, 0);
   }
   wait_until_drained();
@@ -320,12 +364,13 @@ TEST_F(TestAMQP, ReceiveAckForMultiple)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
     EXPECT_EQ(rc, 0);
   }
   wait_until_drained();
@@ -339,12 +384,13 @@ TEST_F(TestAMQP, DynamicCallback)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message",
+    rc = publish_with_confirm(conn_id, "topic", "message",
             std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
     EXPECT_EQ(rc, 0);
   }
@@ -360,9 +406,10 @@ TEST_F(TestAMQP, ReceiveNack)
   amqp_mock::REPLY_ACK = false;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
@@ -377,9 +424,10 @@ TEST_F(TestAMQP, FailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
@@ -390,35 +438,49 @@ TEST_F(TestAMQP, FailWrite)
 
 TEST_F(TestAMQP, RetryInvalidHost)
 {
+  callback_invoked = false;
   const std::string host = "192.168.0.1";
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://"+host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   // now next retry should be ok
+  callback_invoked = false;
   amqp_mock::set_valid_host(host);
   std::this_thread::sleep_for(long_wait_time);
-  rc = amqp::publish(conn, "topic", "message");
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   amqp_mock::set_valid_host("localhost");
 }
 
 TEST_F(TestAMQP, RetryInvalidPort)
 {
+  callback_invoked = false;
   const int port = 9999;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   // now next retry should be ok
+  callback_invoked = false;
   amqp_mock::set_valid_port(port);
   std::this_thread::sleep_for(long_wait_time);
-  rc = amqp::publish(conn, "topic", "message");
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   amqp_mock::set_valid_port(5672);
 }
 
@@ -426,34 +488,40 @@ TEST_F(TestAMQP, RetryFailWrite)
 {
   callback_invoked = false;
   amqp_mock::FAIL_NEXT_WRITE = true;
-  const std::string host("localhost4");
+  const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
-  // set port to a different one, so that reconnect would fail
-  amqp_mock::set_valid_port(9999);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
-  callback_invoked = false;
-  rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
-  EXPECT_LT(rc, 0);
-  // expect immediate failure, no callback called after sleep
-  std::this_thread::sleep_for(long_wait_time);
-  EXPECT_FALSE(callback_invoked);
-  // set port to the right one so that reconnect would succeed
-  amqp_mock::set_valid_port(5672);
-  callback_invoked = false;
+  // now next retry should be ok
   amqp_mock::FAIL_NEXT_WRITE = false;
-  // give time to reconnect
+  callback_invoked = false;
   std::this_thread::sleep_for(long_wait_time);
-  // retry to publish should succeed now
-  rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
-  callback_invoked = false;
   amqp_mock::set_valid_host("localhost");
 }
 
+TEST_F(TestAMQP, IdleConnection)
+{
+  // this test is skipped since it takes 30seconds
+  //GTEST_SKIP();
+  const auto connection_number = amqp::get_connection_count();
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+  std::this_thread::sleep_for(idle_time);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
+}
+