]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/beast: replace beast::tcp_stream with manual timeouts
authorCasey Bodley <cbodley@redhat.com>
Sat, 30 Oct 2021 23:47:02 +0000 (19:47 -0400)
committerCory Snyder <csnyder@iland.com>
Tue, 16 Nov 2021 14:59:02 +0000 (09:59 -0500)
remove the beast::tcp_stream wrapper from the socket, and track timeouts
manually with a timeout_timer. this timer uses ceph's coarse_mono_clock
which is cheaper to sample than std::chrono::steady_clock

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_asio_frontend.cc
src/rgw/rgw_asio_frontend_timer.h [new file with mode: 0644]

index 1cf8bf74d94295a80ef58896c855fc16fe9e9411..cd2950da6152de890d0460486e78420c82b4a054 100644 (file)
@@ -20,7 +20,6 @@
 
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
 #include <boost/asio/ssl.hpp>
-#include <boost/beast/ssl/ssl_stream.hpp>
 #endif
 
 #include "common/split.h"
@@ -30,6 +29,7 @@
 
 #include "rgw_zone.h"
 
+#include "rgw_asio_frontend_timer.h"
 #include "rgw_dmclock_async_scheduler.h"
 
 #define dout_subsys ceph_subsys_rgw
@@ -48,6 +48,9 @@ using executor_type = boost::asio::io_context::executor_type;
 using tcp_socket = boost::asio::basic_stream_socket<tcp, executor_type>;
 using tcp_stream = boost::beast::basic_stream<tcp, executor_type>;
 
+using timeout_timer = rgw::basic_timeout_timer<ceph::coarse_mono_clock,
+      executor_type>;
+
 using parse_buffer = boost::beast::flat_static_buffer<65536>;
 
 // use mmap/mprotect to allocate 512k coroutine stacks
@@ -59,32 +62,35 @@ template <typename Stream>
 class StreamIO : public rgw::asio::ClientIO {
   CephContext* const cct;
   Stream& stream;
+  timeout_timer& timeout;
   yield_context yield;
   parse_buffer& buffer;
   ceph::timespan request_timeout;
  public:
-  StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
-           yield_context yield, parse_buffer& buffer, bool is_ssl,
+  StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
+           rgw::asio::parser_type& parser, yield_context yield,
+           parse_buffer& buffer, bool is_ssl,
            const tcp::endpoint& local_endpoint,
            const tcp::endpoint& remote_endpoint,
            ceph::timespan request_timeout)
       : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
-        cct(cct), stream(stream), yield(yield), buffer(buffer), request_timeout(request_timeout)
+        cct(cct), stream(stream), timeout(timeout), yield(yield),
+        buffer(buffer), request_timeout(request_timeout)
   {}
 
   size_t write_data(const char* buf, size_t len) override {
     boost::system::error_code ec;
-    auto& timeout = get_lowest_layer(stream);
     if (request_timeout.count()) {
-      timeout.expires_after(request_timeout);
+      timeout.expires_after(stream.lowest_layer(), request_timeout);
     }
     auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
                                           yield[ec]);
+    timeout.cancel();
     if (ec) {
       ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
-      if (ec==boost::asio::error::broken_pipe) {
+      if (ec == boost::asio::error::broken_pipe) {
         boost::system::error_code ec_ignored;
-        timeout.socket().shutdown(tcp_socket::shutdown_both, ec_ignored);
+        stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored);
       }
       throw rgw::io::Exception(ec.value(), std::system_category());
     }
@@ -92,7 +98,6 @@ class StreamIO : public rgw::asio::ClientIO {
   }
 
   size_t recv_body(char* buf, size_t max) override {
-    auto& timeout = get_lowest_layer(stream);
     auto& message = parser.get();
     auto& body_remaining = message.body();
     body_remaining.data = buf;
@@ -101,9 +106,10 @@ class StreamIO : public rgw::asio::ClientIO {
     while (body_remaining.size && !parser.is_done()) {
       boost::system::error_code ec;
       if (request_timeout.count()) {
-        timeout.expires_after(request_timeout);
+        timeout.expires_after(stream.lowest_layer(), request_timeout);
       }
       http::async_read_some(stream, buffer, parser, yield[ec]);
+      timeout.cancel();
       if (ec == http::error::need_buffer) {
         break;
       }
@@ -149,6 +155,7 @@ using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_t
 template <typename Stream>
 void handle_connection(boost::asio::io_context& context,
                        RGWProcessEnv& env, Stream& stream,
+                       timeout_timer& timeout,
                        parse_buffer& buffer, bool is_ssl,
                        SharedMutex& pause_mutex,
                        rgw::dmclock::Scheduler *scheduler,
@@ -169,12 +176,12 @@ void handle_connection(boost::asio::io_context& context,
     rgw::asio::parser_type parser;
     parser.header_limit(header_limit);
     parser.body_limit(body_limit);
-    auto& timeout = get_lowest_layer(stream);
     if (request_timeout.count()) {
-      timeout.expires_after(request_timeout);
+      timeout.expires_after(stream.lowest_layer(), request_timeout);
     }
     // parse the header
     http::async_read_header(stream, buffer, parser, yield[ec]);
+    timeout.cancel();
     if (ec == boost::asio::error::connection_reset ||
         ec == boost::asio::error::bad_descriptor ||
         ec == boost::asio::error::operation_aborted ||
@@ -193,9 +200,10 @@ void handle_connection(boost::asio::io_context& context,
       response.version(message.version() == 10 ? 10 : 11);
       response.prepare_payload();
       if (request_timeout.count()) {
-        timeout.expires_after(request_timeout);
+        timeout.expires_after(stream.lowest_layer(), request_timeout);
       }
       http::async_write(stream, response, yield[ec]);
+      timeout.cancel();
       if (ec) {
         ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
       }
@@ -215,16 +223,16 @@ void handle_connection(boost::asio::io_context& context,
       // process the request
       RGWRequest req{env.store->getRados()->get_new_req_id()};
 
-      auto& socket = get_lowest_layer(stream).socket();
+      auto& socket = stream.lowest_layer();
       const auto& remote_endpoint = socket.remote_endpoint(ec);
       if (ec) {
         ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
         return;
       }
 
-      StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
-                           socket.local_endpoint(),
-                           remote_endpoint,request_timeout};
+      StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
+                           is_ssl, socket.local_endpoint(),
+                           remote_endpoint, request_timeout};
 
       auto real_client_io = rgw::io::add_reordering(
                               rgw::io::add_buffering(cct,
@@ -267,9 +275,10 @@ void handle_connection(boost::asio::io_context& context,
       body.data = discard_buffer.data();
 
       if (request_timeout.count()) {
-        timeout.expires_after(request_timeout);
+        timeout.expires_after(stream.lowest_layer(), request_timeout);
       }
       http::async_read_some(stream, buffer, parser, yield[ec]);
+      timeout.cancel();
       if (ec == http::error::need_buffer) {
         continue;
       }
@@ -915,44 +924,44 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
     ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
     return;
   }
-  auto socket = std::move(l.socket);
-  tcp::no_delay options(l.use_nodelay);
-  socket.set_option(options,ec);
+  auto stream = std::move(l.socket);
+  stream.set_option(tcp::no_delay(l.use_nodelay), ec);
   l.acceptor.async_accept(l.socket,
                           [this, &l] (boost::system::error_code ec) {
                             accept(l, ec);
                           });
   
-  tcp_stream stream(std::move(socket));
   // spawn a coroutine to handle the connection
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   if (l.use_ssl) {
     spawn::spawn(context,
       [this, s=std::move(stream)] (yield_context yield) mutable {
-        Connection conn{s.socket()};
+        Connection conn{s};
         auto c = connections.add(conn);
-        // wrap the tcp_stream in an ssl stream
-        boost::beast::ssl_stream<tcp_stream&> stream{s, *ssl_context};
+        // wrap the tcp stream in an ssl stream
+        boost::asio::ssl::stream<tcp_socket&> stream{s, *ssl_context};
+        auto timeout = timeout_timer{context.get_executor()};
         auto buffer = std::make_unique<parse_buffer>();
         // do ssl handshake
         boost::system::error_code ec;
         if (request_timeout.count()) {
-          get_lowest_layer(stream).expires_after(request_timeout);
+          timeout.expires_after(s, request_timeout);
         }
         auto bytes = stream.async_handshake(ssl::stream_base::server,
                                             buffer->data(), yield[ec]);
+        timeout.cancel();
         if (ec) {
           ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
           return;
         }
         buffer->consume(bytes);
-        handle_connection(context, env, stream, *buffer, true, pause_mutex,
+        handle_connection(context, env, stream, timeout, *buffer, true, pause_mutex,
                           scheduler.get(), ec, yield, request_timeout);
         if (!ec) {
           // ssl shutdown (ignoring errors)
           stream.async_shutdown(yield[ec]);
         }
-        s.socket().shutdown(tcp_socket::shutdown_both, ec);
+        s.shutdown(tcp::socket::shutdown_both, ec);
       }, make_stack_allocator());
   } else {
 #else
@@ -960,13 +969,14 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
 #endif // WITH_RADOSGW_BEAST_OPENSSL
     spawn::spawn(context,
       [this, s=std::move(stream)] (yield_context yield) mutable {
-        Connection conn{s.socket()};
+        Connection conn{s};
         auto c = connections.add(conn);
+        auto timeout = timeout_timer{context.get_executor()};
         auto buffer = std::make_unique<parse_buffer>();
         boost::system::error_code ec;
-        handle_connection(context, env, s, *buffer, false, pause_mutex,
+        handle_connection(context, env, s, timeout, *buffer, false, pause_mutex,
                           scheduler.get(), ec, yield, request_timeout);
-        s.socket().shutdown(tcp_socket::shutdown_both, ec);
+        s.shutdown(tcp_socket::shutdown_both, ec);
       }, make_stack_allocator());
   }
 }
diff --git a/src/rgw/rgw_asio_frontend_timer.h b/src/rgw/rgw_asio_frontend_timer.h
new file mode 100644 (file)
index 0000000..4fc81ee
--- /dev/null
@@ -0,0 +1,53 @@
+#pragma once
+
+#include <boost/asio/basic_waitable_timer.hpp>
+
+#include "common/ceph_time.h"
+
+namespace rgw {
+
+// a WaitHandler that closes a stream if the timeout expires
+template <typename Stream>
+struct timeout_handler {
+  Stream* stream;
+
+  explicit timeout_handler(Stream* stream) noexcept : stream(stream) {}
+
+  void operator()(boost::system::error_code ec) {
+    if (!ec) { // wait was not canceled
+      boost::system::error_code ec_ignored;
+      stream->close(ec_ignored);
+    }
+  }
+};
+
+// a timeout timer for stream operations
+template <typename Clock, typename Executor>
+class basic_timeout_timer {
+ public:
+  using clock_type = Clock;
+  using duration = typename clock_type::duration;
+  using executor_type = Executor;
+
+  explicit basic_timeout_timer(const executor_type& ex) : timer(ex) {}
+
+  basic_timeout_timer(const basic_timeout_timer&) = delete;
+  basic_timeout_timer& operator=(const basic_timeout_timer&) = delete;
+
+  template <typename Stream>
+  void expires_after(Stream& stream, duration dur) {
+    timer.expires_after(dur);
+    timer.async_wait(timeout_handler{&stream});
+  }
+
+  void cancel() {
+    timer.cancel();
+  }
+
+ private:
+  using Timer = boost::asio::basic_waitable_timer<clock_type,
+        boost::asio::wait_traits<clock_type>, executor_type>;
+  Timer timer;
+};
+
+} // namespace rgw