// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
// since references to deleted objects may still exist in the calling code
struct connection_t {
- amqp_connection_state_t state;
+ std::atomic<amqp_connection_state_t> state;
std::string exchange;
std::string user;
std::string password;
amqp_bytes_t reply_to_queue;
- bool marked_for_deletion;
uint64_t delivery_tag;
int status;
int reply_type;
connection_t() :
state(nullptr),
reply_to_queue(amqp_empty_bytes),
- marked_for_deletion(false),
delivery_tag(1),
status(AMQP_STATUS_OK),
reply_type(AMQP_RESPONSE_NORMAL),
}
bool is_ok() const {
- return (state != nullptr && !marked_for_deletion);
+ return (state != nullptr);
}
// dtor also destroys the internals
// utility function to create a connection, when the connection object already exists
connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
- // pointer must be valid and not marked for deletion
- ceph_assert(conn && !conn->marked_for_deletion);
-
+ ceph_assert(conn);
+
// reset all status codes
conn->status = AMQP_STATUS_OK;
conn->reply_type = AMQP_RESPONSE_NORMAL;
conn->cct = cct;
conn->use_ssl = info.ssl;
conn->verify_ssl = verify_ssl;
- conn->ca_location = ca_location;
+ conn->ca_location = ca_location;
return create_connection(conn, info);
}
const size_t max_queue;
private:
std::atomic<size_t> connection_count;
- bool stopped;
+ std::atomic<bool> stopped;
struct timeval read_timeout;
ConnectionList connections;
MessageQueue messages;
auto& conn = conn_it->second;
const auto& conn_key = conn_it->first;
- // delete the connection if marked for deletion
- if (conn->marked_for_deletion) {
- ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
- conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
- std::lock_guard lock(connections_lock);
- // erase is safe - does not invalidate any other iterator
- // lock so no insertion happens at the same time
- ERASE_AND_CONTINUE(conn_it, connections);
- }
// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
stopped = true;
}
- // disconnect from a broker
- bool disconnect(connection_ptr_t& conn) {
- if (!conn || stopped) {
- return false;
- }
- conn->marked_for_deletion = true;
- return true;
- }
-
// connect to a broker, or reuse an existing connection if already connected
connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
boost::optional<const std::string&> ca_location) {
std::lock_guard lock(connections_lock);
const auto it = connections.find(id);
if (it != connections.end()) {
- if (it->second->marked_for_deletion) {
- ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
- return nullptr;
- } else if (it->second->exchange != exchange) {
+ if (it->second->exchange != exchange) {
ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
return nullptr;
}
size_t sum = 0;
std::lock_guard lock(connections_lock);
std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+ // concurrent access to the callback vector is safe without locking
sum += conn_pair.second->callbacks.size();
});
return sum;
return s_manager->max_queue;
}
-bool disconnect(connection_ptr_t& conn) {
- if (!s_manager) return false;
- return s_manager->disconnect(conn);
-}
-
} // namespace amqp
}
EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
amqp_mock::set_valid_host("localhost");
- // delete connections to make space for new ones
- for (auto conn : connections) {
- EXPECT_TRUE(amqp::disconnect(conn));
- }
- // wait for them to be deleted
- std::this_thread::sleep_for(long_wait_time);
- EXPECT_LT(amqp::get_connection_count(), amqp::get_max_connections());
}
std::atomic<bool> callback_invoked = false;
amqp_mock::set_valid_host("localhost");
}
-TEST_F(TestAMQP, ClosedConnection)
-{
- callback_invoked = false;
- const auto current_connections = amqp::get_connection_count();
- const std::string host("localhost3");
- amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
- EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
- EXPECT_TRUE(amqp::disconnect(conn));
- std::this_thread::sleep_for(long_wait_time);
- // make sure number of connections decreased back
- EXPECT_EQ(amqp::get_connection_count(), current_connections);
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
- EXPECT_LT(rc, 0);
- std::this_thread::sleep_for(long_wait_time);
- EXPECT_FALSE(callback_invoked);
- callback_invoked = false;
- amqp_mock::set_valid_host("localhost");
-}
-
TEST_F(TestAMQP, RetryInvalidHost)
{
const std::string host = "192.168.0.1";