From: Casey Bodley Date: Mon, 19 Mar 2018 20:49:38 +0000 (-0400) Subject: rgw: beast frontend uses async SharedMutex for pause X-Git-Tag: v14.0.1~356^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=378b01064cdf3b6645cbabcb16ad633bb7c45bd2;p=ceph.git rgw: beast frontend uses async SharedMutex for pause the strategy for pause relied on stopping the io_context and waiting for io_context.run() to return control to all of the worker threads. this relies on the fact that process_request() is completely synchronous (so considered a single unit of work in the io_context) - otherwise, pause could complete in the middle of a call to process_request(), and destroy the RGWRados instance while it's still in use calling io_context.stop() to pause the worker threads also assumes that no other work will be scheduled on these threads to decouple pause from worker threads, handle_connection() now uses an async shared mutex to synchronize with pause/unpause Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index f1eb5a1f1b96..9525c2f8c738 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -2,14 +2,14 @@ // vim: ts=8 sw=2 smarttab #include -#include -#include #include #include #include #include +#include "common/async/shared_mutex.h" + #include "rgw_asio_client.h" #include "rgw_asio_frontend.h" @@ -21,49 +21,6 @@ namespace { -class Pauser { - std::mutex mutex; - std::condition_variable cond_ready; // signaled on ready==true - std::condition_variable cond_paused; // signaled on waiters==thread_count - bool ready{false}; - int waiters{0}; - public: - template - void pause(int thread_count, Func&& func); - void unpause(); - void wait(); -}; - -template -void Pauser::pause(int thread_count, Func&& func) -{ - std::unique_lock lock(mutex); - ready = false; - lock.unlock(); - - func(); - - // wait for all threads to pause - lock.lock(); - cond_paused.wait(lock, [=] { return waiters == thread_count; }); -} - -void Pauser::unpause() -{ - std::lock_guard lock(mutex); - ready = true; - cond_ready.notify_all(); -} - -void Pauser::wait() -{ - std::unique_lock lock(mutex); - ++waiters; - cond_paused.notify_one(); // notify pause() that we're waiting - cond_ready.wait(lock, [this] { return ready; }); // wait for unpause() - --waiters; -} - using tcp = boost::asio::ip::tcp; namespace beast = boost::beast; #ifdef WITH_RADOSGW_BEAST_OPENSSL @@ -115,9 +72,12 @@ class StreamIO : public rgw::asio::ClientIO { } }; +using SharedMutex = ceph::async::SharedMutex; + template void handle_connection(RGWProcessEnv& env, Stream& stream, beast::flat_buffer& buffer, bool is_ssl, + SharedMutex& pause_mutex, boost::system::error_code& ec, boost::asio::yield_context yield) { @@ -159,22 +119,32 @@ void handle_connection(RGWProcessEnv& env, Stream& stream, return; } - // process the request - RGWRequest req{env.store->get_new_req_id()}; - - auto& socket = stream.lowest_layer(); - StreamIO real_client{stream, parser, buffer, is_ssl, - socket.local_endpoint(), - socket.remote_endpoint()}; + { + auto lock = pause_mutex.async_lock_shared(yield[ec]); + if (ec == boost::asio::error::operation_aborted) { + return; + } else if (ec) { + ldout(cct, 1) << "failed to lock: " << ec.message() << dendl; + return; + } - auto real_client_io = rgw::io::add_reordering( - rgw::io::add_buffering(cct, - rgw::io::add_chunking( - rgw::io::add_conlen_controlling( - &real_client)))); - RGWRestfulIO client(cct, &real_client_io); - process_request(env.store, env.rest, &req, env.uri_prefix, - *env.auth_registry, &client, env.olog); + // process the request + RGWRequest req{env.store->get_new_req_id()}; + + auto& socket = stream.lowest_layer(); + StreamIO real_client{stream, parser, buffer, is_ssl, + socket.local_endpoint(), + socket.remote_endpoint()}; + + auto real_client_io = rgw::io::add_reordering( + rgw::io::add_buffering(cct, + rgw::io::add_chunking( + rgw::io::add_conlen_controlling( + &real_client)))); + RGWRestfulIO client(cct, &real_client_io); + process_request(env.store, env.rest, &req, env.uri_prefix, + *env.auth_registry, &client, env.olog); + } if (!parser.keep_alive()) { return; @@ -205,11 +175,12 @@ void handle_connection(RGWProcessEnv& env, Stream& stream, class AsioFrontend { RGWProcessEnv env; RGWFrontendConfig* conf; - boost::asio::io_service service; + boost::asio::io_context context; #ifdef WITH_RADOSGW_BEAST_OPENSSL boost::optional ssl_context; int init_ssl(); #endif + SharedMutex pause_mutex; struct Listener { tcp::endpoint endpoint; @@ -217,13 +188,16 @@ class AsioFrontend { tcp::socket socket; bool use_ssl = false; - explicit Listener(boost::asio::io_service& service) - : acceptor(service), socket(service) {} + explicit Listener(boost::asio::io_context& context) + : acceptor(context), socket(context) {} }; std::vector listeners; + // work guard to keep run() threads busy while listeners are paused + using Executor = boost::asio::io_context::executor_type; + std::optional> work; + std::vector threads; - Pauser pauser; std::atomic going_down{false}; CephContext* ctx() const { return env.store->ctx(); } @@ -232,7 +206,9 @@ class AsioFrontend { public: AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf) - : env(env), conf(conf) {} + : env(env), conf(conf), + pause_mutex(context.get_executor()) + {} int init(); int run(); @@ -293,7 +269,7 @@ int AsioFrontend::init() lderr(ctx()) << "failed to parse port=" << i->second << dendl; return -ec.value(); } - listeners.emplace_back(service); + listeners.emplace_back(context); listeners.back().endpoint.port(port); } @@ -304,7 +280,7 @@ int AsioFrontend::init() lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl; return -ec.value(); } - listeners.emplace_back(service); + listeners.emplace_back(context); listeners.back().endpoint = endpoint; } @@ -391,7 +367,7 @@ int AsioFrontend::init_ssl() lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl; return -ec.value(); } - listeners.emplace_back(service); + listeners.emplace_back(context); listeners.back().endpoint.port(port); listeners.back().use_ssl = true; } @@ -407,7 +383,7 @@ int AsioFrontend::init_ssl() lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl; return -ec.value(); } - listeners.emplace_back(service); + listeners.emplace_back(context); listeners.back().endpoint = endpoint; listeners.back().use_ssl = true; } @@ -433,7 +409,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) // spawn a coroutine to handle the connection #ifdef WITH_RADOSGW_BEAST_OPENSSL if (l.use_ssl) { - boost::asio::spawn(service, + boost::asio::spawn(context, [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable { // wrap the socket in an ssl stream ssl::stream stream{s, *ssl_context}; @@ -447,7 +423,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) return; } buffer.consume(bytes); - handle_connection(env, stream, buffer, true, ec, yield); + handle_connection(env, stream, buffer, true, pause_mutex, ec, yield); if (!ec) { // ssl shutdown (ignoring errors) stream.async_shutdown(yield[ec]); @@ -458,11 +434,11 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) #else { #endif // WITH_RADOSGW_BEAST_OPENSSL - boost::asio::spawn(service, + boost::asio::spawn(context, [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable { beast::flat_buffer buffer; boost::system::error_code ec; - handle_connection(env, s, buffer, false, ec, yield); + handle_connection(env, s, buffer, false, pause_mutex, ec, yield); s.shutdown(tcp::socket::shutdown_both, ec); }); } @@ -476,15 +452,14 @@ int AsioFrontend::run() ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; + // the worker threads call io_context::run(), which will return when there's + // no work left. hold a work guard to keep these threads going until join() + work.emplace(boost::asio::make_work_guard(context)); + for (int i = 0; i < thread_count; i++) { threads.emplace_back([=] { - for (;;) { - service.run(); - if (going_down) { - break; - } - pauser.wait(); - } + boost::system::error_code ec; + context.run(ec); }); } return 0; @@ -503,7 +478,7 @@ void AsioFrontend::stop() } // unblock the run() threads - service.stop(); + context.stop(); // XXX: kill connections instead } void AsioFrontend::join() @@ -511,6 +486,8 @@ void AsioFrontend::join() if (!going_down) { stop(); } + work.reset(); + ldout(ctx(), 4) << "frontend joining threads..." << dendl; for (auto& thread : threads) { thread.join(); @@ -520,12 +497,22 @@ void AsioFrontend::join() void AsioFrontend::pause() { - ldout(ctx(), 4) << "frontend pausing threads..." << dendl; - pauser.pause(threads.size(), [=] { - // unblock the run() threads - service.stop(); - }); - ldout(ctx(), 4) << "frontend paused" << dendl; + ldout(ctx(), 4) << "frontend pausing connections..." << dendl; + + // cancel pending calls to accept(), but don't close the sockets + boost::system::error_code ec; + for (auto& l : listeners) { + l.acceptor.cancel(ec); + } + + // pause and wait for outstanding requests to complete + pause_mutex.lock(ec); + + if (ec) { + ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl; + } else { + ldout(ctx(), 4) << "frontend paused" << dendl; + } } void AsioFrontend::unpause(RGWRados* const store, @@ -533,9 +520,19 @@ void AsioFrontend::unpause(RGWRados* const store, { env.store = store; env.auth_registry = std::move(auth_registry); + + // unpause to unblock connections + pause_mutex.unlock(); + + // start accepting connections again + for (auto& l : listeners) { + l.acceptor.async_accept(l.socket, + [this, &l] (boost::system::error_code ec) { + accept(l, ec); + }); + } + ldout(ctx(), 4) << "frontend unpaused" << dendl; - service.reset(); - pauser.unpause(); } } // anonymous namespace