]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add spawn_throttle for bounded concurrency with optional_yield
authorCasey Bodley <cbodley@redhat.com>
Wed, 8 May 2024 16:42:42 +0000 (12:42 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 22 May 2024 21:02:15 +0000 (17:02 -0400)
a primitive for structured concurrency with stackful coroutines from
boost::asio::spawn(). this relies on spawn()'s support for per-op
cancellation to guarantee that the lifetime of child coroutines won't
exceed the lifetime of their spawn_throttle, making it safe for children
to access memory from their parent's stack

by taking optional_yield in the constructor, spawn_throttle transparently
supports synchronous execution (where optional_yield is empty) and
asynchronous execution within a stackful coroutine (where optional_yield
contains the parent's yield_context)

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/detail/spawn_throttle_impl.h [new file with mode: 0644]
src/common/async/spawn_throttle.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_spawn_throttle.cc [new file with mode: 0644]

diff --git a/src/common/async/detail/spawn_throttle_impl.h b/src/common/async/detail/spawn_throttle_impl.h
new file mode 100644 (file)
index 0000000..9030f26
--- /dev/null
@@ -0,0 +1,360 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <exception>
+#include <optional>
+#include <memory>
+#include <utility>
+#include <boost/asio/append.hpp>
+#include <boost/asio/associated_cancellation_slot.hpp>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/execution/context.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/query.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/service.h"
+#include "common/async/yield_context.h"
+
+namespace ceph::async::detail {
+
+struct spawn_throttle_handler;
+
+// Reference-counted spawn throttle interface.
+class spawn_throttle_impl :
+    public boost::intrusive_ref_counter<spawn_throttle_impl,
+        boost::thread_unsafe_counter>
+{
+ public:
+  spawn_throttle_impl(size_t limit, cancel_on_error on_error)
+    : limit(limit), on_error(on_error),
+      children(std::make_unique<child[]>(limit))
+  {
+    // initialize the free list
+    for (size_t i = 0; i < limit; i++) {
+      free.push_back(children[i]);
+    }
+  }
+  virtual ~spawn_throttle_impl() {}
+
+  // factory function
+  static auto create(optional_yield y, size_t limit, cancel_on_error on_error)
+      -> boost::intrusive_ptr<spawn_throttle_impl>;
+
+  // return the completion handler for a new child. may block due to throttling
+  // or rethrow an exception from a previously-spawned child
+  spawn_throttle_handler get();
+
+  // track each spawned coroutine for cancellation. these are stored in an
+  // array, and recycled after each use via the free list
+  struct child : boost::intrusive::list_base_hook<> {
+    std::optional<boost::asio::cancellation_signal> signal;
+  };
+
+  using executor_type = boost::asio::any_io_executor;
+  virtual executor_type get_executor() = 0;
+
+  // wait until count <= target_count
+  virtual void wait_for(size_t target_count) = 0;
+
+  // cancel outstanding coroutines
+  virtual void cancel(bool shutdown)
+  {
+    cancel_outstanding_from(outstanding.begin());
+  }
+
+  // complete the given child coroutine
+  virtual void on_complete(child& c, std::exception_ptr eptr)
+  {
+    --count;
+
+    // move back to the free list
+    auto next = outstanding.erase(outstanding.iterator_to(c));
+    c.signal.reset();
+    free.push_back(c);
+
+    if (eptr && !unreported_exception) {
+      // hold on to the first child exception until we can report it in wait()
+      // or completion()
+      unreported_exception = eptr;
+
+      // handle cancel_on_error
+      auto cancel_from = outstanding.end();
+      if (on_error == cancel_on_error::after) {
+        cancel_from = next;
+      } else if (on_error == cancel_on_error::all) {
+        cancel_from = outstanding.begin();
+      }
+      cancel_outstanding_from(cancel_from);
+    }
+  }
+
+ protected:
+  const size_t limit;
+  const cancel_on_error on_error;
+  size_t count = 0;
+
+  void report_exception()
+  {
+    if (unreported_exception) {
+      std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+    }
+  }
+
+ private:
+  std::exception_ptr unreported_exception;
+  std::unique_ptr<child[]> children;
+
+  using child_list = boost::intrusive::list<child,
+        boost::intrusive::constant_time_size<false>>;
+  child_list outstanding;
+  child_list free;
+
+  void cancel_outstanding_from(child_list::iterator i)
+  {
+    while (i != outstanding.end()) {
+      // increment before cancellation, which may invoke on_complete()
+      // directly and remove the child from this list
+      child& c = *i++;
+      c.signal->emit(boost::asio::cancellation_type::terminal);
+    }
+  }
+};
+
+// A cancellable spawn() completion handler that notifies the spawn_throttle
+// upon completion. This holds a reference to the implementation in order to
+// extend its lifetime. This is required for per-op cancellation because the
+// cancellation_signals must outlive these coroutine stacks.
+struct spawn_throttle_handler {
+  boost::intrusive_ptr<spawn_throttle_impl> impl;
+  spawn_throttle_impl::child& c;
+  boost::asio::cancellation_slot slot;
+
+  spawn_throttle_handler(boost::intrusive_ptr<spawn_throttle_impl> impl,
+                         spawn_throttle_impl::child& c)
+    : impl(std::move(impl)), c(c), slot(c.signal->slot())
+  {}
+
+  using executor_type = spawn_throttle_impl::executor_type;
+  executor_type get_executor() const noexcept
+  {
+    return impl->get_executor();
+  }
+
+  using cancellation_slot_type = boost::asio::cancellation_slot;
+  cancellation_slot_type get_cancellation_slot() const noexcept
+  {
+    return slot;
+  }
+
+  void operator()(std::exception_ptr eptr)
+  {
+    impl->on_complete(c, eptr);
+  }
+};
+
+spawn_throttle_handler spawn_throttle_impl::get()
+{
+  report_exception(); // throw unreported exception
+
+  if (count >= limit) {
+    wait_for(limit - 1);
+  }
+
+  ++count;
+
+  // move a free child to the outstanding list
+  child& c = free.front();
+  free.pop_front();
+  outstanding.push_back(c);
+
+  // spawn the coroutine with its associated cancellation signal
+  c.signal.emplace();
+  return {this, c};
+}
+
+
+// Spawn throttle implementation for use in synchronous contexts where wait()
+// blocks the calling thread until completion.
+class sync_spawn_throttle_impl final : public spawn_throttle_impl {
+  static constexpr int concurrency = 1; // only run from a single thread
+ public:
+  sync_spawn_throttle_impl(size_t limit, cancel_on_error on_error)
+    : spawn_throttle_impl(limit, on_error),
+      ctx(std::in_place, concurrency)
+  {}
+
+  executor_type get_executor() override
+  {
+    return ctx->get_executor();
+  }
+
+  void wait_for(size_t target_count) override
+  {
+    while (count > target_count) {
+      if (ctx->stopped()) {
+        ctx->restart();
+      }
+      ctx->run_one();
+    }
+
+    report_exception(); // throw unreported exception
+  }
+
+  void cancel(bool shutdown) override
+  {
+    spawn_throttle_impl::cancel(shutdown);
+
+    if (shutdown) {
+      // destroy the io_context to trigger two-phase shutdown which
+      // destroys any completion handlers with a reference to 'this'
+      ctx.reset();
+      count = 0;
+    }
+  }
+
+ private:
+  std::optional<boost::asio::io_context> ctx;
+};
+
+// Spawn throttle implementation for use in asynchronous contexts where wait()
+// suspends the calling stackful coroutine.
+class async_spawn_throttle_impl final :
+    public spawn_throttle_impl,
+    public service_list_base_hook
+{
+ public:
+  async_spawn_throttle_impl(boost::asio::yield_context yield,
+                            size_t limit, cancel_on_error on_error)
+    : spawn_throttle_impl(limit, on_error),
+      svc(boost::asio::use_service<service<async_spawn_throttle_impl>>(
+              boost::asio::query(yield.get_executor(),
+                                 boost::asio::execution::context))),
+      yield(yield)
+  {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+  }
+
+  ~async_spawn_throttle_impl()
+  {
+    svc.remove(*this);
+  }
+
+  executor_type get_executor() override
+  {
+    return yield.get_executor();
+  }
+
+  void service_shutdown()
+  {
+    waiter.reset();
+  }
+
+ private:
+  service<async_spawn_throttle_impl>& svc;
+  boost::asio::yield_context yield;
+
+  using WaitSignature = void(boost::system::error_code);
+  struct wait_state {
+    using Work = boost::asio::executor_work_guard<
+        boost::asio::any_io_executor>;
+    using Handler = typename boost::asio::async_result<
+        boost::asio::yield_context, WaitSignature>::handler_type;
+
+    Work work;
+    Handler handler;
+
+    explicit wait_state(Handler&& h)
+      : work(make_work_guard(h)),
+        handler(std::move(h))
+    {}
+  };
+  std::optional<wait_state> waiter;
+  size_t wait_for_count = 0;
+
+  struct op_cancellation {
+    async_spawn_throttle_impl* self;
+    explicit op_cancellation(async_spawn_throttle_impl* self) noexcept
+      : self(self) {}
+    void operator()(boost::asio::cancellation_type type) {
+      if (type != boost::asio::cancellation_type::none) {
+        self->cancel(false);
+      }
+    }
+  };
+
+  void wait_for(size_t target_count) override
+  {
+    if (count > target_count) {
+      wait_for_count = target_count;
+
+      boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
+          [this] (auto handler) {
+            auto slot = get_associated_cancellation_slot(handler);
+            if (slot.is_connected()) {
+              slot.template emplace<op_cancellation>(this);
+            }
+            waiter.emplace(std::move(handler));
+          }, yield);
+      // this is a coroutine, so the wait has completed by this point
+    }
+
+    report_exception(); // throw unreported exception
+  }
+
+  void wait_complete(boost::system::error_code ec)
+  {
+    auto w = std::move(*waiter);
+    waiter.reset();
+    boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec));
+  }
+
+  void on_complete(child& c, std::exception_ptr eptr) override
+  {
+    spawn_throttle_impl::on_complete(c, eptr);
+
+    if (waiter && count <= wait_for_count) {
+      wait_complete({});
+    }
+  }
+
+  void cancel(bool shutdown) override
+  {
+    spawn_throttle_impl::cancel(shutdown);
+
+    if (waiter) {
+      wait_complete(make_error_code(boost::asio::error::operation_aborted));
+    }
+  }
+};
+
+auto spawn_throttle_impl::create(optional_yield y, size_t limit,
+                                 cancel_on_error on_error)
+    -> boost::intrusive_ptr<spawn_throttle_impl>
+{
+  if (y) {
+    auto yield = y.get_yield_context();
+    return new async_spawn_throttle_impl(yield, limit, on_error);
+  } else {
+    return new sync_spawn_throttle_impl(limit, on_error);
+  }
+}
+
+} // namespace ceph::async::detail
diff --git a/src/common/async/spawn_throttle.h b/src/common/async/spawn_throttle.h
new file mode 100644 (file)
index 0000000..02e07ca
--- /dev/null
@@ -0,0 +1,146 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "detail/spawn_throttle_impl.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include "cancel_on_error.h"
+#include "yield_context.h"
+
+namespace ceph::async {
+
+/// A coroutine throttle that allows a thread of execution to spawn and manage
+/// multiple child coroutines, while enforcing an upper bound on concurrency.
+/// The parent may either be a synchronous function or a stackful coroutine,
+/// depending on the optional_yield constructor argument.
+///
+/// Child coroutines are spawned by calling boost::asio::spawn() and using the
+/// spawn_throttle object as the CompletionToken argument. Exceptions thrown
+/// by children are reported to the caller on its next call to get() or wait().
+/// The cancel_on_error option controls whether these exceptions trigger the
+/// cancellation of other children.
+///
+/// All child coroutines are canceled by cancel() or spawn_throttle destruction.
+/// This allows a parent function to share memory with its child coroutines
+/// without fear of dangling references.
+///
+/// This class is not thread-safe. Member functions should be called from the
+/// parent thread of execution only.
+///
+/// Example:
+/// @code
+/// void child(boost::asio::yield_context yield);
+///
+/// void parent(size_t count, optional_yield y)
+/// {
+///   // spawn all children, up to 10 at a time
+///   auto throttle = ceph::async::spawn_throttle{y, 10};
+///
+///   for (size_t i = 0; i < count; i++) {
+///     boost::asio::spawn(throttle.get_executor(), child, throttle);
+///   }
+///   throttle.wait();
+/// }
+/// @endcode
+class spawn_throttle {
+  using impl_type = detail::spawn_throttle_impl;
+  boost::intrusive_ptr<impl_type> impl;
+
+ public:
+  spawn_throttle(optional_yield y, size_t limit,
+                 cancel_on_error on_error = cancel_on_error::none)
+    : impl(detail::spawn_throttle_impl::create(y, limit, on_error))
+  {}
+
+  spawn_throttle(spawn_throttle&&) = default;
+  spawn_throttle& operator=(spawn_throttle&&) = default;
+  // disable copy for unique ownership
+  spawn_throttle(const spawn_throttle&) = delete;
+  spawn_throttle& operator=(const spawn_throttle&) = delete;
+
+  /// Cancel outstanding coroutines on destruction.
+  ~spawn_throttle()
+  {
+    if (impl) {
+      impl->cancel(true);
+    }
+  }
+
+  using executor_type = impl_type::executor_type;
+  executor_type get_executor()
+  {
+    return impl->get_executor();
+  }
+
+  /// Return a cancellable spawn() completion handler with signature
+  /// void(std::exception_ptr).
+  ///
+  /// This function may block until a throttle unit becomes available. If one or
+  /// more previously-spawned coroutines exit with an exception, the first such
+  /// exception is rethrown here. 
+  ///
+  /// As a convenience, you can avoid calling this function by using the
+  /// spawn_throttle itself as a CompletionToken for spawn().
+  auto get()
+    -> detail::spawn_throttle_handler
+  {
+    return impl->get();
+  }
+
+  /// Wait for all outstanding completions before returning. If any
+  /// of the spawned coroutines exits with an exception, the first exception
+  /// is rethrown.
+  ///
+  /// After wait() completes, whether successfully or by exception, the yield
+  /// throttle can be reused to spawn and await additional coroutines.
+  void wait()
+  {
+    impl->wait_for(0);
+  }
+
+  /// Cancel all outstanding coroutines.
+  void cancel()
+  {
+    impl->cancel(false);
+  }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// Allow spawn_throttle to be used as a CompletionToken.
+template <typename Signature>
+struct async_result<ceph::async::spawn_throttle, Signature>
+{
+  using completion_handler_type =
+      ceph::async::detail::spawn_throttle_handler;
+  async_result(completion_handler_type&) {}
+
+  using return_type = void;
+  return_type get() {}
+
+  template <typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                              ceph::async::spawn_throttle& throttle,
+                              Args&& ...args)
+  {
+    return std::move(init)(throttle.get(), std::forward<Args>(args)...);
+  }
+};
+
+} // namespace boost::asio
index 08f77a03894858b27d2f5e6c20782092231022b7..d54ea127cc9687e9746d62b33a361beb61fd35ee 100644 (file)
@@ -361,6 +361,10 @@ add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
 add_ceph_unittest(unittest_async_shared_mutex)
 target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)
 
+add_executable(unittest_async_spawn_throttle test_async_spawn_throttle.cc)
+add_ceph_unittest(unittest_async_spawn_throttle)
+target_link_libraries(unittest_async_spawn_throttle ceph-common Boost::system Boost::context)
+
 add_executable(unittest_async_yield_waiter test_async_yield_waiter.cc)
 add_ceph_unittest(unittest_async_yield_waiter)
 target_link_libraries(unittest_async_yield_waiter ceph-common Boost::system Boost::context)
diff --git a/src/test/common/test_async_spawn_throttle.cc b/src/test/common/test_async_spawn_throttle.cc
new file mode 100644 (file)
index 0000000..6f306a1
--- /dev/null
@@ -0,0 +1,751 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/spawn_throttle.h"
+
+#include <optional>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <gtest/gtest.h>
+#include "common/async/yield_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+using error_code = boost::system::error_code;
+
+void rethrow(std::exception_ptr eptr)
+{
+  if (eptr) std::rethrow_exception(eptr);
+}
+
+auto capture(std::optional<std::exception_ptr>& eptr)
+{
+  return [&eptr] (std::exception_ptr e) { eptr = e; };
+}
+
+auto capture(asio::cancellation_signal& signal,
+             std::optional<std::exception_ptr>& eptr)
+{
+  return asio::bind_cancellation_slot(signal.slot(), capture(eptr));
+}
+
+using namespace std::chrono_literals;
+
+void wait_for(std::chrono::milliseconds dur, asio::yield_context yield)
+{
+  auto timer = asio::steady_timer{yield.get_executor(), dur};
+  timer.async_wait(yield);
+}
+
+auto wait_for(std::chrono::milliseconds dur)
+{
+  return [dur] (asio::yield_context yield) { wait_for(dur, yield); };
+}
+
+auto wait_on(yield_waiter<void>& handler)
+{
+  return [&handler] (asio::yield_context yield) {
+    handler.async_wait(yield);
+  };
+}
+
+
+TEST(YieldGroupSync, wait_empty)
+{
+  auto throttle = spawn_throttle{null_yield, 2};
+  throttle.wait();
+}
+
+TEST(YieldGroupSync, spawn_wait)
+{
+  int completed = 0;
+  auto cr = [&] (asio::yield_context yield) {
+    wait_for(1ms, yield);
+    ++completed;
+  };
+
+  auto throttle = spawn_throttle{null_yield, 2};
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.wait();
+
+  EXPECT_EQ(1, completed);
+}
+
+TEST(YieldGroupSync, spawn_shutdown)
+{
+  auto throttle = spawn_throttle{null_yield, 2};
+  asio::spawn(throttle.get_executor(), wait_for(1s), throttle);
+}
+
+TEST(YieldGroupSync, spawn_cancel_wait)
+{
+  int completed = 0;
+
+  auto cr = [&] (asio::yield_context yield) {
+    wait_for(1s, yield);
+    ++completed;
+  };
+
+  auto throttle = spawn_throttle{null_yield, 2};
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.cancel();
+  EXPECT_THROW(throttle.wait(), boost::system::system_error);
+
+  EXPECT_EQ(0, completed);
+}
+
+TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait)
+{
+  int completed = 0;
+
+  auto cr = [&] (asio::yield_context yield) {
+    wait_for(1ms, yield);
+    ++completed;
+  };
+
+  auto throttle = spawn_throttle{null_yield, 2};
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.cancel();
+  EXPECT_THROW(throttle.wait(), boost::system::system_error);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.wait();
+
+  EXPECT_EQ(1, completed);
+}
+
+TEST(YieldGroupSync, spawn_over_limit)
+{
+  int concurrent = 0;
+  int max_concurrent = 0;
+  int completed = 0;
+
+  auto cr = [&] (asio::yield_context yield) {
+    ++concurrent;
+    if (max_concurrent < concurrent) {
+      max_concurrent = concurrent;
+    }
+
+    wait_for(1ms, yield);
+
+    --concurrent;
+    ++completed;
+  };
+
+  auto throttle = spawn_throttle{null_yield, 2};
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle); // blocks
+  asio::spawn(throttle.get_executor(), cr, throttle); // blocks
+  throttle.wait(); // blocks
+
+  EXPECT_EQ(0, concurrent);
+  EXPECT_EQ(2, max_concurrent);
+  EXPECT_EQ(4, completed);
+}
+
+TEST(YieldGroupSync, spawn_cancel_on_error_none)
+{
+  int completed = 0;
+
+  auto cr = [&] (asio::yield_context yield) {
+    wait_for(10ms, yield);
+    ++completed;
+  };
+  auto err = [] (asio::yield_context yield) {
+    wait_for(0ms, yield);
+    throw std::logic_error{"err"};
+  };
+
+  auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none};
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), err, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  EXPECT_THROW(throttle.wait(), std::logic_error);
+
+  EXPECT_EQ(3, completed);
+}
+
+TEST(YieldGroupSync, spawn_cancel_on_error_after)
+{
+  int completed = 0;
+
+  auto cr = [&] (asio::yield_context yield) {
+    wait_for(10ms, yield);
+    ++completed;
+  };
+  auto err = [] (asio::yield_context yield) {
+    wait_for(0ms, yield);
+    throw std::logic_error{"err"};
+  };
+
+  auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after};
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), err, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  EXPECT_THROW(throttle.wait(), std::logic_error);
+
+  EXPECT_EQ(2, completed);
+}
+
+TEST(YieldGroupSync, spawn_cancel_on_error_all)
+{
+  int completed = 0;
+
+  auto cr = [&] (asio::yield_context yield) {
+    wait_for(1s, yield);
+    ++completed;
+  };
+  auto err = [] (asio::yield_context yield) {
+    wait_for(0ms, yield);
+    throw std::logic_error{"err"};
+  };
+
+  auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all};
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  asio::spawn(throttle.get_executor(), err, throttle);
+  asio::spawn(throttle.get_executor(), cr, throttle);
+  EXPECT_THROW(throttle.wait(), std::logic_error);
+
+  EXPECT_EQ(0, completed);
+}
+
+
+TEST(YieldGroupAsync, wait_empty)
+{
+  asio::io_context ctx;
+  asio::spawn(ctx, [] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 2};
+      throttle.wait();
+    }, rethrow);
+
+  ctx.run();
+}
+
+TEST(YieldGroupAsync, spawn_wait)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 2};
+      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.wait(); // blocks
+    }, rethrow);
+
+  ASSERT_FALSE(waiter);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter);
+
+  waiter.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+}
+
+TEST(YieldGroupAsync, spawn_over_limit)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+  yield_waiter<void> waiter3;
+  yield_waiter<void> waiter4;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 2};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      asio::spawn(yield, wait_on(waiter2), throttle);
+      asio::spawn(yield, wait_on(waiter3), throttle); // blocks
+      asio::spawn(yield, wait_on(waiter4), throttle); // blocks
+      throttle.wait(); // blocks
+    }, rethrow);
+
+  ASSERT_FALSE(waiter1);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_TRUE(waiter2);
+  ASSERT_FALSE(waiter3);
+
+  waiter1.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter3);
+  ASSERT_FALSE(waiter4);
+
+  waiter3.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter4);
+
+  waiter2.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter4.complete(error_code{});
+
+  ctx.poll();
+  EXPECT_TRUE(ctx.stopped());
+}
+
+TEST(YieldGroupAsync, spawn_shutdown)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 2};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      waiter2.async_wait(yield); // blocks
+      // shut down while there's an outstanding child but throttle is not
+      // waiting on spawn() or wait()
+    }, rethrow);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(waiter1);
+  EXPECT_TRUE(waiter2);
+}
+
+TEST(YieldGroupAsync, spawn_throttled_shutdown)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+      // shut down while we're throttled on the second spawn
+    }, rethrow);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(waiter1);
+  EXPECT_FALSE(waiter2);
+}
+
+TEST(YieldGroupAsync, spawn_wait_shutdown)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.wait(); // blocks
+      // shut down while we're wait()ing
+    }, rethrow);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(waiter);
+}
+
+TEST(YieldGroupAsync, spawn_throttled_error)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+    }, capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_FALSE(waiter2);
+
+  waiter1.complete(make_error_code(std::errc::no_such_file_or_directory));
+
+  ctx.poll();
+  EXPECT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_throttled_signal)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+    }, capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_FALSE(waiter2);
+
+  signal.emit(boost::asio::cancellation_type::terminal);
+
+  ctx.poll();
+  EXPECT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_wait_error)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter;
+
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.wait(); // blocks
+    }, capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter);
+
+  waiter.complete(make_error_code(std::errc::no_such_file_or_directory));
+
+  ctx.poll();
+  EXPECT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_wait_signal)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter;
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.wait(); // blocks
+    }, capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter);
+  ASSERT_FALSE(result);
+
+  signal.emit(boost::asio::cancellation_type::terminal);
+
+  ctx.poll();
+  EXPECT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_cancel_wait)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter;
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 2};
+      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.cancel();
+      throttle.wait();
+    }, capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_cancel_on_error_none)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+  yield_waiter<void> waiter3;
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 4, cancel_on_error::none};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      asio::spawn(yield, wait_on(waiter2), throttle);
+      asio::spawn(yield, wait_on(waiter3), throttle);
+      throttle.wait(); // blocks
+    }, capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_TRUE(waiter2);
+  ASSERT_TRUE(waiter3);
+
+  waiter2.complete(make_error_code(std::errc::no_such_file_or_directory));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter1.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter3.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_cancel_on_error_after)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+  yield_waiter<void> waiter3;
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 4, cancel_on_error::after};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      asio::spawn(yield, wait_on(waiter2), throttle);
+      asio::spawn(yield, wait_on(waiter3), throttle);
+      throttle.wait(); // blocks
+    }, capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_TRUE(waiter2);
+  ASSERT_TRUE(waiter3);
+
+  waiter2.complete(make_error_code(std::errc::no_such_file_or_directory));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  // if the waiter3 cr was canceled, completing waiter1 should unblock wait()
+  waiter1.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_cancel_on_error_all)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+  yield_waiter<void> waiter3;
+  std::optional<std::exception_ptr> result;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 4, cancel_on_error::all};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      asio::spawn(yield, wait_on(waiter2), throttle);
+      asio::spawn(yield, wait_on(waiter3), throttle);
+      throttle.wait(); // blocks
+    }, capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_TRUE(waiter2);
+  ASSERT_TRUE(waiter3);
+
+  // should cancel the other crs and unblock throttle.wait()
+  waiter2.complete(make_error_code(std::errc::no_such_file_or_directory));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(YieldGroupAsync, spawn_wait_spawn_wait)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      throttle.wait(); // blocks
+      asio::spawn(yield, wait_on(waiter2), throttle);
+      throttle.wait(); // blocks
+    }, rethrow);
+
+  ASSERT_FALSE(waiter1);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_FALSE(waiter2);
+
+  waiter1.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter2);
+
+  waiter2.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+}
+
+TEST(YieldGroupAsync, spawn_cancel_wait_spawn_wait)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      throttle.cancel();
+      EXPECT_THROW(throttle.wait(), boost::system::system_error);
+      asio::spawn(yield, wait_on(waiter2), throttle);
+      throttle.wait(); // blocks
+    }, rethrow);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter2);
+
+  waiter2.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+}
+
+TEST(YieldGroupAsync, spawn_error_wait_spawn_wait)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter1;
+  yield_waiter<void> waiter2;
+
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+      auto throttle = spawn_throttle{yield, 1};
+      asio::spawn(yield, wait_on(waiter1), throttle);
+      EXPECT_THROW(throttle.wait(), boost::system::system_error);
+      asio::spawn(yield, wait_on(waiter2), throttle);
+      throttle.wait(); // blocks
+    }, rethrow);
+
+  ASSERT_FALSE(waiter1);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter1);
+  ASSERT_FALSE(waiter2);
+
+  waiter1.complete(make_error_code(std::errc::no_such_file_or_directory));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(waiter2);
+
+  waiter2.complete(error_code{});
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+}
+
+} // namespace ceph::async