]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: beast frontend uses async SharedMutex for pause
authorCasey Bodley <cbodley@redhat.com>
Mon, 19 Mar 2018 20:49:38 +0000 (16:49 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 7 Sep 2018 17:11:36 +0000 (13:11 -0400)
the strategy for pause relied on stopping the io_context and waiting for
io_context.run() to return control to all of the worker threads. this
relies on the fact that process_request() is completely synchronous (so
considered a single unit of work in the io_context) - otherwise, pause
could complete in the middle of a call to process_request(), and destroy
the RGWRados instance while it's still in use

calling io_context.stop() to pause the worker threads also assumes that
no other work will be scheduled on these threads

to decouple pause from worker threads, handle_connection() now uses an
async shared mutex to synchronize with pause/unpause

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_asio_frontend.cc

index f1eb5a1f1b969872c2330173522d9f33afc3d30e..9525c2f8c7385fe7e537e273b0f80e21c4f2cac5 100644 (file)
@@ -2,14 +2,14 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <atomic>
-#include <condition_variable>
-#include <mutex>
 #include <thread>
 #include <vector>
 
 #include <boost/asio.hpp>
 #include <boost/asio/spawn.hpp>
 
+#include "common/async/shared_mutex.h"
+
 #include "rgw_asio_client.h"
 #include "rgw_asio_frontend.h"
 
 
 namespace {
 
-class Pauser {
-  std::mutex mutex;
-  std::condition_variable cond_ready; // signaled on ready==true
-  std::condition_variable cond_paused; // signaled on waiters==thread_count
-  bool ready{false};
-  int waiters{0};
- public:
-  template <typename Func>
-  void pause(int thread_count, Func&& func);
-  void unpause();
-  void wait();
-};
-
-template <typename Func>
-void Pauser::pause(int thread_count, Func&& func)
-{
-  std::unique_lock<std::mutex> lock(mutex);
-  ready = false;
-  lock.unlock();
-
-  func();
-
-  // wait for all threads to pause
-  lock.lock();
-  cond_paused.wait(lock, [=] { return waiters == thread_count; });
-}
-
-void Pauser::unpause()
-{
-  std::lock_guard<std::mutex> lock(mutex);
-  ready = true;
-  cond_ready.notify_all();
-}
-
-void Pauser::wait()
-{
-  std::unique_lock<std::mutex> lock(mutex);
-  ++waiters;
-  cond_paused.notify_one(); // notify pause() that we're waiting
-  cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
-  --waiters;
-}
-
 using tcp = boost::asio::ip::tcp;
 namespace beast = boost::beast;
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
@@ -115,9 +72,12 @@ class StreamIO : public rgw::asio::ClientIO {
   }
 };
 
+using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
+
 template <typename Stream>
 void handle_connection(RGWProcessEnv& env, Stream& stream,
                        beast::flat_buffer& buffer, bool is_ssl,
+                       SharedMutex& pause_mutex,
                        boost::system::error_code& ec,
                        boost::asio::yield_context yield)
 {
@@ -159,22 +119,32 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
       return;
     }
 
-    // process the request
-    RGWRequest req{env.store->get_new_req_id()};
-
-    auto& socket = stream.lowest_layer();
-    StreamIO real_client{stream, parser, buffer, is_ssl,
-                         socket.local_endpoint(),
-                         socket.remote_endpoint()};
+    {
+      auto lock = pause_mutex.async_lock_shared(yield[ec]);
+      if (ec == boost::asio::error::operation_aborted) {
+        return;
+      } else if (ec) {
+        ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
+        return;
+      }
 
-    auto real_client_io = rgw::io::add_reordering(
-                            rgw::io::add_buffering(cct,
-                              rgw::io::add_chunking(
-                                rgw::io::add_conlen_controlling(
-                                  &real_client))));
-    RGWRestfulIO client(cct, &real_client_io);
-    process_request(env.store, env.rest, &req, env.uri_prefix,
-                    *env.auth_registry, &client, env.olog);
+      // process the request
+      RGWRequest req{env.store->get_new_req_id()};
+
+      auto& socket = stream.lowest_layer();
+      StreamIO real_client{stream, parser, buffer, is_ssl,
+                           socket.local_endpoint(),
+                           socket.remote_endpoint()};
+
+      auto real_client_io = rgw::io::add_reordering(
+                              rgw::io::add_buffering(cct,
+                                rgw::io::add_chunking(
+                                  rgw::io::add_conlen_controlling(
+                                    &real_client))));
+      RGWRestfulIO client(cct, &real_client_io);
+      process_request(env.store, env.rest, &req, env.uri_prefix,
+                      *env.auth_registry, &client, env.olog);
+    }
 
     if (!parser.keep_alive()) {
       return;
@@ -205,11 +175,12 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
 class AsioFrontend {
   RGWProcessEnv env;
   RGWFrontendConfig* conf;
-  boost::asio::io_service service;
+  boost::asio::io_context context;
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   boost::optional<ssl::context> ssl_context;
   int init_ssl();
 #endif
+  SharedMutex pause_mutex;
 
   struct Listener {
     tcp::endpoint endpoint;
@@ -217,13 +188,16 @@ class AsioFrontend {
     tcp::socket socket;
     bool use_ssl = false;
 
-    explicit Listener(boost::asio::io_service& service)
-      : acceptor(service), socket(service) {}
+    explicit Listener(boost::asio::io_context& context)
+      : acceptor(context), socket(context) {}
   };
   std::vector<Listener> listeners;
 
+  // work guard to keep run() threads busy while listeners are paused
+  using Executor = boost::asio::io_context::executor_type;
+  std::optional<boost::asio::executor_work_guard<Executor>> work;
+
   std::vector<std::thread> threads;
-  Pauser pauser;
   std::atomic<bool> going_down{false};
 
   CephContext* ctx() const { return env.store->ctx(); }
@@ -232,7 +206,9 @@ class AsioFrontend {
 
  public:
   AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
-    : env(env), conf(conf) {}
+    : env(env), conf(conf),
+      pause_mutex(context.get_executor())
+  {}
 
   int init();
   int run();
@@ -293,7 +269,7 @@ int AsioFrontend::init()
       lderr(ctx()) << "failed to parse port=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint.port(port);
   }
 
@@ -304,7 +280,7 @@ int AsioFrontend::init()
       lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint = endpoint;
   }
 
@@ -391,7 +367,7 @@ int AsioFrontend::init_ssl()
       lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint.port(port);
     listeners.back().use_ssl = true;
   }
@@ -407,7 +383,7 @@ int AsioFrontend::init_ssl()
       lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint = endpoint;
     listeners.back().use_ssl = true;
   }
@@ -433,7 +409,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
   // spawn a coroutine to handle the connection
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   if (l.use_ssl) {
-    boost::asio::spawn(service,
+    boost::asio::spawn(context,
       [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
         // wrap the socket in an ssl stream
         ssl::stream<tcp::socket&> stream{s, *ssl_context};
@@ -447,7 +423,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
           return;
         }
         buffer.consume(bytes);
-        handle_connection(env, stream, buffer, true, ec, yield);
+        handle_connection(env, stream, buffer, true, pause_mutex, ec, yield);
         if (!ec) {
           // ssl shutdown (ignoring errors)
           stream.async_shutdown(yield[ec]);
@@ -458,11 +434,11 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
 #else
   {
 #endif // WITH_RADOSGW_BEAST_OPENSSL
-    boost::asio::spawn(service,
+    boost::asio::spawn(context,
       [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
         beast::flat_buffer buffer;
         boost::system::error_code ec;
-        handle_connection(env, s, buffer, false, ec, yield);
+        handle_connection(env, s, buffer, false, pause_mutex, ec, yield);
         s.shutdown(tcp::socket::shutdown_both, ec);
       });
   }
@@ -476,15 +452,14 @@ int AsioFrontend::run()
 
   ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
 
+  // the worker threads call io_context::run(), which will return when there's
+  // no work left. hold a work guard to keep these threads going until join()
+  work.emplace(boost::asio::make_work_guard(context));
+
   for (int i = 0; i < thread_count; i++) {
     threads.emplace_back([=] {
-      for (;;) {
-        service.run();
-        if (going_down) {
-          break;
-        }
-        pauser.wait();
-      }
+      boost::system::error_code ec;
+      context.run(ec);
     });
   }
   return 0;
@@ -503,7 +478,7 @@ void AsioFrontend::stop()
   }
 
   // unblock the run() threads
-  service.stop();
+  context.stop(); // XXX: kill connections instead
 }
 
 void AsioFrontend::join()
@@ -511,6 +486,8 @@ void AsioFrontend::join()
   if (!going_down) {
     stop();
   }
+  work.reset();
+
   ldout(ctx(), 4) << "frontend joining threads..." << dendl;
   for (auto& thread : threads) {
     thread.join();
@@ -520,12 +497,22 @@ void AsioFrontend::join()
 
 void AsioFrontend::pause()
 {
-  ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
-  pauser.pause(threads.size(), [=] {
-    // unblock the run() threads
-    service.stop();
-  });
-  ldout(ctx(), 4) << "frontend paused" << dendl;
+  ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
+
+  // cancel pending calls to accept(), but don't close the sockets
+  boost::system::error_code ec;
+  for (auto& l : listeners) {
+    l.acceptor.cancel(ec);
+  }
+
+  // pause and wait for outstanding requests to complete
+  pause_mutex.lock(ec);
+
+  if (ec) {
+    ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
+  } else {
+    ldout(ctx(), 4) << "frontend paused" << dendl;
+  }
 }
 
 void AsioFrontend::unpause(RGWRados* const store,
@@ -533,9 +520,19 @@ void AsioFrontend::unpause(RGWRados* const store,
 {
   env.store = store;
   env.auth_registry = std::move(auth_registry);
+
+  // unpause to unblock connections
+  pause_mutex.unlock();
+
+  // start accepting connections again
+  for (auto& l : listeners) {
+    l.acceptor.async_accept(l.socket,
+                            [this, &l] (boost::system::error_code ec) {
+                              accept(l, ec);
+                            });
+  }
+
   ldout(ctx(), 4) << "frontend unpaused" << dendl;
-  service.reset();
-  pauser.unpause();
 }
 
 } // anonymous namespace