From: Casey Bodley Date: Wed, 12 Jul 2017 15:39:26 +0000 (-0400) Subject: rgw: beast frontend uses callbacks instead of coroutines X-Git-Tag: v12.2.3~126^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c897636ff13750acd446cd7a37b4b5c83e4d4dde;p=ceph.git rgw: beast frontend uses callbacks instead of coroutines Signed-off-by: Casey Bodley (cherry picked from commit 7767d8d88aecac8e88ed4c87a187c7a2ed67cf48) --- diff --git a/src/rgw/rgw_asio_client.cc b/src/rgw/rgw_asio_client.cc index b25e115e9f2c..d0174c9bbbf1 100644 --- a/src/rgw/rgw_asio_client.cc +++ b/src/rgw/rgw_asio_client.cc @@ -9,9 +9,6 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw -#undef dout_prefix -#define dout_prefix (*_dout << "asio: ") - using namespace rgw::asio; ClientIO::ClientIO(tcp::socket& socket, @@ -114,9 +111,7 @@ size_t ClientIO::read_data(char* buf, size_t max) while (body_remaining.size && !parser.is_done()) { boost::system::error_code ec; beast::http::read_some(socket, buffer, parser, ec); - if (ec == boost::asio::error::connection_reset || - ec == boost::asio::error::eof || - ec == beast::http::error::partial_message || + if (ec == beast::http::error::partial_message || ec == beast::http::error::need_buffer) { break; } diff --git a/src/rgw/rgw_asio_client.h b/src/rgw/rgw_asio_client.h index 64a5bdcd1e81..f193e17c4964 100644 --- a/src/rgw/rgw_asio_client.h +++ b/src/rgw/rgw_asio_client.h @@ -38,8 +38,6 @@ class ClientIO : public io::RestfulClient, beast::flat_buffer& buffer); ~ClientIO() override; - bool get_conn_close() const { return conn_close; } - void init_env(CephContext *cct) override; size_t complete_request() override; void flush() override; diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index ddc5faebfc0e..9a1f01d84a36 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -1,22 +1,19 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include #include #include #include #include #include -#include #include "rgw_asio_client.h" #include "rgw_asio_frontend.h" #define dout_subsys ceph_subsys_rgw -//#undef dout_prefix -//#define dout_prefix (*_dout << "asio: ") - namespace { class Pauser { @@ -65,67 +62,102 @@ void Pauser::wait() using tcp = boost::asio::ip::tcp; namespace beast = boost::beast; -// coroutine to handle a client connection to completion -static void handle_connection(RGWProcessEnv& env, tcp::socket socket, - boost::asio::yield_context yield) -{ - auto cct = env.store->ctx(); - boost::system::error_code ec; +class Connection { + RGWProcessEnv& env; + boost::asio::strand strand; + tcp::socket socket; - // limit header to 4k, since we read it all into a single buffer - constexpr size_t header_limit = 4096; + // references are bound to callbacks for async operations. if a callback + // function returns without issuing another operation, the reference is + // dropped and the Connection is deleted/closed + std::atomic nref{0}; + using Ref = boost::intrusive_ptr; + + // limit header to 4k, since we read it all into a single flat_buffer + static constexpr size_t header_limit = 4096; // don't impose a limit on the body, since we read it in pieces - constexpr size_t body_limit = std::numeric_limits::max(); + static constexpr size_t body_limit = std::numeric_limits::max(); beast::flat_buffer buffer; + boost::optional parser; + + using bad_response_type = beast::http::response; + boost::optional response; + + CephContext* ctx() const { return env.store->ctx(); } - // read messages from the socket until eof - for (;;) { + void read_header() { // configure the parser - rgw::asio::parser_type parser; - parser.header_limit(header_limit); - parser.body_limit(body_limit); + parser.emplace(); + parser->header_limit(header_limit); + parser->body_limit(body_limit); // parse the header - beast::http::async_read_header(socket, buffer, parser, yield[ec]); + beast::http::async_read_header(socket, buffer, *parser, strand.wrap( + std::bind(&Connection::on_header, Ref{this}, + std::placeholders::_1))); + } + + void on_write_error(boost::system::error_code ec) { + if (ec) { + ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl; + } + } + void on_header(boost::system::error_code ec) { if (ec == boost::asio::error::connection_reset || - ec == boost::asio::error::eof) { + ec == beast::http::error::end_of_stream) { return; } if (ec) { - auto& message = parser.get(); - ldout(cct, 1) << "read failed: " << ec.message() << dendl; - ldout(cct, 1) << "====== req done http_status=400 ======" << dendl; - beast::http::response response; - response.result(beast::http::status::bad_request); - response.reason("Bad Request"); - response.version(message.version() == 10 ? 10 : 11); - response.prepare_payload(); - beast::http::async_write(socket, response, yield[ec]); - // ignore ec + auto& message = parser->get(); + ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl; + ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl; + response.emplace(); + response->result(beast::http::status::bad_request); + response->version(message.version() == 10 ? 10 : 11); + response->prepare_payload(); + beast::http::async_write(socket, *response, strand.wrap( + std::bind(&Connection::on_write_error, Ref{this}, + std::placeholders::_1))); return; } // process the request RGWRequest req{env.store->get_new_req_id()}; - rgw::asio::ClientIO real_client{socket, parser, buffer}; + rgw::asio::ClientIO real_client{socket, *parser, buffer}; auto real_client_io = rgw::io::add_reordering( - rgw::io::add_buffering(cct, + rgw::io::add_buffering(ctx(), rgw::io::add_chunking( rgw::io::add_conlen_controlling( &real_client)))); - RGWRestfulIO client(cct, &real_client_io); + RGWRestfulIO client(ctx(), &real_client_io); process_request(env.store, env.rest, &req, env.uri_prefix, *env.auth_registry, &client, env.olog); - if (real_client.get_conn_close()) { - return; + if (!real_client.get_conn_close()) { + // read next header + read_header(); } } -} + + public: + Connection(RGWProcessEnv& env, tcp::socket&& socket) + : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {} + + void on_connect() { + read_header(); + } + + void get() { ++nref; } + void put() { if (nref.fetch_sub(1) == 1) { delete this; } } + + friend void intrusive_ptr_add_ref(Connection *c) { c->get(); } + friend void intrusive_ptr_release(Connection *c) { c->put(); } +}; + class AsioFrontend { RGWProcessEnv env; @@ -190,15 +222,14 @@ void AsioFrontend::accept(boost::system::error_code ec) throw ec; } auto socket = std::move(peer_socket); - // spawn a coroutine to handle the connection - boost::asio::spawn(service, - [&] (boost::asio::yield_context yield) { - handle_connection(env, std::move(socket), yield); - }); acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); }); + + boost::intrusive_ptr conn{new Connection(env, std::move(socket))}; + conn->on_connect(); + // reference drops here, but on_connect() takes another } int AsioFrontend::run()