]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: switch beast frontend back to stackful coroutine 20449/head
authorCasey Bodley <cbodley@redhat.com>
Thu, 9 Nov 2017 03:19:12 +0000 (22:19 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 6 Feb 2018 21:23:22 +0000 (16:23 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_asio_frontend.cc

index f01dff917ed24572ca6e32d45d5a823ebdb7f0b4..967cbdfa72964d115616638210c7f9a390068d4c 100644 (file)
@@ -142,11 +142,7 @@ add_library(rgw_a STATIC ${rgw_a_srcs})
 
 add_dependencies(rgw_a civetweb_h)
 
-target_include_directories(rgw_a SYSTEM PUBLIC
-  ${FCGI_INCLUDE_DIR}
-  "../rapidjson/include"
-  )
-target_compile_definitions(rgw_a PUBLIC BOOST_COROUTINES_NO_DEPRECATION_WARNING)
+target_include_directories(rgw_a SYSTEM PUBLIC "../rapidjson/include")
 
 target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount_client
   cls_log_client cls_statelog_client cls_timeindex_client cls_version_client
@@ -155,6 +151,11 @@ target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount
   ${EXPAT_LIBRARIES}
   ${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS})
 
+if (WITH_RADOSGW_BEAST_FRONTEND)
+  target_compile_definitions(rgw_a PUBLIC BOOST_COROUTINES_NO_DEPRECATION_WARNING)
+  target_link_libraries(rgw_a Boost::coroutine Boost::context)
+endif()
+
 set(radosgw_srcs
   rgw_loadgen_process.cc
   rgw_civetweb.cc
index d4a44d39447ea8f610136f1c72b742bb764f3d48..faf7806fccb64ccfdcdbacc99d6317dd53c63c45 100644 (file)
@@ -8,6 +8,7 @@
 #include <vector>
 
 #include <boost/asio.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include "rgw_asio_client.h"
 #include "rgw_asio_frontend.h"
@@ -62,134 +63,85 @@ void Pauser::wait()
 using tcp = boost::asio::ip::tcp;
 namespace beast = boost::beast;
 
-class Connection {
-  RGWProcessEnv& env;
-  boost::asio::io_service::strand strand;
-  tcp::socket socket;
-
-  // references are bound to callbacks for async operations. if a callback
-  // function returns without issuing another operation, the reference is
-  // dropped and the Connection is deleted/closed
-  std::atomic<int> nref{0};
-  using Ref = boost::intrusive_ptr<Connection>;
-
+void handle_connection(RGWProcessEnv& env, tcp::socket& socket,
+                       boost::asio::yield_context yield)
+{
   // limit header to 4k, since we read it all into a single flat_buffer
   static constexpr size_t header_limit = 4096;
   // don't impose a limit on the body, since we read it in pieces
   static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
 
+  auto cct = env.store->ctx();
+  boost::system::error_code ec;
   beast::flat_buffer buffer;
-  boost::optional<rgw::asio::parser_type> parser;
-
-  using bad_response_type = beast::http::response<beast::http::empty_body>;
-  boost::optional<bad_response_type> response;
 
-  CephContext* ctx() const { return env.store->ctx(); }
-
-  void read_header() {
+  // read messages from the socket until eof
+  for (;;) {
     // configure the parser
-    parser.emplace();
-    parser->header_limit(header_limit);
-    parser->body_limit(body_limit);
+    rgw::asio::parser_type parser;
+    parser.header_limit(header_limit);
+    parser.body_limit(body_limit);
 
     // parse the header
-    beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
-            std::bind(&Connection::on_header, Ref{this},
-                      std::placeholders::_1)));
-  }
-
-  void discard_unread_message() {
-    if (parser->is_done()) {
-      // nothing left to discard, start reading the next message
-      read_header();
-      return;
-    }
-
-    // read the rest of the request into a static buffer. multiple clients could
-    // write at the same time, but this is okay because we never read it back
-    static std::array<char, 1024> discard_buffer;
-
-    auto& body = parser->get().body();
-    body.size = discard_buffer.size();
-    body.data = discard_buffer.data();
-
-    beast::http::async_read_some(socket, buffer, *parser, strand.wrap(
-            std::bind(&Connection::on_discard_unread, Ref{this},
-                      std::placeholders::_1)));
-  }
-
-  void on_discard_unread(boost::system::error_code ec) {
-    if (ec == boost::asio::error::connection_reset) {
-      return;
-    }
-    if (ec) {
-      ldout(ctx(), 5) << "discard_unread_message failed: "
-          << ec.message() << dendl;
-      return;
-    }
-    discard_unread_message();
-  }
-
-  void on_write_error(boost::system::error_code ec) {
-    if (ec) {
-      ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
-    }
-  }
-
-  void on_header(boost::system::error_code ec) {
+    beast::http::async_read_header(socket, buffer, parser, yield[ec]);
     if (ec == boost::asio::error::connection_reset ||
         ec == beast::http::error::end_of_stream) {
       return;
     }
     if (ec) {
-      auto& message = parser->get();
-      ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
-      ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
-      response.emplace();
-      response->result(beast::http::status::bad_request);
-      response->version(message.version() == 10 ? 10 : 11);
-      response->prepare_payload();
-      beast::http::async_write(socket, *response, strand.wrap(
-            std::bind(&Connection::on_write_error, Ref{this},
-                      std::placeholders::_1)));
+      ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
+      auto& message = parser.get();
+      beast::http::response<beast::http::empty_body> response;
+      response.result(beast::http::status::bad_request);
+      response.version(message.version() == 10 ? 10 : 11);
+      response.prepare_payload();
+      beast::http::async_write(socket, response, yield[ec]);
+      if (ec) {
+        ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
+      }
+      ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
       return;
     }
 
     // process the request
     RGWRequest req{env.store->get_new_req_id()};
 
-    rgw::asio::ClientIO real_client{socket, *parser, buffer};
+    rgw::asio::ClientIO real_client{socket, parser, buffer};
 
     auto real_client_io = rgw::io::add_reordering(
-                            rgw::io::add_buffering(ctx(),
+                            rgw::io::add_buffering(cct,
                               rgw::io::add_chunking(
                                 rgw::io::add_conlen_controlling(
                                   &real_client))));
-    RGWRestfulIO client(ctx(), &real_client_io);
+    RGWRestfulIO client(cct, &real_client_io);
     process_request(env.store, env.rest, &req, env.uri_prefix,
                     *env.auth_registry, &client, env.olog);
 
-    if (parser->keep_alive()) {
-      // parse any unread bytes from the previous message (in case we replied
-      // before reading the entire body) before reading the next
-      discard_unread_message();
+    if (!parser.keep_alive()) {
+      return;
     }
-  }
 
- public:
-  Connection(RGWProcessEnv& env, tcp::socket&& socket)
-    : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
-
-  void on_connect() {
-    read_header();
-  }
+    // if we failed before reading the entire message, discard any remaining
+    // bytes before reading the next
+    while (!parser.is_done()) {
+      static std::array<char, 1024> discard_buffer;
 
-  void get() { ++nref; }
-  void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
+      auto& body = parser.get().body();
+      body.size = discard_buffer.size();
+      body.data = discard_buffer.data();
 
-  friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
-  friend void intrusive_ptr_release(Connection *c) { c->put(); }
-};
+      beast::http::async_read_some(socket, buffer, parser, yield[ec]);
+      if (ec == boost::asio::error::connection_reset) {
+        return;
+      }
+      if (ec) {
+        ldout(cct, 5) << "failed to discard unread message: "
+            << ec.message() << dendl;
+        return;
+      }
+    }
+  }
+}
 
 class AsioFrontend {
   RGWProcessEnv env;
@@ -325,9 +277,11 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
                             accept(l, ec);
                           });
 
-  boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
-  conn->on_connect();
-  // reference drops here, but on_connect() takes another
+  // spawn a coroutine to handle the connection
+  boost::asio::spawn(service,
+    [this, socket=std::move(socket)] (boost::asio::yield_context yield) mutable {
+      handle_connection(env, socket, yield);
+    });
 }
 
 int AsioFrontend::run()