From 93de3367a7d7019fe46717ede418b415826651da Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 30 Mar 2018 23:31:49 -0400 Subject: [PATCH] rgw: beast frontend closes connections on stop the strategy for stop relies on the fact that process_request() is completely synchronous, so that io_context.stop() would still complete each request and clean up properly to tolerate an asynchronous process_request(), we instead need to drain all outstanding work on the io_context so that io_context.run() can return control natually to all of the worker threads. that would allow us to suspend our coroutine in the middle of process_request(), and still guarantee that process_request() will resume and run to completion before the worker threads exit each connected socket also counts as outstanding work, and needs to be closed in order to drain the io_context. each connection now adds itself to a connection list so that stop() can close its socket Signed-off-by: Casey Bodley --- src/rgw/rgw_asio_frontend.cc | 54 ++++++++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index 9525c2f8c7385..23a578c472c22 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -7,6 +7,7 @@ #include #include +#include #include "common/async/shared_mutex.h" @@ -98,10 +99,13 @@ void handle_connection(RGWProcessEnv& env, Stream& stream, // parse the header beast::http::async_read_header(stream, buffer, parser, yield[ec]); if (ec == boost::asio::error::connection_reset || + ec == boost::asio::error::bad_descriptor || + ec == boost::asio::error::operation_aborted || #ifdef WITH_RADOSGW_BEAST_OPENSSL ec == ssl::error::stream_truncated || #endif ec == beast::http::error::end_of_stream) { + ldout(cct, 20) << "failed to read header: " << ec.message() << dendl; return; } if (ec) { @@ -172,6 +176,44 @@ void handle_connection(RGWProcessEnv& env, Stream& stream, } } +struct Connection : boost::intrusive::list_base_hook<> { + tcp::socket& socket; + Connection(tcp::socket& socket) : socket(socket) {} +}; + +class ConnectionList { + using List = boost::intrusive::list; + List connections; + std::mutex mutex; + + void remove(Connection& c) { + std::lock_guard lock{mutex}; + if (c.is_linked()) { + connections.erase(List::s_iterator_to(c)); + } + } + public: + class Guard { + ConnectionList *list; + Connection *conn; + public: + Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {} + ~Guard() { list->remove(*conn); } + }; + [[nodiscard]] Guard add(Connection& conn) { + std::lock_guard lock{mutex}; + connections.push_back(conn); + return Guard{this, &conn}; + } + void close(boost::system::error_code& ec) { + std::lock_guard lock{mutex}; + for (auto& conn : connections) { + conn.socket.close(ec); + } + connections.clear(); + } +}; + class AsioFrontend { RGWProcessEnv env; RGWFrontendConfig* conf; @@ -193,6 +235,8 @@ class AsioFrontend { }; std::vector listeners; + ConnectionList connections; + // work guard to keep run() threads busy while listeners are paused using Executor = boost::asio::io_context::executor_type; std::optional> work; @@ -411,6 +455,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) if (l.use_ssl) { boost::asio::spawn(context, [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable { + Connection conn{s}; + auto c = connections.add(conn); // wrap the socket in an ssl stream ssl::stream stream{s, *ssl_context}; beast::flat_buffer buffer; @@ -436,6 +482,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) #endif // WITH_RADOSGW_BEAST_OPENSSL boost::asio::spawn(context, [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable { + Connection conn{s}; + auto c = connections.add(conn); beast::flat_buffer buffer; boost::system::error_code ec; handle_connection(env, s, buffer, false, pause_mutex, ec, yield); @@ -476,9 +524,9 @@ void AsioFrontend::stop() for (auto& listener : listeners) { listener.acceptor.close(ec); } - - // unblock the run() threads - context.stop(); // XXX: kill connections instead + // close all connections + connections.close(ec); + pause_mutex.cancel(); } void AsioFrontend::join() -- 2.39.5