option(WITH_SYSTEM_BOOST "require and build with system Boost" OFF)
set(BOOST_COMPONENTS
- container thread system regex random program_options date_time iostreams)
+ container thread system regex random program_options date_time iostreams coroutine context)
if(WITH_MGR)
list(APPEND BOOST_COMPONENTS python)
endif()
target_include_directories(rgw_a PUBLIC
"../Beast/include"
${FCGI_INCLUDE_DIR})
+target_compile_definitions(rgw_a PUBLIC BOOST_COROUTINES_NO_DEPRECATION_WARNING)
target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount_client
cls_log_client cls_statelog_client cls_timeindex_client cls_version_client
cls_replica_log_client cls_user_client ceph-common common_utf8 global
- ${CURL_LIBRARIES}
+ ${CURL_LIBRARIES} ${Boost_LIBRARIES}
${EXPAT_LIBRARIES}
${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS})
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/write.hpp>
+#include <beast/http/read.hpp>
#include "rgw_asio_client.h"
#undef dout_prefix
#define dout_prefix (*_dout << "asio: ")
+using namespace rgw::asio;
-RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket,
- request_type&& request)
- : socket(std::move(socket)),
- request(std::move(request)),
- txbuf(*this) {
+ClientIO::ClientIO(tcp::socket& socket,
+ parser_type& parser,
+ beast::flat_streambuf& buffer)
+ : socket(socket), parser(parser), buffer(buffer), txbuf(*this)
+{
}
-RGWAsioClientIO::~RGWAsioClientIO() = default;
+ClientIO::~ClientIO() = default;
-void RGWAsioClientIO::init_env(CephContext *cct)
+void ClientIO::init_env(CephContext *cct)
{
env.init(cct);
- body_iter = request.body.begin();
- const auto& headers = request.headers;
+ const auto& request = parser.get();
+ const auto& headers = request.fields;
for (auto header = headers.begin(); header != headers.end(); ++header) {
const auto& name = header->name();
const auto& value = header->value();
// TODO: set REMOTE_USER if authenticated
}
-size_t RGWAsioClientIO::write_data(const char* const buf,
- const size_t len)
+size_t ClientIO::write_data(const char* buf, size_t len)
{
boost::system::error_code ec;
auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec);
if (ec) {
derr << "write_data failed: " << ec.message() << dendl;
throw rgw::io::Exception(ec.value(), std::system_category());
- } else {
- /* According to the documentation of boost::asio::write if there is
- * no error (signalised by ec), then bytes == len. We don't need to
- * take care of partial writes in such situation. */
- return bytes;
}
+ /* According to the documentation of boost::asio::write if there is
+ * no error (signalised by ec), then bytes == len. We don't need to
+ * take care of partial writes in such situation. */
+ return bytes;
}
-size_t RGWAsioClientIO::read_data(char* const buf, const size_t max)
+size_t ClientIO::read_data(char* buf, size_t max)
{
- // read data from the body's bufferlist
- auto bytes = std::min<unsigned>(max, body_iter.get_remaining());
- body_iter.copy(bytes, buf);
- return bytes;
+ auto& message = parser.get();
+ auto& body_remaining = message.body;
+ body_remaining = boost::asio::mutable_buffer{buf, max};
+
+ boost::system::error_code ec;
+
+ dout(30) << this << " read_data for " << max << " with "
+ << buffer.size() << " bytes buffered" << dendl;
+
+ while (boost::asio::buffer_size(body_remaining) && !parser.is_complete()) {
+ auto bytes = beast::http::read_some(socket, buffer, parser, ec);
+ buffer.consume(bytes);
+ if (ec == boost::asio::error::connection_reset ||
+ ec == boost::asio::error::eof ||
+ ec == beast::http::error::partial_message) {
+ break;
+ }
+ if (ec) {
+ derr << "failed to read body: " << ec.message() << dendl;
+ throw rgw::io::Exception(ec.value(), std::system_category());
+ }
+ }
+ return max - boost::asio::buffer_size(body_remaining);
}
-size_t RGWAsioClientIO::complete_request()
+size_t ClientIO::complete_request()
{
return 0;
}
-void RGWAsioClientIO::flush()
+void ClientIO::flush()
{
txbuf.pubsync();
}
-size_t RGWAsioClientIO::send_status(const int status,
- const char* const status_name)
+size_t ClientIO::send_status(int status, const char* status_name)
{
static constexpr size_t STATUS_BUF_SIZE = 128;
return txbuf.sputn(statusbuf, statuslen);
}
-size_t RGWAsioClientIO::send_100_continue()
+size_t ClientIO::send_100_continue()
{
const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n";
const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE,
"Date: %a, %d %b %Y %H:%M:%S %Z\r\n", tmp);
}
-size_t RGWAsioClientIO::complete_header()
+size_t ClientIO::complete_header()
{
size_t sent = 0;
return sent;
}
-size_t RGWAsioClientIO::send_header(const boost::string_ref& name,
- const boost::string_ref& value)
+size_t ClientIO::send_header(const boost::string_ref& name,
+ const boost::string_ref& value)
{
static constexpr char HEADER_SEP[] = ": ";
static constexpr char HEADER_END[] = "\r\n";
return sent;
}
-size_t RGWAsioClientIO::send_content_length(const uint64_t len)
+size_t ClientIO::send_content_length(uint64_t len)
{
static constexpr size_t CONLEN_BUF_SIZE = 128;
#define RGW_ASIO_CLIENT_H
#include <boost/asio/ip/tcp.hpp>
-#include <beast/http/body_type.hpp>
-#include <beast/http/concepts.hpp>
-#include <beast/http/message_v1.hpp>
+#include <beast/http/message.hpp>
+#include <beast/http/message_parser.hpp>
+#include <beast/core/flat_streambuf.hpp>
#include "include/assert.h"
#include "rgw_client_io.h"
-// bufferlist to represent the message body
-class RGWBufferlistBody {
- public:
- using value_type = ceph::bufferlist;
+namespace rgw {
+namespace asio {
- class reader;
- class writer;
+/// streaming message body interface
+struct streaming_body {
+ using value_type = boost::asio::mutable_buffer;
- template <bool isRequest, typename Headers>
- using message_type = beast::http::message<isRequest, RGWBufferlistBody,
- Headers>;
-};
+ class reader {
+ value_type& buffer;
+ public:
+ using mutable_buffers_type = boost::asio::mutable_buffers_1;
-class RGWAsioClientIO : public rgw::io::RestfulClient,
- public rgw::io::BuffererSink {
- using tcp = boost::asio::ip::tcp;
- tcp::socket socket;
+ static const bool is_direct{true}; // reads directly into user buffer
- using body_type = RGWBufferlistBody;
- using request_type = beast::http::request_v1<body_type>;
- request_type request;
+ template<bool isRequest, class Fields>
+ explicit reader(beast::http::message<isRequest, streaming_body, Fields>& m)
+ : buffer(m.body)
+ {}
+
+ void init() {}
+ void init(uint64_t content_length) {}
+ void finish() {}
+
+ mutable_buffers_type prepare(size_t n) {
+ n = std::min(n, boost::asio::buffer_size(buffer));
+ auto position = boost::asio::buffer_cast<char*>(buffer);
+ return {position, n};
+ }
- bufferlist::const_iterator body_iter;
+ void commit(size_t n) {
+ buffer = buffer + n;
+ }
+ };
+};
+
+using header_type = beast::http::fields;
+using parser_type = beast::http::message_parser<true, streaming_body, header_type>;
+
+class ClientIO : public io::RestfulClient,
+ public io::BuffererSink {
+ private:
+ using tcp = boost::asio::ip::tcp;
+ tcp::socket& socket;
+ parser_type& parser;
+ beast::flat_streambuf& buffer; //< parse buffer
bool conn_keepalive{false};
bool conn_close{false};
size_t read_data(char *buf, size_t max);
public:
- RGWAsioClientIO(tcp::socket&& socket, request_type&& request);
- ~RGWAsioClientIO() override;
+ ClientIO(tcp::socket& socket, parser_type& parser,
+ beast::flat_streambuf& buffer);
+ ~ClientIO() override;
+
+ bool get_conn_close() const { return conn_close; }
void init_env(CephContext *cct) override;
size_t complete_request() override;
}
};
-// used by beast::http::read() to read the body into a bufferlist
-class RGWBufferlistBody::reader {
- value_type& bl;
- public:
- template<bool isRequest, typename Headers>
- explicit reader(message_type<isRequest, Headers>& m) : bl(m.body) {}
-
- void write(const char* data, size_t size, boost::system::error_code&) {
- bl.append(data, size);
- }
-};
-
-// used by beast::http::write() to write the buffered body
-class RGWBufferlistBody::writer {
- const value_type& bl;
- public:
- template<bool isRequest, typename Headers>
- explicit writer(const message_type<isRequest, Headers>& msg)
- : bl(msg.body) {}
-
- void init(boost::system::error_code& ec) {}
- uint64_t content_length() const { return bl.length(); }
-
- template<typename Write>
- boost::tribool operator()(beast::http::resume_context&&,
- boost::system::error_code&, Write&& write) {
- // translate from bufferlist to a ConstBufferSequence for beast
- std::vector<boost::asio::const_buffer> buffers;
- buffers.reserve(bl.get_num_buffers());
- for (auto& ptr : bl.buffers()) {
- buffers.emplace_back(ptr.c_str(), ptr.length());
- }
- write(buffers);
- return true;
- }
-};
-static_assert(beast::http::is_ReadableBody<RGWBufferlistBody>{},
- "RGWBufferlistBody does not satisfy ReadableBody");
-static_assert(beast::http::is_WritableBody<RGWBufferlistBody>{},
- "RGWBufferlistBody does not satisfy WritableBody");
+} // namespace asio
+} // namespace rgw
#endif // RGW_ASIO_CLIENT_H
#include <vector>
#include <boost/asio.hpp>
-#include <boost/optional.hpp>
+#include <boost/asio/spawn.hpp>
#include <beast/core/placeholders.hpp>
-#include <beast/core/streambuf.hpp>
-#include <beast/http/empty_body.hpp>
-#include <beast/http/parse_error.hpp>
#include <beast/http/read.hpp>
+#include <beast/http/string_body.hpp>
#include <beast/http/write.hpp>
#include "rgw_asio_frontend.h"
using tcp = boost::asio::ip::tcp;
-class AsioConnection : public std::enable_shared_from_this<AsioConnection> {
- RGWProcessEnv& env;
- boost::asio::io_service::strand strand;
- tcp::socket socket;
- tcp::endpoint endpoint;
- beast::streambuf buf;
- beast::http::request_v1<RGWBufferlistBody> request;
+// coroutine to handle a client connection to completion
+static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
+ boost::asio::yield_context yield)
+{
+ auto cct = env.store->ctx();
+ boost::system::error_code ec;
- public:
- void on_read(boost::system::error_code ec) {
- auto cct = env.store->ctx();
+ beast::flat_streambuf buffer{1024};
+
+ // read messages from the socket until eof
+ for (;;) {
+ // parse the header
+ rgw::asio::parser_type parser;
+ do {
+ auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
+ buffer.consume(bytes);
+ } while (!ec && !parser.got_header());
+
+ if (ec == boost::asio::error::connection_reset ||
+ ec == boost::asio::error::eof) {
+ return;
+ }
if (ec) {
- if (ec.category() == beast::http::get_parse_error_category()) {
- ldout(cct, 1) << "parse failed: " << ec.message() << dendl;
- } else {
- ldout(cct, 1) << "read failed: " << ec.message() << dendl;
- }
- write_bad_request();
+ auto& message = parser.get();
+ ldout(cct, 1) << "read failed: " << ec.message() << dendl;
+ ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
+ beast::http::response<beast::http::string_body> response;
+ response.status = 400;
+ response.reason = "Bad Request";
+ response.version = message.version == 10 ? 10 : 11;
+ beast::http::prepare(response);
+ beast::http::async_write(socket, std::move(response), yield[ec]);
+ // ignore ec
return;
}
+
+ // process the request
RGWRequest req{env.store->get_new_req_id()};
- RGWAsioClientIO real_client{std::move(socket), std::move(request)};
+
+ rgw::asio::ClientIO real_client{socket, parser, buffer};
+
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(
rgw::io::add_chunking(
RGWRestfulIO client(&real_client_io);
process_request(env.store, env.rest, &req, env.uri_prefix,
*env.auth_registry, &client, env.olog);
- }
-
- void write_bad_request() {
- beast::http::response_v1<beast::http::empty_body> response;
- response.status = 400;
- response.reason = "Bad Request";
- /* If the request is so terribly malformed that we can't extract even
- * the protocol version, we will use HTTP/1.1 as a fallback. */
- response.version = request.version ? request.version : 11;
- beast::http::prepare(response);
- beast::http::async_write(socket, std::move(response),
- std::bind(&AsioConnection::on_write,
- shared_from_this(),
- beast::asio::placeholders::error));
- }
- void on_write(boost::system::error_code ec) {
- auto cct = env.store->ctx();
- if (ec) {
- ldout(cct, 1) << "write failed: " << ec.message() << dendl;
+ if (real_client.get_conn_close()) {
+ return;
}
}
-
- public:
- AsioConnection(RGWProcessEnv& env, tcp::socket&& socket)
- : env(env), strand(socket.get_io_service()), socket(std::move(socket))
- {}
-
- void read() {
- beast::http::async_read(socket, buf, request, strand.wrap(
- std::bind(&AsioConnection::on_read, shared_from_this(),
- beast::asio::placeholders::error)));
- }
-};
+}
class AsioFrontend {
RGWProcessEnv env;
auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
- acceptor.open(ep.protocol());
+ boost::system::error_code ec;
+ acceptor.open(ep.protocol(), ec);
+ if (ec) {
+ lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
+ return -ec.value();
+ }
acceptor.set_option(tcp::acceptor::reuse_address(true));
- acceptor.bind(ep);
+ acceptor.bind(ep, ec);
+ if (ec) {
+ lderr(ctx()) << "failed to bind address " << ep <<
+ ": " << ec.message() << dendl;
+ return -ec.value();
+ }
acceptor.listen(boost::asio::socket_base::max_connections);
acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
throw ec;
}
auto socket = std::move(peer_socket);
-
+ // spawn a coroutine to handle the connection
+ boost::asio::spawn(service,
+ [&] (boost::asio::yield_context yield) {
+ handle_connection(env, std::move(socket), yield);
+ });
acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
return accept(ec);
});
-
- std::make_shared<AsioConnection>(env, std::move(socket))->read();
}
int AsioFrontend::run()