]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: beast frontend closes connections on stop 21271/head
authorCasey Bodley <cbodley@redhat.com>
Sat, 31 Mar 2018 03:31:49 +0000 (23:31 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 7 Sep 2018 17:11:36 +0000 (13:11 -0400)
the strategy for stop relies on the fact that process_request() is
completely synchronous, so that io_context.stop() would still complete
each request and clean up properly

to tolerate an asynchronous process_request(), we instead need to drain
all outstanding work on the io_context so that io_context.run() can
return control natually to all of the worker threads. that would allow
us to suspend our coroutine in the middle of process_request(), and
still guarantee that process_request() will resume and run to completion
before the worker threads exit

each connected socket also counts as outstanding work, and needs to be
closed in order to drain the io_context. each connection now adds itself
to a connection list so that stop() can close its socket

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_asio_frontend.cc

index 9525c2f8c7385fe7e537e273b0f80e21c4f2cac5..23a578c472c2287e84c5e79ab1d9c05becbf3d02 100644 (file)
@@ -7,6 +7,7 @@
 
 #include <boost/asio.hpp>
 #include <boost/asio/spawn.hpp>
+#include <boost/intrusive/list.hpp>
 
 #include "common/async/shared_mutex.h"
 
@@ -98,10 +99,13 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
     // parse the header
     beast::http::async_read_header(stream, buffer, parser, yield[ec]);
     if (ec == boost::asio::error::connection_reset ||
+        ec == boost::asio::error::bad_descriptor ||
+        ec == boost::asio::error::operation_aborted ||
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
         ec == ssl::error::stream_truncated ||
 #endif
         ec == beast::http::error::end_of_stream) {
+      ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
       return;
     }
     if (ec) {
@@ -172,6 +176,44 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
   }
 }
 
+struct Connection : boost::intrusive::list_base_hook<> {
+  tcp::socket& socket;
+  Connection(tcp::socket& socket) : socket(socket) {}
+};
+
+class ConnectionList {
+  using List = boost::intrusive::list<Connection>;
+  List connections;
+  std::mutex mutex;
+
+  void remove(Connection& c) {
+    std::lock_guard lock{mutex};
+    if (c.is_linked()) {
+      connections.erase(List::s_iterator_to(c));
+    }
+  }
+ public:
+  class Guard {
+    ConnectionList *list;
+    Connection *conn;
+   public:
+    Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
+    ~Guard() { list->remove(*conn); }
+  };
+  [[nodiscard]] Guard add(Connection& conn) {
+    std::lock_guard lock{mutex};
+    connections.push_back(conn);
+    return Guard{this, &conn};
+  }
+  void close(boost::system::error_code& ec) {
+    std::lock_guard lock{mutex};
+    for (auto& conn : connections) {
+      conn.socket.close(ec);
+    }
+    connections.clear();
+  }
+};
+
 class AsioFrontend {
   RGWProcessEnv env;
   RGWFrontendConfig* conf;
@@ -193,6 +235,8 @@ class AsioFrontend {
   };
   std::vector<Listener> listeners;
 
+  ConnectionList connections;
+
   // work guard to keep run() threads busy while listeners are paused
   using Executor = boost::asio::io_context::executor_type;
   std::optional<boost::asio::executor_work_guard<Executor>> work;
@@ -411,6 +455,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
   if (l.use_ssl) {
     boost::asio::spawn(context,
       [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+        Connection conn{s};
+        auto c = connections.add(conn);
         // wrap the socket in an ssl stream
         ssl::stream<tcp::socket&> stream{s, *ssl_context};
         beast::flat_buffer buffer;
@@ -436,6 +482,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
 #endif // WITH_RADOSGW_BEAST_OPENSSL
     boost::asio::spawn(context,
       [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+        Connection conn{s};
+        auto c = connections.add(conn);
         beast::flat_buffer buffer;
         boost::system::error_code ec;
         handle_connection(env, s, buffer, false, pause_mutex, ec, yield);
@@ -476,9 +524,9 @@ void AsioFrontend::stop()
   for (auto& listener : listeners) {
     listener.acceptor.close(ec);
   }
-
-  // unblock the run() threads
-  context.stop(); // XXX: kill connections instead
+  // close all connections
+  connections.close(ec);
+  pause_mutex.cancel();
 }
 
 void AsioFrontend::join()