]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: update Beast for streaming reads in asio frontend
authorCasey Bodley <cbodley@redhat.com>
Sat, 18 Feb 2017 23:00:05 +0000 (18:00 -0500)
committerCasey Bodley <cbodley@redhat.com>
Fri, 5 May 2017 14:12:35 +0000 (10:12 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
CMakeLists.txt
src/rgw/CMakeLists.txt
src/rgw/rgw_asio_client.cc
src/rgw/rgw_asio_client.h
src/rgw/rgw_asio_frontend.cc

index 4338e39fa27ee14fd4cdf182cc108eabfe919bdb..587ad337a4ca54ce2a4a6daeaf829463dd58be90 100644 (file)
@@ -508,7 +508,7 @@ endif()
 option(WITH_SYSTEM_BOOST "require and build with system Boost" OFF)
 
 set(BOOST_COMPONENTS
-       container thread system regex random program_options date_time iostreams)
+       container thread system regex random program_options date_time iostreams coroutine context)
 if(WITH_MGR)
        list(APPEND BOOST_COMPONENTS python)
 endif()
index fabf4b50f884c89d8705e420e95516920aa53a04..b95a4f2b043d2eb9b8715341fd5b3d98ff513789 100644 (file)
@@ -116,11 +116,12 @@ add_dependencies(rgw_a civetweb_h)
 target_include_directories(rgw_a PUBLIC
   "../Beast/include"
   ${FCGI_INCLUDE_DIR})
+target_compile_definitions(rgw_a PUBLIC BOOST_COROUTINES_NO_DEPRECATION_WARNING)
 
 target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount_client
   cls_log_client cls_statelog_client cls_timeindex_client cls_version_client
   cls_replica_log_client cls_user_client ceph-common common_utf8 global
-  ${CURL_LIBRARIES}
+  ${CURL_LIBRARIES} ${Boost_LIBRARIES}
   ${EXPAT_LIBRARIES}
   ${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS})
 
index 219a36b68374ba0ff56988dbf295e78a0b57c11a..63de2d27e757802aed3856f2badedd6d27107868 100644 (file)
@@ -3,6 +3,7 @@
 
 #include <boost/algorithm/string/predicate.hpp>
 #include <boost/asio/write.hpp>
+#include <beast/http/read.hpp>
 
 #include "rgw_asio_client.h"
 
 #undef dout_prefix
 #define dout_prefix (*_dout << "asio: ")
 
+using namespace rgw::asio;
 
-RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket,
-                                 request_type&& request)
-  : socket(std::move(socket)),
-    request(std::move(request)),
-    txbuf(*this) {
+ClientIO::ClientIO(tcp::socket& socket,
+                   parser_type& parser,
+                   beast::flat_streambuf& buffer)
+  : socket(socket), parser(parser), buffer(buffer), txbuf(*this)
+{
 }
 
-RGWAsioClientIO::~RGWAsioClientIO() = default;
+ClientIO::~ClientIO() = default;
 
-void RGWAsioClientIO::init_env(CephContext *cct)
+void ClientIO::init_env(CephContext *cct)
 {
   env.init(cct);
-  body_iter = request.body.begin();
 
-  const auto& headers = request.headers;
+  const auto& request = parser.get();
+  const auto& headers = request.fields;
   for (auto header = headers.begin(); header != headers.end(); ++header) {
     const auto& name = header->name();
     const auto& value = header->value();
@@ -80,42 +82,58 @@ void RGWAsioClientIO::init_env(CephContext *cct)
   // TODO: set REMOTE_USER if authenticated
 }
 
-size_t RGWAsioClientIO::write_data(const char* const buf,
-                                   const size_t len)
+size_t ClientIO::write_data(const char* buf, size_t len)
 {
   boost::system::error_code ec;
   auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec);
   if (ec) {
     derr << "write_data failed: " << ec.message() << dendl;
     throw rgw::io::Exception(ec.value(), std::system_category());
-  } else {
-    /* According to the documentation of boost::asio::write if there is
-     * no error (signalised by ec), then bytes == len. We don't need to
-     * take care of partial writes in such situation. */
-    return bytes;
   }
+  /* According to the documentation of boost::asio::write if there is
+   * no error (signalised by ec), then bytes == len. We don't need to
+   * take care of partial writes in such situation. */
+  return bytes;
 }
 
-size_t RGWAsioClientIO::read_data(char* const buf, const size_t max)
+size_t ClientIO::read_data(char* buf, size_t max)
 {
-  // read data from the body's bufferlist
-  auto bytes = std::min<unsigned>(max, body_iter.get_remaining());
-  body_iter.copy(bytes, buf);
-  return bytes;
+  auto& message = parser.get();
+  auto& body_remaining = message.body;
+  body_remaining = boost::asio::mutable_buffer{buf, max};
+
+  boost::system::error_code ec;
+
+  dout(30) << this << " read_data for " << max << " with "
+      << buffer.size() << " bytes buffered" << dendl;
+
+  while (boost::asio::buffer_size(body_remaining) && !parser.is_complete()) {
+    auto bytes = beast::http::read_some(socket, buffer, parser, ec);
+    buffer.consume(bytes);
+    if (ec == boost::asio::error::connection_reset ||
+        ec == boost::asio::error::eof ||
+        ec == beast::http::error::partial_message) {
+      break;
+    }
+    if (ec) {
+      derr << "failed to read body: " << ec.message() << dendl;
+      throw rgw::io::Exception(ec.value(), std::system_category());
+    }
+  }
+  return max - boost::asio::buffer_size(body_remaining);
 }
 
-size_t RGWAsioClientIO::complete_request()
+size_t ClientIO::complete_request()
 {
   return 0;
 }
 
-void RGWAsioClientIO::flush()
+void ClientIO::flush()
 {
   txbuf.pubsync();
 }
 
-size_t RGWAsioClientIO::send_status(const int status,
-                                    const char* const status_name)
+size_t ClientIO::send_status(int status, const char* status_name)
 {
   static constexpr size_t STATUS_BUF_SIZE = 128;
 
@@ -126,7 +144,7 @@ size_t RGWAsioClientIO::send_status(const int status,
   return txbuf.sputn(statusbuf, statuslen);
 }
 
-size_t RGWAsioClientIO::send_100_continue()
+size_t ClientIO::send_100_continue()
 {
   const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n";
   const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE,
@@ -148,7 +166,7 @@ static size_t dump_date_header(char (&timestr)[TIME_BUF_SIZE])
                   "Date: %a, %d %b %Y %H:%M:%S %Z\r\n", tmp);
 }
 
-size_t RGWAsioClientIO::complete_header()
+size_t ClientIO::complete_header()
 {
   size_t sent = 0;
 
@@ -172,8 +190,8 @@ size_t RGWAsioClientIO::complete_header()
   return sent;
 }
 
-size_t RGWAsioClientIO::send_header(const boost::string_ref& name,
-                                    const boost::string_ref& value)
+size_t ClientIO::send_header(const boost::string_ref& name,
+                             const boost::string_ref& value)
 {
   static constexpr char HEADER_SEP[] = ": ";
   static constexpr char HEADER_END[] = "\r\n";
@@ -188,7 +206,7 @@ size_t RGWAsioClientIO::send_header(const boost::string_ref& name,
   return sent;
 }
 
-size_t RGWAsioClientIO::send_content_length(const uint64_t len)
+size_t ClientIO::send_content_length(uint64_t len)
 {
   static constexpr size_t CONLEN_BUF_SIZE = 128;
 
index c87fd5f720b3404e392c69badccdac7772be4aae..513a3ef0ca20bc9e667ace39a015b871145e8123 100644 (file)
@@ -4,36 +4,58 @@
 #define RGW_ASIO_CLIENT_H
 
 #include <boost/asio/ip/tcp.hpp>
-#include <beast/http/body_type.hpp>
-#include <beast/http/concepts.hpp>
-#include <beast/http/message_v1.hpp>
+#include <beast/http/message.hpp>
+#include <beast/http/message_parser.hpp>
+#include <beast/core/flat_streambuf.hpp>
 #include "include/assert.h"
 
 #include "rgw_client_io.h"
 
-// bufferlist to represent the message body
-class RGWBufferlistBody {
- public:
-  using value_type = ceph::bufferlist;
+namespace rgw {
+namespace asio {
 
-  class reader;
-  class writer;
+/// streaming message body interface
+struct streaming_body {
+  using value_type = boost::asio::mutable_buffer;
 
-  template <bool isRequest, typename Headers>
-  using message_type = beast::http::message<isRequest, RGWBufferlistBody,
-                                            Headers>;
-};
+  class reader {
+    value_type& buffer;
+   public:
+    using mutable_buffers_type = boost::asio::mutable_buffers_1;
 
-class RGWAsioClientIO : public rgw::io::RestfulClient,
-                        public rgw::io::BuffererSink {
-  using tcp = boost::asio::ip::tcp;
-  tcp::socket socket;
+    static const bool is_direct{true}; // reads directly into user buffer
 
-  using body_type = RGWBufferlistBody;
-  using request_type = beast::http::request_v1<body_type>;
-  request_type request;
+    template<bool isRequest, class Fields>
+    explicit reader(beast::http::message<isRequest, streaming_body, Fields>& m)
+      : buffer(m.body)
+    {}
+
+    void init() {}
+    void init(uint64_t content_length) {}
+    void finish() {}
+
+    mutable_buffers_type prepare(size_t n) {
+      n = std::min(n, boost::asio::buffer_size(buffer));
+      auto position = boost::asio::buffer_cast<char*>(buffer);
+      return {position, n};
+    }
 
-  bufferlist::const_iterator body_iter;
+    void commit(size_t n) {
+      buffer = buffer + n;
+    }
+  };
+};
+
+using header_type = beast::http::fields;
+using parser_type = beast::http::message_parser<true, streaming_body, header_type>;
+
+class ClientIO : public io::RestfulClient,
+                 public io::BuffererSink {
+ private:
+  using tcp = boost::asio::ip::tcp;
+  tcp::socket& socket;
+  parser_type& parser;
+  beast::flat_streambuf& buffer; //< parse buffer
 
   bool conn_keepalive{false};
   bool conn_close{false};
@@ -45,8 +67,11 @@ class RGWAsioClientIO : public rgw::io::RestfulClient,
   size_t read_data(char *buf, size_t max);
 
  public:
-  RGWAsioClientIO(tcp::socket&& socket, request_type&& request);
-  ~RGWAsioClientIO() override;
+  ClientIO(tcp::socket& socket, parser_type& parser,
+           beast::flat_streambuf& buffer);
+  ~ClientIO() override;
+
+  bool get_conn_close() const { return conn_close; }
 
   void init_env(CephContext *cct) override;
   size_t complete_request() override;
@@ -71,45 +96,7 @@ class RGWAsioClientIO : public rgw::io::RestfulClient,
   }
 };
 
-// used by beast::http::read() to read the body into a bufferlist
-class RGWBufferlistBody::reader {
-  value_type& bl;
- public:
-  template<bool isRequest, typename Headers>
-  explicit reader(message_type<isRequest, Headers>& m) : bl(m.body) {}
-
-  void write(const char* data, size_t size, boost::system::error_code&) {
-    bl.append(data, size);
-  }
-};
-
-// used by beast::http::write() to write the buffered body
-class RGWBufferlistBody::writer {
-  const value_type& bl;
- public:
-  template<bool isRequest, typename Headers>
-  explicit writer(const message_type<isRequest, Headers>& msg)
-    : bl(msg.body) {}
-
-  void init(boost::system::error_code& ec) {}
-  uint64_t content_length() const { return bl.length(); }
-
-  template<typename Write>
-  boost::tribool operator()(beast::http::resume_context&&,
-                            boost::system::error_code&, Write&& write) {
-    // translate from bufferlist to a ConstBufferSequence for beast
-    std::vector<boost::asio::const_buffer> buffers;
-    buffers.reserve(bl.get_num_buffers());
-    for (auto& ptr : bl.buffers()) {
-      buffers.emplace_back(ptr.c_str(), ptr.length());
-    }
-    write(buffers);
-    return true;
-  }
-};
-static_assert(beast::http::is_ReadableBody<RGWBufferlistBody>{},
-              "RGWBufferlistBody does not satisfy ReadableBody");
-static_assert(beast::http::is_WritableBody<RGWBufferlistBody>{},
-              "RGWBufferlistBody does not satisfy WritableBody");
+} // namespace asio
+} // namespace rgw
 
 #endif // RGW_ASIO_CLIENT_H
index ff2d78067594b9c03c2f9fc30e0fb1b88afcf715..5fc1deec42e4b323d7281e2e14db2dc43647386d 100644 (file)
@@ -7,13 +7,11 @@
 #include <vector>
 
 #include <boost/asio.hpp>
-#include <boost/optional.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include <beast/core/placeholders.hpp>
-#include <beast/core/streambuf.hpp>
-#include <beast/http/empty_body.hpp>
-#include <beast/http/parse_error.hpp>
 #include <beast/http/read.hpp>
+#include <beast/http/string_body.hpp>
 #include <beast/http/write.hpp>
 
 #include "rgw_asio_frontend.h"
@@ -71,28 +69,47 @@ void Pauser::wait()
 
 using tcp = boost::asio::ip::tcp;
 
-class AsioConnection : public std::enable_shared_from_this<AsioConnection> {
-  RGWProcessEnv& env;
-  boost::asio::io_service::strand strand;
-  tcp::socket socket;
-  tcp::endpoint endpoint;
-  beast::streambuf buf;
-  beast::http::request_v1<RGWBufferlistBody> request;
+// coroutine to handle a client connection to completion
+static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
+                              boost::asio::yield_context yield)
+{
+  auto cct = env.store->ctx();
+  boost::system::error_code ec;
 
- public:
-  void on_read(boost::system::error_code ec) {
-    auto cct = env.store->ctx();
+  beast::flat_streambuf buffer{1024};
+
+  // read messages from the socket until eof
+  for (;;) {
+    // parse the header
+    rgw::asio::parser_type parser;
+    do {
+      auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
+      buffer.consume(bytes);
+    } while (!ec && !parser.got_header());
+
+    if (ec == boost::asio::error::connection_reset ||
+        ec == boost::asio::error::eof) {
+      return;
+    }
     if (ec) {
-      if (ec.category() == beast::http::get_parse_error_category()) {
-        ldout(cct, 1) << "parse failed: " << ec.message() << dendl;
-      } else {
-        ldout(cct, 1) << "read failed: " << ec.message() << dendl;
-      }
-      write_bad_request();
+      auto& message = parser.get();
+      ldout(cct, 1) << "read failed: " << ec.message() << dendl;
+      ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
+      beast::http::response<beast::http::string_body> response;
+      response.status = 400;
+      response.reason = "Bad Request";
+      response.version = message.version == 10 ? 10 : 11;
+      beast::http::prepare(response);
+      beast::http::async_write(socket, std::move(response), yield[ec]);
+      // ignore ec
       return;
     }
+
+    // process the request
     RGWRequest req{env.store->get_new_req_id()};
-    RGWAsioClientIO real_client{std::move(socket), std::move(request)};
+
+    rgw::asio::ClientIO real_client{socket, parser, buffer};
+
     auto real_client_io = rgw::io::add_reordering(
                             rgw::io::add_buffering(
                               rgw::io::add_chunking(
@@ -101,40 +118,12 @@ class AsioConnection : public std::enable_shared_from_this<AsioConnection> {
     RGWRestfulIO client(&real_client_io);
     process_request(env.store, env.rest, &req, env.uri_prefix,
                     *env.auth_registry, &client, env.olog);
-  }
-
-  void write_bad_request() {
-    beast::http::response_v1<beast::http::empty_body> response;
-    response.status = 400;
-    response.reason = "Bad Request";
-    /* If the request is so terribly malformed that we can't extract even
-     * the protocol version, we will use HTTP/1.1 as a fallback. */
-    response.version = request.version ? request.version : 11;
-    beast::http::prepare(response);
-    beast::http::async_write(socket, std::move(response),
-                             std::bind(&AsioConnection::on_write,
-                                       shared_from_this(),
-                                       beast::asio::placeholders::error));
-  }
 
-  void on_write(boost::system::error_code ec) {
-    auto cct = env.store->ctx();
-    if (ec) {
-      ldout(cct, 1) << "write failed: " << ec.message() << dendl;
+    if (real_client.get_conn_close()) {
+      return;
     }
   }
-
- public:
-  AsioConnection(RGWProcessEnv& env, tcp::socket&& socket)
-    : env(env), strand(socket.get_io_service()), socket(std::move(socket))
-  {}
-
-  void read() {
-    beast::http::async_read(socket, buf, request, strand.wrap(
-            std::bind(&AsioConnection::on_read, shared_from_this(),
-                      beast::asio::placeholders::error)));
-  }
-};
+}
 
 class AsioFrontend {
   RGWProcessEnv env;
@@ -168,9 +157,19 @@ int AsioFrontend::init()
   auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
   ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
 
-  acceptor.open(ep.protocol());
+  boost::system::error_code ec;
+  acceptor.open(ep.protocol(), ec);
+  if (ec) {
+    lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
+    return -ec.value();
+  }
   acceptor.set_option(tcp::acceptor::reuse_address(true));
-  acceptor.bind(ep);
+  acceptor.bind(ep, ec);
+  if (ec) {
+    lderr(ctx()) << "failed to bind address " << ep <<
+        ": " << ec.message() << dendl;
+    return -ec.value();
+  }
   acceptor.listen(boost::asio::socket_base::max_connections);
   acceptor.async_accept(peer_socket,
                         [this] (boost::system::error_code ec) {
@@ -189,13 +188,15 @@ void AsioFrontend::accept(boost::system::error_code ec)
     throw ec;
   }
   auto socket = std::move(peer_socket);
-
+  // spawn a coroutine to handle the connection
+  boost::asio::spawn(service,
+                     [&] (boost::asio::yield_context yield) {
+                       handle_connection(env, std::move(socket), yield);
+                     });
   acceptor.async_accept(peer_socket,
                         [this] (boost::system::error_code ec) {
                           return accept(ec);
                         });
-
-  std::make_shared<AsioConnection>(env, std::move(socket))->read();
 }
 
 int AsioFrontend::run()