#ifdef WITH_RADOSGW_BEAST_OPENSSL
#include <boost/asio/ssl.hpp>
+#include <boost/beast/ssl/ssl_stream.hpp>
#include "services/svc_config_key.h"
#include "services/svc_zone.h"
Stream& stream;
spawn::yield_context yield;
parse_buffer& buffer;
+ ceph::timespan request_timeout;
public:
StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
spawn::yield_context yield,
parse_buffer& buffer, bool is_ssl,
const tcp::endpoint& local_endpoint,
- const tcp::endpoint& remote_endpoint)
+ const tcp::endpoint& remote_endpoint,
+ ceph::timespan request_timeout)
: ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
- cct(cct), stream(stream), yield(yield), buffer(buffer)
+ cct(cct), stream(stream), yield(yield), buffer(buffer), request_timeout(request_timeout)
{}
size_t write_data(const char* buf, size_t len) override {
boost::system::error_code ec;
+ auto& timeout = get_lowest_layer(stream);
+ if (request_timeout.count()) {
+ timeout.expires_after(request_timeout);
+ }
auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
yield[ec]);
if (ec) {
ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
if (ec==boost::asio::error::broken_pipe) {
boost::system::error_code ec_ignored;
- stream.lowest_layer().shutdown(tcp::socket::shutdown_both, ec_ignored);
+ timeout.socket().shutdown(tcp::socket::shutdown_both, ec_ignored);
}
throw rgw::io::Exception(ec.value(), std::system_category());
}
}
size_t recv_body(char* buf, size_t max) override {
+ auto& timeout = get_lowest_layer(stream);
auto& message = parser.get();
auto& body_remaining = message.body();
body_remaining.data = buf;
while (body_remaining.size && !parser.is_done()) {
boost::system::error_code ec;
+ if (request_timeout.count()) {
+ timeout.expires_after(request_timeout);
+ }
http::async_read_some(stream, buffer, parser, yield[ec]);
if (ec == http::error::need_buffer) {
break;
SharedMutex& pause_mutex,
rgw::dmclock::Scheduler *scheduler,
boost::system::error_code& ec,
- spawn::yield_context yield)
+ spawn::yield_context yield,
+ ceph::timespan request_timeout)
{
// limit header to 4k, since we read it all into a single flat_buffer
static constexpr size_t header_limit = 4096;
rgw::asio::parser_type parser;
parser.header_limit(header_limit);
parser.body_limit(body_limit);
-
+ auto& timeout = get_lowest_layer(stream);
+ if (request_timeout.count()) {
+ timeout.expires_after(request_timeout);
+ }
// parse the header
http::async_read_header(stream, buffer, parser, yield[ec]);
if (ec == boost::asio::error::connection_reset ||
response.result(http::status::bad_request);
response.version(message.version() == 10 ? 10 : 11);
response.prepare_payload();
+ if (request_timeout.count()) {
+ timeout.expires_after(request_timeout);
+ }
http::async_write(stream, response, yield[ec]);
if (ec) {
ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
// process the request
RGWRequest req{env.store->getRados()->get_new_req_id()};
- auto& socket = stream.lowest_layer();
+ auto& socket = get_lowest_layer(stream).socket();
const auto& remote_endpoint = socket.remote_endpoint(ec);
if (ec) {
ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
socket.local_endpoint(),
- remote_endpoint};
+ remote_endpoint,request_timeout};
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(cct,
body.size = discard_buffer.size();
body.data = discard_buffer.data();
+ if (request_timeout.count()) {
+ timeout.expires_after(request_timeout);
+ }
http::async_read_some(stream, buffer, parser, yield[ec]);
if (ec == http::error::need_buffer) {
continue;
RGWProcessEnv env;
RGWFrontendConfig* conf;
boost::asio::io_context context;
+ ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
#ifdef WITH_RADOSGW_BEAST_OPENSSL
boost::optional<ssl::context> ssl_context;
int get_config_key_val(string name,
boost::system::error_code ec;
auto& config = conf->get_config_map();
+// Setting global timeout
+ auto timeout = config.find("request_timeout_ms");
+ if (timeout != config.end()) {
+ auto timeout_number = ceph::parse<uint64_t>(timeout->second.data());
+ if (timeout_number) {
+ request_timeout = std::chrono::milliseconds(*timeout_number);
+ } else {
+ lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
+ << timeout->second.data() << " setting it to the default value: "
+ << REQUEST_TIMEOUT << dendl;
+ }
+ }
#ifdef WITH_RADOSGW_BEAST_OPENSSL
int r = init_ssl();
if (r < 0) {
[this, &l] (boost::system::error_code ec) {
accept(l, ec);
});
-
+
+ boost::beast::tcp_stream stream(std::move(socket));
// spawn a coroutine to handle the connection
#ifdef WITH_RADOSGW_BEAST_OPENSSL
if (l.use_ssl) {
spawn::spawn(context,
- [this, s=std::move(socket)] (spawn::yield_context yield) mutable {
- Connection conn{s};
+ [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+ Connection conn{s.socket()};
auto c = connections.add(conn);
- // wrap the socket in an ssl stream
- ssl::stream<tcp::socket&> stream{s, *ssl_context};
+ // wrap the tcp_stream in an ssl stream
+ boost::beast::ssl_stream<boost::beast::tcp_stream&> stream{s, *ssl_context};
auto buffer = std::make_unique<parse_buffer>();
// do ssl handshake
boost::system::error_code ec;
+ if (request_timeout.count()) {
+ get_lowest_layer(stream).expires_after(request_timeout);
+ }
auto bytes = stream.async_handshake(ssl::stream_base::server,
buffer->data(), yield[ec]);
if (ec) {
}
buffer->consume(bytes);
handle_connection(context, env, stream, *buffer, true, pause_mutex,
- scheduler.get(), ec, yield);
+ scheduler.get(), ec, yield, request_timeout);
if (!ec) {
// ssl shutdown (ignoring errors)
stream.async_shutdown(yield[ec]);
}
- s.shutdown(tcp::socket::shutdown_both, ec);
+ s.socket().shutdown(tcp::socket::shutdown_both, ec);
}, make_stack_allocator());
} else {
#else
{
#endif // WITH_RADOSGW_BEAST_OPENSSL
spawn::spawn(context,
- [this, s=std::move(socket)] (spawn::yield_context yield) mutable {
- Connection conn{s};
+ [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+ Connection conn{s.socket()};
auto c = connections.add(conn);
auto buffer = std::make_unique<parse_buffer>();
boost::system::error_code ec;
handle_connection(context, env, s, *buffer, false, pause_mutex,
- scheduler.get(), ec, yield);
- s.shutdown(tcp::socket::shutdown_both, ec);
+ scheduler.get(), ec, yield, request_timeout);
+ s.socket().shutdown(tcp::socket::shutdown_both, ec);
}, make_stack_allocator());
}
}