From fb32cb833117bf5bc73f0e4b2dd4ab2888232609 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 6 Aug 2024 12:07:18 -0400 Subject: [PATCH] rgw/beast: spawn a cancellable coroutine for the accept loop Signed-off-by: Casey Bodley (cherry picked from commit 100d1b63bd5ab856598b58560463a70cba3ed1c8) --- src/rgw/rgw_asio_frontend.cc | 71 ++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index ace3b7aff49..86029e7f634 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -3,9 +3,13 @@ #include #include +#include #include -#include +#include +#include +#include +#include #include #include #include @@ -423,13 +427,14 @@ class AsioFrontend { tcp::endpoint endpoint; tcp::acceptor acceptor; tcp::socket socket; + boost::asio::cancellation_signal signal; bool use_ssl = false; bool use_nodelay = false; explicit Listener(boost::asio::io_context& context) : acceptor(context), socket(context) {} }; - std::vector listeners; + std::list listeners; ConnectionList connections; @@ -438,7 +443,9 @@ class AsioFrontend { CephContext* ctx() const { return cct.get(); } std::optional client_counters; std::unique_ptr client_config; - void accept(Listener& listener, boost::system::error_code ec); + + void accept(Listener& listener, boost::asio::yield_context yield); + void on_accept(Listener& listener, tcp::socket stream); public: AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf, @@ -682,10 +689,13 @@ int AsioFrontend::init() } } l.acceptor.listen(max_connection_backlog); - l.acceptor.async_accept(l.socket, - [this, &l] (boost::system::error_code ec) { - accept(l, ec); - }); + + // spawn a cancellable coroutine to the run the accept loop + boost::asio::spawn(context, + [this, &l] (boost::asio::yield_context yield) mutable { + accept(l, yield); + }, bind_cancellation_slot(l.signal.slot(), + bind_executor(context, boost::asio::detached))); ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl; socket_bound = true; @@ -1002,22 +1012,29 @@ int AsioFrontend::init_ssl() } #endif // WITH_RADOSGW_BEAST_OPENSSL -void AsioFrontend::accept(Listener& l, boost::system::error_code ec) +void AsioFrontend::accept(Listener& l, boost::asio::yield_context yield) { - if (!l.acceptor.is_open()) { - return; - } else if (ec == boost::asio::error::operation_aborted) { - return; - } else if (ec) { - ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl; - return; + for (;;) { + boost::system::error_code ec; + l.acceptor.async_accept(l.socket, yield[ec]); + + if (!l.acceptor.is_open()) { + return; + } else if (ec == boost::asio::error::operation_aborted) { + return; + } else if (ec) { + ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl; + return; + } + + on_accept(l, std::move(l.socket)); } - auto stream = std::move(l.socket); +} + +void AsioFrontend::on_accept(Listener& l, tcp::socket stream) +{ + boost::system::error_code ec; stream.set_option(tcp::no_delay(l.use_nodelay), ec); - l.acceptor.async_accept(l.socket, - [this, &l] (boost::system::error_code ec) { - accept(l, ec); - }); // spawn a coroutine to handle the connection #ifdef WITH_RADOSGW_BEAST_OPENSSL @@ -1085,6 +1102,8 @@ void AsioFrontend::stop() // close all listeners for (auto& listener : listeners) { listener.acceptor.close(ec); + // signal cancellation of accept() + listener.signal.emit(boost::asio::cancellation_type::terminal); } // close all connections connections.close(ec); @@ -1106,6 +1125,8 @@ void AsioFrontend::pause() boost::system::error_code ec; for (auto& l : listeners) { l.acceptor.cancel(ec); + // signal cancellation of accept() + l.signal.emit(boost::asio::cancellation_type::terminal); } // pause and wait for outstanding requests to complete @@ -1125,10 +1146,12 @@ void AsioFrontend::unpause() // start accepting connections again for (auto& l : listeners) { - l.acceptor.async_accept(l.socket, - [this, &l] (boost::system::error_code ec) { - accept(l, ec); - }); + boost::asio::spawn(context, + [this, &l] (boost::asio::yield_context yield) mutable { + accept(l, yield); + }, bind_cancellation_slot(l.signal.slot(), + bind_executor(context, boost::asio::detached))); + } ldout(ctx(), 4) << "frontend unpaused" << dendl; -- 2.39.5