}
} /* init_numa */
+void rgw::AppMain::need_context_pool() {
+ if (!context_pool) {
+ context_pool.emplace(
+ dpp->get_cct()->_conf->rgw_thread_pool_size,
+ [] {
+ // request warnings on synchronous librados calls in this thread
+ is_asio_thread = true;
+ });
+ }
+}
+
int rgw::AppMain::init_storage()
{
auto config_store_type = g_conf().get_val<std::string>("rgw_config_store");
(g_conf()->rgw_run_sync_thread &&
((!nfs) || (nfs && g_conf()->rgw_nfs_run_sync_thread)));
+ need_context_pool();
DriverManager::Config cfg = DriverManager::get_config(false, g_ceph_context);
env.driver = DriverManager::get_storage(dpp, dpp->get_cct(),
cfg,
- context_pool,
+ *context_pool,
run_gc,
run_lc,
run_quota,
fe = new RGWLoadGenFrontend(env, config);
}
else if (framework == "beast") {
- fe = new RGWAsioFrontend(env, config, *sched_ctx);
+ need_context_pool();
+ fe = new RGWAsioFrontend(env, config, *sched_ctx, *context_pool);
}
else if (framework == "rgw-nfs") {
fe = new RGWLibFrontend(env, config);
if (env.lua.background) {
rgw_pauser->add_pauser(env.lua.background);
}
+ need_context_pool();
reloader = std::make_unique<RGWRealmReloader>(
- env, *implicit_tenant_context, service_map_meta, rgw_pauser.get(), context_pool);
+ env, *implicit_tenant_context, service_map_meta, rgw_pauser.get(), *context_pool);
realm_watcher = std::make_unique<RGWRealmWatcher>(dpp, g_ceph_context,
static_cast<rgw::sal::RadosStore*>(env.driver)->svc()->zone->get_realm());
realm_watcher->add_watcher(RGWRealmNotify::Reload, *reloader);
fe->stop();
}
- for (auto& fe : fes) {
- fe->join();
- delete fe;
- }
-
- for (auto& fec : fe_configs) {
- delete fec;
- }
-
ldh.reset(nullptr); // deletes
finalize_async_signals(); // callback
rgw_log_usage_finalize();
-
+
delete olog;
if (lua_background) {
lua_background->shutdown();
}
+ // Do this before closing storage so requests don't try to call into
+ // closed storage.
+ context_pool->finish();
+
cfgstore.reset(); // deletes
DriverManager::close_storage(env.driver);
+ // Fe can't be deleted until nobody's exeucting `io_context::run`
+ for (auto& fe : fes) {
+ fe->join();
+ delete fe;
+ }
+
+ for (auto& fec : fe_configs) {
+ delete fec;
+ }
+
+
rgw_tools_cleanup();
rgw_shutdown_resolver();
rgw_http_client_cleanup();
#include <atomic>
#include <ctime>
-#include <thread>
#include <vector>
#include <boost/asio.hpp>
namespace dmc = rgw::dmclock;
class AsioFrontend {
RGWProcessEnv& env;
+ boost::intrusive_ptr<CephContext> cct{env.driver->ctx()};
RGWFrontendConfig* conf;
- boost::asio::io_context context;
+ boost::asio::io_context& context;
std::string uri_prefix;
ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
size_t header_limit = 16384;
ConnectionList connections;
- // work guard to keep run() threads busy while listeners are paused
- using Executor = boost::asio::io_context::executor_type;
- std::optional<boost::asio::executor_work_guard<Executor>> work;
-
- std::vector<std::thread> threads;
std::atomic<bool> going_down{false};
- CephContext* ctx() const { return env.driver->ctx(); }
+ CephContext* ctx() const { return cct.get(); }
std::optional<dmc::ClientCounters> client_counters;
std::unique_ptr<dmc::ClientConfig> client_config;
void accept(Listener& listener, boost::system::error_code ec);
public:
AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
- dmc::SchedulerCtx& sched_ctx)
- : env(env), conf(conf), pause_mutex(context.get_executor())
+ dmc::SchedulerCtx& sched_ctx,
+ boost::asio::io_context& context)
+ : env(env), conf(conf), context(context),
+ pause_mutex(context.get_executor())
{
auto sched_t = dmc::get_scheduler_t(ctx());
switch(sched_t){
}
int init();
- int run();
+ int run() {
+ return 0;
+ }
void stop();
void join();
void pause();
}
return port;
}
-
+
tcp::endpoint parse_endpoint(boost::asio::string_view input,
unsigned short default_port,
boost::system::error_code& ec)
}
}
-int AsioFrontend::run()
-{
- auto cct = ctx();
- const int thread_count = cct->_conf->rgw_thread_pool_size;
- threads.reserve(thread_count);
-
- ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
-
- // the worker threads call io_context::run(), which will return when there's
- // no work left. hold a work guard to keep these threads going until join()
- work.emplace(boost::asio::make_work_guard(context));
-
- for (int i = 0; i < thread_count; i++) {
- threads.emplace_back([this]() noexcept {
- // request warnings on synchronous librados calls in this thread
- is_asio_thread = true;
- // Have uncaught exceptions kill the process and give a
- // stacktrace, not be swallowed.
- context.run();
- });
- }
- return 0;
-}
-
void AsioFrontend::stop()
{
ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
if (!going_down) {
stop();
}
- work.reset();
-
- ldout(ctx(), 4) << "frontend joining threads..." << dendl;
- for (auto& thread : threads) {
- thread.join();
- }
- ldout(ctx(), 4) << "frontend done" << dendl;
}
void AsioFrontend::pause()
class RGWAsioFrontend::Impl : public AsioFrontend {
public:
Impl(RGWProcessEnv& env, RGWFrontendConfig* conf,
- rgw::dmclock::SchedulerCtx& sched_ctx)
- : AsioFrontend(env, conf, sched_ctx) {}
+ rgw::dmclock::SchedulerCtx& sched_ctx,
+ boost::asio::io_context& context)
+ : AsioFrontend(env, conf, sched_ctx, context) {}
};
RGWAsioFrontend::RGWAsioFrontend(RGWProcessEnv& env,
RGWFrontendConfig* conf,
- rgw::dmclock::SchedulerCtx& sched_ctx)
- : impl(new Impl(env, conf, sched_ctx))
+ rgw::dmclock::SchedulerCtx& sched_ctx,
+ boost::asio::io_context& context)
+ : impl(new Impl(env, conf, sched_ctx, context))
{
}