]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Asio frontend shares `io_context` with the rest of RGW wip-coro-after-reef 49737/head
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 14 Apr 2023 19:45:04 +0000 (15:45 -0400)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:48:01 +0000 (17:48 -0400)
Pull the `io_context` and threads out of `AsioFrontend`, pass in a
reference to `io_context_pool` so it can be shut down at `AsioFrontend::join`

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_appmain.cc
src/rgw/rgw_asio_frontend.cc
src/rgw/rgw_asio_frontend.h
src/rgw/rgw_main.h

index 1f20aed247d5db001cecf64fc73816c5fb5cab7d..dfafbb5483e7ec0667dad6ce1b7a72c209fba594 100644 (file)
@@ -203,6 +203,17 @@ void rgw::AppMain::init_numa()
   }
 } /* init_numa */
 
+void rgw::AppMain::need_context_pool() {
+  if (!context_pool) {
+    context_pool.emplace(
+      dpp->get_cct()->_conf->rgw_thread_pool_size,
+      [] {
+       // request warnings on synchronous librados calls in this thread
+       is_asio_thread = true;
+      });
+  }
+}
+
 int rgw::AppMain::init_storage()
 {
   auto config_store_type = g_conf().get_val<std::string>("rgw_config_store");
@@ -234,10 +245,11 @@ int rgw::AppMain::init_storage()
     (g_conf()->rgw_run_sync_thread &&
       ((!nfs) || (nfs && g_conf()->rgw_nfs_run_sync_thread)));
 
+  need_context_pool();
   DriverManager::Config cfg = DriverManager::get_config(false, g_ceph_context);
   env.driver = DriverManager::get_storage(dpp, dpp->get_cct(),
           cfg,
-         context_pool,
+         *context_pool,
           run_gc,
           run_lc,
           run_quota,
@@ -457,7 +469,8 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib)
       fe = new RGWLoadGenFrontend(env, config);
     }
     else if (framework == "beast") {
-      fe = new RGWAsioFrontend(env, config, *sched_ctx);
+      need_context_pool();
+      fe = new RGWAsioFrontend(env, config, *sched_ctx, *context_pool);
     }
     else if (framework == "rgw-nfs") {
       fe = new RGWLibFrontend(env, config);
@@ -515,8 +528,9 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib)
     if (env.lua.background) {
       rgw_pauser->add_pauser(env.lua.background);
     }
+    need_context_pool();
     reloader = std::make_unique<RGWRealmReloader>(
-      env, *implicit_tenant_context, service_map_meta, rgw_pauser.get(), context_pool);
+      env, *implicit_tenant_context, service_map_meta, rgw_pauser.get(), *context_pool);
     realm_watcher = std::make_unique<RGWRealmWatcher>(dpp, g_ceph_context,
                                  static_cast<rgw::sal::RadosStore*>(env.driver)->svc()->zone->get_realm());
     realm_watcher->add_watcher(RGWRealmNotify::Reload, *reloader);
@@ -595,28 +609,34 @@ void rgw::AppMain::shutdown(std::function<void(void)> finalize_async_signals)
     fe->stop();
   }
 
-  for (auto& fe : fes) {
-    fe->join();
-    delete fe;
-  }
-
-  for (auto& fec : fe_configs) {
-    delete fec;
-  }
-
   ldh.reset(nullptr); // deletes
   finalize_async_signals(); // callback
   rgw_log_usage_finalize();
-  
+
   delete olog;
 
   if (lua_background) {
     lua_background->shutdown();
   }
 
+  // Do this before closing storage so requests don't try to call into
+  // closed storage.
+  context_pool->finish();
+
   cfgstore.reset(); // deletes
   DriverManager::close_storage(env.driver);
 
+  // Fe can't be deleted until nobody's exeucting `io_context::run`
+  for (auto& fe : fes) {
+    fe->join();
+    delete fe;
+  }
+
+  for (auto& fec : fe_configs) {
+    delete fec;
+  }
+
+
   rgw_tools_cleanup();
   rgw_shutdown_resolver();
   rgw_http_client_cleanup();
index 633a2963300f60b2e10c809a185daa2bc6f00a85..055d1ba47965adf395959599e54802370a4c4166 100644 (file)
@@ -3,7 +3,6 @@
 
 #include <atomic>
 #include <ctime>
-#include <thread>
 #include <vector>
 
 #include <boost/asio.hpp>
@@ -394,8 +393,9 @@ class ConnectionList {
 namespace dmc = rgw::dmclock;
 class AsioFrontend {
   RGWProcessEnv& env;
+  boost::intrusive_ptr<CephContext> cct{env.driver->ctx()};
   RGWFrontendConfig* conf;
-  boost::asio::io_context context;
+  boost::asio::io_context& context;
   std::string uri_prefix;
   ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
   size_t header_limit = 16384;
@@ -425,22 +425,19 @@ class AsioFrontend {
 
   ConnectionList connections;
 
-  // work guard to keep run() threads busy while listeners are paused
-  using Executor = boost::asio::io_context::executor_type;
-  std::optional<boost::asio::executor_work_guard<Executor>> work;
-
-  std::vector<std::thread> threads;
   std::atomic<bool> going_down{false};
 
-  CephContext* ctx() const { return env.driver->ctx(); }
+  CephContext* ctx() const { return cct.get(); }
   std::optional<dmc::ClientCounters> client_counters;
   std::unique_ptr<dmc::ClientConfig> client_config;
   void accept(Listener& listener, boost::system::error_code ec);
 
  public:
   AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
-              dmc::SchedulerCtx& sched_ctx)
-    : env(env), conf(conf), pause_mutex(context.get_executor())
+              dmc::SchedulerCtx& sched_ctx,
+              boost::asio::io_context& context)
+    : env(env), conf(conf), context(context),
+      pause_mutex(context.get_executor())
   {
     auto sched_t = dmc::get_scheduler_t(ctx());
     switch(sched_t){
@@ -462,7 +459,9 @@ class AsioFrontend {
   }
 
   int init();
-  int run();
+  int run() {
+    return 0;
+  }
   void stop();
   void join();
   void pause();
@@ -480,7 +479,7 @@ unsigned short parse_port(const char *input, boost::system::error_code& ec)
   }
   return port;
 }
-       
+
 tcp::endpoint parse_endpoint(boost::asio::string_view input,
                              unsigned short default_port,
                              boost::system::error_code& ec)
@@ -1060,30 +1059,6 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
   }
 }
 
-int AsioFrontend::run()
-{
-  auto cct = ctx();
-  const int thread_count = cct->_conf->rgw_thread_pool_size;
-  threads.reserve(thread_count);
-
-  ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
-
-  // the worker threads call io_context::run(), which will return when there's
-  // no work left. hold a work guard to keep these threads going until join()
-  work.emplace(boost::asio::make_work_guard(context));
-
-  for (int i = 0; i < thread_count; i++) {
-    threads.emplace_back([this]() noexcept {
-      // request warnings on synchronous librados calls in this thread
-      is_asio_thread = true;
-      // Have uncaught exceptions kill the process and give a
-      // stacktrace, not be swallowed.
-      context.run();
-    });
-  }
-  return 0;
-}
-
 void AsioFrontend::stop()
 {
   ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
@@ -1105,13 +1080,6 @@ void AsioFrontend::join()
   if (!going_down) {
     stop();
   }
-  work.reset();
-
-  ldout(ctx(), 4) << "frontend joining threads..." << dendl;
-  for (auto& thread : threads) {
-    thread.join();
-  }
-  ldout(ctx(), 4) << "frontend done" << dendl;
 }
 
 void AsioFrontend::pause()
@@ -1155,14 +1123,16 @@ void AsioFrontend::unpause()
 class RGWAsioFrontend::Impl : public AsioFrontend {
  public:
   Impl(RGWProcessEnv& env, RGWFrontendConfig* conf,
-       rgw::dmclock::SchedulerCtx& sched_ctx)
-    : AsioFrontend(env, conf, sched_ctx) {}
+       rgw::dmclock::SchedulerCtx& sched_ctx,
+       boost::asio::io_context& context)
+    : AsioFrontend(env, conf, sched_ctx, context) {}
 };
 
 RGWAsioFrontend::RGWAsioFrontend(RGWProcessEnv& env,
                                  RGWFrontendConfig* conf,
-                                rgw::dmclock::SchedulerCtx& sched_ctx)
-  : impl(new Impl(env, conf, sched_ctx))
+                                rgw::dmclock::SchedulerCtx& sched_ctx,
+                                boost::asio::io_context& context)
+  : impl(new Impl(env, conf, sched_ctx, context))
 {
 }
 
index 2de6f337a9fb34f2162abd6d42d39862bcbc1e69..8f642bb526f5a789e258b0159b02476f29af1c75 100644 (file)
@@ -4,6 +4,9 @@
 #pragma once
 
 #include <memory>
+
+#include <boost/asio/io_context.hpp>
+
 #include "rgw_frontend.h"
 #define REQUEST_TIMEOUT 65000
 
@@ -12,7 +15,8 @@ class RGWAsioFrontend : public RGWFrontend {
   std::unique_ptr<Impl> impl;
 public:
   RGWAsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
-                 rgw::dmclock::SchedulerCtx& sched_ctx);
+                 rgw::dmclock::SchedulerCtx& sched_ctx,
+                 boost::asio::io_context& io_context);
   ~RGWAsioFrontend() override;
 
   int init() override;
index 9bdea60e2860df75f5047cb98e6247c7c312bcdc..caa6a0822828c9353e90e46f0e881c49e6ff4f8b 100644 (file)
@@ -84,8 +84,8 @@ class AppMain {
   SiteConfig site;
   const DoutPrefixProvider* dpp;
   RGWProcessEnv env;
-  ceph::async::io_context_pool context_pool{
-    dpp->get_cct()->_conf->rgw_thread_pool_size};
+  void need_context_pool();
+  std::optional<ceph::async::io_context_pool> context_pool;
 public:
   AppMain(const DoutPrefixProvider* dpp);
   ~AppMain();