]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: asio frontend uses async_read
authorCasey Bodley <cbodley@redhat.com>
Mon, 20 Jun 2016 03:29:20 +0000 (23:29 -0400)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Fri, 21 Oct 2016 20:57:17 +0000 (22:57 +0200)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_asio_client.cc
src/rgw/rgw_asio_client.h
src/rgw/rgw_asio_frontend.cc

index 653ade56598d9a1e440b584c2702637a3aa7d634..8b584799a74f6ee0dace465bab574e16391da704 100644 (file)
@@ -2,14 +2,8 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <boost/algorithm/string/predicate.hpp>
-#include <boost/asio/read.hpp>
 #include <boost/asio/write.hpp>
 
-#include <beast/core/basic_streambuf.hpp>
-#include <beast/http/body_type.hpp>
-#include <beast/http/concepts.hpp>
-#include <beast/http/read.hpp>
-
 #include "rgw_asio_client.h"
 
 #define dout_subsys ceph_subsys_rgw
 #define dout_prefix (*_dout << "asio: ")
 
 
-RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint)
-  : socket(std::move(socket)), endpoint(std::move(endpoint))
+RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, request_type&& request)
+  : socket(std::move(socket)), request(std::move(request))
 {}
 
 RGWAsioClientIO::~RGWAsioClientIO() = default;
 
 void RGWAsioClientIO::init_env(CephContext *cct)
 {
-  beast::basic_streambuf<std::allocator<char>> buf; // XXX: not sure what this is for
-  beast::http::read(socket, buf, req);
-  body_iter = req.body.begin();
+  env.init(cct);
+  body_iter = request.body.begin();
 
-  const auto& headers = req.headers;
+  const auto& headers = request.headers;
   for (auto header = headers.begin(); header != headers.end(); ++header) {
     const auto& name = header->name();
     const auto& value = header->value();
@@ -64,10 +57,10 @@ void RGWAsioClientIO::init_env(CephContext *cct)
     env.set(buf, value);
   }
 
-  env.set("REQUEST_METHOD", req.method);
+  env.set("REQUEST_METHOD", request.method);
 
   // split uri from query
-  auto url = boost::string_ref{req.url};
+  auto url = boost::string_ref{request.url};
   auto pos = url.find('?');
   auto query = url.substr(pos + 1);
   url = url.substr(0, pos);
index 43239716e6800de192a79a9b06b2d516b7786fb5..d6426b78a4731354ea952024b2bcf9c78d761c80 100644 (file)
@@ -4,6 +4,8 @@
 #define RGW_ASIO_CLIENT_H
 
 #include <boost/asio/ip/tcp.hpp>
+#include <beast/http/body_type.hpp>
+#include <beast/http/concepts.hpp>
 #include <beast/http/message_v1.hpp>
 #include "include/assert.h"
 
@@ -25,11 +27,10 @@ class RGWBufferlistBody {
 class RGWAsioClientIO : public RGWStreamIO {
   using tcp = boost::asio::ip::tcp;
   tcp::socket socket;
-  tcp::endpoint endpoint;
 
   using body_type = RGWBufferlistBody;
   using request_type = beast::http::request_v1<body_type>;
-  request_type req;
+  request_type request;
 
   bufferlist::const_iterator body_iter;
 
@@ -41,7 +42,7 @@ class RGWAsioClientIO : public RGWStreamIO {
   int read_data(char *buf, int max) override;
 
  public:
-  RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint);
+  RGWAsioClientIO(tcp::socket&& socket, request_type&& request);
   ~RGWAsioClientIO();
 
   int complete_request() override;
index 22a92da0fa7a8d45507c4240f82cc26035e8f052..a89ec7711dad73403574109b88bf94f7eba6f786 100644 (file)
@@ -9,6 +9,13 @@
 #include <boost/asio.hpp>
 #include <boost/optional.hpp>
 
+#include <beast/core/placeholders.hpp>
+#include <beast/core/streambuf.hpp>
+#include <beast/http/empty_body.hpp>
+#include <beast/http/parse_error.hpp>
+#include <beast/http/read.hpp>
+#include <beast/http/write.hpp>
+
 #include "rgw_asio_frontend.h"
 #include "rgw_asio_client.h"
 
@@ -64,13 +71,68 @@ void Pauser::wait()
 
 using tcp = boost::asio::ip::tcp;
 
+class AsioConnection : public std::enable_shared_from_this<AsioConnection> {
+  RGWProcessEnv& env;
+  boost::asio::io_service::strand strand;
+  tcp::socket socket;
+  tcp::endpoint endpoint;
+  beast::streambuf buf;
+  beast::http::request_v1<RGWBufferlistBody> request;
+
+ public:
+  void on_read(boost::system::error_code ec) {
+    auto cct = env.store->ctx();
+    if (ec) {
+      if (ec.category() == beast::http::get_parse_error_category()) {
+        ldout(cct, 1) << "parse failed with " << ec.message() << dendl;
+      } else {
+        ldout(cct, 1) << "read failed with " << ec.message() << dendl;
+      }
+      write_bad_request();
+      return;
+    }
+    RGWRequest req{env.store->get_new_req_id()};
+    RGWAsioClientIO client{std::move(socket), std::move(request)};
+    process_request(env.store, env.rest, &req, &client, env.olog);
+  }
+
+  void write_bad_request() {
+    beast::http::response_v1<beast::http::empty_body> response;
+    response.status = 400;
+    response.reason = "Bad Request";
+    response.version = request.version;
+    beast::http::prepare(response);
+    beast::http::async_write(socket, std::move(response),
+                             std::bind(&AsioConnection::on_write,
+                                       shared_from_this(),
+                                       beast::asio::placeholders::error));
+  }
+
+  void on_write(boost::system::error_code ec) {
+    auto cct = env.store->ctx();
+    if (ec) {
+      ldout(cct, 1) << "write failed with " << ec.message() << dendl;
+    }
+  }
+
+ public:
+  AsioConnection(RGWProcessEnv& env, tcp::socket&& socket)
+    : env(env), strand(socket.get_io_service()), socket(std::move(socket))
+  {}
+
+  void read() {
+    beast::http::async_read(socket, buf, request, strand.wrap(
+            std::bind(&AsioConnection::on_read, shared_from_this(),
+                      beast::asio::placeholders::error)));
+  }
+};
+
 class AsioFrontend {
   RGWProcessEnv env;
   boost::asio::io_service service;
 
   tcp::acceptor acceptor;
   tcp::socket peer_socket;
-  tcp::endpoint peer_endpoint;
 
   std::vector<std::thread> threads;
   Pauser pauser;
@@ -101,7 +163,7 @@ int AsioFrontend::init()
   acceptor.set_option(tcp::acceptor::reuse_address(true));
   acceptor.bind(ep);
   acceptor.listen(boost::asio::socket_base::max_connections);
-  acceptor.async_accept(peer_socket, peer_endpoint,
+  acceptor.async_accept(peer_socket,
                         [this] (boost::system::error_code ec) {
                           return accept(ec);
                         });
@@ -118,18 +180,13 @@ void AsioFrontend::accept(boost::system::error_code ec)
     throw ec;
   }
   auto socket = std::move(peer_socket);
-  auto endpoint = std::move(peer_endpoint);
 
-  acceptor.async_accept(peer_socket, peer_endpoint,
+  acceptor.async_accept(peer_socket,
                         [this] (boost::system::error_code ec) {
                           return accept(ec);
                         });
 
-  ldout(ctx(), 4) << "accept " << endpoint << dendl;
-
-  RGWRequest req{env.store->get_new_req_id()};
-  RGWAsioClientIO client{std::move(socket), std::move(endpoint)};
-  process_request(env.store, env.rest, &req, &client, env.olog);
+  std::make_shared<AsioConnection>(env, std::move(socket))->read();
 }
 
 int AsioFrontend::run()
@@ -192,7 +249,7 @@ void AsioFrontend::unpause(RGWRados *store)
   env.store = store;
   ldout(ctx(), 4) << "frontend unpaused" << dendl;
   service.reset();
-  acceptor.async_accept(peer_socket, peer_endpoint,
+  acceptor.async_accept(peer_socket,
                         [this] (boost::system::error_code ec) {
                           return accept(ec);
                         });