From: Casey Bodley Date: Tue, 22 Jul 2025 18:37:08 +0000 (-0400) Subject: common/async: remove null_yield support for spawn_throttle X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=4956c638e61ab77470a935f254a46dc1b0ec4705;p=ceph.git common/async: remove null_yield support for spawn_throttle null_yield support was provided by sync_spawn_throttle_impl, which itself stores the boost::asio::io_context that it runs on. however, valgrind reports issues with object lifetimes each spawn_throttle_handler holds a reference count on the spawn_throttle_impl to prevent it from going away while coroutines are in flight. on completion, this spawn_throttle_handler gets invoked on its associated executor. this invocation may drop the last reference to spawn_throttle_impl while that associated executor is still trying to use the io_context that was destroyed with sync_spawn_throttle_impl finding no good way for a custom executor to preserve the lifetime of the spawn_throttle_impl, we instead remove support for this flawed design. the null_yield callers must first spawn a parent coroutine before using spawn_throttle Fixes: https://tracker.ceph.com/issues/70965 Signed-off-by: Casey Bodley --- diff --git a/src/common/async/detail/spawn_throttle_impl.h b/src/common/async/detail/spawn_throttle_impl.h index 75a28ac48e70b..c47aa2a518c66 100644 --- a/src/common/async/detail/spawn_throttle_impl.h +++ b/src/common/async/detail/spawn_throttle_impl.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -53,10 +52,6 @@ class spawn_throttle_impl : } virtual ~spawn_throttle_impl() {} - // factory function - static auto create(optional_yield y, size_t limit, cancel_on_error on_error) - -> boost::intrusive_ptr; - // return the completion handler for a new child. may block due to throttling // or rethrow an exception from a previously-spawned child spawn_throttle_handler get(); @@ -189,52 +184,6 @@ inline spawn_throttle_handler spawn_throttle_impl::get() return {this, c}; } - -// Spawn throttle implementation for use in synchronous contexts where wait() -// blocks the calling thread until completion. -class sync_spawn_throttle_impl final : public spawn_throttle_impl { - static constexpr int concurrency = 1; // only run from a single thread - public: - sync_spawn_throttle_impl(size_t limit, cancel_on_error on_error) - : spawn_throttle_impl(limit, on_error), - ctx(std::in_place, concurrency) - {} - - executor_type get_executor() override - { - return ctx->get_executor(); - } - - void wait_for(size_t target_count) override - { - while (count > target_count) { - if (ctx->stopped()) { - ctx->restart(); - } - ctx->run_one(); - } - - report_exception(); // throw unreported exception - } - - void cancel(bool shutdown) override - { - spawn_throttle_impl::cancel(shutdown); - - if (shutdown) { - // destroy the io_context to trigger two-phase shutdown which - // destroys any completion handlers with a reference to 'this' - ctx.reset(); - count = 0; - } - } - - private: - std::optional ctx; -}; - -// Spawn throttle implementation for use in asynchronous contexts where wait() -// suspends the calling stackful coroutine. class async_spawn_throttle_impl final : public spawn_throttle_impl, public service_list_base_hook @@ -345,16 +294,4 @@ class async_spawn_throttle_impl final : } }; -inline auto spawn_throttle_impl::create(optional_yield y, size_t limit, - cancel_on_error on_error) - -> boost::intrusive_ptr -{ - if (y) { - auto yield = y.get_yield_context(); - return new async_spawn_throttle_impl(yield, limit, on_error); - } else { - return new sync_spawn_throttle_impl(limit, on_error); - } -} - } // namespace ceph::async::detail diff --git a/src/common/async/spawn_throttle.h b/src/common/async/spawn_throttle.h index 1fdff1928c7fe..41aae346b1bb6 100644 --- a/src/common/async/spawn_throttle.h +++ b/src/common/async/spawn_throttle.h @@ -23,10 +23,8 @@ namespace ceph::async { -/// A coroutine throttle that allows a thread of execution to spawn and manage +/// A coroutine throttle that allows a parent coroutine to spawn and manage /// multiple child coroutines, while enforcing an upper bound on concurrency. -/// The parent may either be a synchronous function or a stackful coroutine, -/// depending on the optional_yield constructor argument. /// /// Child coroutines take boost::asio::yield_context as the only argument. /// Exceptions thrown by children are reported to the caller on its next call @@ -44,10 +42,10 @@ namespace ceph::async { /// @code /// void child(boost::asio::yield_context yield); /// -/// void parent(size_t count, optional_yield y) +/// void parent(size_t count, boost::asio::yield_context yield) /// { /// // spawn all children, up to 10 at a time -/// auto throttle = ceph::async::spawn_throttle{y, 10}; +/// auto throttle = ceph::async::spawn_throttle{yield, 10}; /// /// for (size_t i = 0; i < count; i++) { /// throttle.spawn(child); @@ -60,9 +58,9 @@ class spawn_throttle { boost::intrusive_ptr impl; public: - spawn_throttle(optional_yield y, size_t limit, + spawn_throttle(boost::asio::yield_context yield, size_t limit, cancel_on_error on_error = cancel_on_error::none) - : impl(detail::spawn_throttle_impl::create(y, limit, on_error)) + : impl(new detail::async_spawn_throttle_impl(yield, limit, on_error)) {} spawn_throttle(spawn_throttle&&) = default; diff --git a/src/test/common/test_async_spawn_throttle.cc b/src/test/common/test_async_spawn_throttle.cc index 7e19738a8e2ce..4c12ac5dd8176 100644 --- a/src/test/common/test_async_spawn_throttle.cc +++ b/src/test/common/test_async_spawn_throttle.cc @@ -66,169 +66,6 @@ auto wait_on(yield_waiter& handler) } -TEST(YieldGroupSync, wait_empty) -{ - auto throttle = spawn_throttle{null_yield, 2}; - throttle.wait(); -} - -TEST(YieldGroupSync, spawn_wait) -{ - int completed = 0; - auto cr = [&] (asio::yield_context yield) { - wait_for(1ms, yield); - ++completed; - }; - - auto throttle = spawn_throttle{null_yield, 2}; - throttle.spawn(cr); - throttle.wait(); - - EXPECT_EQ(1, completed); -} - -TEST(YieldGroupSync, spawn_shutdown) -{ - auto throttle = spawn_throttle{null_yield, 2}; - throttle.spawn(wait_for(1s)); -} - -TEST(YieldGroupSync, spawn_cancel_wait) -{ - int completed = 0; - - auto cr = [&] (asio::yield_context yield) { - wait_for(1s, yield); - ++completed; - }; - - auto throttle = spawn_throttle{null_yield, 2}; - throttle.spawn(cr); - throttle.cancel(); - EXPECT_THROW(throttle.wait(), boost::system::system_error); - - EXPECT_EQ(0, completed); -} - -TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait) -{ - int completed = 0; - - auto cr = [&] (asio::yield_context yield) { - wait_for(1ms, yield); - ++completed; - }; - - auto throttle = spawn_throttle{null_yield, 2}; - throttle.spawn(cr); - throttle.cancel(); - EXPECT_THROW(throttle.wait(), boost::system::system_error); - throttle.spawn(cr); - throttle.wait(); - - EXPECT_EQ(1, completed); -} - -TEST(YieldGroupSync, spawn_over_limit) -{ - int concurrent = 0; - int max_concurrent = 0; - int completed = 0; - - auto cr = [&] (asio::yield_context yield) { - ++concurrent; - if (max_concurrent < concurrent) { - max_concurrent = concurrent; - } - - wait_for(1ms, yield); - - --concurrent; - ++completed; - }; - - auto throttle = spawn_throttle{null_yield, 2}; - throttle.spawn(cr); - throttle.spawn(cr); - throttle.spawn(cr); // blocks - throttle.spawn(cr); // blocks - throttle.wait(); // blocks - - EXPECT_EQ(0, concurrent); - EXPECT_EQ(2, max_concurrent); - EXPECT_EQ(4, completed); -} - -TEST(YieldGroupSync, spawn_cancel_on_error_none) -{ - int completed = 0; - - auto cr = [&] (asio::yield_context yield) { - wait_for(10ms, yield); - ++completed; - }; - auto err = [] (asio::yield_context yield) { - wait_for(0ms, yield); - throw std::logic_error{"err"}; - }; - - auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none}; - throttle.spawn(cr); - throttle.spawn(cr); - throttle.spawn(err); - throttle.spawn(cr); - EXPECT_THROW(throttle.wait(), std::logic_error); - - EXPECT_EQ(3, completed); -} - -TEST(YieldGroupSync, spawn_cancel_on_error_after) -{ - int completed = 0; - - auto cr = [&] (asio::yield_context yield) { - wait_for(10ms, yield); - ++completed; - }; - auto err = [] (asio::yield_context yield) { - wait_for(0ms, yield); - throw std::logic_error{"err"}; - }; - - auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after}; - throttle.spawn(cr); - throttle.spawn(cr); - throttle.spawn(err); - throttle.spawn(cr); - EXPECT_THROW(throttle.wait(), std::logic_error); - - EXPECT_EQ(2, completed); -} - -TEST(YieldGroupSync, spawn_cancel_on_error_all) -{ - int completed = 0; - - auto cr = [&] (asio::yield_context yield) { - wait_for(1s, yield); - ++completed; - }; - auto err = [] (asio::yield_context yield) { - wait_for(0ms, yield); - throw std::logic_error{"err"}; - }; - - auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all}; - throttle.spawn(cr); - throttle.spawn(cr); - throttle.spawn(err); - throttle.spawn(cr); - EXPECT_THROW(throttle.wait(), std::logic_error); - - EXPECT_EQ(0, completed); -} - - TEST(YieldGroupAsync, wait_empty) { asio::io_context ctx;