]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add co_throttle for bounded concurrency with c++20 coroutines
authorCasey Bodley <cbodley@redhat.com>
Wed, 11 Jan 2023 15:45:49 +0000 (10:45 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 24 Jul 2024 16:51:05 +0000 (12:51 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/co_throttle.h [new file with mode: 0644]
src/common/async/detail/co_throttle_impl.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_co_throttle.cc [new file with mode: 0644]

diff --git a/src/common/async/co_throttle.h b/src/common/async/co_throttle.h
new file mode 100644 (file)
index 0000000..880ffc9
--- /dev/null
@@ -0,0 +1,113 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <boost/intrusive_ptr.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/detail/co_throttle_impl.h"
+
+namespace ceph::async {
+
+/// A coroutine throttle that allows a parent coroutine to spawn and manage
+/// multiple child coroutines, while enforcing an upper bound on concurrency.
+///
+/// Child coroutines must be of type awaitable<void>. Exceptions thrown by
+/// children are rethrown to the parent on its next call to spawn() or wait().
+/// The cancel_on_error option controls whether these exceptions errors trigger
+/// the cancellation of other children.
+///
+/// All child coroutines are canceled by cancel() or co_throttle destruction.
+/// This allows the parent coroutine to share memory with its child coroutines
+/// without fear of dangling references.
+///
+/// This class is not thread-safe, so a strand executor should be used in
+/// multi-threaded contexts.
+///
+/// Example:
+/// \code
+/// awaitable<void> child(task& t);
+///
+/// awaitable<void> parent(std::span<task> tasks)
+/// {
+///   // process all tasks, up to 10 at a time
+///   auto ex = co_await boost::asio::this_coro::executor;
+///   auto throttle = co_throttle{ex, 10};
+///
+///   for (auto& t : tasks) {
+///     co_await throttle.spawn(child(t));
+///   }
+///   co_await throttle.wait();
+/// }
+/// \endcode
+template <boost::asio::execution::executor Executor>
+class co_throttle {
+  using impl_type = detail::co_throttle_impl<Executor>;
+  boost::intrusive_ptr<impl_type> impl;
+
+ public:
+  using executor_type = Executor;
+  executor_type get_executor() const noexcept { return impl->get_executor(); }
+
+  static constexpr size_t max_limit = std::numeric_limits<size_t>::max();
+
+  co_throttle(const executor_type& ex, size_t limit,
+              cancel_on_error on_error = cancel_on_error::none)
+    : impl(new impl_type(ex, limit, on_error))
+  {
+  }
+
+  ~co_throttle()
+  {
+    cancel();
+  }
+
+  co_throttle(const co_throttle&) = delete;
+  co_throttle& operator=(const co_throttle&) = delete;
+
+  /// Try to spawn the given coroutine \ref cr. If this would exceed the
+  /// concurrency limit, wait for another coroutine to complete first. This
+  /// default limit can be overridden with the optional \ref smaller_limit
+  /// argument.
+  ///
+  /// If any spawned coroutines exit with an exception, the first exception is
+  /// rethrown by the next call to spawn() or wait(). If spawn() has an
+  /// exception to rethrow, it will spawn \cr first only in the case of
+  /// cancel_on_error::none. New coroutines can be spawned by later calls to
+  /// spawn() regardless of cancel_on_error.
+  auto spawn(boost::asio::awaitable<void, executor_type> cr,
+             size_t smaller_limit = max_limit)
+      -> boost::asio::awaitable<void, executor_type>
+  {
+    return impl->spawn(std::move(cr), smaller_limit);
+  }
+
+  /// Wait for all associated coroutines to complete. If any of these coroutines
+  /// exit with an exception, the first of those exceptions is rethrown.
+  auto wait()
+      -> boost::asio::awaitable<void, executor_type>
+  {
+    return impl->wait();
+  }
+
+  /// Cancel all associated coroutines.
+  void cancel()
+  {
+    impl->cancel();
+  }
+};
+
+} // namespace ceph::async
diff --git a/src/common/async/detail/co_throttle_impl.h b/src/common/async/detail/co_throttle_impl.h
new file mode 100644 (file)
index 0000000..f2f17a0
--- /dev/null
@@ -0,0 +1,222 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <boost/asio/append.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/co_waiter.h"
+#include "common/async/service.h"
+#include "include/ceph_assert.h"
+
+namespace ceph::async::detail {
+
+// Coroutine throttle implementation. This is reference-counted so the
+// co_spawn() completion handlers can extend the implementation's lifetime.
+// This is required for per-op cancellation because the cancellation_signals
+// must outlive their coroutine frames.
+template <boost::asio::execution::executor Executor>
+class co_throttle_impl :
+    public boost::intrusive_ref_counter<co_throttle_impl<Executor>,
+        boost::thread_unsafe_counter>,
+    public service_list_base_hook
+{
+ public:
+  using executor_type = Executor;
+  executor_type get_executor() const { return ex; }
+
+  co_throttle_impl(const executor_type& ex, size_t limit,
+                   cancel_on_error on_error)
+    : svc(boost::asio::use_service<service<co_throttle_impl>>(
+            boost::asio::query(ex, boost::asio::execution::context))),
+      ex(ex), limit(limit), on_error(on_error),
+      children(new child[limit])
+  {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+
+    // initialize the free list
+    for (size_t i = 0; i < limit; i++) {
+      free.push_back(children[i]);
+    }
+  }
+  ~co_throttle_impl()
+  {
+    svc.remove(*this);
+  }
+
+  auto spawn(boost::asio::awaitable<void, executor_type> cr,
+             size_t smaller_limit)
+      -> boost::asio::awaitable<void, executor_type>
+  {
+    if (unreported_exception && on_error != cancel_on_error::none) {
+      std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+    }
+
+    const size_t current_limit = std::min(smaller_limit, limit);
+    if (count >= current_limit) {
+      co_await wait_for(current_limit - 1);
+      if (unreported_exception && on_error != cancel_on_error::none) {
+        std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+      }
+    }
+
+    ++count;
+
+    // move a free child to the outstanding list
+    ceph_assert(!free.empty());
+    child& c = free.front();
+    free.pop_front();
+    outstanding.push_back(c);
+
+    // spawn the coroutine with its associated cancellation signal
+    c.signal.emplace();
+    c.canceled = false;
+
+    boost::asio::co_spawn(get_executor(), std::move(cr),
+        boost::asio::bind_cancellation_slot(c.signal->slot(),
+            child_completion{this, c}));
+
+    if (unreported_exception) {
+      std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+    }
+  }
+
+  auto wait()
+      -> boost::asio::awaitable<void, executor_type>
+  {
+    if (count > 0) {
+      co_await wait_for(0);
+    }
+    if (unreported_exception) {
+      std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+    }
+  }
+
+  void cancel()
+  {
+    while (!outstanding.empty()) {
+      child& c = outstanding.front();
+      outstanding.pop_front();
+
+      c.canceled = true;
+      c.signal->emit(boost::asio::cancellation_type::terminal);
+    }
+  }
+
+  void service_shutdown()
+  {
+    waiter.shutdown();
+  }
+
+ private:
+  service<co_throttle_impl>& svc;
+  executor_type ex;
+  const size_t limit;
+  const cancel_on_error on_error;
+
+  size_t count = 0;
+  size_t wait_for_count = 0;
+
+  std::exception_ptr unreported_exception;
+
+  // track each spawned coroutine for cancellation. these are stored in an
+  // array, and recycled after each use via the free list
+  struct child : boost::intrusive::list_base_hook<> {
+    std::optional<boost::asio::cancellation_signal> signal;
+    bool canceled = false;
+  };
+  std::unique_ptr<child[]> children;
+
+  using child_list = boost::intrusive::list<child,
+        boost::intrusive::constant_time_size<false>>;
+  child_list outstanding;
+  child_list free;
+
+  co_waiter<void, executor_type> waiter;
+
+  // return an awaitable that completes once count <= target_count
+  auto wait_for(size_t target_count)
+      -> boost::asio::awaitable<void, executor_type>
+  {
+    wait_for_count = target_count;
+    return waiter.get();
+  }
+
+  void on_complete(child& c, std::exception_ptr eptr)
+  {
+    --count;
+
+    if (c.canceled) {
+      // if the child was canceled, it was already removed from outstanding
+      ceph_assert(!c.is_linked());
+      c.canceled = false;
+      c.signal.reset();
+      free.push_back(c);
+    } else {
+      // move back to the free list
+      ceph_assert(c.is_linked());
+      auto next = outstanding.erase(outstanding.iterator_to(c));
+      c.signal.reset();
+      free.push_back(c);
+
+      if (eptr) {
+        if (eptr && !unreported_exception) {
+          unreported_exception = eptr;
+        }
+
+        // handle cancel_on_error. cancellation signals may recurse into
+        // on_complete(), so move the entries into a separate list first
+        child_list to_cancel;
+        if (on_error == cancel_on_error::after) {
+          to_cancel.splice(to_cancel.end(), outstanding,
+                           next, outstanding.end());
+        } else if (on_error == cancel_on_error::all) {
+          to_cancel = std::move(outstanding);
+        }
+
+        for (auto i = to_cancel.begin(); i != to_cancel.end(); ++i) {
+          child& c = *i;
+          i = to_cancel.erase(i);
+
+          c.canceled = true;
+          c.signal->emit(boost::asio::cancellation_type::terminal);
+        }
+      }
+    }
+
+    // maybe wake the waiter
+    if (waiter.waiting() && count <= wait_for_count) {
+      waiter.complete(nullptr);
+    }
+  }
+
+  struct child_completion {
+    boost::intrusive_ptr<co_throttle_impl> impl;
+    child& c;
+
+    void operator()(std::exception_ptr eptr) {
+      impl->on_complete(c, eptr);
+    }
+  };
+};
+
+} // namespace ceph::async::detail
index 11a7ea0e20c036a7bb41099d552bb872a9dafe1c..88fb5142995d6876fa13ed408d1f7ac69a71477d 100644 (file)
@@ -361,10 +361,17 @@ add_executable(unittest_async_completion test_async_completion.cc)
 add_ceph_unittest(unittest_async_completion)
 target_link_libraries(unittest_async_completion ceph-common Boost::system)
 
+
 add_executable(unittest_async_max_concurrent_for_each test_async_max_concurrent_for_each.cc)
 add_ceph_unittest(unittest_async_max_concurrent_for_each)
 target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context)
 
+if(NOT WIN32)
+add_executable(unittest_async_co_throttle test_async_co_throttle.cc)
+add_ceph_unittest(unittest_async_co_throttle)
+target_link_libraries(unittest_async_co_throttle ceph-common Boost::system)
+endif(NOT WIN32)
+
 add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
 add_ceph_unittest(unittest_async_shared_mutex)
 target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)
diff --git a/src/test/common/test_async_co_throttle.cc b/src/test/common/test_async_co_throttle.cc
new file mode 100644 (file)
index 0000000..31988c7
--- /dev/null
@@ -0,0 +1,548 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/async/co_throttle.h"
+
+#include <latch>
+#include <optional>
+#include <boost/asio/any_io_executor.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/defer.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/thread_pool.hpp>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = asio::any_io_executor;
+
+using void_waiter = co_waiter<void, executor_type>;
+
+auto capture(std::optional<std::exception_ptr>& eptr)
+{
+  return [&eptr] (std::exception_ptr e) { eptr = e; };
+}
+
+auto capture(asio::cancellation_signal& signal,
+             std::optional<std::exception_ptr>& eptr)
+{
+  return asio::bind_cancellation_slot(signal.slot(), capture(eptr));
+}
+
+asio::awaitable<void> wait(void_waiter& waiter, bool& completed)
+{
+  co_await waiter.get();
+  completed = true;
+}
+
+TEST(co_throttle, wait_empty)
+{
+  constexpr size_t limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_throttle, spawn_over_limit)
+{
+  constexpr size_t limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
+  bool spawn2_completed = false;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get());
+    spawn2_completed = true;
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(spawn2_completed);
+
+  waiter1.complete(nullptr);
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  EXPECT_TRUE(spawn2_completed);
+
+  waiter2.complete(nullptr);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_throttle, spawn_over_smaller_limit)
+{
+  constexpr size_t limit = 2;
+  constexpr size_t smaller_limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
+  bool spawn2_completed = false;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get(), smaller_limit);
+    spawn2_completed = true;
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(spawn2_completed);
+
+  waiter1.complete(nullptr);
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn2_completed);
+
+  waiter2.complete(nullptr);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_throttle, spawn_cancel)
+{
+  constexpr size_t limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
+  bool spawn2_completed = false;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get());
+    spawn2_completed = true;
+    co_await throttle.wait();
+  };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(signal, result));
+
+  ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(spawn2_completed);
+
+  // cancel before spawn2 completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+  EXPECT_FALSE(spawn2_completed);
+  ASSERT_TRUE(result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(co_throttle, wait_cancel)
+{
+  constexpr size_t limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter;
+  bool spawn_completed = false;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter.get());
+    spawn_completed = true;
+    co_await throttle.wait();
+  };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(signal, result));
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn_completed);
+  EXPECT_FALSE(result);
+
+  // cancel before wait completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+  ASSERT_TRUE(result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(co_throttle, spawn_shutdown)
+{
+  constexpr size_t limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  bool spawn1_completed = false;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter1.get());
+    spawn1_completed = true;
+    co_await throttle.spawn(waiter2.get());
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until spawn2 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn1_completed);
+  EXPECT_FALSE(result);
+  // shut down before spawn2 completes
+}
+
+TEST(co_throttle, wait_shutdown)
+{
+  constexpr size_t limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter;
+  bool spawn_completed = false;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter.get());
+    spawn_completed = true;
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_TRUE(spawn_completed);
+  EXPECT_FALSE(result);
+  // shut down before wait completes
+}
+
+TEST(co_throttle, spawn_error)
+{
+  constexpr size_t limit = 2;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  void_waiter waiter3;
+  bool cr1_completed = false;
+  bool cr2_completed = false;
+  bool cr3_completed = false;
+  std::exception_ptr spawn3_eptr;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(wait(waiter1, cr1_completed));
+    co_await throttle.spawn(wait(waiter2, cr2_completed));
+    try {
+      co_await throttle.spawn(wait(waiter3, cr3_completed));
+    } catch (const std::exception&) {
+      spawn3_eptr = std::current_exception();
+    }
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until spawn3 blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(cr1_completed);
+  EXPECT_FALSE(cr2_completed);
+  EXPECT_FALSE(cr3_completed);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(spawn3_eptr);
+  EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+  EXPECT_FALSE(result);
+
+  waiter1.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped()); // wait still blocked
+
+  waiter3.complete(nullptr);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+  EXPECT_TRUE(cr1_completed);
+  EXPECT_FALSE(cr2_completed);
+  EXPECT_TRUE(cr3_completed); // cr3 isn't canceled by cr2's error
+}
+
+TEST(co_throttle, wait_error)
+{
+  constexpr size_t limit = 1;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+    co_await throttle.spawn(waiter.get());
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(co_throttle, spawn_cancel_on_error_after)
+{
+  constexpr size_t limit = 2;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  void_waiter waiter3;
+  void_waiter waiter4;
+  bool cr1_completed = false;
+  bool cr2_completed = false;
+  bool cr3_completed = false;
+  bool cr4_completed = false;
+  std::exception_ptr spawn3_eptr;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto ex = co_await asio::this_coro::executor;
+    auto throttle = co_throttle{ex, limit, cancel_on_error::after};
+    co_await throttle.spawn(wait(waiter1, cr1_completed));
+    co_await throttle.spawn(wait(waiter2, cr2_completed));
+    try {
+      co_await throttle.spawn(wait(waiter3, cr3_completed));
+    } catch (const std::exception&) {
+      spawn3_eptr = std::current_exception();
+    }
+    co_await throttle.spawn(wait(waiter4, cr4_completed));
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until spawn3 blocks
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(cr1_completed);
+  ASSERT_TRUE(spawn3_eptr);
+  EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+
+  waiter1.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped()); // wait still blocked
+  EXPECT_FALSE(result);
+  EXPECT_TRUE(cr1_completed);
+  EXPECT_FALSE(cr4_completed);
+
+  waiter4.complete(nullptr);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+  EXPECT_FALSE(cr2_completed); // exited by exception
+  EXPECT_FALSE(cr3_completed); // cr3 canceled
+  EXPECT_TRUE(cr4_completed); // cr4 not canceled
+}
+
+TEST(co_throttle, spawn_cancel_on_error_all)
+{
+  constexpr size_t limit = 2;
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  void_waiter waiter1;
+  void_waiter waiter2;
+  void_waiter waiter3;
+  void_waiter waiter4;
+  bool cr1_completed = false;
+  bool cr2_completed = false;
+  bool cr3_completed = false;
+  bool cr4_completed = false;
+  std::exception_ptr spawn3_eptr;
+
+  auto cr = [&] () -> asio::awaitable<void> {
+    auto ex = co_await asio::this_coro::executor;
+    auto throttle = co_throttle{ex, limit, cancel_on_error::all};
+    co_await throttle.spawn(wait(waiter1, cr1_completed));
+    co_await throttle.spawn(wait(waiter2, cr2_completed));
+    try {
+      co_await throttle.spawn(wait(waiter3, cr3_completed));
+    } catch (const std::exception&) {
+      spawn3_eptr = std::current_exception();
+    }
+    co_await throttle.spawn(wait(waiter4, cr4_completed));
+    co_await throttle.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll(); // run until spawn3 blocks
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run until wait blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_TRUE(spawn3_eptr);
+  EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+  EXPECT_FALSE(cr4_completed);
+
+  waiter4.complete(nullptr);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+  EXPECT_FALSE(cr1_completed); // cr1 canceled
+  EXPECT_FALSE(cr2_completed); // exited by exception
+  EXPECT_FALSE(cr3_completed); // cr3 canceled
+  EXPECT_TRUE(cr4_completed); // cr4 not canceled
+}
+
+TEST(co_throttle, cross_thread_cancel)
+{
+  constexpr size_t limit = 1;
+  // run the coroutine in a background thread
+  asio::thread_pool ctx{1};
+  executor_type ex = ctx.get_executor();
+
+  std::latch waiting{1};
+
+  auto cr = [ex, &waiting] () -> asio::awaitable<void> {
+    auto throttle = co_throttle{ex, limit};
+    co_waiter<void, executor_type> waiter;
+    co_await throttle.spawn(waiter.get());
+    // decrement the latch after throttle.wait() suspends
+    asio::defer(ex, [&waiting] { waiting.count_down(); });
+    co_await throttle.wait();
+  };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  // without bind_executor(), tsan identifies a data race on signal.emit()
+  asio::co_spawn(ex, cr(), bind_executor(ex, capture(signal, result)));
+
+  waiting.wait(); // wait until we've suspended in throttle.wait()
+
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.join();
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+} // namespace ceph::async