From 5b41fb55e40833de55e1d93e0be07c6da319a8bb Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sat, 18 Feb 2017 18:00:05 -0500 Subject: [PATCH] rgw: update Beast for streaming reads in asio frontend Signed-off-by: Casey Bodley --- CMakeLists.txt | 2 +- src/rgw/CMakeLists.txt | 3 +- src/rgw/rgw_asio_client.cc | 78 +++++++++++++++--------- src/rgw/rgw_asio_client.h | 113 +++++++++++++++------------------- src/rgw/rgw_asio_frontend.cc | 115 ++++++++++++++++++----------------- 5 files changed, 159 insertions(+), 152 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4338e39fa27..587ad337a4c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -508,7 +508,7 @@ endif() option(WITH_SYSTEM_BOOST "require and build with system Boost" OFF) set(BOOST_COMPONENTS - container thread system regex random program_options date_time iostreams) + container thread system regex random program_options date_time iostreams coroutine context) if(WITH_MGR) list(APPEND BOOST_COMPONENTS python) endif() diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index fabf4b50f88..b95a4f2b043 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -116,11 +116,12 @@ add_dependencies(rgw_a civetweb_h) target_include_directories(rgw_a PUBLIC "../Beast/include" ${FCGI_INCLUDE_DIR}) +target_compile_definitions(rgw_a PUBLIC BOOST_COROUTINES_NO_DEPRECATION_WARNING) target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount_client cls_log_client cls_statelog_client cls_timeindex_client cls_version_client cls_replica_log_client cls_user_client ceph-common common_utf8 global - ${CURL_LIBRARIES} + ${CURL_LIBRARIES} ${Boost_LIBRARIES} ${EXPAT_LIBRARIES} ${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS}) diff --git a/src/rgw/rgw_asio_client.cc b/src/rgw/rgw_asio_client.cc index 219a36b6837..63de2d27e75 100644 --- a/src/rgw/rgw_asio_client.cc +++ b/src/rgw/rgw_asio_client.cc @@ -3,6 +3,7 @@ #include #include +#include #include "rgw_asio_client.h" @@ -12,22 +13,23 @@ #undef dout_prefix #define dout_prefix (*_dout << "asio: ") +using namespace rgw::asio; -RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, - request_type&& request) - : socket(std::move(socket)), - request(std::move(request)), - txbuf(*this) { +ClientIO::ClientIO(tcp::socket& socket, + parser_type& parser, + beast::flat_streambuf& buffer) + : socket(socket), parser(parser), buffer(buffer), txbuf(*this) +{ } -RGWAsioClientIO::~RGWAsioClientIO() = default; +ClientIO::~ClientIO() = default; -void RGWAsioClientIO::init_env(CephContext *cct) +void ClientIO::init_env(CephContext *cct) { env.init(cct); - body_iter = request.body.begin(); - const auto& headers = request.headers; + const auto& request = parser.get(); + const auto& headers = request.fields; for (auto header = headers.begin(); header != headers.end(); ++header) { const auto& name = header->name(); const auto& value = header->value(); @@ -80,42 +82,58 @@ void RGWAsioClientIO::init_env(CephContext *cct) // TODO: set REMOTE_USER if authenticated } -size_t RGWAsioClientIO::write_data(const char* const buf, - const size_t len) +size_t ClientIO::write_data(const char* buf, size_t len) { boost::system::error_code ec; auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec); if (ec) { derr << "write_data failed: " << ec.message() << dendl; throw rgw::io::Exception(ec.value(), std::system_category()); - } else { - /* According to the documentation of boost::asio::write if there is - * no error (signalised by ec), then bytes == len. We don't need to - * take care of partial writes in such situation. */ - return bytes; } + /* According to the documentation of boost::asio::write if there is + * no error (signalised by ec), then bytes == len. We don't need to + * take care of partial writes in such situation. */ + return bytes; } -size_t RGWAsioClientIO::read_data(char* const buf, const size_t max) +size_t ClientIO::read_data(char* buf, size_t max) { - // read data from the body's bufferlist - auto bytes = std::min(max, body_iter.get_remaining()); - body_iter.copy(bytes, buf); - return bytes; + auto& message = parser.get(); + auto& body_remaining = message.body; + body_remaining = boost::asio::mutable_buffer{buf, max}; + + boost::system::error_code ec; + + dout(30) << this << " read_data for " << max << " with " + << buffer.size() << " bytes buffered" << dendl; + + while (boost::asio::buffer_size(body_remaining) && !parser.is_complete()) { + auto bytes = beast::http::read_some(socket, buffer, parser, ec); + buffer.consume(bytes); + if (ec == boost::asio::error::connection_reset || + ec == boost::asio::error::eof || + ec == beast::http::error::partial_message) { + break; + } + if (ec) { + derr << "failed to read body: " << ec.message() << dendl; + throw rgw::io::Exception(ec.value(), std::system_category()); + } + } + return max - boost::asio::buffer_size(body_remaining); } -size_t RGWAsioClientIO::complete_request() +size_t ClientIO::complete_request() { return 0; } -void RGWAsioClientIO::flush() +void ClientIO::flush() { txbuf.pubsync(); } -size_t RGWAsioClientIO::send_status(const int status, - const char* const status_name) +size_t ClientIO::send_status(int status, const char* status_name) { static constexpr size_t STATUS_BUF_SIZE = 128; @@ -126,7 +144,7 @@ size_t RGWAsioClientIO::send_status(const int status, return txbuf.sputn(statusbuf, statuslen); } -size_t RGWAsioClientIO::send_100_continue() +size_t ClientIO::send_100_continue() { const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n"; const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE, @@ -148,7 +166,7 @@ static size_t dump_date_header(char (×tr)[TIME_BUF_SIZE]) "Date: %a, %d %b %Y %H:%M:%S %Z\r\n", tmp); } -size_t RGWAsioClientIO::complete_header() +size_t ClientIO::complete_header() { size_t sent = 0; @@ -172,8 +190,8 @@ size_t RGWAsioClientIO::complete_header() return sent; } -size_t RGWAsioClientIO::send_header(const boost::string_ref& name, - const boost::string_ref& value) +size_t ClientIO::send_header(const boost::string_ref& name, + const boost::string_ref& value) { static constexpr char HEADER_SEP[] = ": "; static constexpr char HEADER_END[] = "\r\n"; @@ -188,7 +206,7 @@ size_t RGWAsioClientIO::send_header(const boost::string_ref& name, return sent; } -size_t RGWAsioClientIO::send_content_length(const uint64_t len) +size_t ClientIO::send_content_length(uint64_t len) { static constexpr size_t CONLEN_BUF_SIZE = 128; diff --git a/src/rgw/rgw_asio_client.h b/src/rgw/rgw_asio_client.h index c87fd5f720b..513a3ef0ca2 100644 --- a/src/rgw/rgw_asio_client.h +++ b/src/rgw/rgw_asio_client.h @@ -4,36 +4,58 @@ #define RGW_ASIO_CLIENT_H #include -#include -#include -#include +#include +#include +#include #include "include/assert.h" #include "rgw_client_io.h" -// bufferlist to represent the message body -class RGWBufferlistBody { - public: - using value_type = ceph::bufferlist; +namespace rgw { +namespace asio { - class reader; - class writer; +/// streaming message body interface +struct streaming_body { + using value_type = boost::asio::mutable_buffer; - template - using message_type = beast::http::message; -}; + class reader { + value_type& buffer; + public: + using mutable_buffers_type = boost::asio::mutable_buffers_1; -class RGWAsioClientIO : public rgw::io::RestfulClient, - public rgw::io::BuffererSink { - using tcp = boost::asio::ip::tcp; - tcp::socket socket; + static const bool is_direct{true}; // reads directly into user buffer - using body_type = RGWBufferlistBody; - using request_type = beast::http::request_v1; - request_type request; + template + explicit reader(beast::http::message& m) + : buffer(m.body) + {} + + void init() {} + void init(uint64_t content_length) {} + void finish() {} + + mutable_buffers_type prepare(size_t n) { + n = std::min(n, boost::asio::buffer_size(buffer)); + auto position = boost::asio::buffer_cast(buffer); + return {position, n}; + } - bufferlist::const_iterator body_iter; + void commit(size_t n) { + buffer = buffer + n; + } + }; +}; + +using header_type = beast::http::fields; +using parser_type = beast::http::message_parser; + +class ClientIO : public io::RestfulClient, + public io::BuffererSink { + private: + using tcp = boost::asio::ip::tcp; + tcp::socket& socket; + parser_type& parser; + beast::flat_streambuf& buffer; //< parse buffer bool conn_keepalive{false}; bool conn_close{false}; @@ -45,8 +67,11 @@ class RGWAsioClientIO : public rgw::io::RestfulClient, size_t read_data(char *buf, size_t max); public: - RGWAsioClientIO(tcp::socket&& socket, request_type&& request); - ~RGWAsioClientIO() override; + ClientIO(tcp::socket& socket, parser_type& parser, + beast::flat_streambuf& buffer); + ~ClientIO() override; + + bool get_conn_close() const { return conn_close; } void init_env(CephContext *cct) override; size_t complete_request() override; @@ -71,45 +96,7 @@ class RGWAsioClientIO : public rgw::io::RestfulClient, } }; -// used by beast::http::read() to read the body into a bufferlist -class RGWBufferlistBody::reader { - value_type& bl; - public: - template - explicit reader(message_type& m) : bl(m.body) {} - - void write(const char* data, size_t size, boost::system::error_code&) { - bl.append(data, size); - } -}; - -// used by beast::http::write() to write the buffered body -class RGWBufferlistBody::writer { - const value_type& bl; - public: - template - explicit writer(const message_type& msg) - : bl(msg.body) {} - - void init(boost::system::error_code& ec) {} - uint64_t content_length() const { return bl.length(); } - - template - boost::tribool operator()(beast::http::resume_context&&, - boost::system::error_code&, Write&& write) { - // translate from bufferlist to a ConstBufferSequence for beast - std::vector buffers; - buffers.reserve(bl.get_num_buffers()); - for (auto& ptr : bl.buffers()) { - buffers.emplace_back(ptr.c_str(), ptr.length()); - } - write(buffers); - return true; - } -}; -static_assert(beast::http::is_ReadableBody{}, - "RGWBufferlistBody does not satisfy ReadableBody"); -static_assert(beast::http::is_WritableBody{}, - "RGWBufferlistBody does not satisfy WritableBody"); +} // namespace asio +} // namespace rgw #endif // RGW_ASIO_CLIENT_H diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index ff2d7806759..5fc1deec42e 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -7,13 +7,11 @@ #include #include -#include +#include #include -#include -#include -#include #include +#include #include #include "rgw_asio_frontend.h" @@ -71,28 +69,47 @@ void Pauser::wait() using tcp = boost::asio::ip::tcp; -class AsioConnection : public std::enable_shared_from_this { - RGWProcessEnv& env; - boost::asio::io_service::strand strand; - tcp::socket socket; - tcp::endpoint endpoint; - beast::streambuf buf; - beast::http::request_v1 request; +// coroutine to handle a client connection to completion +static void handle_connection(RGWProcessEnv& env, tcp::socket socket, + boost::asio::yield_context yield) +{ + auto cct = env.store->ctx(); + boost::system::error_code ec; - public: - void on_read(boost::system::error_code ec) { - auto cct = env.store->ctx(); + beast::flat_streambuf buffer{1024}; + + // read messages from the socket until eof + for (;;) { + // parse the header + rgw::asio::parser_type parser; + do { + auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]); + buffer.consume(bytes); + } while (!ec && !parser.got_header()); + + if (ec == boost::asio::error::connection_reset || + ec == boost::asio::error::eof) { + return; + } if (ec) { - if (ec.category() == beast::http::get_parse_error_category()) { - ldout(cct, 1) << "parse failed: " << ec.message() << dendl; - } else { - ldout(cct, 1) << "read failed: " << ec.message() << dendl; - } - write_bad_request(); + auto& message = parser.get(); + ldout(cct, 1) << "read failed: " << ec.message() << dendl; + ldout(cct, 1) << "====== req done http_status=400 ======" << dendl; + beast::http::response response; + response.status = 400; + response.reason = "Bad Request"; + response.version = message.version == 10 ? 10 : 11; + beast::http::prepare(response); + beast::http::async_write(socket, std::move(response), yield[ec]); + // ignore ec return; } + + // process the request RGWRequest req{env.store->get_new_req_id()}; - RGWAsioClientIO real_client{std::move(socket), std::move(request)}; + + rgw::asio::ClientIO real_client{socket, parser, buffer}; + auto real_client_io = rgw::io::add_reordering( rgw::io::add_buffering( rgw::io::add_chunking( @@ -101,40 +118,12 @@ class AsioConnection : public std::enable_shared_from_this { RGWRestfulIO client(&real_client_io); process_request(env.store, env.rest, &req, env.uri_prefix, *env.auth_registry, &client, env.olog); - } - - void write_bad_request() { - beast::http::response_v1 response; - response.status = 400; - response.reason = "Bad Request"; - /* If the request is so terribly malformed that we can't extract even - * the protocol version, we will use HTTP/1.1 as a fallback. */ - response.version = request.version ? request.version : 11; - beast::http::prepare(response); - beast::http::async_write(socket, std::move(response), - std::bind(&AsioConnection::on_write, - shared_from_this(), - beast::asio::placeholders::error)); - } - void on_write(boost::system::error_code ec) { - auto cct = env.store->ctx(); - if (ec) { - ldout(cct, 1) << "write failed: " << ec.message() << dendl; + if (real_client.get_conn_close()) { + return; } } - - public: - AsioConnection(RGWProcessEnv& env, tcp::socket&& socket) - : env(env), strand(socket.get_io_service()), socket(std::move(socket)) - {} - - void read() { - beast::http::async_read(socket, buf, request, strand.wrap( - std::bind(&AsioConnection::on_read, shared_from_this(), - beast::asio::placeholders::error))); - } -}; +} class AsioFrontend { RGWProcessEnv env; @@ -168,9 +157,19 @@ int AsioFrontend::init() auto ep = tcp::endpoint{tcp::v4(), static_cast(env.port)}; ldout(ctx(), 4) << "frontend listening on " << ep << dendl; - acceptor.open(ep.protocol()); + boost::system::error_code ec; + acceptor.open(ep.protocol(), ec); + if (ec) { + lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; + return -ec.value(); + } acceptor.set_option(tcp::acceptor::reuse_address(true)); - acceptor.bind(ep); + acceptor.bind(ep, ec); + if (ec) { + lderr(ctx()) << "failed to bind address " << ep << + ": " << ec.message() << dendl; + return -ec.value(); + } acceptor.listen(boost::asio::socket_base::max_connections); acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { @@ -189,13 +188,15 @@ void AsioFrontend::accept(boost::system::error_code ec) throw ec; } auto socket = std::move(peer_socket); - + // spawn a coroutine to handle the connection + boost::asio::spawn(service, + [&] (boost::asio::yield_context yield) { + handle_connection(env, std::move(socket), yield); + }); acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); }); - - std::make_shared(env, std::move(socket))->read(); } int AsioFrontend::run() -- 2.39.5