From: Casey Bodley Date: Mon, 20 Jun 2016 03:29:20 +0000 (-0400) Subject: rgw: asio frontend uses async_read X-Git-Tag: v11.1.0~454^2~68 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8340439b0975749b414e882195ab369f04fc2edb;p=ceph.git rgw: asio frontend uses async_read Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_asio_client.cc b/src/rgw/rgw_asio_client.cc index 653ade56598d..8b584799a74f 100644 --- a/src/rgw/rgw_asio_client.cc +++ b/src/rgw/rgw_asio_client.cc @@ -2,14 +2,8 @@ // vim: ts=8 sw=2 smarttab #include -#include #include -#include -#include -#include -#include - #include "rgw_asio_client.h" #define dout_subsys ceph_subsys_rgw @@ -18,19 +12,18 @@ #define dout_prefix (*_dout << "asio: ") -RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint) - : socket(std::move(socket)), endpoint(std::move(endpoint)) +RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, request_type&& request) + : socket(std::move(socket)), request(std::move(request)) {} RGWAsioClientIO::~RGWAsioClientIO() = default; void RGWAsioClientIO::init_env(CephContext *cct) { - beast::basic_streambuf> buf; // XXX: not sure what this is for - beast::http::read(socket, buf, req); - body_iter = req.body.begin(); + env.init(cct); + body_iter = request.body.begin(); - const auto& headers = req.headers; + const auto& headers = request.headers; for (auto header = headers.begin(); header != headers.end(); ++header) { const auto& name = header->name(); const auto& value = header->value(); @@ -64,10 +57,10 @@ void RGWAsioClientIO::init_env(CephContext *cct) env.set(buf, value); } - env.set("REQUEST_METHOD", req.method); + env.set("REQUEST_METHOD", request.method); // split uri from query - auto url = boost::string_ref{req.url}; + auto url = boost::string_ref{request.url}; auto pos = url.find('?'); auto query = url.substr(pos + 1); url = url.substr(0, pos); diff --git a/src/rgw/rgw_asio_client.h b/src/rgw/rgw_asio_client.h index 43239716e680..d6426b78a473 100644 --- a/src/rgw/rgw_asio_client.h +++ b/src/rgw/rgw_asio_client.h @@ -4,6 +4,8 @@ #define RGW_ASIO_CLIENT_H #include +#include +#include #include #include "include/assert.h" @@ -25,11 +27,10 @@ class RGWBufferlistBody { class RGWAsioClientIO : public RGWStreamIO { using tcp = boost::asio::ip::tcp; tcp::socket socket; - tcp::endpoint endpoint; using body_type = RGWBufferlistBody; using request_type = beast::http::request_v1; - request_type req; + request_type request; bufferlist::const_iterator body_iter; @@ -41,7 +42,7 @@ class RGWAsioClientIO : public RGWStreamIO { int read_data(char *buf, int max) override; public: - RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint); + RGWAsioClientIO(tcp::socket&& socket, request_type&& request); ~RGWAsioClientIO(); int complete_request() override; diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index 22a92da0fa7a..a89ec7711dad 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -9,6 +9,13 @@ #include #include +#include +#include +#include +#include +#include +#include + #include "rgw_asio_frontend.h" #include "rgw_asio_client.h" @@ -64,13 +71,68 @@ void Pauser::wait() using tcp = boost::asio::ip::tcp; +class AsioConnection : public std::enable_shared_from_this { + RGWProcessEnv& env; + boost::asio::io_service::strand strand; + tcp::socket socket; + tcp::endpoint endpoint; + beast::streambuf buf; + beast::http::request_v1 request; + + public: + void on_read(boost::system::error_code ec) { + auto cct = env.store->ctx(); + if (ec) { + if (ec.category() == beast::http::get_parse_error_category()) { + ldout(cct, 1) << "parse failed with " << ec.message() << dendl; + } else { + ldout(cct, 1) << "read failed with " << ec.message() << dendl; + } + write_bad_request(); + return; + } + RGWRequest req{env.store->get_new_req_id()}; + RGWAsioClientIO client{std::move(socket), std::move(request)}; + process_request(env.store, env.rest, &req, &client, env.olog); + } + + void write_bad_request() { + beast::http::response_v1 response; + response.status = 400; + response.reason = "Bad Request"; + response.version = request.version; + beast::http::prepare(response); + beast::http::async_write(socket, std::move(response), + std::bind(&AsioConnection::on_write, + shared_from_this(), + beast::asio::placeholders::error)); + } + + void on_write(boost::system::error_code ec) { + auto cct = env.store->ctx(); + if (ec) { + ldout(cct, 1) << "write failed with " << ec.message() << dendl; + } + } + + public: + AsioConnection(RGWProcessEnv& env, tcp::socket&& socket) + : env(env), strand(socket.get_io_service()), socket(std::move(socket)) + {} + + void read() { + beast::http::async_read(socket, buf, request, strand.wrap( + std::bind(&AsioConnection::on_read, shared_from_this(), + beast::asio::placeholders::error))); + } +}; + class AsioFrontend { RGWProcessEnv env; boost::asio::io_service service; tcp::acceptor acceptor; tcp::socket peer_socket; - tcp::endpoint peer_endpoint; std::vector threads; Pauser pauser; @@ -101,7 +163,7 @@ int AsioFrontend::init() acceptor.set_option(tcp::acceptor::reuse_address(true)); acceptor.bind(ep); acceptor.listen(boost::asio::socket_base::max_connections); - acceptor.async_accept(peer_socket, peer_endpoint, + acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); }); @@ -118,18 +180,13 @@ void AsioFrontend::accept(boost::system::error_code ec) throw ec; } auto socket = std::move(peer_socket); - auto endpoint = std::move(peer_endpoint); - acceptor.async_accept(peer_socket, peer_endpoint, + acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); }); - ldout(ctx(), 4) << "accept " << endpoint << dendl; - - RGWRequest req{env.store->get_new_req_id()}; - RGWAsioClientIO client{std::move(socket), std::move(endpoint)}; - process_request(env.store, env.rest, &req, &client, env.olog); + std::make_shared(env, std::move(socket))->read(); } int AsioFrontend::run() @@ -192,7 +249,7 @@ void AsioFrontend::unpause(RGWRados *store) env.store = store; ldout(ctx(), 4) << "frontend unpaused" << dendl; service.reset(); - acceptor.async_accept(peer_socket, peer_endpoint, + acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); });