]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: beast frontend uses callbacks instead of coroutines
authorCasey Bodley <cbodley@redhat.com>
Wed, 12 Jul 2017 15:39:26 +0000 (11:39 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 27 Oct 2017 15:28:40 +0000 (11:28 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_asio_client.cc
src/rgw/rgw_asio_client.h
src/rgw/rgw_asio_frontend.cc

index b25e115e9f2c6d585c040bc0353786561995a9d1..d0174c9bbbf1413fd403b9da91f5c617f3addc56 100644 (file)
@@ -9,9 +9,6 @@
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-#undef dout_prefix
-#define dout_prefix (*_dout << "asio: ")
-
 using namespace rgw::asio;
 
 ClientIO::ClientIO(tcp::socket& socket,
@@ -114,9 +111,7 @@ size_t ClientIO::read_data(char* buf, size_t max)
   while (body_remaining.size && !parser.is_done()) {
     boost::system::error_code ec;
     beast::http::read_some(socket, buffer, parser, ec);
-    if (ec == boost::asio::error::connection_reset ||
-        ec == boost::asio::error::eof ||
-        ec == beast::http::error::partial_message ||
+    if (ec == beast::http::error::partial_message ||
         ec == beast::http::error::need_buffer) {
       break;
     }
index 64a5bdcd1e81e6b4340642c5360daed126a51403..f193e17c496461ab8483bf0bcfcd8f5d6d59b2ca 100644 (file)
@@ -38,8 +38,6 @@ class ClientIO : public io::RestfulClient,
            beast::flat_buffer& buffer);
   ~ClientIO() override;
 
-  bool get_conn_close() const { return conn_close; }
-
   void init_env(CephContext *cct) override;
   size_t complete_request() override;
   void flush() override;
index cd753eb323c01fdc581286dad52c99beda5e6aac..45008656fa1eef4404368d5d826f326375002a5d 100644 (file)
@@ -1,22 +1,19 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include <atomic>
 #include <condition_variable>
 #include <mutex>
 #include <thread>
 #include <vector>
 
 #include <boost/asio.hpp>
-#include <boost/asio/spawn.hpp>
 
 #include "rgw_asio_client.h"
 #include "rgw_asio_frontend.h"
 
 #define dout_subsys ceph_subsys_rgw
 
-//#undef dout_prefix
-//#define dout_prefix (*_dout << "asio: ")
-
 namespace {
 
 class Pauser {
@@ -65,67 +62,102 @@ void Pauser::wait()
 using tcp = boost::asio::ip::tcp;
 namespace beast = boost::beast;
 
-// coroutine to handle a client connection to completion
-static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
-                              boost::asio::yield_context yield)
-{
-  auto cct = env.store->ctx();
-  boost::system::error_code ec;
+class Connection {
+  RGWProcessEnv& env;
+  boost::asio::strand strand;
+  tcp::socket socket;
 
-  // limit header to 4k, since we read it all into a single buffer
-  constexpr size_t header_limit = 4096;
+  // references are bound to callbacks for async operations. if a callback
+  // function returns without issuing another operation, the reference is
+  // dropped and the Connection is deleted/closed
+  std::atomic<int> nref{0};
+  using Ref = boost::intrusive_ptr<Connection>;
+
+  // limit header to 4k, since we read it all into a single flat_buffer
+  static constexpr size_t header_limit = 4096;
   // don't impose a limit on the body, since we read it in pieces
-  constexpr size_t body_limit = std::numeric_limits<size_t>::max();
+  static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
 
   beast::flat_buffer buffer;
+  boost::optional<rgw::asio::parser_type> parser;
+
+  using bad_response_type = beast::http::response<beast::http::empty_body>;
+  boost::optional<bad_response_type> response;
+
+  CephContext* ctx() const { return env.store->ctx(); }
 
-  // read messages from the socket until eof
-  for (;;) {
+  void read_header() {
     // configure the parser
-    rgw::asio::parser_type parser;
-    parser.header_limit(header_limit);
-    parser.body_limit(body_limit);
+    parser.emplace();
+    parser->header_limit(header_limit);
+    parser->body_limit(body_limit);
 
     // parse the header
-    beast::http::async_read_header(socket, buffer, parser, yield[ec]);
+    beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
+            std::bind(&Connection::on_header, Ref{this},
+                      std::placeholders::_1)));
+  }
+
+  void on_write_error(boost::system::error_code ec) {
+    if (ec) {
+      ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
+    }
+  }
 
+  void on_header(boost::system::error_code ec) {
     if (ec == boost::asio::error::connection_reset ||
-        ec == boost::asio::error::eof) {
+        ec == beast::http::error::end_of_stream) {
       return;
     }
     if (ec) {
-      auto& message = parser.get();
-      ldout(cct, 1) << "read failed: " << ec.message() << dendl;
-      ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
-      beast::http::response<beast::http::string_body> response;
-      response.result(beast::http::status::bad_request);
-      response.reason("Bad Request");
-      response.version(message.version() == 10 ? 10 : 11);
-      response.prepare_payload();
-      beast::http::async_write(socket, response, yield[ec]);
-      // ignore ec
+      auto& message = parser->get();
+      ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
+      ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
+      response.emplace();
+      response->result(beast::http::status::bad_request);
+      response->version(message.version() == 10 ? 10 : 11);
+      response->prepare_payload();
+      beast::http::async_write(socket, *response, strand.wrap(
+            std::bind(&Connection::on_write_error, Ref{this},
+                      std::placeholders::_1)));
       return;
     }
 
     // process the request
     RGWRequest req{env.store->get_new_req_id()};
 
-    rgw::asio::ClientIO real_client{socket, parser, buffer};
+    rgw::asio::ClientIO real_client{socket, *parser, buffer};
 
     auto real_client_io = rgw::io::add_reordering(
-                            rgw::io::add_buffering(cct,
+                            rgw::io::add_buffering(ctx(),
                               rgw::io::add_chunking(
                                 rgw::io::add_conlen_controlling(
                                   &real_client))));
-    RGWRestfulIO client(cct, &real_client_io);
+    RGWRestfulIO client(ctx(), &real_client_io);
     process_request(env.store, env.rest, &req, env.uri_prefix,
                     *env.auth_registry, &client, env.olog);
 
-    if (real_client.get_conn_close()) {
-      return;
+    if (!real_client.get_conn_close()) {
+      // read next header
+      read_header();
     }
   }
-}
+
+ public:
+  Connection(RGWProcessEnv& env, tcp::socket&& socket)
+    : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
+
+  void on_connect() {
+    read_header();
+  }
+
+  void get() { ++nref; }
+  void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
+
+  friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
+  friend void intrusive_ptr_release(Connection *c) { c->put(); }
+};
+
 
 class AsioFrontend {
   RGWProcessEnv env;
@@ -190,15 +222,14 @@ void AsioFrontend::accept(boost::system::error_code ec)
     throw ec;
   }
   auto socket = std::move(peer_socket);
-  // spawn a coroutine to handle the connection
-  boost::asio::spawn(service,
-                     [&] (boost::asio::yield_context yield) {
-                       handle_connection(env, std::move(socket), yield);
-                     });
   acceptor.async_accept(peer_socket,
                         [this] (boost::system::error_code ec) {
                           return accept(ec);
                         });
+
+  boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
+  conn->on_connect();
+  // reference drops here, but on_connect() takes another
 }
 
 int AsioFrontend::run()