]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add co_throttle for bounded concurrency with c++20 coroutines
authorCasey Bodley <cbodley@redhat.com>
Wed, 11 Jan 2023 15:45:49 +0000 (10:45 -0500)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:48:00 +0000 (17:48 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/cancel_on_error.h [new file with mode: 0644]
src/common/async/co_throttle.h [new file with mode: 0644]
src/common/async/detail/co_throttle_impl.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_co_throttle.cc [new file with mode: 0644]

diff --git a/src/common/async/cancel_on_error.h b/src/common/async/cancel_on_error.h
new file mode 100644 (file)
index 0000000..a4c05c2
--- /dev/null
@@ -0,0 +1,28 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace ceph::async {
+
+/// Error handling strategy for co_throttle.
+enum class cancel_on_error : uint8_t {
+  none, //< No spawned coroutines are canceled on failure.
+  after, //< Cancel coroutines spawned after the failed coroutine.
+  all, //< Cancel all spawned coroutines on failure.
+};
+
+} // namespace ceph::async
diff --git a/src/common/async/co_throttle.h b/src/common/async/co_throttle.h
new file mode 100644 (file)
index 0000000..bf1e968
--- /dev/null
@@ -0,0 +1,127 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <boost/intrusive_ptr.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/detail/co_throttle_impl.h"
+
+namespace ceph::async {
+
+/// A coroutine throttle that allows a parent coroutine to spawn and manage
+/// multiple child coroutines, while enforcing an upper bound on concurrency.
+///
+/// Child coroutines can be of type awaitable<void> or awaitable<error_code>.
+/// Error codes returned by children are reported to the parent on its next call
+/// to spawn() or wait(). The cancel_on_error option controls whether these
+/// errors trigger the cancellation of other children.
+///
+/// All child coroutines are canceled by cancel() or co_throttle destruction.
+/// This allows the parent coroutine to share memory with its child coroutines
+/// without fear of dangling references.
+///
+/// This class is not thread-safe, so a strand executor should be used in
+/// multi-threaded contexts.
+///
+/// Example:
+/// @code
+/// awaitable<void> child(task& t);
+///
+/// awaitable<void> parent(std::span<task> tasks)
+/// {
+///   // process all tasks, up to 10 at a time
+///   auto ex = co_await boost::asio::this_coro::executor;
+///   auto throttle = co_throttle{ex, 10};
+///
+///   for (auto& t : tasks) {
+///     co_await throttle.spawn(child(t));
+///   }
+///   co_await throttle.wait();
+/// }
+/// @endcode
+template <boost::asio::execution::executor Executor>
+class co_throttle {
+ public:
+  using executor_type = Executor;
+  executor_type get_executor() const { return impl->get_executor(); }
+
+  using size_type = uint16_t;
+  static constexpr size_type max_limit = std::numeric_limits<size_type>::max();
+
+  co_throttle(const executor_type& ex, size_type limit,
+              cancel_on_error on_error = cancel_on_error::none)
+    : impl(new impl_type(ex, limit, on_error))
+  {
+  }
+
+  ~co_throttle()
+  {
+    cancel();
+  }
+
+  co_throttle(const co_throttle&) = delete;
+  co_throttle& operator=(const co_throttle&) = delete;
+
+  template <typename T>
+  using awaitable = boost::asio::awaitable<T, executor_type>;
+
+  /// Try to spawn the given coroutine. If this would exceed the concurrency
+  /// limit, wait for another coroutine to complete first. This default
+  /// limit can be overridden with the optional `smaller_limit` argument.
+  ///
+  /// If any spawned coroutines of type awaitable<error_code> return a non-zero
+  /// error, the first such error is reported by the next call to spawn() or
+  /// wait(). When spawn() reports these errors, the given coroutine given will
+  /// only be spawned in the case of cancel_on_error::none. New coroutines can
+  /// be spawned by later calls to spawn() regardless of cancel_on_error.
+  ///
+  /// If a spawned coroutine exits by an uncaught exception, that exception is
+  /// rethrown by the next call to spawn() or wait().
+  auto spawn(awaitable<boost::system::error_code> cr,
+             size_type smaller_limit = max_limit)
+      -> awaitable<boost::system::error_code>
+  {
+    return impl->spawn(std::move(cr), smaller_limit);
+  }
+
+  /// \overload
+  auto spawn(awaitable<void> cr, size_type smaller_limit = max_limit)
+      -> awaitable<boost::system::error_code>
+  {
+    return impl->spawn(std::move(cr), smaller_limit);
+  }
+
+  /// Wait for all associated coroutines to complete. If any of these coroutines
+  /// return a non-zero error_code, the first of those errors is returned.
+  awaitable<boost::system::error_code> wait()
+  {
+    return impl->wait();
+  }
+
+  /// Cancel all associated coroutines. Callers waiting on spawn() or wait()
+  /// will fail with boost::asio::error::operation_aborted.
+  void cancel()
+  {
+    impl->cancel();
+  }
+
+ private:
+  using impl_type = detail::co_throttle_impl<Executor, size_type>;
+  boost::intrusive_ptr<impl_type> impl;
+};
+
+} // namespace ceph::async
diff --git a/src/common/async/detail/co_throttle_impl.h b/src/common/async/detail/co_throttle_impl.h
new file mode 100644 (file)
index 0000000..c031745
--- /dev/null
@@ -0,0 +1,251 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <boost/asio/append.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/detail/service.h"
+#include "include/ceph_assert.h"
+
+namespace ceph::async::detail {
+
+// Coroutine throttle implementation. This is reference-counted so the
+// co_spawn() completion handlers can extend the implementation's lifetime.
+// This is required for per-op cancellation because the cancellation_signals
+// must outlive their coroutine frames.
+template <boost::asio::execution::executor Executor, typename SizeType>
+class co_throttle_impl :
+    public boost::intrusive_ref_counter<co_throttle_impl<Executor, SizeType>,
+        boost::thread_unsafe_counter>,
+    public service_list_base_hook
+{
+ public:
+  using size_type = SizeType;
+
+  using executor_type = Executor;
+  executor_type get_executor() const { return ex; }
+
+  co_throttle_impl(const executor_type& ex, size_type limit,
+                   cancel_on_error on_error)
+    : svc(boost::asio::use_service<service<co_throttle_impl>>(
+            boost::asio::query(ex, boost::asio::execution::context))),
+      ex(ex), limit(limit), on_error(on_error),
+      children(new child[limit])
+  {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+
+    // initialize the free list
+    for (size_type i = 0; i < limit; i++) {
+      free.push_back(children[i]);
+    }
+  }
+  ~co_throttle_impl()
+  {
+    svc.remove(*this);
+  }
+
+  template <typename T>
+  using awaitable = boost::asio::awaitable<T, executor_type>;
+
+  template <typename T> // where T=void or error_code
+  auto spawn(awaitable<T> cr, size_type smaller_limit)
+      -> awaitable<boost::system::error_code>
+  {
+    if (unreported_exception) {
+      std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+    }
+    if (unreported_error && on_error != cancel_on_error::none) {
+      co_return std::exchange(unreported_error, {});
+    }
+
+    const size_type current_limit = std::min(smaller_limit, limit);
+    if (count >= current_limit) {
+      auto ec = co_await wait_for(current_limit - 1);
+      if (ec) {
+        unreported_error.clear();
+        co_return ec;
+      }
+      if (unreported_error && on_error != cancel_on_error::none) {
+        co_return std::exchange(unreported_error, {});
+      }
+    }
+
+    ++count;
+
+    // move a free child to the outstanding list
+    ceph_assert(!free.empty());
+    child& c = free.front();
+    free.pop_front();
+    outstanding.push_back(c);
+
+    // spawn the coroutine with its associated cancellation signal
+    c.signal.emplace();
+    c.canceled = false;
+
+    boost::asio::co_spawn(get_executor(), std::move(cr),
+        boost::asio::bind_cancellation_slot(c.signal->slot(),
+            child_completion{this, c}));
+
+    co_return std::exchange(unreported_error, {});
+  }
+
+  awaitable<boost::system::error_code> wait()
+  {
+    if (count > 0) {
+      auto ec = co_await wait_for(0);
+      if (ec) {
+        unreported_error.clear();
+        co_return ec;
+      }
+    }
+    co_return std::exchange(unreported_error, {});
+  }
+
+  void cancel()
+  {
+    for (child& c : outstanding) {
+      c.canceled = true;
+      c.signal->emit(boost::asio::cancellation_type::terminal);
+    }
+    if (wait_handler) {
+      wait_complete(make_error_code(boost::asio::error::operation_aborted));
+    }
+  }
+
+  void service_shutdown()
+  {
+    wait_handler.reset();
+  }
+
+ private:
+  service<co_throttle_impl>& svc;
+  executor_type ex;
+  const size_type limit;
+  const cancel_on_error on_error;
+
+  size_type count = 0;
+  size_type wait_for_count = 0;
+
+  boost::system::error_code unreported_error;
+  std::exception_ptr unreported_exception;
+
+  // track each spawned coroutine for cancellation. these are stored in an
+  // array, and recycled after each use via the free list
+  struct child : boost::intrusive::list_base_hook<> {
+    std::optional<boost::asio::cancellation_signal> signal;
+    bool canceled = false;
+  };
+  std::unique_ptr<child[]> children;
+
+  using child_list = boost::intrusive::list<child,
+        boost::intrusive::constant_time_size<false>>;
+  child_list outstanding;
+  child_list free;
+
+  using use_awaitable_t = boost::asio::use_awaitable_t<executor_type>;
+
+  using wait_signature = void(std::exception_ptr, boost::system::error_code);
+  using wait_handler_type = typename boost::asio::async_result<
+      use_awaitable_t, wait_signature>::handler_type;
+  std::optional<wait_handler_type> wait_handler;
+
+  // return an awaitable that completes once count <= target_count
+  auto wait_for(size_type target_count)
+      -> awaitable<boost::system::error_code>
+  {
+    ceph_assert(!wait_handler); // one waiter at a time
+    wait_for_count = target_count;
+
+    use_awaitable_t token;
+    return boost::asio::async_initiate<use_awaitable_t, wait_signature>(
+        [this] (wait_handler_type h) {
+          wait_handler.emplace(std::move(h));
+        }, token);
+  }
+
+  void on_complete(child& c, std::exception_ptr eptr,
+                   boost::system::error_code ec)
+  {
+    --count;
+
+    if (c.canceled) {
+      // don't report cancellation errors. cancellation was either requested
+      // by the user, or triggered by another failure that is reported
+      eptr = nullptr;
+      ec = {};
+    }
+
+    if (eptr && !unreported_exception) {
+      unreported_exception = eptr;
+    }
+    if (ec && !unreported_error) {
+      unreported_error = ec;
+    }
+
+    // move back to the free list
+    auto next = outstanding.erase(outstanding.iterator_to(c));
+    c.signal.reset();
+    free.push_back(c);
+
+    // handle cancel_on_error
+    if (eptr || ec) {
+      auto cancel_begin = outstanding.end();
+      if (on_error == cancel_on_error::after) {
+        cancel_begin = next;
+      } else if (on_error == cancel_on_error::all) {
+        cancel_begin = outstanding.begin();
+      }
+      for (auto i = cancel_begin; i != outstanding.end(); ++i) {
+        i->canceled = true;
+        i->signal->emit(boost::asio::cancellation_type::terminal);
+      }
+    }
+
+    // maybe wake the waiter
+    if (wait_handler && count <= wait_for_count) {
+      wait_complete({});
+    }
+  }
+
+  void wait_complete(boost::system::error_code ec)
+  {
+    // bind arguments to the handler for dispatch
+    auto eptr = std::exchange(unreported_exception, nullptr);
+    auto c = boost::asio::append(std::move(*wait_handler), eptr, ec);
+    wait_handler.reset();
+
+    boost::asio::dispatch(std::move(c));
+  }
+
+  struct child_completion {
+    boost::intrusive_ptr<co_throttle_impl> impl;
+    child& c;
+
+    void operator()(std::exception_ptr eptr,
+                    boost::system::error_code ec = {}) {
+      impl->on_complete(c, eptr, ec);
+    }
+  };
+};
+
+} // namespace ceph::async::detail
index c044daf662ab7f25341ddd1e060905d491f25ef4..b993ee661d0d7175a788cedfb769916965ca5404 100644 (file)
@@ -361,6 +361,10 @@ add_executable(unittest_async_completion test_async_completion.cc)
 add_ceph_unittest(unittest_async_completion)
 target_link_libraries(unittest_async_completion ceph-common Boost::system)
 
+add_executable(unittest_async_co_throttle test_async_co_throttle.cc)
+add_ceph_unittest(unittest_async_co_throttle)
+target_link_libraries(unittest_async_co_throttle ceph-common Boost::system)
+
 add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
 add_ceph_unittest(unittest_async_shared_mutex)
 target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)
diff --git a/src/test/common/test_async_co_throttle.cc b/src/test/common/test_async_co_throttle.cc
new file mode 100644 (file)
index 0000000..7dec81c
--- /dev/null
@@ -0,0 +1,655 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/async/co_throttle.h"
+#include <chrono>
+#include <boost/asio/basic_waitable_timer.hpp>
+#include <boost/asio/io_context.hpp>
+#include <gtest/gtest.h>
+
+namespace ceph::async {
+
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = boost::asio::io_context::executor_type;
+
+template <typename T>
+using awaitable = boost::asio::awaitable<T, executor_type>;
+using use_awaitable_t = boost::asio::use_awaitable_t<executor_type>;
+static constexpr use_awaitable_t use_awaitable{};
+
+using clock_type = std::chrono::steady_clock;
+using timer_type = boost::asio::basic_waitable_timer<clock_type,
+     boost::asio::wait_traits<clock_type>, executor_type>;
+
+void rethrow(std::exception_ptr eptr)
+{
+  if (eptr) std::rethrow_exception(eptr);
+}
+
+using namespace std::chrono_literals;
+
+auto worker(std::chrono::milliseconds delay = 20ms)
+    -> awaitable<void>
+{
+  auto timer = timer_type{co_await boost::asio::this_coro::executor, delay};
+  co_await timer.async_wait(use_awaitable);
+}
+
+auto worker(error_code ec, std::chrono::milliseconds delay = 10ms)
+    -> awaitable<error_code>
+{
+  co_await worker(delay);
+  co_return ec;
+}
+
+auto worker(std::exception_ptr eptr, std::chrono::milliseconds delay = 10ms)
+    -> awaitable<void>
+{
+  co_await worker(delay);
+  std::rethrow_exception(eptr);
+}
+
+auto worker(bool& finished, std::chrono::milliseconds delay = 20ms)
+    -> awaitable<void>
+{
+  co_await worker(delay);
+  finished = true;
+}
+
+auto counting_worker(size_t& count, size_t& max_count)
+    -> awaitable<void>
+{
+  ++count;
+  if (max_count < count) {
+    max_count = count;
+  }
+  co_await worker();
+  --count;
+}
+
+// use a worker that never completes to test cancellation
+awaitable<void> lazy_worker()
+{
+  for (;;) {
+    co_await worker();
+  }
+}
+
+TEST(co_throttle, wait_empty)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+
+  ASSERT_TRUE(ec_wait); // wait returns immediately if nothing was spawned
+  EXPECT_FALSE(*ec_wait);
+}
+
+TEST(co_throttle, spawn_over_limit)
+{
+  constexpr size_t limit = 1;
+
+  size_t count = 0;
+  size_t max_count = 0;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count));
+        ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count));
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until spawn2 blocks
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  EXPECT_FALSE(ec_spawn2);
+
+  ctx.run_one(); // wait for spawn1's completion
+  ctx.poll(); // run until wait blocks
+
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_FALSE(*ec_spawn2);
+  EXPECT_FALSE(ec_wait);
+
+  ctx.run(); // run to completion
+
+  ASSERT_TRUE(ec_wait);
+  EXPECT_FALSE(*ec_wait);
+
+  EXPECT_EQ(max_count, limit); // count never exceeds limit
+  EXPECT_EQ(count, 0);
+}
+
+TEST(co_throttle, spawn_over_smaller_limit)
+{
+  constexpr size_t limit = 2;
+  constexpr size_t smaller_limit = 1;
+
+  size_t count = 0;
+  size_t max_count = 0;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count));
+        ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count),
+                                            smaller_limit);
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until spawn2 blocks
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  EXPECT_FALSE(ec_spawn2);
+
+  ctx.run_one(); // wait for spawn1's completion
+  ctx.poll(); // run until wait blocks
+
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_FALSE(*ec_spawn2);
+  EXPECT_FALSE(ec_wait);
+
+  ctx.run(); // run to completion
+
+  ASSERT_TRUE(ec_wait);
+  EXPECT_FALSE(*ec_wait);
+
+  EXPECT_EQ(max_count, smaller_limit); // count never exceeds smaller_limit
+  EXPECT_EQ(count, 0);
+}
+
+TEST(co_throttle, spawn_cancel)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn1 = co_await throttle.spawn(lazy_worker());
+        ec_spawn2 = co_await throttle.spawn(lazy_worker());
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until spawn2 blocks
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  EXPECT_FALSE(ec_spawn2);
+  EXPECT_FALSE(ec_wait);
+
+  throttle.cancel();
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted);
+  ASSERT_TRUE(ec_wait);
+  EXPECT_FALSE(*ec_wait); // wait after cancel succeeds immediately
+}
+
+TEST(co_throttle, wait_cancel)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn;
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn = co_await throttle.spawn(lazy_worker());
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until wait blocks
+
+  ASSERT_TRUE(ec_spawn);
+  EXPECT_FALSE(*ec_spawn);
+  EXPECT_FALSE(ec_wait);
+
+  throttle.cancel();
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+
+  ASSERT_TRUE(ec_wait);
+  EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted);
+}
+
+TEST(co_throttle, spawn_shutdown)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        auto throttle = co_throttle{ex, limit};
+        ec_spawn1 = co_await throttle.spawn(lazy_worker());
+        ec_spawn2 = co_await throttle.spawn(lazy_worker());
+      }, rethrow);
+
+  ctx.run_one(); // call spawn1 and spawn2
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  EXPECT_FALSE(ec_spawn2);
+
+  // shut down io_context before spawn2 unblocks
+}
+
+TEST(co_throttle, wait_shutdown)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+
+  std::optional<error_code> ec_spawn;
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        auto throttle = co_throttle{ex, limit};
+        ec_spawn = co_await throttle.spawn(lazy_worker());
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.run_one(); // call spawn and wait
+
+  ASSERT_TRUE(ec_spawn);
+  EXPECT_FALSE(*ec_spawn);
+  EXPECT_FALSE(ec_wait);
+
+  // shut down io_context before wait unblocks
+}
+
+TEST(co_throttle, spawn_destroy)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+
+  {
+    auto throttle = co_throttle{ex, limit};
+
+    boost::asio::co_spawn(ex,
+        [&] () -> awaitable<void> {
+          ec_spawn1 = co_await throttle.spawn(lazy_worker());
+          ec_spawn2 = co_await throttle.spawn(lazy_worker());
+        }, rethrow);
+
+    ctx.poll(); // run until spawn2 blocks
+
+    ASSERT_TRUE(ec_spawn1);
+    EXPECT_FALSE(*ec_spawn1);
+    EXPECT_FALSE(ec_spawn2);
+    // throttle canceled/destroyed
+  }
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted);
+}
+
+TEST(co_throttle, wait_destroy)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+
+  std::optional<error_code> ec_spawn;
+  std::optional<error_code> ec_wait;
+
+  {
+    auto throttle = co_throttle{ex, limit};
+
+    boost::asio::co_spawn(ex,
+        [&] () -> awaitable<void> {
+          ec_spawn = co_await throttle.spawn(lazy_worker());
+          ec_wait = co_await throttle.wait();
+        }, rethrow);
+
+    ctx.poll(); // run until wait blocks
+
+    ASSERT_TRUE(ec_spawn);
+    EXPECT_FALSE(*ec_spawn);
+    EXPECT_FALSE(ec_wait);
+    // throttle canceled/destroyed
+  }
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+
+  ASSERT_TRUE(ec_wait);
+  EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted);
+}
+
+TEST(co_throttle, spawn_error)
+{
+  constexpr size_t limit = 2;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+  std::optional<error_code> ec_spawn3;
+  std::optional<error_code> ec_wait;
+  bool spawn1_finished = false;
+  bool spawn3_finished = false;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
+        auto ec = make_error_code(errc::invalid_argument);
+        ec_spawn2 = co_await throttle.spawn(worker(ec));
+        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until spawn3 blocks
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_FALSE(*ec_spawn2);
+  EXPECT_FALSE(ec_spawn3);
+  EXPECT_FALSE(spawn1_finished);
+  EXPECT_FALSE(spawn3_finished);
+
+  ctx.run_one(); // wait for spawn2's completion
+  ctx.poll(); // run until wait() blocks
+
+  ASSERT_TRUE(ec_spawn3);
+  EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
+  EXPECT_FALSE(ec_wait);
+
+  ctx.run(); // run to completion
+
+  EXPECT_TRUE(spawn3_finished); // spawn3 isn't canceled by spawn2's error
+  ASSERT_TRUE(ec_wait);
+  EXPECT_FALSE(*ec_wait);
+}
+
+TEST(co_throttle, wait_error)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn;
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        auto ec = make_error_code(errc::invalid_argument);
+        ec_spawn = co_await throttle.spawn(worker(ec));
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until wait blocks
+
+  ASSERT_TRUE(ec_spawn);
+  EXPECT_FALSE(*ec_spawn);
+  EXPECT_FALSE(ec_wait);
+
+  ctx.run(); // run to completion
+
+  ASSERT_TRUE(ec_wait);
+  EXPECT_EQ(*ec_wait, errc::invalid_argument);
+}
+
+TEST(co_throttle, spawn_cancel_on_error_after)
+{
+  constexpr size_t limit = 2;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit, cancel_on_error::after};
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+  std::optional<error_code> ec_spawn3;
+  std::optional<error_code> ec_spawn4;
+  std::optional<error_code> ec_wait;
+  bool spawn1_finished = false;
+  bool spawn3_finished = false;
+  bool spawn4_finished = false;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
+        auto ec = make_error_code(errc::invalid_argument);
+        ec_spawn2 = co_await throttle.spawn(worker(ec));
+        // spawn3 expects invalid_argument error and cancellation
+        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
+        // spawn4 expects success
+        ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished));
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until spawn3 blocks
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_FALSE(*ec_spawn2);
+  EXPECT_FALSE(spawn1_finished);
+
+  ctx.run_one(); // wait for spawn2's completion
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  ASSERT_TRUE(ec_spawn3);
+  EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
+  ASSERT_TRUE(ec_spawn4);
+  EXPECT_FALSE(*ec_spawn4);
+  EXPECT_FALSE(spawn1_finished);
+
+  ctx.run_one(); // wait for spawn1's completion
+  ctx.poll(); // run until wait blocks
+
+  EXPECT_FALSE(ec_wait);
+  EXPECT_TRUE(spawn1_finished); // spawn1 not canceled
+
+  ctx.run_one(); // wait for spawn4's completion
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+
+  ASSERT_TRUE(ec_wait);
+  EXPECT_FALSE(*ec_wait);
+  EXPECT_FALSE(spawn3_finished); // spawn3 canceled
+  EXPECT_TRUE(spawn4_finished); // spawn4 not canceled
+}
+
+TEST(co_throttle, spawn_cancel_on_error_all)
+{
+  constexpr size_t limit = 2;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit, cancel_on_error::all};
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+  std::optional<error_code> ec_spawn3;
+  std::optional<error_code> ec_spawn4;
+  std::optional<error_code> ec_wait;
+  bool spawn1_finished = false;
+  bool spawn3_finished = false;
+  bool spawn4_finished = false;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
+        auto ec = make_error_code(errc::invalid_argument);
+        ec_spawn2 = co_await throttle.spawn(worker(ec));
+        // spawn3 expects invalid_argument error and cancellation
+        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
+        // spawn3 expects success
+        ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished));
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until spawn3 blocks
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_FALSE(*ec_spawn2);
+  EXPECT_FALSE(ec_spawn3);
+  EXPECT_FALSE(ec_spawn4);
+
+  ctx.run_one(); // wait for spawn2's completion
+  ctx.poll(); // run until wait blocks
+
+  ASSERT_TRUE(ec_spawn3);
+  EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
+  ASSERT_TRUE(ec_spawn4);
+  EXPECT_FALSE(*ec_spawn4);
+  EXPECT_FALSE(ec_wait);
+  EXPECT_FALSE(spawn1_finished); // spawn1 canceled
+
+  ctx.run_one(); // wait for spawn4's completion
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+
+  ASSERT_TRUE(ec_wait);
+  EXPECT_FALSE(*ec_wait);
+  EXPECT_FALSE(spawn3_finished); // spawn3 canceled
+  EXPECT_TRUE(spawn4_finished); // spawn4 not canceled
+}
+
+TEST(co_throttle, spawn_exception)
+{
+  constexpr size_t limit = 2;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn1;
+  std::optional<error_code> ec_spawn2;
+  std::optional<error_code> ec_spawn3;
+  bool spawn1_finished = false;
+  bool spawn3_finished = false;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
+        auto eptr = std::make_exception_ptr(std::runtime_error{"oops"});
+        ec_spawn2 = co_await throttle.spawn(worker(eptr));
+        ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
+      }, rethrow);
+
+  ctx.poll(); // run until spawn3 blocks
+
+  ASSERT_TRUE(ec_spawn1);
+  EXPECT_FALSE(*ec_spawn1);
+  ASSERT_TRUE(ec_spawn2);
+  EXPECT_FALSE(*ec_spawn2);
+
+  EXPECT_THROW(ctx.run_one(), std::runtime_error);
+
+  ASSERT_FALSE(ec_spawn3);
+  EXPECT_FALSE(spawn1_finished);
+  EXPECT_FALSE(spawn3_finished);
+}
+
+TEST(co_throttle, wait_exception)
+{
+  constexpr size_t limit = 1;
+
+  boost::asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto throttle = co_throttle{ex, limit};
+
+  std::optional<error_code> ec_spawn;
+  std::optional<error_code> ec_wait;
+
+  boost::asio::co_spawn(ex,
+      [&] () -> awaitable<void> {
+        auto eptr = std::make_exception_ptr(std::runtime_error{"oops"});
+        ec_spawn = co_await throttle.spawn(worker(eptr));
+        ec_wait = co_await throttle.wait();
+      }, rethrow);
+
+  ctx.poll(); // run until wait blocks
+
+  ASSERT_TRUE(ec_spawn);
+  EXPECT_FALSE(*ec_spawn);
+
+  EXPECT_THROW(ctx.run(), std::runtime_error);
+
+  ASSERT_FALSE(ec_wait);
+}
+
+} // namespace ceph::async