#include <vector>
#include <boost/asio.hpp>
+#include <boost/asio/spawn.hpp>
#include "rgw_asio_client.h"
#include "rgw_asio_frontend.h"
using tcp = boost::asio::ip::tcp;
namespace beast = boost::beast;
-class Connection {
- RGWProcessEnv& env;
- boost::asio::io_service::strand strand;
- tcp::socket socket;
-
- // references are bound to callbacks for async operations. if a callback
- // function returns without issuing another operation, the reference is
- // dropped and the Connection is deleted/closed
- std::atomic<int> nref{0};
- using Ref = boost::intrusive_ptr<Connection>;
-
+void handle_connection(RGWProcessEnv& env, tcp::socket& socket,
+ boost::asio::yield_context yield)
+{
// limit header to 4k, since we read it all into a single flat_buffer
static constexpr size_t header_limit = 4096;
// don't impose a limit on the body, since we read it in pieces
static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
+ auto cct = env.store->ctx();
+ boost::system::error_code ec;
beast::flat_buffer buffer;
- boost::optional<rgw::asio::parser_type> parser;
-
- using bad_response_type = beast::http::response<beast::http::empty_body>;
- boost::optional<bad_response_type> response;
- CephContext* ctx() const { return env.store->ctx(); }
-
- void read_header() {
+ // read messages from the socket until eof
+ for (;;) {
// configure the parser
- parser.emplace();
- parser->header_limit(header_limit);
- parser->body_limit(body_limit);
+ rgw::asio::parser_type parser;
+ parser.header_limit(header_limit);
+ parser.body_limit(body_limit);
// parse the header
- beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
- std::bind(&Connection::on_header, Ref{this},
- std::placeholders::_1)));
- }
-
- void discard_unread_message() {
- if (parser->is_done()) {
- // nothing left to discard, start reading the next message
- read_header();
- return;
- }
-
- // read the rest of the request into a static buffer. multiple clients could
- // write at the same time, but this is okay because we never read it back
- static std::array<char, 1024> discard_buffer;
-
- auto& body = parser->get().body();
- body.size = discard_buffer.size();
- body.data = discard_buffer.data();
-
- beast::http::async_read_some(socket, buffer, *parser, strand.wrap(
- std::bind(&Connection::on_discard_unread, Ref{this},
- std::placeholders::_1)));
- }
-
- void on_discard_unread(boost::system::error_code ec) {
- if (ec == boost::asio::error::connection_reset) {
- return;
- }
- if (ec) {
- ldout(ctx(), 5) << "discard_unread_message failed: "
- << ec.message() << dendl;
- return;
- }
- discard_unread_message();
- }
-
- void on_write_error(boost::system::error_code ec) {
- if (ec) {
- ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
- }
- }
-
- void on_header(boost::system::error_code ec) {
+ beast::http::async_read_header(socket, buffer, parser, yield[ec]);
if (ec == boost::asio::error::connection_reset ||
ec == beast::http::error::end_of_stream) {
return;
}
if (ec) {
- auto& message = parser->get();
- ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
- ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
- response.emplace();
- response->result(beast::http::status::bad_request);
- response->version(message.version() == 10 ? 10 : 11);
- response->prepare_payload();
- beast::http::async_write(socket, *response, strand.wrap(
- std::bind(&Connection::on_write_error, Ref{this},
- std::placeholders::_1)));
+ ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
+ auto& message = parser.get();
+ beast::http::response<beast::http::empty_body> response;
+ response.result(beast::http::status::bad_request);
+ response.version(message.version() == 10 ? 10 : 11);
+ response.prepare_payload();
+ beast::http::async_write(socket, response, yield[ec]);
+ if (ec) {
+ ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
+ }
+ ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
return;
}
// process the request
RGWRequest req{env.store->get_new_req_id()};
- rgw::asio::ClientIO real_client{socket, *parser, buffer};
+ rgw::asio::ClientIO real_client{socket, parser, buffer};
auto real_client_io = rgw::io::add_reordering(
- rgw::io::add_buffering(ctx(),
+ rgw::io::add_buffering(cct,
rgw::io::add_chunking(
rgw::io::add_conlen_controlling(
&real_client))));
- RGWRestfulIO client(ctx(), &real_client_io);
+ RGWRestfulIO client(cct, &real_client_io);
process_request(env.store, env.rest, &req, env.uri_prefix,
*env.auth_registry, &client, env.olog);
- if (parser->keep_alive()) {
- // parse any unread bytes from the previous message (in case we replied
- // before reading the entire body) before reading the next
- discard_unread_message();
+ if (!parser.keep_alive()) {
+ return;
}
- }
- public:
- Connection(RGWProcessEnv& env, tcp::socket&& socket)
- : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
-
- void on_connect() {
- read_header();
- }
+ // if we failed before reading the entire message, discard any remaining
+ // bytes before reading the next
+ while (!parser.is_done()) {
+ static std::array<char, 1024> discard_buffer;
- void get() { ++nref; }
- void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
+ auto& body = parser.get().body();
+ body.size = discard_buffer.size();
+ body.data = discard_buffer.data();
- friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
- friend void intrusive_ptr_release(Connection *c) { c->put(); }
-};
+ beast::http::async_read_some(socket, buffer, parser, yield[ec]);
+ if (ec == boost::asio::error::connection_reset) {
+ return;
+ }
+ if (ec) {
+ ldout(cct, 5) << "failed to discard unread message: "
+ << ec.message() << dendl;
+ return;
+ }
+ }
+ }
+}
class AsioFrontend {
RGWProcessEnv env;
accept(l, ec);
});
- boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
- conn->on_connect();
- // reference drops here, but on_connect() takes another
+ // spawn a coroutine to handle the connection
+ boost::asio::spawn(service,
+ [this, socket=std::move(socket)] (boost::asio::yield_context yield) mutable {
+ handle_connection(env, socket, yield);
+ });
}
int AsioFrontend::run()