]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/amqp: remove the explicit "disconnect()" interface 45427/head
authorYuval Lifshitz <ylifshit@redhat.com>
Sun, 13 Jun 2021 14:48:11 +0000 (17:48 +0300)
committerCory Snyder <csnyder@iland.com>
Wed, 16 Mar 2022 15:52:11 +0000 (11:52 -0400)
this is to resolve a possible race condistion.
note that the removed interface is not used in RGW code.
and that connection cleanup should be done based on idleness and
not on explicit disconnect. see: https://tracker.ceph.com/issues/49033

Fixes: https://tracker.ceph.com/issues/51114
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit a3849aaa9f5a3e95b1e6ce0a80b1df33b3a77cf9)

src/rgw/rgw_amqp.cc
src/rgw/rgw_amqp.h
src/test/rgw/test_rgw_amqp.cc

index d507c683eb86b78d2d9c075b00ddaeb21286dbfe..7676fd7676c7b4be8e40d6af2bba30ccdb4c03b1 100644 (file)
@@ -114,12 +114,11 @@ typedef std::vector<reply_callback_with_tag_t> CallbackList;
 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
 // since references to deleted objects may still exist in the calling code
 struct connection_t {
-  amqp_connection_state_t state;
+  std::atomic<amqp_connection_state_t> state;
   std::string exchange;
   std::string user;
   std::string password;
   amqp_bytes_t reply_to_queue;
-  bool marked_for_deletion;
   uint64_t delivery_tag;
   int status;
   int reply_type;
@@ -137,7 +136,6 @@ struct connection_t {
   connection_t() :
     state(nullptr),
     reply_to_queue(amqp_empty_bytes),
-    marked_for_deletion(false),
     delivery_tag(1),
     status(AMQP_STATUS_OK),
     reply_type(AMQP_RESPONSE_NORMAL),
@@ -170,7 +168,7 @@ struct connection_t {
   }
 
   bool is_ok() const {
-    return (state != nullptr && !marked_for_deletion);
+    return (state != nullptr);
   }
 
   // dtor also destroys the internals
@@ -389,9 +387,8 @@ static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
 
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
-  // pointer must be valid and not marked for deletion
-  ceph_assert(conn && !conn->marked_for_deletion);
-  
+  ceph_assert(conn);
+
   // reset all status codes
   conn->status = AMQP_STATUS_OK; 
   conn->reply_type = AMQP_RESPONSE_NORMAL;
@@ -550,7 +547,7 @@ connection_ptr_t create_new_connection(const amqp_connection_info& info,
   conn->cct = cct;
   conn->use_ssl = info.ssl;
   conn->verify_ssl = verify_ssl;
-  conn->ca_location =  ca_location;
+  conn->ca_location = ca_location;
   return create_connection(conn, info);
 }
 
@@ -588,7 +585,7 @@ public:
   const size_t max_queue;
 private:
   std::atomic<size_t> connection_count;
-  bool stopped;
+  std::atomic<bool> stopped;
   struct timeval read_timeout;
   ConnectionList connections;
   MessageQueue messages;
@@ -699,15 +696,6 @@ private:
         
         auto& conn = conn_it->second;
         const auto& conn_key = conn_it->first;
-        // delete the connection if marked for deletion
-        if (conn->marked_for_deletion) {
-          ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
-          conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
-          std::lock_guard lock(connections_lock);
-          // erase is safe - does not invalidate any other iterator
-          // lock so no insertion happens at the same time
-          ERASE_AND_CONTINUE(conn_it, connections);
-        }
 
         // try to reconnect the connection if it has an error
         if (!conn->is_ok()) {
@@ -892,15 +880,6 @@ public:
     stopped = true;
   }
 
-  // disconnect from a broker
-  bool disconnect(connection_ptr_t& conn) {
-    if (!conn || stopped) {
-      return false;
-    }
-    conn->marked_for_deletion = true;
-    return true;
-  }
-
   // connect to a broker, or reuse an existing connection if already connected
   connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location) {
@@ -922,10 +901,7 @@ public:
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(id);
     if (it != connections.end()) {
-      if (it->second->marked_for_deletion) {
-        ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
-        return nullptr;
-      } else if (it->second->exchange != exchange) {
+      if (it->second->exchange != exchange) {
         ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
         return nullptr;
       }
@@ -1012,6 +988,7 @@ public:
     size_t sum = 0;
     std::lock_guard lock(connections_lock);
     std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+        // concurrent access to the callback vector is safe without locking
         sum += conn_pair.second->callbacks.size();
       });
     return sum;
@@ -1111,10 +1088,5 @@ size_t get_max_queue() {
   return s_manager->max_queue;
 }
 
-bool disconnect(connection_ptr_t& conn) {
-  if (!s_manager) return false;
-  return s_manager->disconnect(conn);
-}
-
 } // namespace amqp
 
index 7a2316858ca4326df7c54e1edc32b26f1c428995..84d0650731c53139a032000b64911a25705faf24 100644 (file)
@@ -72,8 +72,5 @@ size_t get_max_inflight();
 // maximum number of messages in the queue
 size_t get_max_queue();
 
-// disconnect from an amqp broker
-bool disconnect(connection_ptr_t& conn);
-
 }
 
index 421f757515bcf8829fe944a231a2af09973dd0d0..e4beb34d18459a1f105dc5aec7aaf05d817b0ed5 100644 (file)
@@ -222,13 +222,6 @@ TEST_F(TestAMQP, MaxConnections)
   }
   EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
   amqp_mock::set_valid_host("localhost");
-  // delete connections to make space for new ones
-  for (auto conn : connections) {
-    EXPECT_TRUE(amqp::disconnect(conn));
-  }
-  // wait for them to be deleted
-  std::this_thread::sleep_for(long_wait_time);
-  EXPECT_LT(amqp::get_connection_count(), amqp::get_max_connections());
 }
 
 std::atomic<bool> callback_invoked = false;
@@ -395,27 +388,6 @@ TEST_F(TestAMQP, FailWrite)
   amqp_mock::set_valid_host("localhost");
 }
 
-TEST_F(TestAMQP, ClosedConnection)
-{
-  callback_invoked = false;
-  const auto current_connections = amqp::get_connection_count();
-  const std::string host("localhost3");
-  amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
-  EXPECT_TRUE(amqp::disconnect(conn));
-  std::this_thread::sleep_for(long_wait_time);
-  // make sure number of connections decreased back
-  EXPECT_EQ(amqp::get_connection_count(), current_connections);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
-  EXPECT_LT(rc, 0);
-  std::this_thread::sleep_for(long_wait_time);
-  EXPECT_FALSE(callback_invoked);
-  callback_invoked = false;
-  amqp_mock::set_valid_host("localhost");
-}
-
 TEST_F(TestAMQP, RetryInvalidHost)
 {
   const std::string host = "192.168.0.1";