From 0753410bdebf0c66583b423eda7231db464ba0a7 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 22 Jan 2018 22:24:23 -0500 Subject: [PATCH] rgw: factor the tcp::socket out of ClientIO remove ClientIO's dependency on a concrete socket type by moving it into a derived StreamIO class in rgw_asio_frontend.cc Signed-off-by: Casey Bodley --- src/rgw/rgw_asio_client.cc | 54 ++++++------------------------------ src/rgw/rgw_asio_client.h | 21 ++++++-------- src/rgw/rgw_asio_frontend.cc | 48 +++++++++++++++++++++++++++++++- 3 files changed, 64 insertions(+), 59 deletions(-) diff --git a/src/rgw/rgw_asio_client.cc b/src/rgw/rgw_asio_client.cc index 0ad801667b2f8..29db397df56fc 100644 --- a/src/rgw/rgw_asio_client.cc +++ b/src/rgw/rgw_asio_client.cc @@ -11,10 +11,13 @@ using namespace rgw::asio; -ClientIO::ClientIO(tcp::socket& socket, - parser_type& parser, - beast::flat_buffer& buffer) - : socket(socket), parser(parser), buffer(buffer), txbuf(*this) +ClientIO::ClientIO(parser_type& parser, + const endpoint_type& local_endpoint, + const endpoint_type& remote_endpoint) + : parser(parser), + local_endpoint(local_endpoint), + remote_endpoint(remote_endpoint), + txbuf(*this) { } @@ -77,53 +80,14 @@ int ClientIO::init_env(CephContext *cct) env.set("SCRIPT_URI", url.to_string()); /* FIXME */ char port_buf[16]; - snprintf(port_buf, sizeof(port_buf), "%d", socket.local_endpoint().port()); + snprintf(port_buf, sizeof(port_buf), "%d", local_endpoint.port()); env.set("SERVER_PORT", port_buf); - env.set("REMOTE_ADDR", socket.remote_endpoint().address().to_string()); + env.set("REMOTE_ADDR", remote_endpoint.address().to_string()); // TODO: set SERVER_PORT_SECURE if using ssl // TODO: set REMOTE_USER if authenticated return 0; } -size_t ClientIO::write_data(const char* buf, size_t len) -{ - boost::system::error_code ec; - auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec); - if (ec) { - derr << "write_data failed: " << ec.message() << dendl; - throw rgw::io::Exception(ec.value(), std::system_category()); - } - /* According to the documentation of boost::asio::write if there is - * no error (signalised by ec), then bytes == len. We don't need to - * take care of partial writes in such situation. */ - return bytes; -} - -size_t ClientIO::read_data(char* buf, size_t max) -{ - auto& message = parser.get(); - auto& body_remaining = message.body(); - body_remaining.data = buf; - body_remaining.size = max; - - dout(30) << this << " read_data for " << max << " with " - << buffer.size() << " bytes buffered" << dendl; - - while (body_remaining.size && !parser.is_done()) { - boost::system::error_code ec; - beast::http::read_some(socket, buffer, parser, ec); - if (ec == beast::http::error::partial_message || - ec == beast::http::error::need_buffer) { - break; - } - if (ec) { - derr << "failed to read body: " << ec.message() << dendl; - throw rgw::io::Exception(ec.value(), std::system_category()); - } - } - return max - body_remaining.size; -} - size_t ClientIO::complete_request() { perfcounter->inc(l_rgw_qlen, -1); diff --git a/src/rgw/rgw_asio_client.h b/src/rgw/rgw_asio_client.h index eeed4ee8051e2..2dac89b41775f 100644 --- a/src/rgw/rgw_asio_client.h +++ b/src/rgw/rgw_asio_client.h @@ -18,22 +18,21 @@ using parser_type = beast::http::request_parser; class ClientIO : public io::RestfulClient, public io::BuffererSink { - private: - using tcp = boost::asio::ip::tcp; - tcp::socket& socket; + protected: parser_type& parser; - beast::flat_buffer& buffer; //< parse buffer + private: + using endpoint_type = boost::asio::ip::tcp::endpoint; + endpoint_type local_endpoint; + endpoint_type remote_endpoint; RGWEnv env; rgw::io::StaticOutputBufferer<> txbuf; - size_t write_data(const char *buf, size_t len) override; - size_t read_data(char *buf, size_t max); - public: - ClientIO(tcp::socket& socket, parser_type& parser, - beast::flat_buffer& buffer); + ClientIO(parser_type& parser, + const endpoint_type& local_endpoint, + const endpoint_type& remote_endpoint); ~ClientIO() override; int init_env(CephContext *cct) override; @@ -46,10 +45,6 @@ class ClientIO : public io::RestfulClient, size_t send_content_length(uint64_t len) override; size_t complete_header() override; - size_t recv_body(char* buf, size_t max) override { - return read_data(buf, max); - } - size_t send_body(const char* buf, size_t len) override { return write_data(buf, len); } diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index faf7806fccb64..73c6fe2b497ba 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -63,6 +63,50 @@ void Pauser::wait() using tcp = boost::asio::ip::tcp; namespace beast = boost::beast; +class StreamIO : public rgw::asio::ClientIO { + tcp::socket& stream; + beast::flat_buffer& buffer; + public: + StreamIO(tcp::socket& stream, rgw::asio::parser_type& parser, + beast::flat_buffer& buffer, + const tcp::endpoint& local_endpoint, + const tcp::endpoint& remote_endpoint) + : ClientIO(parser, local_endpoint, remote_endpoint), + stream(stream), buffer(buffer) + {} + + size_t write_data(const char* buf, size_t len) override { + boost::system::error_code ec; + auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec); + if (ec) { + derr << "write_data failed: " << ec.message() << dendl; + throw rgw::io::Exception(ec.value(), std::system_category()); + } + return bytes; + } + + size_t recv_body(char* buf, size_t max) override { + auto& message = parser.get(); + auto& body_remaining = message.body(); + body_remaining.data = buf; + body_remaining.size = max; + + while (body_remaining.size && !parser.is_done()) { + boost::system::error_code ec; + beast::http::read_some(stream, buffer, parser, ec); + if (ec == beast::http::error::partial_message || + ec == beast::http::error::need_buffer) { + break; + } + if (ec) { + derr << "failed to read body: " << ec.message() << dendl; + throw rgw::io::Exception(ec.value(), std::system_category()); + } + } + return max - body_remaining.size; + } +}; + void handle_connection(RGWProcessEnv& env, tcp::socket& socket, boost::asio::yield_context yield) { @@ -106,7 +150,9 @@ void handle_connection(RGWProcessEnv& env, tcp::socket& socket, // process the request RGWRequest req{env.store->get_new_req_id()}; - rgw::asio::ClientIO real_client{socket, parser, buffer}; + StreamIO real_client{socket, parser, buffer, + socket.local_endpoint(), + socket.remote_endpoint()}; auto real_client_io = rgw::io::add_reordering( rgw::io::add_buffering(cct, -- 2.39.5