]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: remove error_codes from co_throttle interface
authorCasey Bodley <cbodley@redhat.com>
Mon, 6 Feb 2023 22:05:12 +0000 (17:05 -0500)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:48:00 +0000 (17:48 -0400)
in the initial design of co_throttle described in
https://github.com/ceph/ceph/pull/49720, the cancel_on_error option only
applied to errors from awaitable<error_code> but not to exceptions from
awaitable<void> coroutines

with the decision to use exceptions as the default method of error
handling in rgw multisite, this design choice no longer makes sense.
i've removed the error_code overloads entirely, and changed the
exception handling logic to match the previous behavior for error codes

the unit tests were rewritten with co_waiter instead of timers to make
them deterministic and faster. co_waiter's cancellation behavior
exposed some issues where the cancellation signal could cause the
completions to recurse, so on_complete() was restructured to tolerate
that

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/co_throttle.h
src/common/async/detail/co_throttle_impl.h
src/test/common/test_async_co_throttle.cc

index bf1e9685c672548a08c06940137154e16a873946..f8d399ed40ea53f9133074b1ace5a1c2f5e78d61 100644 (file)
@@ -25,10 +25,10 @@ namespace ceph::async {
 /// A coroutine throttle that allows a parent coroutine to spawn and manage
 /// multiple child coroutines, while enforcing an upper bound on concurrency.
 ///
-/// Child coroutines can be of type awaitable<void> or awaitable<error_code>.
-/// Error codes returned by children are reported to the parent on its next call
-/// to spawn() or wait(). The cancel_on_error option controls whether these
-/// errors trigger the cancellation of other children.
+/// Child coroutines must be of type awaitable<void>. Exceptions thrown by
+/// children are rethrown to the parent on its next call to spawn() or wait().
+/// The cancel_on_error option controls whether these exceptions errors trigger
+/// the cancellation of other children.
 ///
 /// All child coroutines are canceled by cancel() or co_throttle destruction.
 /// This allows the parent coroutine to share memory with its child coroutines
@@ -38,7 +38,7 @@ namespace ceph::async {
 /// multi-threaded contexts.
 ///
 /// Example:
-/// @code
+/// \code
 /// awaitable<void> child(task& t);
 ///
 /// awaitable<void> parent(std::span<task> tasks)
@@ -52,14 +52,17 @@ namespace ceph::async {
 ///   }
 ///   co_await throttle.wait();
 /// }
-/// @endcode
+/// \endcode
 template <boost::asio::execution::executor Executor>
 class co_throttle {
+  using impl_type = detail::co_throttle_impl<Executor>;
+  boost::intrusive_ptr<impl_type> impl;
+
  public:
   using executor_type = Executor;
-  executor_type get_executor() const { return impl->get_executor(); }
+  executor_type get_executor() const noexcept { return impl->get_executor(); }
 
-  using size_type = uint16_t;
+  using size_type = typename impl_type::size_type;
   static constexpr size_type max_limit = std::numeric_limits<size_type>::max();
 
   co_throttle(const executor_type& ex, size_type limit,
@@ -76,52 +79,36 @@ class co_throttle {
   co_throttle(const co_throttle&) = delete;
   co_throttle& operator=(const co_throttle&) = delete;
 
-  template <typename T>
-  using awaitable = boost::asio::awaitable<T, executor_type>;
-
-  /// Try to spawn the given coroutine. If this would exceed the concurrency
-  /// limit, wait for another coroutine to complete first. This default
-  /// limit can be overridden with the optional `smaller_limit` argument.
+  /// Try to spawn the given coroutine \ref cr. If this would exceed the
+  /// concurrency limit, wait for another coroutine to complete first. This
+  /// default limit can be overridden with the optional \ref smaller_limit
+  /// argument.
   ///
-  /// If any spawned coroutines of type awaitable<error_code> return a non-zero
-  /// error, the first such error is reported by the next call to spawn() or
-  /// wait(). When spawn() reports these errors, the given coroutine given will
-  /// only be spawned in the case of cancel_on_error::none. New coroutines can
-  /// be spawned by later calls to spawn() regardless of cancel_on_error.
-  ///
-  /// If a spawned coroutine exits by an uncaught exception, that exception is
-  /// rethrown by the next call to spawn() or wait().
-  auto spawn(awaitable<boost::system::error_code> cr,
+  /// If any spawned coroutines exit with an exception, the first exception is
+  /// rethrown by the next call to spawn() or wait(). If spawn() has an
+  /// exception to rethrow, it will spawn \cr first only in the case of
+  /// cancel_on_error::none. New coroutines can be spawned by later calls to
+  /// spawn() regardless of cancel_on_error.
+  auto spawn(boost::asio::awaitable<void, executor_type> cr,
              size_type smaller_limit = max_limit)
-      -> awaitable<boost::system::error_code>
-  {
-    return impl->spawn(std::move(cr), smaller_limit);
-  }
-
-  /// \overload
-  auto spawn(awaitable<void> cr, size_type smaller_limit = max_limit)
-      -> awaitable<boost::system::error_code>
+      -> boost::asio::awaitable<void, executor_type>
   {
     return impl->spawn(std::move(cr), smaller_limit);
   }
 
   /// Wait for all associated coroutines to complete. If any of these coroutines
-  /// return a non-zero error_code, the first of those errors is returned.
-  awaitable<boost::system::error_code> wait()
+  /// exit with an exception, the first of those exceptions is rethrown.
+  auto wait()
+      -> boost::asio::awaitable<void, executor_type>
   {
     return impl->wait();
   }
 
-  /// Cancel all associated coroutines. Callers waiting on spawn() or wait()
-  /// will fail with boost::asio::error::operation_aborted.
+  /// Cancel all associated coroutines.
   void cancel()
   {
     impl->cancel();
   }
-
- private:
-  using impl_type = detail::co_throttle_impl<Executor, size_type>;
-  boost::intrusive_ptr<impl_type> impl;
 };
 
 } // namespace ceph::async
index d1a89db94c0644c4f22748a2902017b108919d8f..43f3691439d7eabb1982ccb29044b33ab4001543 100644 (file)
@@ -33,14 +33,14 @@ namespace ceph::async::detail {
 // co_spawn() completion handlers can extend the implementation's lifetime.
 // This is required for per-op cancellation because the cancellation_signals
 // must outlive their coroutine frames.
-template <boost::asio::execution::executor Executor, typename SizeType>
+template <boost::asio::execution::executor Executor>
 class co_throttle_impl :
-    public boost::intrusive_ref_counter<co_throttle_impl<Executor, SizeType>,
+    public boost::intrusive_ref_counter<co_throttle_impl<Executor>,
         boost::thread_unsafe_counter>,
     public service_list_base_hook
 {
  public:
-  using size_type = SizeType;
+  using size_type = uint16_t;
 
   using executor_type = Executor;
   executor_type get_executor() const { return ex; }
@@ -65,29 +65,19 @@ class co_throttle_impl :
     svc.remove(*this);
   }
 
-  template <typename T>
-  using awaitable = boost::asio::awaitable<T, executor_type>;
-
-  template <typename T> // where T=void or error_code
-  auto spawn(awaitable<T> cr, size_type smaller_limit)
-      -> awaitable<boost::system::error_code>
+  auto spawn(boost::asio::awaitable<void, executor_type> cr,
+             size_type smaller_limit)
+      -> boost::asio::awaitable<void, executor_type>
   {
-    if (unreported_exception) {
+    if (unreported_exception && on_error != cancel_on_error::none) {
       std::rethrow_exception(std::exchange(unreported_exception, nullptr));
     }
-    if (unreported_error && on_error != cancel_on_error::none) {
-      co_return std::exchange(unreported_error, {});
-    }
 
     const size_type current_limit = std::min(smaller_limit, limit);
     if (count >= current_limit) {
-      auto ec = co_await wait_for(current_limit - 1);
-      if (ec) {
-        unreported_error.clear();
-        co_return ec;
-      }
-      if (unreported_error && on_error != cancel_on_error::none) {
-        co_return std::exchange(unreported_error, {});
+      co_await wait_for(current_limit - 1);
+      if (unreported_exception && on_error != cancel_on_error::none) {
+        std::rethrow_exception(std::exchange(unreported_exception, nullptr));
       }
     }
 
@@ -107,32 +97,31 @@ class co_throttle_impl :
         boost::asio::bind_cancellation_slot(c.signal->slot(),
             child_completion{this, c}));
 
-    co_return std::exchange(unreported_error, {});
+    if (unreported_exception) {
+      std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+    }
   }
 
-  awaitable<boost::system::error_code> wait()
+  auto wait()
+      -> boost::asio::awaitable<void, executor_type>
   {
     if (count > 0) {
-      auto ec = co_await wait_for(0);
-      if (ec) {
-        unreported_error.clear();
-        co_return ec;
-      }
+      co_await wait_for(0);
+    }
+    if (unreported_exception) {
+      std::rethrow_exception(std::exchange(unreported_exception, nullptr));
     }
-    co_return std::exchange(unreported_error, {});
   }
 
   void cancel()
   {
-    for (child& c : outstanding) {
+    while (!outstanding.empty()) {
+      child& c = outstanding.front();
+      outstanding.pop_front();
+
       c.canceled = true;
       c.signal->emit(boost::asio::cancellation_type::terminal);
     }
-    if (waiter.waiting()) {
-      auto eptr = std::exchange(unreported_exception, nullptr);
-      auto ec = make_error_code(boost::asio::error::operation_aborted);
-      waiter.complete(eptr, ec);
-    }
   }
 
   void service_shutdown()
@@ -149,7 +138,6 @@ class co_throttle_impl :
   size_type count = 0;
   size_type wait_for_count = 0;
 
-  boost::system::error_code unreported_error;
   std::exception_ptr unreported_exception;
 
   // track each spawned coroutine for cancellation. these are stored in an
@@ -165,58 +153,61 @@ class co_throttle_impl :
   child_list outstanding;
   child_list free;
 
-  co_waiter<boost::system::error_code, executor_type> waiter;
+  co_waiter<void, executor_type> waiter;
 
   // return an awaitable that completes once count <= target_count
   auto wait_for(size_type target_count)
-      -> awaitable<boost::system::error_code>
+      -> boost::asio::awaitable<void, executor_type>
   {
     wait_for_count = target_count;
     return waiter.get();
   }
 
-  void on_complete(child& c, std::exception_ptr eptr,
-                   boost::system::error_code ec)
+  void on_complete(child& c, std::exception_ptr eptr)
   {
     --count;
 
     if (c.canceled) {
-      // don't report cancellation errors. cancellation was either requested
-      // by the user, or triggered by another failure that is reported
-      eptr = nullptr;
-      ec = {};
-    }
-
-    if (eptr && !unreported_exception) {
-      unreported_exception = eptr;
-    }
-    if (ec && !unreported_error) {
-      unreported_error = ec;
-    }
-
-    // move back to the free list
-    auto next = outstanding.erase(outstanding.iterator_to(c));
-    c.signal.reset();
-    free.push_back(c);
-
-    // handle cancel_on_error
-    if (eptr || ec) {
-      auto cancel_begin = outstanding.end();
-      if (on_error == cancel_on_error::after) {
-        cancel_begin = next;
-      } else if (on_error == cancel_on_error::all) {
-        cancel_begin = outstanding.begin();
-      }
-      for (auto i = cancel_begin; i != outstanding.end(); ++i) {
-        i->canceled = true;
-        i->signal->emit(boost::asio::cancellation_type::terminal);
+      // if the child was canceled, it was already removed from outstanding
+      ceph_assert(!c.is_linked());
+      c.canceled = false;
+      c.signal.reset();
+      free.push_back(c);
+    } else {
+      // move back to the free list
+      ceph_assert(c.is_linked());
+      auto next = outstanding.erase(outstanding.iterator_to(c));
+      c.signal.reset();
+      free.push_back(c);
+
+      if (eptr) {
+        if (eptr && !unreported_exception) {
+          unreported_exception = eptr;
+        }
+
+        // handle cancel_on_error. cancellation signals may recurse into
+        // on_complete(), so move the entries into a separate list first
+        child_list to_cancel;
+        if (on_error == cancel_on_error::after) {
+          to_cancel.splice(to_cancel.end(), outstanding,
+                           next, outstanding.end());
+        } else if (on_error == cancel_on_error::all) {
+          to_cancel = std::move(outstanding);
+        }
+
+        for (auto i = to_cancel.begin(); i != to_cancel.end(); ++i) {
+          child& c = *i;
+          i = to_cancel.erase(i);
+
+          c.canceled = true;
+          c.signal->emit(boost::asio::cancellation_type::terminal);
+        }
       }
     }
 
     // maybe wake the waiter
     if (waiter.waiting() && count <= wait_for_count) {
-      auto eptr = std::exchange(unreported_exception, nullptr);
-      waiter.complete(eptr, {});
+      waiter.complete(nullptr);
     }
   }
 
@@ -224,9 +215,8 @@ class co_throttle_impl :
     boost::intrusive_ptr<co_throttle_impl> impl;
     child& c;
 
-    void operator()(std::exception_ptr eptr,
-                    boost::system::error_code ec = {}) {
-      impl->on_complete(c, eptr, ec);
+    void operator()(std::exception_ptr eptr) {
+      impl->on_complete(c, eptr);
     }
   };
 };
index 7dec81cfc7ed1578d640d53669a7b8330c3e6695..6ecdd9b248f98294191f9c28686fd07d9f4ca6b2 100644 (file)
  */
 
 #include "common/async/co_throttle.h"
-#include <chrono>
-#include <boost/asio/basic_waitable_timer.hpp>
+#include <optional>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/co_spawn.hpp>
 #include <boost/asio/io_context.hpp>
 #include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
 
 namespace ceph::async {
 
+namespace asio = boost::asio;
 namespace errc = boost::system::errc;
 using boost::system::error_code;
 
-using executor_type = boost::asio::io_context::executor_type;
+using executor_type = asio::io_context::executor_type;
 
 template <typename T>
-using awaitable = boost::asio::awaitable<T, executor_type>;
-using use_awaitable_t = boost::asio::use_awaitable_t<executor_type>;
+using awaitable = asio::awaitable<T, executor_type>;
+using use_awaitable_t = asio::use_awaitable_t<executor_type>;
 static constexpr use_awaitable_t use_awaitable{};
 
-using clock_type = std::chrono::steady_clock;
-using timer_type = boost::asio::basic_waitable_timer<clock_type,
-     boost::asio::wait_traits<clock_type>, executor_type>;
+using void_waiter = co_waiter<void, executor_type>;
 
-void rethrow(std::exception_ptr eptr)
+auto capture(std::optional<std::exception_ptr>& eptr)
 {
-  if (eptr) std::rethrow_exception(eptr);
+  return [&eptr] (std::exception_ptr e) { eptr = e; };
 }
 
-using namespace std::chrono_literals;
-
-auto worker(std::chrono::milliseconds delay = 20ms)
-    -> awaitable<void>
-{
-  auto timer = timer_type{co_await boost::asio::this_coro::executor, delay};
-  co_await timer.async_wait(use_awaitable);
-}
-
-auto worker(error_code ec, std::chrono::milliseconds delay = 10ms)
-    -> awaitable<error_code>
-{
-  co_await worker(delay);
-  co_return ec;
-}
-
-auto worker(std::exception_ptr eptr, std::chrono::milliseconds delay = 10ms)
-    -> awaitable<void>
-{
-  co_await worker(delay);
-  std::rethrow_exception(eptr);
-}
-
-auto worker(bool& finished, std::chrono::milliseconds delay = 20ms)
-    -> awaitable<void>
-{
-  co_await worker(delay);
-  finished = true;
-}
-
-auto counting_worker(size_t& count, size_t& max_count)
-    -> awaitable<void>
+auto capture(asio::cancellation_signal& signal,
+             std::optional<std::exception_ptr>& eptr)
 {
-  ++count;
-  if (max_count < count) {
-    max_count = count;
-  }
-  co_await worker();
-  --count;
+  return asio::bind_cancellation_slot(signal.slot(), capture(eptr));
 }
 
-// use a worker that never completes to test cancellation
-awaitable<void> lazy_worker()
+awaitable<void> wait(void_waiter& waiter, bool& completed)
 {
-  for (;;) {
-    co_await worker();
-  }
+  co_await waiter.get();
+  completed = true;
 }
 
 TEST(co_throttle, wait_empty)
 {
   constexpr size_t limit = 1;
+  asio::io_context ctx;
 
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
-
-  std::optional<error_code> ec_wait;
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.wait();
+  };
 
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
   ctx.poll();
-  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
-  ASSERT_TRUE(ec_wait); // wait returns immediately if nothing was spawned
-  EXPECT_FALSE(*ec_wait);
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
 }
 
 TEST(co_throttle, spawn_over_limit)
 {
   constexpr size_t limit = 1;
+  asio::io_context ctx;
 
-  size_t count = 0;
-  size_t max_count = 0;
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
+  bool spawn2_completed = false;
 
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get());
+    spawn2_completed = true;
+    co_await throttle.wait();
+  };
 
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-  std::optional<error_code> ec_wait;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count));
-        ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count));
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
   ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(spawn2_completed);
 
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  EXPECT_FALSE(ec_spawn2);
+  waiter1.complete(nullptr);
 
-  ctx.run_one(); // wait for spawn1's completion
   ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  EXPECT_TRUE(spawn2_completed);
 
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_FALSE(*ec_spawn2);
-  EXPECT_FALSE(ec_wait);
-
-  ctx.run(); // run to completion
-
-  ASSERT_TRUE(ec_wait);
-  EXPECT_FALSE(*ec_wait);
+  waiter2.complete(nullptr);
 
-  EXPECT_EQ(max_count, limit); // count never exceeds limit
-  EXPECT_EQ(count, 0);
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
 }
 
 TEST(co_throttle, spawn_over_smaller_limit)
 {
   constexpr size_t limit = 2;
   constexpr size_t smaller_limit = 1;
+  asio::io_context ctx;
 
-  size_t count = 0;
-  size_t max_count = 0;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
+  bool spawn2_completed = false;
 
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-  std::optional<error_code> ec_wait;
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get(), smaller_limit);
+    spawn2_completed = true;
+    co_await throttle.wait();
+  };
 
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count));
-        ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count),
-                                            smaller_limit);
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
   ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(spawn2_completed);
 
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  EXPECT_FALSE(ec_spawn2);
+  waiter1.complete(nullptr);
 
-  ctx.run_one(); // wait for spawn1's completion
   ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn2_completed);
 
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_FALSE(*ec_spawn2);
-  EXPECT_FALSE(ec_wait);
-
-  ctx.run(); // run to completion
-
-  ASSERT_TRUE(ec_wait);
-  EXPECT_FALSE(*ec_wait);
+  waiter2.complete(nullptr);
 
-  EXPECT_EQ(max_count, smaller_limit); // count never exceeds smaller_limit
-  EXPECT_EQ(count, 0);
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
 }
 
 TEST(co_throttle, spawn_cancel)
 {
   constexpr size_t limit = 1;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
-
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-  std::optional<error_code> ec_wait;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn1 = co_await throttle.spawn(lazy_worker());
-        ec_spawn2 = co_await throttle.spawn(lazy_worker());
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  asio::io_context ctx;
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
+  bool spawn2_completed = false;
+
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get());
+    spawn2_completed = true;
+    co_await throttle.wait();
+  };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(signal, result));
 
   ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(spawn2_completed);
 
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  EXPECT_FALSE(ec_spawn2);
-  EXPECT_FALSE(ec_wait);
-
-  throttle.cancel();
+  // cancel before spawn2 completes
+  signal.emit(asio::cancellation_type::terminal);
 
   ctx.poll();
   ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted);
-  ASSERT_TRUE(ec_wait);
-  EXPECT_FALSE(*ec_wait); // wait after cancel succeeds immediately
+  EXPECT_FALSE(spawn2_completed);
+  ASSERT_TRUE(result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
 }
 
 TEST(co_throttle, wait_cancel)
 {
   constexpr size_t limit = 1;
+  asio::io_context ctx;
 
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
+  void_waiter waiter;
+  bool spawn_completed = false;
 
-  std::optional<error_code> ec_spawn;
-  std::optional<error_code> ec_wait;
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter.get());
+    spawn_completed = true;
+    co_await throttle.wait();
+  };
 
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn = co_await throttle.spawn(lazy_worker());
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(signal, result));
 
   ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn_completed);
+  EXPECT_FALSE(result);
 
-  ASSERT_TRUE(ec_spawn);
-  EXPECT_FALSE(*ec_spawn);
-  EXPECT_FALSE(ec_wait);
-
-  throttle.cancel();
+  // cancel before wait completes
+  signal.emit(asio::cancellation_type::terminal);
 
   ctx.poll();
   ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
-  ASSERT_TRUE(ec_wait);
-  EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted);
+  ASSERT_TRUE(result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
 }
 
 TEST(co_throttle, spawn_shutdown)
 {
   constexpr size_t limit = 1;
+  asio::io_context ctx;
 
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
 
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get());
+  };
 
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        auto throttle = co_throttle{ex, limit};
-        ec_spawn1 = co_await throttle.spawn(lazy_worker());
-        ec_spawn2 = co_await throttle.spawn(lazy_worker());
-      }, rethrow);
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
-  ctx.run_one(); // call spawn1 and spawn2
-
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  EXPECT_FALSE(ec_spawn2);
-
-  // shut down io_context before spawn2 unblocks
+  ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(result);
+  // shut down before spawn2 completes
 }
 
 TEST(co_throttle, wait_shutdown)
 {
   constexpr size_t limit = 1;
+  asio::io_context ctx;
 
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-
-  std::optional<error_code> ec_spawn;
-  std::optional<error_code> ec_wait;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        auto throttle = co_throttle{ex, limit};
-        ec_spawn = co_await throttle.spawn(lazy_worker());
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
-
-  ctx.run_one(); // call spawn and wait
-
-  ASSERT_TRUE(ec_spawn);
-  EXPECT_FALSE(*ec_spawn);
-  EXPECT_FALSE(ec_wait);
-
-  // shut down io_context before wait unblocks
-}
-
-TEST(co_throttle, spawn_destroy)
-{
-  constexpr size_t limit = 1;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-
-  {
-    auto throttle = co_throttle{ex, limit};
-
-    boost::asio::co_spawn(ex,
-        [&] () -> awaitable<void> {
-          ec_spawn1 = co_await throttle.spawn(lazy_worker());
-          ec_spawn2 = co_await throttle.spawn(lazy_worker());
-        }, rethrow);
-
-    ctx.poll(); // run until spawn2 blocks
-
-    ASSERT_TRUE(ec_spawn1);
-    EXPECT_FALSE(*ec_spawn1);
-    EXPECT_FALSE(ec_spawn2);
-    // throttle canceled/destroyed
-  }
-
-  ctx.poll();
-  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted);
-}
-
-TEST(co_throttle, wait_destroy)
-{
-  constexpr size_t limit = 1;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-
-  std::optional<error_code> ec_spawn;
-  std::optional<error_code> ec_wait;
-
-  {
-    auto throttle = co_throttle{ex, limit};
+  void_waiter waiter;
+  bool spawn_completed = false;
 
-    boost::asio::co_spawn(ex,
-        [&] () -> awaitable<void> {
-          ec_spawn = co_await throttle.spawn(lazy_worker());
-          ec_wait = co_await throttle.wait();
-        }, rethrow);
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter.get());
+    spawn_completed = true;
+    co_await throttle.wait();
+  };
 
-    ctx.poll(); // run until wait blocks
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
-    ASSERT_TRUE(ec_spawn);
-    EXPECT_FALSE(*ec_spawn);
-    EXPECT_FALSE(ec_wait);
-    // throttle canceled/destroyed
-  }
-
-  ctx.poll();
-  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
-  ASSERT_TRUE(ec_wait);
-  EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted);
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn_completed);
+  EXPECT_FALSE(result);
+  // shut down before wait completes
 }
 
 TEST(co_throttle, spawn_error)
 {
   constexpr size_t limit = 2;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
-
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-  std::optional<error_code> ec_spawn3;
-  std::optional<error_code> ec_wait;
-  bool spawn1_finished = false;
-  bool spawn3_finished = false;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
-        auto ec = make_error_code(errc::invalid_argument);
-        ec_spawn2 = co_await throttle.spawn(worker(ec));
-        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  asio::io_context ctx;
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  void_waiter waiter3;
+  bool cr1_completed = false;
+  bool cr2_completed = false;
+  bool cr3_completed = false;
+  std::exception_ptr spawn3_eptr;
+
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(wait(waiter1, cr1_completed));
+    co_await throttle.spawn(wait(waiter2, cr2_completed));
+    try {
+      co_await throttle.spawn(wait(waiter3, cr3_completed));
+    } catch (const std::exception&) {
+      spawn3_eptr = std::current_exception();
+    }
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
   ctx.poll(); // run until spawn3 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(cr1_completed);
+  EXPECT_FALSE(cr2_completed);
+  EXPECT_FALSE(cr3_completed);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
 
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_FALSE(*ec_spawn2);
-  EXPECT_FALSE(ec_spawn3);
-  EXPECT_FALSE(spawn1_finished);
-  EXPECT_FALSE(spawn3_finished);
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(spawn3_eptr);
+  EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+  EXPECT_FALSE(result);
 
-  ctx.run_one(); // wait for spawn2's completion
-  ctx.poll(); // run until wait() blocks
+  waiter1.complete(nullptr);
 
-  ASSERT_TRUE(ec_spawn3);
-  EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
-  EXPECT_FALSE(ec_wait);
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped()); // wait still blocked
 
-  ctx.run(); // run to completion
+  waiter3.complete(nullptr);
 
-  EXPECT_TRUE(spawn3_finished); // spawn3 isn't canceled by spawn2's error
-  ASSERT_TRUE(ec_wait);
-  EXPECT_FALSE(*ec_wait);
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+  EXPECT_TRUE(cr1_completed);
+  EXPECT_FALSE(cr2_completed);
+  EXPECT_TRUE(cr3_completed); // cr3 isn't canceled by cr2's error
 }
 
 TEST(co_throttle, wait_error)
 {
   constexpr size_t limit = 1;
+  asio::io_context ctx;
 
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
+  void_waiter waiter;
 
-  std::optional<error_code> ec_spawn;
-  std::optional<error_code> ec_wait;
+  auto cr = [&] () -> awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter.get());
+    co_await throttle.wait();
+  };
 
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        auto ec = make_error_code(errc::invalid_argument);
-        ec_spawn = co_await throttle.spawn(worker(ec));
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
   ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
 
-  ASSERT_TRUE(ec_spawn);
-  EXPECT_FALSE(*ec_spawn);
-  EXPECT_FALSE(ec_wait);
-
-  ctx.run(); // run to completion
+  waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
 
-  ASSERT_TRUE(ec_wait);
-  EXPECT_EQ(*ec_wait, errc::invalid_argument);
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
 }
 
 TEST(co_throttle, spawn_cancel_on_error_after)
 {
   constexpr size_t limit = 2;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit, cancel_on_error::after};
-
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-  std::optional<error_code> ec_spawn3;
-  std::optional<error_code> ec_spawn4;
-  std::optional<error_code> ec_wait;
-  bool spawn1_finished = false;
-  bool spawn3_finished = false;
-  bool spawn4_finished = false;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
-        auto ec = make_error_code(errc::invalid_argument);
-        ec_spawn2 = co_await throttle.spawn(worker(ec));
-        // spawn3 expects invalid_argument error and cancellation
-        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
-        // spawn4 expects success
-        ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished));
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  asio::io_context ctx;
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  void_waiter waiter3;
+  void_waiter waiter4;
+  bool cr1_completed = false;
+  bool cr2_completed = false;
+  bool cr3_completed = false;
+  bool cr4_completed = false;
+  std::exception_ptr spawn3_eptr;
+
+  auto cr = [&] () -> awaitable<void> {
+    auto ex = co_await asio::this_coro::executor;
+    auto throttle = co_throttle{ex, limit, cancel_on_error::after};
+    co_await throttle.spawn(wait(waiter1, cr1_completed));
+    co_await throttle.spawn(wait(waiter2, cr2_completed));
+    try {
+      co_await throttle.spawn(wait(waiter3, cr3_completed));
+    } catch (const std::exception&) {
+      spawn3_eptr = std::current_exception();
+    }
+    co_await throttle.spawn(wait(waiter4, cr4_completed));
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
   ctx.poll(); // run until spawn3 blocks
-
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_FALSE(*ec_spawn2);
-  EXPECT_FALSE(spawn1_finished);
-
-  ctx.run_one(); // wait for spawn2's completion
-  ctx.poll();
   ASSERT_FALSE(ctx.stopped());
 
-  ASSERT_TRUE(ec_spawn3);
-  EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
-  ASSERT_TRUE(ec_spawn4);
-  EXPECT_FALSE(*ec_spawn4);
-  EXPECT_FALSE(spawn1_finished);
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
 
-  ctx.run_one(); // wait for spawn1's completion
   ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(cr1_completed);
+  ASSERT_TRUE(spawn3_eptr);
+  EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
 
-  EXPECT_FALSE(ec_wait);
-  EXPECT_TRUE(spawn1_finished); // spawn1 not canceled
+  waiter1.complete(nullptr);
 
-  ctx.run_one(); // wait for spawn4's completion
   ctx.poll();
-  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
-  ASSERT_TRUE(ec_wait);
-  EXPECT_FALSE(*ec_wait);
-  EXPECT_FALSE(spawn3_finished); // spawn3 canceled
-  EXPECT_TRUE(spawn4_finished); // spawn4 not canceled
+  ASSERT_FALSE(ctx.stopped()); // wait still blocked
+  EXPECT_FALSE(result);
+  EXPECT_TRUE(cr1_completed);
+  EXPECT_FALSE(cr4_completed);
+
+  waiter4.complete(nullptr);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+  EXPECT_FALSE(cr2_completed); // exited by exception
+  EXPECT_FALSE(cr3_completed); // cr3 canceled
+  EXPECT_TRUE(cr4_completed); // cr4 not canceled
 }
 
 TEST(co_throttle, spawn_cancel_on_error_all)
 {
   constexpr size_t limit = 2;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit, cancel_on_error::all};
-
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-  std::optional<error_code> ec_spawn3;
-  std::optional<error_code> ec_spawn4;
-  std::optional<error_code> ec_wait;
-  bool spawn1_finished = false;
-  bool spawn3_finished = false;
-  bool spawn4_finished = false;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
-        auto ec = make_error_code(errc::invalid_argument);
-        ec_spawn2 = co_await throttle.spawn(worker(ec));
-        // spawn3 expects invalid_argument error and cancellation
-        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
-        // spawn3 expects success
-        ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished));
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
-
-  ctx.poll(); // run until spawn3 blocks
-
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_FALSE(*ec_spawn2);
-  EXPECT_FALSE(ec_spawn3);
-  EXPECT_FALSE(ec_spawn4);
-
-  ctx.run_one(); // wait for spawn2's completion
-  ctx.poll(); // run until wait blocks
-
-  ASSERT_TRUE(ec_spawn3);
-  EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
-  ASSERT_TRUE(ec_spawn4);
-  EXPECT_FALSE(*ec_spawn4);
-  EXPECT_FALSE(ec_wait);
-  EXPECT_FALSE(spawn1_finished); // spawn1 canceled
-
-  ctx.run_one(); // wait for spawn4's completion
-  ctx.poll();
-  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
-  ASSERT_TRUE(ec_wait);
-  EXPECT_FALSE(*ec_wait);
-  EXPECT_FALSE(spawn3_finished); // spawn3 canceled
-  EXPECT_TRUE(spawn4_finished); // spawn4 not canceled
-}
-
-TEST(co_throttle, spawn_exception)
-{
-  constexpr size_t limit = 2;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
-
-  std::optional<error_code> ec_spawn1;
-  std::optional<error_code> ec_spawn2;
-  std::optional<error_code> ec_spawn3;
-  bool spawn1_finished = false;
-  bool spawn3_finished = false;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
-        auto eptr = std::make_exception_ptr(std::runtime_error{"oops"});
-        ec_spawn2 = co_await throttle.spawn(worker(eptr));
-        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
-      }, rethrow);
+  asio::io_context ctx;
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  void_waiter waiter3;
+  void_waiter waiter4;
+  bool cr1_completed = false;
+  bool cr2_completed = false;
+  bool cr3_completed = false;
+  bool cr4_completed = false;
+  std::exception_ptr spawn3_eptr;
+
+  auto cr = [&] () -> awaitable<void> {
+    auto ex = co_await asio::this_coro::executor;
+    auto throttle = co_throttle{ex, limit, cancel_on_error::all};
+    co_await throttle.spawn(wait(waiter1, cr1_completed));
+    co_await throttle.spawn(wait(waiter2, cr2_completed));
+    try {
+      co_await throttle.spawn(wait(waiter3, cr3_completed));
+    } catch (const std::exception&) {
+      spawn3_eptr = std::current_exception();
+    }
+    co_await throttle.spawn(wait(waiter4, cr4_completed));
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, cr(), capture(result));
 
   ctx.poll(); // run until spawn3 blocks
+  ASSERT_FALSE(ctx.stopped());
 
-  ASSERT_TRUE(ec_spawn1);
-  EXPECT_FALSE(*ec_spawn1);
-  ASSERT_TRUE(ec_spawn2);
-  EXPECT_FALSE(*ec_spawn2);
-
-  EXPECT_THROW(ctx.run_one(), std::runtime_error);
-
-  ASSERT_FALSE(ec_spawn3);
-  EXPECT_FALSE(spawn1_finished);
-  EXPECT_FALSE(spawn3_finished);
-}
-
-TEST(co_throttle, wait_exception)
-{
-  constexpr size_t limit = 1;
-
-  boost::asio::io_context ctx;
-  auto ex = ctx.get_executor();
-  auto throttle = co_throttle{ex, limit};
-
-  std::optional<error_code> ec_spawn;
-  std::optional<error_code> ec_wait;
-
-  boost::asio::co_spawn(ex,
-      [&] () -> awaitable<void> {
-        auto eptr = std::make_exception_ptr(std::runtime_error{"oops"});
-        ec_spawn = co_await throttle.spawn(worker(eptr));
-        ec_wait = co_await throttle.wait();
-      }, rethrow);
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
 
   ctx.poll(); // run until wait blocks
-
-  ASSERT_TRUE(ec_spawn);
-  EXPECT_FALSE(*ec_spawn);
-
-  EXPECT_THROW(ctx.run(), std::runtime_error);
-
-  ASSERT_FALSE(ec_wait);
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(spawn3_eptr);
+  EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+  EXPECT_FALSE(cr4_completed);
+
+  waiter4.complete(nullptr);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+  EXPECT_FALSE(cr1_completed); // cr1 canceled
+  EXPECT_FALSE(cr2_completed); // exited by exception
+  EXPECT_FALSE(cr3_completed); // cr3 canceled
+  EXPECT_TRUE(cr4_completed); // cr4 not canceled
 }
 
 } // namespace ceph::async