From: Casey Bodley Date: Thu, 9 Nov 2017 03:19:12 +0000 (-0500) Subject: rgw: switch beast frontend back to stackful coroutine X-Git-Tag: v13.0.2~122^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=299cbdb397a1ee0df91c93766001394a487e7d35;p=ceph.git rgw: switch beast frontend back to stackful coroutine Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index f01dff917ed2..967cbdfa7296 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -142,11 +142,7 @@ add_library(rgw_a STATIC ${rgw_a_srcs}) add_dependencies(rgw_a civetweb_h) -target_include_directories(rgw_a SYSTEM PUBLIC - ${FCGI_INCLUDE_DIR} - "../rapidjson/include" - ) -target_compile_definitions(rgw_a PUBLIC BOOST_COROUTINES_NO_DEPRECATION_WARNING) +target_include_directories(rgw_a SYSTEM PUBLIC "../rapidjson/include") target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount_client cls_log_client cls_statelog_client cls_timeindex_client cls_version_client @@ -155,6 +151,11 @@ target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount ${EXPAT_LIBRARIES} ${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS}) +if (WITH_RADOSGW_BEAST_FRONTEND) + target_compile_definitions(rgw_a PUBLIC BOOST_COROUTINES_NO_DEPRECATION_WARNING) + target_link_libraries(rgw_a Boost::coroutine Boost::context) +endif() + set(radosgw_srcs rgw_loadgen_process.cc rgw_civetweb.cc diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index d4a44d39447e..faf7806fccb6 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -8,6 +8,7 @@ #include #include +#include #include "rgw_asio_client.h" #include "rgw_asio_frontend.h" @@ -62,134 +63,85 @@ void Pauser::wait() using tcp = boost::asio::ip::tcp; namespace beast = boost::beast; -class Connection { - RGWProcessEnv& env; - boost::asio::io_service::strand strand; - tcp::socket socket; - - // references are bound to callbacks for async operations. if a callback - // function returns without issuing another operation, the reference is - // dropped and the Connection is deleted/closed - std::atomic nref{0}; - using Ref = boost::intrusive_ptr; - +void handle_connection(RGWProcessEnv& env, tcp::socket& socket, + boost::asio::yield_context yield) +{ // limit header to 4k, since we read it all into a single flat_buffer static constexpr size_t header_limit = 4096; // don't impose a limit on the body, since we read it in pieces static constexpr size_t body_limit = std::numeric_limits::max(); + auto cct = env.store->ctx(); + boost::system::error_code ec; beast::flat_buffer buffer; - boost::optional parser; - - using bad_response_type = beast::http::response; - boost::optional response; - CephContext* ctx() const { return env.store->ctx(); } - - void read_header() { + // read messages from the socket until eof + for (;;) { // configure the parser - parser.emplace(); - parser->header_limit(header_limit); - parser->body_limit(body_limit); + rgw::asio::parser_type parser; + parser.header_limit(header_limit); + parser.body_limit(body_limit); // parse the header - beast::http::async_read_header(socket, buffer, *parser, strand.wrap( - std::bind(&Connection::on_header, Ref{this}, - std::placeholders::_1))); - } - - void discard_unread_message() { - if (parser->is_done()) { - // nothing left to discard, start reading the next message - read_header(); - return; - } - - // read the rest of the request into a static buffer. multiple clients could - // write at the same time, but this is okay because we never read it back - static std::array discard_buffer; - - auto& body = parser->get().body(); - body.size = discard_buffer.size(); - body.data = discard_buffer.data(); - - beast::http::async_read_some(socket, buffer, *parser, strand.wrap( - std::bind(&Connection::on_discard_unread, Ref{this}, - std::placeholders::_1))); - } - - void on_discard_unread(boost::system::error_code ec) { - if (ec == boost::asio::error::connection_reset) { - return; - } - if (ec) { - ldout(ctx(), 5) << "discard_unread_message failed: " - << ec.message() << dendl; - return; - } - discard_unread_message(); - } - - void on_write_error(boost::system::error_code ec) { - if (ec) { - ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl; - } - } - - void on_header(boost::system::error_code ec) { + beast::http::async_read_header(socket, buffer, parser, yield[ec]); if (ec == boost::asio::error::connection_reset || ec == beast::http::error::end_of_stream) { return; } if (ec) { - auto& message = parser->get(); - ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl; - ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl; - response.emplace(); - response->result(beast::http::status::bad_request); - response->version(message.version() == 10 ? 10 : 11); - response->prepare_payload(); - beast::http::async_write(socket, *response, strand.wrap( - std::bind(&Connection::on_write_error, Ref{this}, - std::placeholders::_1))); + ldout(cct, 1) << "failed to read header: " << ec.message() << dendl; + auto& message = parser.get(); + beast::http::response response; + response.result(beast::http::status::bad_request); + response.version(message.version() == 10 ? 10 : 11); + response.prepare_payload(); + beast::http::async_write(socket, response, yield[ec]); + if (ec) { + ldout(cct, 5) << "failed to write response: " << ec.message() << dendl; + } + ldout(cct, 1) << "====== req done http_status=400 ======" << dendl; return; } // process the request RGWRequest req{env.store->get_new_req_id()}; - rgw::asio::ClientIO real_client{socket, *parser, buffer}; + rgw::asio::ClientIO real_client{socket, parser, buffer}; auto real_client_io = rgw::io::add_reordering( - rgw::io::add_buffering(ctx(), + rgw::io::add_buffering(cct, rgw::io::add_chunking( rgw::io::add_conlen_controlling( &real_client)))); - RGWRestfulIO client(ctx(), &real_client_io); + RGWRestfulIO client(cct, &real_client_io); process_request(env.store, env.rest, &req, env.uri_prefix, *env.auth_registry, &client, env.olog); - if (parser->keep_alive()) { - // parse any unread bytes from the previous message (in case we replied - // before reading the entire body) before reading the next - discard_unread_message(); + if (!parser.keep_alive()) { + return; } - } - public: - Connection(RGWProcessEnv& env, tcp::socket&& socket) - : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {} - - void on_connect() { - read_header(); - } + // if we failed before reading the entire message, discard any remaining + // bytes before reading the next + while (!parser.is_done()) { + static std::array discard_buffer; - void get() { ++nref; } - void put() { if (nref.fetch_sub(1) == 1) { delete this; } } + auto& body = parser.get().body(); + body.size = discard_buffer.size(); + body.data = discard_buffer.data(); - friend void intrusive_ptr_add_ref(Connection *c) { c->get(); } - friend void intrusive_ptr_release(Connection *c) { c->put(); } -}; + beast::http::async_read_some(socket, buffer, parser, yield[ec]); + if (ec == boost::asio::error::connection_reset) { + return; + } + if (ec) { + ldout(cct, 5) << "failed to discard unread message: " + << ec.message() << dendl; + return; + } + } + } +} class AsioFrontend { RGWProcessEnv env; @@ -325,9 +277,11 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) accept(l, ec); }); - boost::intrusive_ptr conn{new Connection(env, std::move(socket))}; - conn->on_connect(); - // reference drops here, but on_connect() takes another + // spawn a coroutine to handle the connection + boost::asio::spawn(service, + [this, socket=std::move(socket)] (boost::asio::yield_context yield) mutable { + handle_connection(env, socket, yield); + }); } int AsioFrontend::run()