]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement buffered HTTP header composition in front-ends. 10767/head
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Mon, 24 Oct 2016 18:57:06 +0000 (20:57 +0200)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Tue, 25 Oct 2016 16:28:26 +0000 (18:28 +0200)
Without the patch front-ends issueed a lot of small IO operations
leading to increased overhead on syscalls and to the fragmentation
of an HTTP message across multiple TCP segments. The later was
occuring when the Nagle's algorithm hadn't been able to form
a single TCP segment (usually when running on extremely fast
network interfaces like loopback).

Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_asio_client.cc
src/rgw/rgw_asio_client.h
src/rgw/rgw_civetweb.cc
src/rgw/rgw_civetweb.h
src/rgw/rgw_civetweb_frontend.cc
src/rgw/rgw_client_io.h
src/rgw/rgw_fcgi.cc
src/rgw/rgw_fcgi.h
src/rgw/rgw_fcgi_process.cc

index 48381c310bbefaecb6c99d92fb331c8723cc44d1..282281a4142eda12f4d542fa1e53e31f7cdf8d50 100644 (file)
@@ -15,8 +15,9 @@
 RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket,
                                  request_type&& request)
   : socket(std::move(socket)),
-    request(std::move(request))
-{}
+    request(std::move(request)),
+    txbuf(*this) {
+}
 
 RGWAsioClientIO::~RGWAsioClientIO() = default;
 
@@ -109,7 +110,7 @@ size_t RGWAsioClientIO::complete_request()
 
 void RGWAsioClientIO::flush()
 {
-  return;
+  txbuf.pubsync();
 }
 
 size_t RGWAsioClientIO::send_status(const int status,
@@ -121,13 +122,16 @@ size_t RGWAsioClientIO::send_status(const int status,
   const auto statuslen = snprintf(statusbuf, sizeof(statusbuf),
                                   "HTTP/1.1 %d %s\r\n", status, status_name);
 
-  return write_data(statusbuf, statuslen);
+  return txbuf.sputn(statusbuf, statuslen);
 }
 
 size_t RGWAsioClientIO::send_100_continue()
 {
   const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n";
-  return write_data(HTTTP_100_CONTINUE, sizeof(HTTTP_100_CONTINUE) - 1);
+  const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE,
+                                  sizeof(HTTTP_100_CONTINUE) - 1);
+  flush();
+  return sent;
 }
 
 static constexpr size_t TIME_BUF_SIZE = 128;
@@ -149,31 +153,38 @@ size_t RGWAsioClientIO::complete_header()
 
   char timestr[TIME_BUF_SIZE];
   if (dump_date_header(timestr)) {
-    sent += write_data(timestr, strlen(timestr));
+    sent += txbuf.sputn(timestr, strlen(timestr));
   }
 
   if (conn_keepalive) {
     constexpr char CONN_KEEP_ALIVE[] = "Connection: Keep-Alive\r\n";
-    sent += write_data(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
+    sent += txbuf.sputn(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
   } else if (conn_close) {
     constexpr char CONN_KEEP_CLOSE[] = "Connection: close\r\n";
-    sent += write_data(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
+    sent += txbuf.sputn(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
   }
 
   constexpr char HEADER_END[] = "\r\n";
-  return sent + write_data(HEADER_END, sizeof(HEADER_END) - 1);
+  sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+  flush();
+  return sent;
 }
 
 size_t RGWAsioClientIO::send_header(const boost::string_ref& name,
                                     const boost::string_ref& value)
 {
-  char hdrbuf[name.size() + 2 + value.size() + 2 + 1];
-  const auto hdrlen = snprintf(hdrbuf, sizeof(hdrbuf), "%.*s: %.*s\r\n",
-                               static_cast<int>(name.length()),
-                               name.data(),
-                               static_cast<int>(value.length()),
-                               value.data());
-  return write_data(hdrbuf, hdrlen);
+  static constexpr char HEADER_SEP[] = ": ";
+  static constexpr char HEADER_END[] = "\r\n";
+
+  size_t sent = 0;
+
+  sent += txbuf.sputn(name.data(), name.length());
+  sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1);
+  sent += txbuf.sputn(value.data(), value.length());
+  sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+  return sent;
 }
 
 size_t RGWAsioClientIO::send_content_length(const uint64_t len)
@@ -184,5 +195,5 @@ size_t RGWAsioClientIO::send_content_length(const uint64_t len)
   const auto sizelen = snprintf(sizebuf, sizeof(sizebuf),
                                 "Content-Length: %" PRIu64 "\r\n", len);
 
-  return write_data(sizebuf, sizelen);
+  return txbuf.sputn(sizebuf, sizelen);
 }
index c451839d3ab666ccf8bd338dfabf943ecd4d2688..cf65fb90f70136bea0d7274343acbd4c9d20f838 100644 (file)
@@ -24,7 +24,8 @@ class RGWBufferlistBody {
                                             Headers>;
 };
 
-class RGWAsioClientIO : public rgw::io::RestfulClient {
+class RGWAsioClientIO : public rgw::io::RestfulClient,
+                        public rgw::io::BuffererSink {
   using tcp = boost::asio::ip::tcp;
   tcp::socket socket;
 
@@ -38,7 +39,9 @@ class RGWAsioClientIO : public rgw::io::RestfulClient {
   bool conn_close{false};
   RGWEnv env;
 
-  size_t write_data(const char *buf, size_t len);
+  rgw::io::StaticOutputBufferer<> txbuf;
+
+  size_t write_data(const char *buf, size_t len) override;
   size_t read_data(char *buf, size_t max);
 
  public:
index eab43ab4f80c371ced1391fa18c8a78034b1dd43..c92eb843a830953194ff83fee8a8ce583a9d804c 100644 (file)
@@ -33,7 +33,8 @@ RGWCivetWeb::RGWCivetWeb(mg_connection* const conn, const int port)
   : conn(conn),
     port(port),
     explicit_keepalive(false),
-    explicit_conn_close(false)
+    explicit_conn_close(false),
+    txbuf(*this)
 {
 }
 
@@ -48,6 +49,7 @@ size_t RGWCivetWeb::read_data(char *buf, size_t len)
 
 void RGWCivetWeb::flush()
 {
+  txbuf.pubsync();
 }
 
 size_t RGWCivetWeb::complete_request()
@@ -120,38 +122,42 @@ void RGWCivetWeb::init_env(CephContext *cct)
   }
 }
 
-template <class... Args>
-static inline size_t safe_mg_printf(Args&&... args)
-{
-  const int ret = mg_printf(std::forward<Args>(args)...);
-  if (ret == 0) {
-    /* didn't send anything, error out */
-    throw rgw::io::Exception(EIO, std::system_category());
-  } else if (ret < 0) {
-    throw rgw::io::Exception(-ret, std::system_category());
-  }
-  return static_cast<size_t>(ret);
-}
-
 size_t RGWCivetWeb::send_status(int status, const char *status_name)
 {
   mg_set_http_status(conn, status);
 
-  return safe_mg_printf(conn, "HTTP/1.1 %d %s\r\n", status,
-                        status_name ? status_name : "");
+  static constexpr size_t STATUS_BUF_SIZE = 128;
+
+  char statusbuf[STATUS_BUF_SIZE];
+  const auto statuslen = snprintf(statusbuf, sizeof(statusbuf),
+                                  "HTTP/1.1 %d %s\r\n", status, status_name);
+
+  return txbuf.sputn(statusbuf, statuslen);
 }
 
 size_t RGWCivetWeb::send_100_continue()
 {
   const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n";
-  return write_data(HTTTP_100_CONTINUE, sizeof(HTTTP_100_CONTINUE) - 1);
+  const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE,
+                                  sizeof(HTTTP_100_CONTINUE) - 1);
+  flush();
+  return sent;
 }
 
 size_t RGWCivetWeb::send_header(const boost::string_ref& name,
                                 const boost::string_ref& value)
 {
-  return safe_mg_printf(conn, "%.*s: %.*s\r\n", name.length(), name.data(),
-                        value.length(), value.data());
+  static constexpr char HEADER_SEP[] = ": ";
+  static constexpr char HEADER_END[] = "\r\n";
+
+  size_t sent = 0;
+
+  sent += txbuf.sputn(name.data(), name.length());
+  sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1);
+  sent += txbuf.sputn(value.data(), value.length());
+  sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+  return sent;
 }
 
 size_t RGWCivetWeb::dump_date_header()
@@ -171,7 +177,7 @@ size_t RGWCivetWeb::dump_date_header()
     return 0;
   }
 
-  return write_data(timestr, strlen(timestr));
+  return txbuf.sputn(timestr, strlen(timestr));
 }
 
 size_t RGWCivetWeb::complete_header()
@@ -180,17 +186,25 @@ size_t RGWCivetWeb::complete_header()
 
   if (explicit_keepalive) {
     constexpr char CONN_KEEP_ALIVE[] = "Connection: Keep-Alive\r\n";
-    sent += write_data(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
+    sent += txbuf.sputn(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
   } else if (explicit_conn_close) {
     constexpr char CONN_KEEP_CLOSE[] = "Connection: close\r\n";
-    sent += write_data(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
+    sent += txbuf.sputn(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
   }
 
-  constexpr char HEADER_END[] = "\r\n";
-  return sent + write_data(HEADER_END, sizeof(HEADER_END) - 1);
+  static constexpr char HEADER_END[] = "\r\n";
+  sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+  flush();
+  return sent;
 }
 
 size_t RGWCivetWeb::send_content_length(uint64_t len)
 {
-  return safe_mg_printf(conn, "Content-Length: %" PRIu64 "\r\n", len);
+  static constexpr size_t CONLEN_BUF_SIZE = 128;
+
+  char sizebuf[CONLEN_BUF_SIZE];
+  const auto sizelen = snprintf(sizebuf, sizeof(sizebuf),
+                                "Content-Length: %" PRIu64 "\r\n", len);
+  return txbuf.sputn(sizebuf, sizelen);
 }
index f3ec6bb7e1480b19ffce5681352acce5c07bd9fc..a6c5ea2567c4da1e6a2d0513161e0f0c9d432959 100644 (file)
@@ -10,8 +10,8 @@
 
 struct mg_connection;
 
-class RGWCivetWeb : public rgw::io::RestfulClient
-{
+class RGWCivetWeb : public rgw::io::RestfulClient,
+                    public rgw::io::BuffererSink {
   RGWEnv env;
   mg_connection *conn;
 
@@ -20,7 +20,9 @@ class RGWCivetWeb : public rgw::io::RestfulClient
   bool explicit_keepalive;
   bool explicit_conn_close;
 
-  size_t write_data(const char *buf, size_t len);
+  rgw::io::StaticOutputBufferer<> txbuf;
+
+  size_t write_data(const char *buf, size_t len) override;
   size_t read_data(char *buf, size_t len);
   size_t dump_date_header();
 
index 27f1518a591ea31462a0663fe415cde49b018176..de9ea6526911360824608c8d169c6027b5098dce 100644 (file)
@@ -22,14 +22,15 @@ int RGWCivetWebFrontend::process(struct mg_connection*  const conn)
   /* Hold a read lock over access to env.store for reconfiguration. */
   RWLock::RLocker lock(env.mutex);
 
-  RGWRequest req(env.store->get_new_req_id());
+  RGWCivetWeb cw_client(conn, env.port);
   auto real_client_io = rgw::io::add_reordering(
                           rgw::io::add_buffering(
                             rgw::io::add_chunking(
                               rgw::io::add_conlen_controlling(
-                                RGWCivetWeb(conn, env.port)))));
+                                &cw_client))));
   RGWRestfulIO client_io(&real_client_io);
 
+  RGWRequest req(env.store->get_new_req_id());
   int ret = process_request(env.store, env.rest, &req, env.uri_prefix,
                             &client_io, env.olog);
   if (ret < 0) {
index 1d549f5f0b5b7506beb4b97694995aad24f87e28..b468c7f9c35470f11a41f4363c0828f5a7afcee0 100644 (file)
@@ -239,6 +239,67 @@ public:
   }
 } /* rgw::io::DecoratedRestfulClient */;
 
+
+/* Interface that should be provided by a front-end class wanting to to use
+ * the low-level buffering offered by i.e. StaticOutputBufferer. */
+class BuffererSink {
+public:
+  virtual ~BuffererSink() = default;
+
+  /* Send exactly @len bytes from the memory location pointed by @buf.
+   * On success returns @len. On failure throws rgw::io::Exception. */
+  virtual size_t write_data(const char *buf, size_t len) = 0;
+};
+
+/* Utility class providing RestfulClient's implementations with facilities
+ * for low-level buffering without relying on dynamic memory allocations.
+ * The buffer is carried entirely on stack. This narrows down applicability
+ * to these situations where buffers are relatively small. This perfectly
+ * fits the needs of composing an HTTP header. Without that a front-end
+ * might need to issue a lot of small IO operations leading to increased
+ * overhead on syscalls and fragmentation of a message if the Nagle's
+ * algorithm won't be able to form a single TCP segment (usually when
+ * running on extremely fast network interfaces like the loopback). */
+template <size_t BufferSizeV = 4096>
+class StaticOutputBufferer : public std::streambuf {
+  static_assert(BufferSizeV >= sizeof(std::streambuf::char_type),
+                "Buffer size must be bigger than a single char_type.");
+
+  using std::streambuf::int_type;
+
+  int_type overflow(const int_type c) override {
+    *pptr() = c;
+    pbump(sizeof(std::streambuf::char_type));
+
+    if (! sync()) {
+      /* No error, the buffer has been successfully synchronized. */
+      return c;
+     } else {
+      return std::streambuf::traits_type::eof();
+    }
+  }
+
+  int sync() override {
+    const auto len = static_cast<size_t>(std::streambuf::pptr() -
+                                         std::streambuf::pbase());
+    std::streambuf::pbump(-len);
+    sink.write_data(std::streambuf::pbase(), len);
+    /* Always return success here. In case of failure write_data() will throw
+     * rgw::io::Exception. */
+    return 0;
+  }
+
+  BuffererSink& sink;
+  std::streambuf::char_type buffer[BufferSizeV];
+
+public:
+  StaticOutputBufferer(BuffererSink& sink)
+    : sink(sink) {
+    constexpr size_t len = sizeof(buffer) - sizeof(std::streambuf::char_type);
+    std::streambuf::setp(buffer, buffer + len);
+  }
+};
+
 } /* namespace rgw */
 } /* namespace io */
 
index 90074b5c2a06a99ef6d70c43e745bd5a75f70f6b..68eee4760333a76e479db19657648488c7b9437e 100644 (file)
@@ -26,6 +26,7 @@ size_t RGWFCGX::read_data(char* const buf, const size_t len)
 
 void RGWFCGX::flush()
 {
+  txbuf.pubsync();
   FCGX_FFlush(fcgx->out);
 }
 
@@ -42,7 +43,7 @@ size_t RGWFCGX::send_status(const int status, const char* const status_name)
   const auto statuslen = snprintf(statusbuf, sizeof(statusbuf),
                                   "Status: %d %s\r\n", status, status_name);
 
-  return write_data(statusbuf, statuslen);
+  return txbuf.sputn(statusbuf, statuslen);
 }
 
 size_t RGWFCGX::send_100_continue()
@@ -55,15 +56,17 @@ size_t RGWFCGX::send_100_continue()
 size_t RGWFCGX::send_header(const boost::string_ref& name,
                             const boost::string_ref& value)
 {
-  char hdrbuf[name.size() + 2 + value.size() + 2 + 1];
-  const auto hdrlen = snprintf(hdrbuf, sizeof(hdrbuf),
-                               "%.*s: %.*s\r\n",
-                               static_cast<int>(name.length()),
-                               name.data(),
-                               static_cast<int>(value.length()),
-                               value.data());
-
-  return write_data(hdrbuf, hdrlen);
+  static constexpr char HEADER_SEP[] = ": ";
+  static constexpr char HEADER_END[] = "\r\n";
+
+  size_t sent = 0;
+
+  sent += txbuf.sputn(name.data(), name.length());
+  sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1);
+  sent += txbuf.sputn(value.data(), value.length());
+  sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+  return sent;
 }
 
 size_t RGWFCGX::send_content_length(const uint64_t len)
@@ -74,11 +77,14 @@ size_t RGWFCGX::send_content_length(const uint64_t len)
   const auto sizelen = snprintf(sizebuf, sizeof(sizebuf),
                                 "Content-Length: %" PRIu64 "\r\n", len);
 
-  return write_data(sizebuf, sizelen);
+  return txbuf.sputn(sizebuf, sizelen);
 }
 
 size_t RGWFCGX::complete_header()
 {
   static constexpr char HEADER_END[] = "\r\n";
-  return write_data(HEADER_END, sizeof(HEADER_END) - 1);
+  const size_t sent = txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);
+
+  flush();
+  return sent;
 }
index 8562c296f69f8b652087b1f17ee06ccce666cfb4..52d7cacabcfa1f043375f7f3f381347b7b79c35f 100644 (file)
 
 struct FCGX_Request;
 
-class RGWFCGX : public rgw::io::RestfulClient
-{
+class RGWFCGX : public rgw::io::RestfulClient,
+                public rgw::io::BuffererSink {
   FCGX_Request *fcgx;
   RGWEnv env;
 
+  rgw::io::StaticOutputBufferer<> txbuf;
+
   size_t read_data(char* buf, size_t len);
-  size_t write_data(const char* buf, size_t len);
+  size_t write_data(const char* buf, size_t len) override;
 
 public:
   explicit RGWFCGX(FCGX_Request* const fcgx)
-    : fcgx(fcgx) {
+    : fcgx(fcgx),
+      txbuf(*this) {
   }
 
   void init_env(CephContext* cct) override;
@@ -40,7 +43,7 @@ public:
     return write_data(buf, len);
   }
 
-  void flush();
+  void flush() override;
 
   RGWEnv& get_env() noexcept override {
     return env;
index b585db2272c6b0a59b47f1235d651dc55e5fad61..fb918b646c21d87fa86b9320427d4ae275f08776 100644 (file)
@@ -114,12 +114,13 @@ void RGWFCGXProcess::run()
 
 void RGWFCGXProcess::handle_request(RGWRequest* r)
 {
-  RGWFCGXRequest* req = static_cast<RGWFCGXRequest*>(r);
-  FCGX_Request* fcgx = req->fcgx;
+  RGWFCGXRequest* const req = static_cast<RGWFCGXRequest*>(r);
+
+  RGWFCGX fcgxfe(req->fcgx);
   auto real_client_io = rgw::io::add_reordering(
                           rgw::io::add_buffering(
                             rgw::io::add_chunking(
-                              RGWFCGX(fcgx))));
+                              &fcgxfe)));
   RGWRestfulIO client_io(&real_client_io);
 
  
@@ -129,7 +130,7 @@ void RGWFCGXProcess::handle_request(RGWRequest* r)
     dout(20) << "process_request() returned " << ret << dendl;
   }
 
-  FCGX_Finish_r(fcgx);
+  FCGX_Finish_r(req->fcgx);
 
   delete req;
 } /* RGWFCGXProcess::handle_request */