using namespace rgw::asio;
-ClientIO::ClientIO(tcp::socket& socket,
- parser_type& parser,
- beast::flat_buffer& buffer)
- : socket(socket), parser(parser), buffer(buffer), txbuf(*this)
+ClientIO::ClientIO(parser_type& parser,
+ const endpoint_type& local_endpoint,
+ const endpoint_type& remote_endpoint)
+ : parser(parser),
+ local_endpoint(local_endpoint),
+ remote_endpoint(remote_endpoint),
+ txbuf(*this)
{
}
env.set("SCRIPT_URI", url.to_string()); /* FIXME */
char port_buf[16];
- snprintf(port_buf, sizeof(port_buf), "%d", socket.local_endpoint().port());
+ snprintf(port_buf, sizeof(port_buf), "%d", local_endpoint.port());
env.set("SERVER_PORT", port_buf);
- env.set("REMOTE_ADDR", socket.remote_endpoint().address().to_string());
+ env.set("REMOTE_ADDR", remote_endpoint.address().to_string());
// TODO: set SERVER_PORT_SECURE if using ssl
// TODO: set REMOTE_USER if authenticated
return 0;
}
-size_t ClientIO::write_data(const char* buf, size_t len)
-{
- boost::system::error_code ec;
- auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec);
- if (ec) {
- derr << "write_data failed: " << ec.message() << dendl;
- throw rgw::io::Exception(ec.value(), std::system_category());
- }
- /* According to the documentation of boost::asio::write if there is
- * no error (signalised by ec), then bytes == len. We don't need to
- * take care of partial writes in such situation. */
- return bytes;
-}
-
-size_t ClientIO::read_data(char* buf, size_t max)
-{
- auto& message = parser.get();
- auto& body_remaining = message.body();
- body_remaining.data = buf;
- body_remaining.size = max;
-
- dout(30) << this << " read_data for " << max << " with "
- << buffer.size() << " bytes buffered" << dendl;
-
- while (body_remaining.size && !parser.is_done()) {
- boost::system::error_code ec;
- beast::http::read_some(socket, buffer, parser, ec);
- if (ec == beast::http::error::partial_message ||
- ec == beast::http::error::need_buffer) {
- break;
- }
- if (ec) {
- derr << "failed to read body: " << ec.message() << dendl;
- throw rgw::io::Exception(ec.value(), std::system_category());
- }
- }
- return max - body_remaining.size;
-}
-
size_t ClientIO::complete_request()
{
perfcounter->inc(l_rgw_qlen, -1);
class ClientIO : public io::RestfulClient,
public io::BuffererSink {
- private:
- using tcp = boost::asio::ip::tcp;
- tcp::socket& socket;
+ protected:
parser_type& parser;
- beast::flat_buffer& buffer; //< parse buffer
+ private:
+ using endpoint_type = boost::asio::ip::tcp::endpoint;
+ endpoint_type local_endpoint;
+ endpoint_type remote_endpoint;
RGWEnv env;
rgw::io::StaticOutputBufferer<> txbuf;
- size_t write_data(const char *buf, size_t len) override;
- size_t read_data(char *buf, size_t max);
-
public:
- ClientIO(tcp::socket& socket, parser_type& parser,
- beast::flat_buffer& buffer);
+ ClientIO(parser_type& parser,
+ const endpoint_type& local_endpoint,
+ const endpoint_type& remote_endpoint);
~ClientIO() override;
int init_env(CephContext *cct) override;
size_t send_content_length(uint64_t len) override;
size_t complete_header() override;
- size_t recv_body(char* buf, size_t max) override {
- return read_data(buf, max);
- }
-
size_t send_body(const char* buf, size_t len) override {
return write_data(buf, len);
}
using tcp = boost::asio::ip::tcp;
namespace beast = boost::beast;
+class StreamIO : public rgw::asio::ClientIO {
+ tcp::socket& stream;
+ beast::flat_buffer& buffer;
+ public:
+ StreamIO(tcp::socket& stream, rgw::asio::parser_type& parser,
+ beast::flat_buffer& buffer,
+ const tcp::endpoint& local_endpoint,
+ const tcp::endpoint& remote_endpoint)
+ : ClientIO(parser, local_endpoint, remote_endpoint),
+ stream(stream), buffer(buffer)
+ {}
+
+ size_t write_data(const char* buf, size_t len) override {
+ boost::system::error_code ec;
+ auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec);
+ if (ec) {
+ derr << "write_data failed: " << ec.message() << dendl;
+ throw rgw::io::Exception(ec.value(), std::system_category());
+ }
+ return bytes;
+ }
+
+ size_t recv_body(char* buf, size_t max) override {
+ auto& message = parser.get();
+ auto& body_remaining = message.body();
+ body_remaining.data = buf;
+ body_remaining.size = max;
+
+ while (body_remaining.size && !parser.is_done()) {
+ boost::system::error_code ec;
+ beast::http::read_some(stream, buffer, parser, ec);
+ if (ec == beast::http::error::partial_message ||
+ ec == beast::http::error::need_buffer) {
+ break;
+ }
+ if (ec) {
+ derr << "failed to read body: " << ec.message() << dendl;
+ throw rgw::io::Exception(ec.value(), std::system_category());
+ }
+ }
+ return max - body_remaining.size;
+ }
+};
+
void handle_connection(RGWProcessEnv& env, tcp::socket& socket,
boost::asio::yield_context yield)
{
// process the request
RGWRequest req{env.store->get_new_req_id()};
- rgw::asio::ClientIO real_client{socket, parser, buffer};
+ StreamIO real_client{socket, parser, buffer,
+ socket.local_endpoint(),
+ socket.remote_endpoint()};
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(cct,