// vim: ts=8 sw=2 smarttab
#include <boost/algorithm/string/predicate.hpp>
-#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
-#include <beast/core/basic_streambuf.hpp>
-#include <beast/http/body_type.hpp>
-#include <beast/http/concepts.hpp>
-#include <beast/http/read.hpp>
-
#include "rgw_asio_client.h"
#define dout_subsys ceph_subsys_rgw
#define dout_prefix (*_dout << "asio: ")
-RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint)
- : socket(std::move(socket)), endpoint(std::move(endpoint))
+RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket, request_type&& request)
+ : socket(std::move(socket)), request(std::move(request))
{}
RGWAsioClientIO::~RGWAsioClientIO() = default;
void RGWAsioClientIO::init_env(CephContext *cct)
{
- beast::basic_streambuf<std::allocator<char>> buf; // XXX: not sure what this is for
- beast::http::read(socket, buf, req);
- body_iter = req.body.begin();
+ env.init(cct);
+ body_iter = request.body.begin();
- const auto& headers = req.headers;
+ const auto& headers = request.headers;
for (auto header = headers.begin(); header != headers.end(); ++header) {
const auto& name = header->name();
const auto& value = header->value();
env.set(buf, value);
}
- env.set("REQUEST_METHOD", req.method);
+ env.set("REQUEST_METHOD", request.method);
// split uri from query
- auto url = boost::string_ref{req.url};
+ auto url = boost::string_ref{request.url};
auto pos = url.find('?');
auto query = url.substr(pos + 1);
url = url.substr(0, pos);
#define RGW_ASIO_CLIENT_H
#include <boost/asio/ip/tcp.hpp>
+#include <beast/http/body_type.hpp>
+#include <beast/http/concepts.hpp>
#include <beast/http/message_v1.hpp>
#include "include/assert.h"
class RGWAsioClientIO : public RGWStreamIO {
using tcp = boost::asio::ip::tcp;
tcp::socket socket;
- tcp::endpoint endpoint;
using body_type = RGWBufferlistBody;
using request_type = beast::http::request_v1<body_type>;
- request_type req;
+ request_type request;
bufferlist::const_iterator body_iter;
int read_data(char *buf, int max) override;
public:
- RGWAsioClientIO(tcp::socket&& socket, tcp::endpoint&& endpoint);
+ RGWAsioClientIO(tcp::socket&& socket, request_type&& request);
~RGWAsioClientIO();
int complete_request() override;
#include <boost/asio.hpp>
#include <boost/optional.hpp>
+#include <beast/core/placeholders.hpp>
+#include <beast/core/streambuf.hpp>
+#include <beast/http/empty_body.hpp>
+#include <beast/http/parse_error.hpp>
+#include <beast/http/read.hpp>
+#include <beast/http/write.hpp>
+
#include "rgw_asio_frontend.h"
#include "rgw_asio_client.h"
using tcp = boost::asio::ip::tcp;
+class AsioConnection : public std::enable_shared_from_this<AsioConnection> {
+ RGWProcessEnv& env;
+ boost::asio::io_service::strand strand;
+ tcp::socket socket;
+ tcp::endpoint endpoint;
+ beast::streambuf buf;
+ beast::http::request_v1<RGWBufferlistBody> request;
+
+ public:
+ void on_read(boost::system::error_code ec) {
+ auto cct = env.store->ctx();
+ if (ec) {
+ if (ec.category() == beast::http::get_parse_error_category()) {
+ ldout(cct, 1) << "parse failed with " << ec.message() << dendl;
+ } else {
+ ldout(cct, 1) << "read failed with " << ec.message() << dendl;
+ }
+ write_bad_request();
+ return;
+ }
+ RGWRequest req{env.store->get_new_req_id()};
+ RGWAsioClientIO client{std::move(socket), std::move(request)};
+ process_request(env.store, env.rest, &req, &client, env.olog);
+ }
+
+ void write_bad_request() {
+ beast::http::response_v1<beast::http::empty_body> response;
+ response.status = 400;
+ response.reason = "Bad Request";
+ response.version = request.version;
+ beast::http::prepare(response);
+ beast::http::async_write(socket, std::move(response),
+ std::bind(&AsioConnection::on_write,
+ shared_from_this(),
+ beast::asio::placeholders::error));
+ }
+
+ void on_write(boost::system::error_code ec) {
+ auto cct = env.store->ctx();
+ if (ec) {
+ ldout(cct, 1) << "write failed with " << ec.message() << dendl;
+ }
+ }
+
+ public:
+ AsioConnection(RGWProcessEnv& env, tcp::socket&& socket)
+ : env(env), strand(socket.get_io_service()), socket(std::move(socket))
+ {}
+
+ void read() {
+ beast::http::async_read(socket, buf, request, strand.wrap(
+ std::bind(&AsioConnection::on_read, shared_from_this(),
+ beast::asio::placeholders::error)));
+ }
+};
+
class AsioFrontend {
RGWProcessEnv env;
boost::asio::io_service service;
tcp::acceptor acceptor;
tcp::socket peer_socket;
- tcp::endpoint peer_endpoint;
std::vector<std::thread> threads;
Pauser pauser;
acceptor.set_option(tcp::acceptor::reuse_address(true));
acceptor.bind(ep);
acceptor.listen(boost::asio::socket_base::max_connections);
- acceptor.async_accept(peer_socket, peer_endpoint,
+ acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
return accept(ec);
});
throw ec;
}
auto socket = std::move(peer_socket);
- auto endpoint = std::move(peer_endpoint);
- acceptor.async_accept(peer_socket, peer_endpoint,
+ acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
return accept(ec);
});
- ldout(ctx(), 4) << "accept " << endpoint << dendl;
-
- RGWRequest req{env.store->get_new_req_id()};
- RGWAsioClientIO client{std::move(socket), std::move(endpoint)};
- process_request(env.store, env.rest, &req, &client, env.olog);
+ std::make_shared<AsioConnection>(env, std::move(socket))->read();
}
int AsioFrontend::run()
env.store = store;
ldout(ctx(), 4) << "frontend unpaused" << dendl;
service.reset();
- acceptor.async_accept(peer_socket, peer_endpoint,
+ acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
return accept(ec);
});