]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: remove null_yield support for spawn_throttle
authorCasey Bodley <cbodley@redhat.com>
Tue, 22 Jul 2025 18:37:08 +0000 (14:37 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 28 Jul 2025 15:56:24 +0000 (11:56 -0400)
null_yield support was provided by sync_spawn_throttle_impl, which
itself stores the boost::asio::io_context that it runs on. however,
valgrind reports issues with object lifetimes

each spawn_throttle_handler holds a reference count on the
spawn_throttle_impl to prevent it from going away while coroutines are
in flight. on completion, this spawn_throttle_handler gets invoked on
its associated executor. this invocation may drop the last reference
to spawn_throttle_impl while that associated executor is still trying
to use the io_context that was destroyed with sync_spawn_throttle_impl

finding no good way for a custom executor to preserve the lifetime of
the spawn_throttle_impl, we instead remove support for this flawed
design. the null_yield callers must first spawn a parent coroutine
before using spawn_throttle

Fixes: https://tracker.ceph.com/issues/70965
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/detail/spawn_throttle_impl.h
src/common/async/spawn_throttle.h
src/test/common/test_async_spawn_throttle.cc

index 75a28ac48e70b447b7bfd4008f208140fd8b2e2b..c47aa2a518c6671d5fe6172db64b4dcf79f58e6e 100644 (file)
@@ -23,7 +23,6 @@
 #include <boost/asio/associated_cancellation_slot.hpp>
 #include <boost/asio/async_result.hpp>
 #include <boost/asio/execution/context.hpp>
-#include <boost/asio/io_context.hpp>
 #include <boost/asio/query.hpp>
 #include <boost/asio/spawn.hpp>
 #include <boost/intrusive_ptr.hpp>
@@ -53,10 +52,6 @@ class spawn_throttle_impl :
   }
   virtual ~spawn_throttle_impl() {}
 
-  // factory function
-  static auto create(optional_yield y, size_t limit, cancel_on_error on_error)
-      -> boost::intrusive_ptr<spawn_throttle_impl>;
-
   // return the completion handler for a new child. may block due to throttling
   // or rethrow an exception from a previously-spawned child
   spawn_throttle_handler get();
@@ -189,52 +184,6 @@ inline spawn_throttle_handler spawn_throttle_impl::get()
   return {this, c};
 }
 
-
-// Spawn throttle implementation for use in synchronous contexts where wait()
-// blocks the calling thread until completion.
-class sync_spawn_throttle_impl final : public spawn_throttle_impl {
-  static constexpr int concurrency = 1; // only run from a single thread
- public:
-  sync_spawn_throttle_impl(size_t limit, cancel_on_error on_error)
-    : spawn_throttle_impl(limit, on_error),
-      ctx(std::in_place, concurrency)
-  {}
-
-  executor_type get_executor() override
-  {
-    return ctx->get_executor();
-  }
-
-  void wait_for(size_t target_count) override
-  {
-    while (count > target_count) {
-      if (ctx->stopped()) {
-        ctx->restart();
-      }
-      ctx->run_one();
-    }
-
-    report_exception(); // throw unreported exception
-  }
-
-  void cancel(bool shutdown) override
-  {
-    spawn_throttle_impl::cancel(shutdown);
-
-    if (shutdown) {
-      // destroy the io_context to trigger two-phase shutdown which
-      // destroys any completion handlers with a reference to 'this'
-      ctx.reset();
-      count = 0;
-    }
-  }
-
- private:
-  std::optional<boost::asio::io_context> ctx;
-};
-
-// Spawn throttle implementation for use in asynchronous contexts where wait()
-// suspends the calling stackful coroutine.
 class async_spawn_throttle_impl final :
     public spawn_throttle_impl,
     public service_list_base_hook
@@ -345,16 +294,4 @@ class async_spawn_throttle_impl final :
   }
 };
 
-inline auto spawn_throttle_impl::create(optional_yield y, size_t limit,
-                                        cancel_on_error on_error)
-    -> boost::intrusive_ptr<spawn_throttle_impl>
-{
-  if (y) {
-    auto yield = y.get_yield_context();
-    return new async_spawn_throttle_impl(yield, limit, on_error);
-  } else {
-    return new sync_spawn_throttle_impl(limit, on_error);
-  }
-}
-
 } // namespace ceph::async::detail
index 1fdff1928c7fe08d971f7cdea48d1115fc111875..41aae346b1bb688e4510e96d6cccae4b0a6c5d78 100644 (file)
 
 namespace ceph::async {
 
-/// A coroutine throttle that allows a thread of execution to spawn and manage
+/// A coroutine throttle that allows a parent coroutine to spawn and manage
 /// multiple child coroutines, while enforcing an upper bound on concurrency.
-/// The parent may either be a synchronous function or a stackful coroutine,
-/// depending on the optional_yield constructor argument.
 ///
 /// Child coroutines take boost::asio::yield_context as the only argument.
 /// Exceptions thrown by children are reported to the caller on its next call
@@ -44,10 +42,10 @@ namespace ceph::async {
 /// @code
 /// void child(boost::asio::yield_context yield);
 ///
-/// void parent(size_t count, optional_yield y)
+/// void parent(size_t count, boost::asio::yield_context yield)
 /// {
 ///   // spawn all children, up to 10 at a time
-///   auto throttle = ceph::async::spawn_throttle{y, 10};
+///   auto throttle = ceph::async::spawn_throttle{yield, 10};
 ///
 ///   for (size_t i = 0; i < count; i++) {
 ///     throttle.spawn(child);
@@ -60,9 +58,9 @@ class spawn_throttle {
   boost::intrusive_ptr<impl_type> impl;
 
  public:
-  spawn_throttle(optional_yield y, size_t limit,
+  spawn_throttle(boost::asio::yield_context yield, size_t limit,
                  cancel_on_error on_error = cancel_on_error::none)
-    : impl(detail::spawn_throttle_impl::create(y, limit, on_error))
+    : impl(new detail::async_spawn_throttle_impl(yield, limit, on_error))
   {}
 
   spawn_throttle(spawn_throttle&&) = default;
index 7e19738a8e2ce7859fe4859c6114ffe2d3924566..4c12ac5dd8176220c0c07622b03f0d20686ce7c4 100644 (file)
@@ -66,169 +66,6 @@ auto wait_on(yield_waiter<void>& handler)
 }
 
 
-TEST(YieldGroupSync, wait_empty)
-{
-  auto throttle = spawn_throttle{null_yield, 2};
-  throttle.wait();
-}
-
-TEST(YieldGroupSync, spawn_wait)
-{
-  int completed = 0;
-  auto cr = [&] (asio::yield_context yield) {
-    wait_for(1ms, yield);
-    ++completed;
-  };
-
-  auto throttle = spawn_throttle{null_yield, 2};
-  throttle.spawn(cr);
-  throttle.wait();
-
-  EXPECT_EQ(1, completed);
-}
-
-TEST(YieldGroupSync, spawn_shutdown)
-{
-  auto throttle = spawn_throttle{null_yield, 2};
-  throttle.spawn(wait_for(1s));
-}
-
-TEST(YieldGroupSync, spawn_cancel_wait)
-{
-  int completed = 0;
-
-  auto cr = [&] (asio::yield_context yield) {
-    wait_for(1s, yield);
-    ++completed;
-  };
-
-  auto throttle = spawn_throttle{null_yield, 2};
-  throttle.spawn(cr);
-  throttle.cancel();
-  EXPECT_THROW(throttle.wait(), boost::system::system_error);
-
-  EXPECT_EQ(0, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait)
-{
-  int completed = 0;
-
-  auto cr = [&] (asio::yield_context yield) {
-    wait_for(1ms, yield);
-    ++completed;
-  };
-
-  auto throttle = spawn_throttle{null_yield, 2};
-  throttle.spawn(cr);
-  throttle.cancel();
-  EXPECT_THROW(throttle.wait(), boost::system::system_error);
-  throttle.spawn(cr);
-  throttle.wait();
-
-  EXPECT_EQ(1, completed);
-}
-
-TEST(YieldGroupSync, spawn_over_limit)
-{
-  int concurrent = 0;
-  int max_concurrent = 0;
-  int completed = 0;
-
-  auto cr = [&] (asio::yield_context yield) {
-    ++concurrent;
-    if (max_concurrent < concurrent) {
-      max_concurrent = concurrent;
-    }
-
-    wait_for(1ms, yield);
-
-    --concurrent;
-    ++completed;
-  };
-
-  auto throttle = spawn_throttle{null_yield, 2};
-  throttle.spawn(cr);
-  throttle.spawn(cr);
-  throttle.spawn(cr); // blocks
-  throttle.spawn(cr); // blocks
-  throttle.wait(); // blocks
-
-  EXPECT_EQ(0, concurrent);
-  EXPECT_EQ(2, max_concurrent);
-  EXPECT_EQ(4, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_on_error_none)
-{
-  int completed = 0;
-
-  auto cr = [&] (asio::yield_context yield) {
-    wait_for(10ms, yield);
-    ++completed;
-  };
-  auto err = [] (asio::yield_context yield) {
-    wait_for(0ms, yield);
-    throw std::logic_error{"err"};
-  };
-
-  auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none};
-  throttle.spawn(cr);
-  throttle.spawn(cr);
-  throttle.spawn(err);
-  throttle.spawn(cr);
-  EXPECT_THROW(throttle.wait(), std::logic_error);
-
-  EXPECT_EQ(3, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_on_error_after)
-{
-  int completed = 0;
-
-  auto cr = [&] (asio::yield_context yield) {
-    wait_for(10ms, yield);
-    ++completed;
-  };
-  auto err = [] (asio::yield_context yield) {
-    wait_for(0ms, yield);
-    throw std::logic_error{"err"};
-  };
-
-  auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after};
-  throttle.spawn(cr);
-  throttle.spawn(cr);
-  throttle.spawn(err);
-  throttle.spawn(cr);
-  EXPECT_THROW(throttle.wait(), std::logic_error);
-
-  EXPECT_EQ(2, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_on_error_all)
-{
-  int completed = 0;
-
-  auto cr = [&] (asio::yield_context yield) {
-    wait_for(1s, yield);
-    ++completed;
-  };
-  auto err = [] (asio::yield_context yield) {
-    wait_for(0ms, yield);
-    throw std::logic_error{"err"};
-  };
-
-  auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all};
-  throttle.spawn(cr);
-  throttle.spawn(cr);
-  throttle.spawn(err);
-  throttle.spawn(cr);
-  EXPECT_THROW(throttle.wait(), std::logic_error);
-
-  EXPECT_EQ(0, completed);
-}
-
-
 TEST(YieldGroupAsync, wait_empty)
 {
   asio::io_context ctx;