]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/beast: spawn a cancellable coroutine for the accept loop
authorCasey Bodley <cbodley@redhat.com>
Tue, 6 Aug 2024 16:07:18 +0000 (12:07 -0400)
committerCasey Bodley <cbodley@redhat.com>
Thu, 10 Oct 2024 12:34:12 +0000 (08:34 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 100d1b63bd5ab856598b58560463a70cba3ed1c8)

src/rgw/rgw_asio_frontend.cc

index ace3b7aff49e9a4d3424952946ceeb8ff4b8aa0f..86029e7f634e024ec01505584a0b61af224af45a 100644 (file)
@@ -3,9 +3,13 @@
 
 #include <atomic>
 #include <ctime>
+#include <list>
 #include <memory>
-#include <vector>
 
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/detached.hpp>
 #include <boost/asio/error.hpp>
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/ip/tcp.hpp>
@@ -423,13 +427,14 @@ class AsioFrontend {
     tcp::endpoint endpoint;
     tcp::acceptor acceptor;
     tcp::socket socket;
+    boost::asio::cancellation_signal signal;
     bool use_ssl = false;
     bool use_nodelay = false;
 
     explicit Listener(boost::asio::io_context& context)
       : acceptor(context), socket(context) {}
   };
-  std::vector<Listener> listeners;
+  std::list<Listener> listeners;
 
   ConnectionList connections;
 
@@ -438,7 +443,9 @@ class AsioFrontend {
   CephContext* ctx() const { return cct.get(); }
   std::optional<dmc::ClientCounters> client_counters;
   std::unique_ptr<dmc::ClientConfig> client_config;
-  void accept(Listener& listener, boost::system::error_code ec);
+
+  void accept(Listener& listener, boost::asio::yield_context yield);
+  void on_accept(Listener& listener, tcp::socket stream);
 
  public:
   AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
@@ -682,10 +689,13 @@ int AsioFrontend::init()
       }
     }
     l.acceptor.listen(max_connection_backlog);
-    l.acceptor.async_accept(l.socket,
-                            [this, &l] (boost::system::error_code ec) {
-                              accept(l, ec);
-                            });
+
+    // spawn a cancellable coroutine to the run the accept loop
+    boost::asio::spawn(context,
+      [this, &l] (boost::asio::yield_context yield) mutable {
+        accept(l, yield);
+      }, bind_cancellation_slot(l.signal.slot(),
+             bind_executor(context, boost::asio::detached)));
 
     ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
     socket_bound = true;
@@ -1002,22 +1012,29 @@ int AsioFrontend::init_ssl()
 }
 #endif // WITH_RADOSGW_BEAST_OPENSSL
 
-void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
+void AsioFrontend::accept(Listener& l, boost::asio::yield_context yield)
 {
-  if (!l.acceptor.is_open()) {
-    return;
-  } else if (ec == boost::asio::error::operation_aborted) {
-    return;
-  } else if (ec) {
-    ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
-    return;
+  for (;;) {
+    boost::system::error_code ec;
+    l.acceptor.async_accept(l.socket, yield[ec]);
+
+    if (!l.acceptor.is_open()) {
+      return;
+    } else if (ec == boost::asio::error::operation_aborted) {
+      return;
+    } else if (ec) {
+      ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
+      return;
+    }
+
+    on_accept(l, std::move(l.socket));
   }
-  auto stream = std::move(l.socket);
+}
+
+void AsioFrontend::on_accept(Listener& l, tcp::socket stream)
+{
+  boost::system::error_code ec;
   stream.set_option(tcp::no_delay(l.use_nodelay), ec);
-  l.acceptor.async_accept(l.socket,
-                          [this, &l] (boost::system::error_code ec) {
-                            accept(l, ec);
-                          });
   
   // spawn a coroutine to handle the connection
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
@@ -1085,6 +1102,8 @@ void AsioFrontend::stop()
   // close all listeners
   for (auto& listener : listeners) {
     listener.acceptor.close(ec);
+    // signal cancellation of accept()
+    listener.signal.emit(boost::asio::cancellation_type::terminal);
   }
   // close all connections
   connections.close(ec);
@@ -1106,6 +1125,8 @@ void AsioFrontend::pause()
   boost::system::error_code ec;
   for (auto& l : listeners) {
     l.acceptor.cancel(ec);
+    // signal cancellation of accept()
+    l.signal.emit(boost::asio::cancellation_type::terminal);
   }
 
   // pause and wait for outstanding requests to complete
@@ -1125,10 +1146,12 @@ void AsioFrontend::unpause()
 
   // start accepting connections again
   for (auto& l : listeners) {
-    l.acceptor.async_accept(l.socket,
-                            [this, &l] (boost::system::error_code ec) {
-                              accept(l, ec);
-                            });
+    boost::asio::spawn(context,
+      [this, &l] (boost::asio::yield_context yield) mutable {
+        accept(l, yield);
+      }, bind_cancellation_slot(l.signal.slot(),
+             bind_executor(context, boost::asio::detached)));
+
   }
 
   ldout(ctx(), 4) << "frontend unpaused" << dendl;