From: Yuval Lifshitz Date: Sun, 13 Jun 2021 14:48:11 +0000 (+0300) Subject: rgw/amqp: remove the explicit "disconnect()" interface X-Git-Tag: v16.2.8~93^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F45427%2Fhead;p=ceph.git rgw/amqp: remove the explicit "disconnect()" interface this is to resolve a possible race condistion. note that the removed interface is not used in RGW code. and that connection cleanup should be done based on idleness and not on explicit disconnect. see: https://tracker.ceph.com/issues/49033 Fixes: https://tracker.ceph.com/issues/51114 Signed-off-by: Yuval Lifshitz (cherry picked from commit a3849aaa9f5a3e95b1e6ce0a80b1df33b3a77cf9) --- diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index d507c683eb86..7676fd7676c7 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -114,12 +114,11 @@ typedef std::vector CallbackList; // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr) // since references to deleted objects may still exist in the calling code struct connection_t { - amqp_connection_state_t state; + std::atomic state; std::string exchange; std::string user; std::string password; amqp_bytes_t reply_to_queue; - bool marked_for_deletion; uint64_t delivery_tag; int status; int reply_type; @@ -137,7 +136,6 @@ struct connection_t { connection_t() : state(nullptr), reply_to_queue(amqp_empty_bytes), - marked_for_deletion(false), delivery_tag(1), status(AMQP_STATUS_OK), reply_type(AMQP_RESPONSE_NORMAL), @@ -170,7 +168,7 @@ struct connection_t { } bool is_ok() const { - return (state != nullptr && !marked_for_deletion); + return (state != nullptr); } // dtor also destroys the internals @@ -389,9 +387,8 @@ static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2; // utility function to create a connection, when the connection object already exists connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) { - // pointer must be valid and not marked for deletion - ceph_assert(conn && !conn->marked_for_deletion); - + ceph_assert(conn); + // reset all status codes conn->status = AMQP_STATUS_OK; conn->reply_type = AMQP_RESPONSE_NORMAL; @@ -550,7 +547,7 @@ connection_ptr_t create_new_connection(const amqp_connection_info& info, conn->cct = cct; conn->use_ssl = info.ssl; conn->verify_ssl = verify_ssl; - conn->ca_location = ca_location; + conn->ca_location = ca_location; return create_connection(conn, info); } @@ -588,7 +585,7 @@ public: const size_t max_queue; private: std::atomic connection_count; - bool stopped; + std::atomic stopped; struct timeval read_timeout; ConnectionList connections; MessageQueue messages; @@ -699,15 +696,6 @@ private: auto& conn = conn_it->second; const auto& conn_key = conn_it->first; - // delete the connection if marked for deletion - if (conn->marked_for_deletion) { - ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl; - conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED); - std::lock_guard lock(connections_lock); - // erase is safe - does not invalidate any other iterator - // lock so no insertion happens at the same time - ERASE_AND_CONTINUE(conn_it, connections); - } // try to reconnect the connection if it has an error if (!conn->is_ok()) { @@ -892,15 +880,6 @@ public: stopped = true; } - // disconnect from a broker - bool disconnect(connection_ptr_t& conn) { - if (!conn || stopped) { - return false; - } - conn->marked_for_deletion = true; - return true; - } - // connect to a broker, or reuse an existing connection if already connected connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, boost::optional ca_location) { @@ -922,10 +901,7 @@ public: std::lock_guard lock(connections_lock); const auto it = connections.find(id); if (it != connections.end()) { - if (it->second->marked_for_deletion) { - ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl; - return nullptr; - } else if (it->second->exchange != exchange) { + if (it->second->exchange != exchange) { ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl; return nullptr; } @@ -1012,6 +988,7 @@ public: size_t sum = 0; std::lock_guard lock(connections_lock); std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) { + // concurrent access to the callback vector is safe without locking sum += conn_pair.second->callbacks.size(); }); return sum; @@ -1111,10 +1088,5 @@ size_t get_max_queue() { return s_manager->max_queue; } -bool disconnect(connection_ptr_t& conn) { - if (!s_manager) return false; - return s_manager->disconnect(conn); -} - } // namespace amqp diff --git a/src/rgw/rgw_amqp.h b/src/rgw/rgw_amqp.h index 7a2316858ca4..84d0650731c5 100644 --- a/src/rgw/rgw_amqp.h +++ b/src/rgw/rgw_amqp.h @@ -72,8 +72,5 @@ size_t get_max_inflight(); // maximum number of messages in the queue size_t get_max_queue(); -// disconnect from an amqp broker -bool disconnect(connection_ptr_t& conn); - } diff --git a/src/test/rgw/test_rgw_amqp.cc b/src/test/rgw/test_rgw_amqp.cc index 421f757515bc..e4beb34d1845 100644 --- a/src/test/rgw/test_rgw_amqp.cc +++ b/src/test/rgw/test_rgw_amqp.cc @@ -222,13 +222,6 @@ TEST_F(TestAMQP, MaxConnections) } EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections()); amqp_mock::set_valid_host("localhost"); - // delete connections to make space for new ones - for (auto conn : connections) { - EXPECT_TRUE(amqp::disconnect(conn)); - } - // wait for them to be deleted - std::this_thread::sleep_for(long_wait_time); - EXPECT_LT(amqp::get_connection_count(), amqp::get_max_connections()); } std::atomic callback_invoked = false; @@ -395,27 +388,6 @@ TEST_F(TestAMQP, FailWrite) amqp_mock::set_valid_host("localhost"); } -TEST_F(TestAMQP, ClosedConnection) -{ - callback_invoked = false; - const auto current_connections = amqp::get_connection_count(); - const std::string host("localhost3"); - amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none); - EXPECT_TRUE(conn); - EXPECT_EQ(amqp::get_connection_count(), current_connections + 1); - EXPECT_TRUE(amqp::disconnect(conn)); - std::this_thread::sleep_for(long_wait_time); - // make sure number of connections decreased back - EXPECT_EQ(amqp::get_connection_count(), current_connections); - auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); - EXPECT_LT(rc, 0); - std::this_thread::sleep_for(long_wait_time); - EXPECT_FALSE(callback_invoked); - callback_invoked = false; - amqp_mock::set_valid_host("localhost"); -} - TEST_F(TestAMQP, RetryInvalidHost) { const std::string host = "192.168.0.1";