#include <atomic>
#include <ctime>
+#include <list>
#include <memory>
-#include <vector>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/detached.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
tcp::endpoint endpoint;
tcp::acceptor acceptor;
tcp::socket socket;
+ boost::asio::cancellation_signal signal;
bool use_ssl = false;
bool use_nodelay = false;
explicit Listener(boost::asio::io_context& context)
: acceptor(context), socket(context) {}
};
- std::vector<Listener> listeners;
+ std::list<Listener> listeners;
ConnectionList connections;
CephContext* ctx() const { return cct.get(); }
std::optional<dmc::ClientCounters> client_counters;
std::unique_ptr<dmc::ClientConfig> client_config;
- void accept(Listener& listener, boost::system::error_code ec);
+
+ void accept(Listener& listener, boost::asio::yield_context yield);
+ void on_accept(Listener& listener, tcp::socket stream);
public:
AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
}
}
l.acceptor.listen(max_connection_backlog);
- l.acceptor.async_accept(l.socket,
- [this, &l] (boost::system::error_code ec) {
- accept(l, ec);
- });
+
+ // spawn a cancellable coroutine to the run the accept loop
+ boost::asio::spawn(context,
+ [this, &l] (boost::asio::yield_context yield) mutable {
+ accept(l, yield);
+ }, bind_cancellation_slot(l.signal.slot(),
+ bind_executor(context, boost::asio::detached)));
ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
socket_bound = true;
}
#endif // WITH_RADOSGW_BEAST_OPENSSL
-void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
+void AsioFrontend::accept(Listener& l, boost::asio::yield_context yield)
{
- if (!l.acceptor.is_open()) {
- return;
- } else if (ec == boost::asio::error::operation_aborted) {
- return;
- } else if (ec) {
- ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
- return;
+ for (;;) {
+ boost::system::error_code ec;
+ l.acceptor.async_accept(l.socket, yield[ec]);
+
+ if (!l.acceptor.is_open()) {
+ return;
+ } else if (ec == boost::asio::error::operation_aborted) {
+ return;
+ } else if (ec) {
+ ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
+ return;
+ }
+
+ on_accept(l, std::move(l.socket));
}
- auto stream = std::move(l.socket);
+}
+
+void AsioFrontend::on_accept(Listener& l, tcp::socket stream)
+{
+ boost::system::error_code ec;
stream.set_option(tcp::no_delay(l.use_nodelay), ec);
- l.acceptor.async_accept(l.socket,
- [this, &l] (boost::system::error_code ec) {
- accept(l, ec);
- });
// spawn a coroutine to handle the connection
#ifdef WITH_RADOSGW_BEAST_OPENSSL
// close all listeners
for (auto& listener : listeners) {
listener.acceptor.close(ec);
+ // signal cancellation of accept()
+ listener.signal.emit(boost::asio::cancellation_type::terminal);
}
// close all connections
connections.close(ec);
boost::system::error_code ec;
for (auto& l : listeners) {
l.acceptor.cancel(ec);
+ // signal cancellation of accept()
+ l.signal.emit(boost::asio::cancellation_type::terminal);
}
// pause and wait for outstanding requests to complete
// start accepting connections again
for (auto& l : listeners) {
- l.acceptor.async_accept(l.socket,
- [this, &l] (boost::system::error_code ec) {
- accept(l, ec);
- });
+ boost::asio::spawn(context,
+ [this, &l] (boost::asio::yield_context yield) mutable {
+ accept(l, yield);
+ }, bind_cancellation_slot(l.signal.slot(),
+ bind_executor(context, boost::asio::detached)));
+
}
ldout(ctx(), 4) << "frontend unpaused" << dendl;