From: Adam C. Emerson Date: Fri, 14 Apr 2023 19:45:04 +0000 (-0400) Subject: rgw: Asio frontend shares `io_context` with the rest of RGW X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=9f5fc1fd4d0d42a1074a810a7062905a51ae5bd2;p=ceph.git rgw: Asio frontend shares `io_context` with the rest of RGW Pull the `io_context` and threads out of `AsioFrontend`, pass in a reference to `io_context_pool` so it can be shut down at `AsioFrontend::join` Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 1f20aed247d5d..dfafbb5483e7e 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -203,6 +203,17 @@ void rgw::AppMain::init_numa() } } /* init_numa */ +void rgw::AppMain::need_context_pool() { + if (!context_pool) { + context_pool.emplace( + dpp->get_cct()->_conf->rgw_thread_pool_size, + [] { + // request warnings on synchronous librados calls in this thread + is_asio_thread = true; + }); + } +} + int rgw::AppMain::init_storage() { auto config_store_type = g_conf().get_val("rgw_config_store"); @@ -234,10 +245,11 @@ int rgw::AppMain::init_storage() (g_conf()->rgw_run_sync_thread && ((!nfs) || (nfs && g_conf()->rgw_nfs_run_sync_thread))); + need_context_pool(); DriverManager::Config cfg = DriverManager::get_config(false, g_ceph_context); env.driver = DriverManager::get_storage(dpp, dpp->get_cct(), cfg, - context_pool, + *context_pool, run_gc, run_lc, run_quota, @@ -457,7 +469,8 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib) fe = new RGWLoadGenFrontend(env, config); } else if (framework == "beast") { - fe = new RGWAsioFrontend(env, config, *sched_ctx); + need_context_pool(); + fe = new RGWAsioFrontend(env, config, *sched_ctx, *context_pool); } else if (framework == "rgw-nfs") { fe = new RGWLibFrontend(env, config); @@ -515,8 +528,9 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib) if (env.lua.background) { rgw_pauser->add_pauser(env.lua.background); } + need_context_pool(); reloader = std::make_unique( - env, *implicit_tenant_context, service_map_meta, rgw_pauser.get(), context_pool); + env, *implicit_tenant_context, service_map_meta, rgw_pauser.get(), *context_pool); realm_watcher = std::make_unique(dpp, g_ceph_context, static_cast(env.driver)->svc()->zone->get_realm()); realm_watcher->add_watcher(RGWRealmNotify::Reload, *reloader); @@ -595,28 +609,34 @@ void rgw::AppMain::shutdown(std::function finalize_async_signals) fe->stop(); } - for (auto& fe : fes) { - fe->join(); - delete fe; - } - - for (auto& fec : fe_configs) { - delete fec; - } - ldh.reset(nullptr); // deletes finalize_async_signals(); // callback rgw_log_usage_finalize(); - + delete olog; if (lua_background) { lua_background->shutdown(); } + // Do this before closing storage so requests don't try to call into + // closed storage. + context_pool->finish(); + cfgstore.reset(); // deletes DriverManager::close_storage(env.driver); + // Fe can't be deleted until nobody's exeucting `io_context::run` + for (auto& fe : fes) { + fe->join(); + delete fe; + } + + for (auto& fec : fe_configs) { + delete fec; + } + + rgw_tools_cleanup(); rgw_shutdown_resolver(); rgw_http_client_cleanup(); diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index 633a2963300f6..055d1ba47965a 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -3,7 +3,6 @@ #include #include -#include #include #include @@ -394,8 +393,9 @@ class ConnectionList { namespace dmc = rgw::dmclock; class AsioFrontend { RGWProcessEnv& env; + boost::intrusive_ptr cct{env.driver->ctx()}; RGWFrontendConfig* conf; - boost::asio::io_context context; + boost::asio::io_context& context; std::string uri_prefix; ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT); size_t header_limit = 16384; @@ -425,22 +425,19 @@ class AsioFrontend { ConnectionList connections; - // work guard to keep run() threads busy while listeners are paused - using Executor = boost::asio::io_context::executor_type; - std::optional> work; - - std::vector threads; std::atomic going_down{false}; - CephContext* ctx() const { return env.driver->ctx(); } + CephContext* ctx() const { return cct.get(); } std::optional client_counters; std::unique_ptr client_config; void accept(Listener& listener, boost::system::error_code ec); public: AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf, - dmc::SchedulerCtx& sched_ctx) - : env(env), conf(conf), pause_mutex(context.get_executor()) + dmc::SchedulerCtx& sched_ctx, + boost::asio::io_context& context) + : env(env), conf(conf), context(context), + pause_mutex(context.get_executor()) { auto sched_t = dmc::get_scheduler_t(ctx()); switch(sched_t){ @@ -462,7 +459,9 @@ class AsioFrontend { } int init(); - int run(); + int run() { + return 0; + } void stop(); void join(); void pause(); @@ -480,7 +479,7 @@ unsigned short parse_port(const char *input, boost::system::error_code& ec) } return port; } - + tcp::endpoint parse_endpoint(boost::asio::string_view input, unsigned short default_port, boost::system::error_code& ec) @@ -1060,30 +1059,6 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) } } -int AsioFrontend::run() -{ - auto cct = ctx(); - const int thread_count = cct->_conf->rgw_thread_pool_size; - threads.reserve(thread_count); - - ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; - - // the worker threads call io_context::run(), which will return when there's - // no work left. hold a work guard to keep these threads going until join() - work.emplace(boost::asio::make_work_guard(context)); - - for (int i = 0; i < thread_count; i++) { - threads.emplace_back([this]() noexcept { - // request warnings on synchronous librados calls in this thread - is_asio_thread = true; - // Have uncaught exceptions kill the process and give a - // stacktrace, not be swallowed. - context.run(); - }); - } - return 0; -} - void AsioFrontend::stop() { ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; @@ -1105,13 +1080,6 @@ void AsioFrontend::join() if (!going_down) { stop(); } - work.reset(); - - ldout(ctx(), 4) << "frontend joining threads..." << dendl; - for (auto& thread : threads) { - thread.join(); - } - ldout(ctx(), 4) << "frontend done" << dendl; } void AsioFrontend::pause() @@ -1155,14 +1123,16 @@ void AsioFrontend::unpause() class RGWAsioFrontend::Impl : public AsioFrontend { public: Impl(RGWProcessEnv& env, RGWFrontendConfig* conf, - rgw::dmclock::SchedulerCtx& sched_ctx) - : AsioFrontend(env, conf, sched_ctx) {} + rgw::dmclock::SchedulerCtx& sched_ctx, + boost::asio::io_context& context) + : AsioFrontend(env, conf, sched_ctx, context) {} }; RGWAsioFrontend::RGWAsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf, - rgw::dmclock::SchedulerCtx& sched_ctx) - : impl(new Impl(env, conf, sched_ctx)) + rgw::dmclock::SchedulerCtx& sched_ctx, + boost::asio::io_context& context) + : impl(new Impl(env, conf, sched_ctx, context)) { } diff --git a/src/rgw/rgw_asio_frontend.h b/src/rgw/rgw_asio_frontend.h index 2de6f337a9fb3..8f642bb526f5a 100644 --- a/src/rgw/rgw_asio_frontend.h +++ b/src/rgw/rgw_asio_frontend.h @@ -4,6 +4,9 @@ #pragma once #include + +#include + #include "rgw_frontend.h" #define REQUEST_TIMEOUT 65000 @@ -12,7 +15,8 @@ class RGWAsioFrontend : public RGWFrontend { std::unique_ptr impl; public: RGWAsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf, - rgw::dmclock::SchedulerCtx& sched_ctx); + rgw::dmclock::SchedulerCtx& sched_ctx, + boost::asio::io_context& io_context); ~RGWAsioFrontend() override; int init() override; diff --git a/src/rgw/rgw_main.h b/src/rgw/rgw_main.h index 9bdea60e2860d..caa6a0822828c 100644 --- a/src/rgw/rgw_main.h +++ b/src/rgw/rgw_main.h @@ -84,8 +84,8 @@ class AppMain { SiteConfig site; const DoutPrefixProvider* dpp; RGWProcessEnv env; - ceph::async::io_context_pool context_pool{ - dpp->get_cct()->_conf->rgw_thread_pool_size}; + void need_context_pool(); + std::optional context_pool; public: AppMain(const DoutPrefixProvider* dpp); ~AppMain();