From: Radoslaw Zarzynski Date: Mon, 24 Oct 2016 18:57:06 +0000 (+0200) Subject: rgw: implement buffered HTTP header composition in front-ends. X-Git-Tag: v11.1.0~454^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=573c564c0b7fc35c195449284b1812756471b779;p=ceph.git rgw: implement buffered HTTP header composition in front-ends. Without the patch front-ends issueed a lot of small IO operations leading to increased overhead on syscalls and to the fragmentation of an HTTP message across multiple TCP segments. The later was occuring when the Nagle's algorithm hadn't been able to form a single TCP segment (usually when running on extremely fast network interfaces like loopback). Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/rgw/rgw_asio_client.cc b/src/rgw/rgw_asio_client.cc index 48381c310bb..282281a4142 100644 --- a/src/rgw/rgw_asio_client.cc +++ b/src/rgw/rgw_asio_client.cc @@ -15,8 +15,9 @@ RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, request_type&& request) : socket(std::move(socket)), - request(std::move(request)) -{} + request(std::move(request)), + txbuf(*this) { +} RGWAsioClientIO::~RGWAsioClientIO() = default; @@ -109,7 +110,7 @@ size_t RGWAsioClientIO::complete_request() void RGWAsioClientIO::flush() { - return; + txbuf.pubsync(); } size_t RGWAsioClientIO::send_status(const int status, @@ -121,13 +122,16 @@ size_t RGWAsioClientIO::send_status(const int status, const auto statuslen = snprintf(statusbuf, sizeof(statusbuf), "HTTP/1.1 %d %s\r\n", status, status_name); - return write_data(statusbuf, statuslen); + return txbuf.sputn(statusbuf, statuslen); } size_t RGWAsioClientIO::send_100_continue() { const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n"; - return write_data(HTTTP_100_CONTINUE, sizeof(HTTTP_100_CONTINUE) - 1); + const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE, + sizeof(HTTTP_100_CONTINUE) - 1); + flush(); + return sent; } static constexpr size_t TIME_BUF_SIZE = 128; @@ -149,31 +153,38 @@ size_t RGWAsioClientIO::complete_header() char timestr[TIME_BUF_SIZE]; if (dump_date_header(timestr)) { - sent += write_data(timestr, strlen(timestr)); + sent += txbuf.sputn(timestr, strlen(timestr)); } if (conn_keepalive) { constexpr char CONN_KEEP_ALIVE[] = "Connection: Keep-Alive\r\n"; - sent += write_data(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1); + sent += txbuf.sputn(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1); } else if (conn_close) { constexpr char CONN_KEEP_CLOSE[] = "Connection: close\r\n"; - sent += write_data(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1); + sent += txbuf.sputn(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1); } constexpr char HEADER_END[] = "\r\n"; - return sent + write_data(HEADER_END, sizeof(HEADER_END) - 1); + sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1); + + flush(); + return sent; } size_t RGWAsioClientIO::send_header(const boost::string_ref& name, const boost::string_ref& value) { - char hdrbuf[name.size() + 2 + value.size() + 2 + 1]; - const auto hdrlen = snprintf(hdrbuf, sizeof(hdrbuf), "%.*s: %.*s\r\n", - static_cast(name.length()), - name.data(), - static_cast(value.length()), - value.data()); - return write_data(hdrbuf, hdrlen); + static constexpr char HEADER_SEP[] = ": "; + static constexpr char HEADER_END[] = "\r\n"; + + size_t sent = 0; + + sent += txbuf.sputn(name.data(), name.length()); + sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1); + sent += txbuf.sputn(value.data(), value.length()); + sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1); + + return sent; } size_t RGWAsioClientIO::send_content_length(const uint64_t len) @@ -184,5 +195,5 @@ size_t RGWAsioClientIO::send_content_length(const uint64_t len) const auto sizelen = snprintf(sizebuf, sizeof(sizebuf), "Content-Length: %" PRIu64 "\r\n", len); - return write_data(sizebuf, sizelen); + return txbuf.sputn(sizebuf, sizelen); } diff --git a/src/rgw/rgw_asio_client.h b/src/rgw/rgw_asio_client.h index c451839d3ab..cf65fb90f70 100644 --- a/src/rgw/rgw_asio_client.h +++ b/src/rgw/rgw_asio_client.h @@ -24,7 +24,8 @@ class RGWBufferlistBody { Headers>; }; -class RGWAsioClientIO : public rgw::io::RestfulClient { +class RGWAsioClientIO : public rgw::io::RestfulClient, + public rgw::io::BuffererSink { using tcp = boost::asio::ip::tcp; tcp::socket socket; @@ -38,7 +39,9 @@ class RGWAsioClientIO : public rgw::io::RestfulClient { bool conn_close{false}; RGWEnv env; - size_t write_data(const char *buf, size_t len); + rgw::io::StaticOutputBufferer<> txbuf; + + size_t write_data(const char *buf, size_t len) override; size_t read_data(char *buf, size_t max); public: diff --git a/src/rgw/rgw_civetweb.cc b/src/rgw/rgw_civetweb.cc index eab43ab4f80..c92eb843a83 100644 --- a/src/rgw/rgw_civetweb.cc +++ b/src/rgw/rgw_civetweb.cc @@ -33,7 +33,8 @@ RGWCivetWeb::RGWCivetWeb(mg_connection* const conn, const int port) : conn(conn), port(port), explicit_keepalive(false), - explicit_conn_close(false) + explicit_conn_close(false), + txbuf(*this) { } @@ -48,6 +49,7 @@ size_t RGWCivetWeb::read_data(char *buf, size_t len) void RGWCivetWeb::flush() { + txbuf.pubsync(); } size_t RGWCivetWeb::complete_request() @@ -120,38 +122,42 @@ void RGWCivetWeb::init_env(CephContext *cct) } } -template -static inline size_t safe_mg_printf(Args&&... args) -{ - const int ret = mg_printf(std::forward(args)...); - if (ret == 0) { - /* didn't send anything, error out */ - throw rgw::io::Exception(EIO, std::system_category()); - } else if (ret < 0) { - throw rgw::io::Exception(-ret, std::system_category()); - } - return static_cast(ret); -} - size_t RGWCivetWeb::send_status(int status, const char *status_name) { mg_set_http_status(conn, status); - return safe_mg_printf(conn, "HTTP/1.1 %d %s\r\n", status, - status_name ? status_name : ""); + static constexpr size_t STATUS_BUF_SIZE = 128; + + char statusbuf[STATUS_BUF_SIZE]; + const auto statuslen = snprintf(statusbuf, sizeof(statusbuf), + "HTTP/1.1 %d %s\r\n", status, status_name); + + return txbuf.sputn(statusbuf, statuslen); } size_t RGWCivetWeb::send_100_continue() { const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n"; - return write_data(HTTTP_100_CONTINUE, sizeof(HTTTP_100_CONTINUE) - 1); + const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE, + sizeof(HTTTP_100_CONTINUE) - 1); + flush(); + return sent; } size_t RGWCivetWeb::send_header(const boost::string_ref& name, const boost::string_ref& value) { - return safe_mg_printf(conn, "%.*s: %.*s\r\n", name.length(), name.data(), - value.length(), value.data()); + static constexpr char HEADER_SEP[] = ": "; + static constexpr char HEADER_END[] = "\r\n"; + + size_t sent = 0; + + sent += txbuf.sputn(name.data(), name.length()); + sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1); + sent += txbuf.sputn(value.data(), value.length()); + sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1); + + return sent; } size_t RGWCivetWeb::dump_date_header() @@ -171,7 +177,7 @@ size_t RGWCivetWeb::dump_date_header() return 0; } - return write_data(timestr, strlen(timestr)); + return txbuf.sputn(timestr, strlen(timestr)); } size_t RGWCivetWeb::complete_header() @@ -180,17 +186,25 @@ size_t RGWCivetWeb::complete_header() if (explicit_keepalive) { constexpr char CONN_KEEP_ALIVE[] = "Connection: Keep-Alive\r\n"; - sent += write_data(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1); + sent += txbuf.sputn(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1); } else if (explicit_conn_close) { constexpr char CONN_KEEP_CLOSE[] = "Connection: close\r\n"; - sent += write_data(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1); + sent += txbuf.sputn(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1); } - constexpr char HEADER_END[] = "\r\n"; - return sent + write_data(HEADER_END, sizeof(HEADER_END) - 1); + static constexpr char HEADER_END[] = "\r\n"; + sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1); + + flush(); + return sent; } size_t RGWCivetWeb::send_content_length(uint64_t len) { - return safe_mg_printf(conn, "Content-Length: %" PRIu64 "\r\n", len); + static constexpr size_t CONLEN_BUF_SIZE = 128; + + char sizebuf[CONLEN_BUF_SIZE]; + const auto sizelen = snprintf(sizebuf, sizeof(sizebuf), + "Content-Length: %" PRIu64 "\r\n", len); + return txbuf.sputn(sizebuf, sizelen); } diff --git a/src/rgw/rgw_civetweb.h b/src/rgw/rgw_civetweb.h index f3ec6bb7e14..a6c5ea2567c 100644 --- a/src/rgw/rgw_civetweb.h +++ b/src/rgw/rgw_civetweb.h @@ -10,8 +10,8 @@ struct mg_connection; -class RGWCivetWeb : public rgw::io::RestfulClient -{ +class RGWCivetWeb : public rgw::io::RestfulClient, + public rgw::io::BuffererSink { RGWEnv env; mg_connection *conn; @@ -20,7 +20,9 @@ class RGWCivetWeb : public rgw::io::RestfulClient bool explicit_keepalive; bool explicit_conn_close; - size_t write_data(const char *buf, size_t len); + rgw::io::StaticOutputBufferer<> txbuf; + + size_t write_data(const char *buf, size_t len) override; size_t read_data(char *buf, size_t len); size_t dump_date_header(); diff --git a/src/rgw/rgw_civetweb_frontend.cc b/src/rgw/rgw_civetweb_frontend.cc index 27f1518a591..de9ea652691 100644 --- a/src/rgw/rgw_civetweb_frontend.cc +++ b/src/rgw/rgw_civetweb_frontend.cc @@ -22,14 +22,15 @@ int RGWCivetWebFrontend::process(struct mg_connection* const conn) /* Hold a read lock over access to env.store for reconfiguration. */ RWLock::RLocker lock(env.mutex); - RGWRequest req(env.store->get_new_req_id()); + RGWCivetWeb cw_client(conn, env.port); auto real_client_io = rgw::io::add_reordering( rgw::io::add_buffering( rgw::io::add_chunking( rgw::io::add_conlen_controlling( - RGWCivetWeb(conn, env.port))))); + &cw_client)))); RGWRestfulIO client_io(&real_client_io); + RGWRequest req(env.store->get_new_req_id()); int ret = process_request(env.store, env.rest, &req, env.uri_prefix, &client_io, env.olog); if (ret < 0) { diff --git a/src/rgw/rgw_client_io.h b/src/rgw/rgw_client_io.h index 1d549f5f0b5..b468c7f9c35 100644 --- a/src/rgw/rgw_client_io.h +++ b/src/rgw/rgw_client_io.h @@ -239,6 +239,67 @@ public: } } /* rgw::io::DecoratedRestfulClient */; + +/* Interface that should be provided by a front-end class wanting to to use + * the low-level buffering offered by i.e. StaticOutputBufferer. */ +class BuffererSink { +public: + virtual ~BuffererSink() = default; + + /* Send exactly @len bytes from the memory location pointed by @buf. + * On success returns @len. On failure throws rgw::io::Exception. */ + virtual size_t write_data(const char *buf, size_t len) = 0; +}; + +/* Utility class providing RestfulClient's implementations with facilities + * for low-level buffering without relying on dynamic memory allocations. + * The buffer is carried entirely on stack. This narrows down applicability + * to these situations where buffers are relatively small. This perfectly + * fits the needs of composing an HTTP header. Without that a front-end + * might need to issue a lot of small IO operations leading to increased + * overhead on syscalls and fragmentation of a message if the Nagle's + * algorithm won't be able to form a single TCP segment (usually when + * running on extremely fast network interfaces like the loopback). */ +template +class StaticOutputBufferer : public std::streambuf { + static_assert(BufferSizeV >= sizeof(std::streambuf::char_type), + "Buffer size must be bigger than a single char_type."); + + using std::streambuf::int_type; + + int_type overflow(const int_type c) override { + *pptr() = c; + pbump(sizeof(std::streambuf::char_type)); + + if (! sync()) { + /* No error, the buffer has been successfully synchronized. */ + return c; + } else { + return std::streambuf::traits_type::eof(); + } + } + + int sync() override { + const auto len = static_cast(std::streambuf::pptr() - + std::streambuf::pbase()); + std::streambuf::pbump(-len); + sink.write_data(std::streambuf::pbase(), len); + /* Always return success here. In case of failure write_data() will throw + * rgw::io::Exception. */ + return 0; + } + + BuffererSink& sink; + std::streambuf::char_type buffer[BufferSizeV]; + +public: + StaticOutputBufferer(BuffererSink& sink) + : sink(sink) { + constexpr size_t len = sizeof(buffer) - sizeof(std::streambuf::char_type); + std::streambuf::setp(buffer, buffer + len); + } +}; + } /* namespace rgw */ } /* namespace io */ diff --git a/src/rgw/rgw_fcgi.cc b/src/rgw/rgw_fcgi.cc index 90074b5c2a0..68eee476033 100644 --- a/src/rgw/rgw_fcgi.cc +++ b/src/rgw/rgw_fcgi.cc @@ -26,6 +26,7 @@ size_t RGWFCGX::read_data(char* const buf, const size_t len) void RGWFCGX::flush() { + txbuf.pubsync(); FCGX_FFlush(fcgx->out); } @@ -42,7 +43,7 @@ size_t RGWFCGX::send_status(const int status, const char* const status_name) const auto statuslen = snprintf(statusbuf, sizeof(statusbuf), "Status: %d %s\r\n", status, status_name); - return write_data(statusbuf, statuslen); + return txbuf.sputn(statusbuf, statuslen); } size_t RGWFCGX::send_100_continue() @@ -55,15 +56,17 @@ size_t RGWFCGX::send_100_continue() size_t RGWFCGX::send_header(const boost::string_ref& name, const boost::string_ref& value) { - char hdrbuf[name.size() + 2 + value.size() + 2 + 1]; - const auto hdrlen = snprintf(hdrbuf, sizeof(hdrbuf), - "%.*s: %.*s\r\n", - static_cast(name.length()), - name.data(), - static_cast(value.length()), - value.data()); - - return write_data(hdrbuf, hdrlen); + static constexpr char HEADER_SEP[] = ": "; + static constexpr char HEADER_END[] = "\r\n"; + + size_t sent = 0; + + sent += txbuf.sputn(name.data(), name.length()); + sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1); + sent += txbuf.sputn(value.data(), value.length()); + sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1); + + return sent; } size_t RGWFCGX::send_content_length(const uint64_t len) @@ -74,11 +77,14 @@ size_t RGWFCGX::send_content_length(const uint64_t len) const auto sizelen = snprintf(sizebuf, sizeof(sizebuf), "Content-Length: %" PRIu64 "\r\n", len); - return write_data(sizebuf, sizelen); + return txbuf.sputn(sizebuf, sizelen); } size_t RGWFCGX::complete_header() { static constexpr char HEADER_END[] = "\r\n"; - return write_data(HEADER_END, sizeof(HEADER_END) - 1); + const size_t sent = txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1); + + flush(); + return sent; } diff --git a/src/rgw/rgw_fcgi.h b/src/rgw/rgw_fcgi.h index 8562c296f69..52d7cacabcf 100644 --- a/src/rgw/rgw_fcgi.h +++ b/src/rgw/rgw_fcgi.h @@ -11,17 +11,20 @@ struct FCGX_Request; -class RGWFCGX : public rgw::io::RestfulClient -{ +class RGWFCGX : public rgw::io::RestfulClient, + public rgw::io::BuffererSink { FCGX_Request *fcgx; RGWEnv env; + rgw::io::StaticOutputBufferer<> txbuf; + size_t read_data(char* buf, size_t len); - size_t write_data(const char* buf, size_t len); + size_t write_data(const char* buf, size_t len) override; public: explicit RGWFCGX(FCGX_Request* const fcgx) - : fcgx(fcgx) { + : fcgx(fcgx), + txbuf(*this) { } void init_env(CephContext* cct) override; @@ -40,7 +43,7 @@ public: return write_data(buf, len); } - void flush(); + void flush() override; RGWEnv& get_env() noexcept override { return env; diff --git a/src/rgw/rgw_fcgi_process.cc b/src/rgw/rgw_fcgi_process.cc index b585db2272c..fb918b646c2 100644 --- a/src/rgw/rgw_fcgi_process.cc +++ b/src/rgw/rgw_fcgi_process.cc @@ -114,12 +114,13 @@ void RGWFCGXProcess::run() void RGWFCGXProcess::handle_request(RGWRequest* r) { - RGWFCGXRequest* req = static_cast(r); - FCGX_Request* fcgx = req->fcgx; + RGWFCGXRequest* const req = static_cast(r); + + RGWFCGX fcgxfe(req->fcgx); auto real_client_io = rgw::io::add_reordering( rgw::io::add_buffering( rgw::io::add_chunking( - RGWFCGX(fcgx)))); + &fcgxfe))); RGWRestfulIO client_io(&real_client_io); @@ -129,7 +130,7 @@ void RGWFCGXProcess::handle_request(RGWRequest* r) dout(20) << "process_request() returned " << ret << dendl; } - FCGX_Finish_r(fcgx); + FCGX_Finish_r(req->fcgx); delete req; } /* RGWFCGXProcess::handle_request */