#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
+#include <boost/intrusive/list.hpp>
#include "common/async/shared_mutex.h"
// parse the header
beast::http::async_read_header(stream, buffer, parser, yield[ec]);
if (ec == boost::asio::error::connection_reset ||
+ ec == boost::asio::error::bad_descriptor ||
+ ec == boost::asio::error::operation_aborted ||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
ec == ssl::error::stream_truncated ||
#endif
ec == beast::http::error::end_of_stream) {
+ ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
return;
}
if (ec) {
}
}
+struct Connection : boost::intrusive::list_base_hook<> {
+ tcp::socket& socket;
+ Connection(tcp::socket& socket) : socket(socket) {}
+};
+
+class ConnectionList {
+ using List = boost::intrusive::list<Connection>;
+ List connections;
+ std::mutex mutex;
+
+ void remove(Connection& c) {
+ std::lock_guard lock{mutex};
+ if (c.is_linked()) {
+ connections.erase(List::s_iterator_to(c));
+ }
+ }
+ public:
+ class Guard {
+ ConnectionList *list;
+ Connection *conn;
+ public:
+ Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
+ ~Guard() { list->remove(*conn); }
+ };
+ [[nodiscard]] Guard add(Connection& conn) {
+ std::lock_guard lock{mutex};
+ connections.push_back(conn);
+ return Guard{this, &conn};
+ }
+ void close(boost::system::error_code& ec) {
+ std::lock_guard lock{mutex};
+ for (auto& conn : connections) {
+ conn.socket.close(ec);
+ }
+ connections.clear();
+ }
+};
+
class AsioFrontend {
RGWProcessEnv env;
RGWFrontendConfig* conf;
};
std::vector<Listener> listeners;
+ ConnectionList connections;
+
// work guard to keep run() threads busy while listeners are paused
using Executor = boost::asio::io_context::executor_type;
std::optional<boost::asio::executor_work_guard<Executor>> work;
if (l.use_ssl) {
boost::asio::spawn(context,
[this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+ Connection conn{s};
+ auto c = connections.add(conn);
// wrap the socket in an ssl stream
ssl::stream<tcp::socket&> stream{s, *ssl_context};
beast::flat_buffer buffer;
#endif // WITH_RADOSGW_BEAST_OPENSSL
boost::asio::spawn(context,
[this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+ Connection conn{s};
+ auto c = connections.add(conn);
beast::flat_buffer buffer;
boost::system::error_code ec;
handle_connection(env, s, buffer, false, pause_mutex, ec, yield);
for (auto& listener : listeners) {
listener.acceptor.close(ec);
}
-
- // unblock the run() threads
- context.stop(); // XXX: kill connections instead
+ // close all connections
+ connections.close(ec);
+ pause_mutex.cancel();
}
void AsioFrontend::join()