]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add spawn_group template for fork-join parallelism
authorCasey Bodley <cbodley@redhat.com>
Sun, 5 Feb 2023 15:10:34 +0000 (10:10 -0500)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:48:00 +0000 (17:48 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/cancel_on_error.h
src/common/async/detail/spawn_group.h [new file with mode: 0644]
src/common/async/spawn_group.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_spawn_group.cc [new file with mode: 0644]

index a4c05c2a7bc31cf4534f97a45a5fde74c3f5ed53..e9f116c4894ed7a6b96970f8aa151fe644d0684b 100644 (file)
@@ -18,7 +18,7 @@
 
 namespace ceph::async {
 
-/// Error handling strategy for co_throttle.
+/// Error handling strategy for concurrent operations.
 enum class cancel_on_error : uint8_t {
   none, //< No spawned coroutines are canceled on failure.
   after, //< Cancel coroutines spawned after the failed coroutine.
diff --git a/src/common/async/detail/spawn_group.h b/src/common/async/detail/spawn_group.h
new file mode 100644 (file)
index 0000000..c121b7d
--- /dev/null
@@ -0,0 +1,176 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <exception>
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/co_waiter.h"
+#include "common/async/service.h"
+#include "include/scope_guard.h"
+
+namespace ceph::async::detail {
+
+template <boost::asio::execution::executor Executor>
+class spawn_group_impl;
+
+// A cancellable co_spawn() completion handler that notifies the spawn_group
+// upon completion. This holds a reference to the implementation in order to
+// extend its lifetime. This is required for per-op cancellation because the
+// cancellation_signals must outlive these coroutine frames.
+template <typename Executor>
+class spawn_group_handler {
+  using impl_type = spawn_group_impl<Executor>;
+  using size_type = typename impl_type::size_type;
+  boost::intrusive_ptr<impl_type> impl;
+  boost::asio::cancellation_slot slot;
+  size_type index;
+ public:
+  spawn_group_handler(boost::intrusive_ptr<impl_type> impl,
+                      boost::asio::cancellation_slot slot, size_type index)
+      : impl(std::move(impl)), slot(std::move(slot)), index(index)
+  {}
+
+  using executor_type = typename impl_type::executor_type;
+  executor_type get_executor() const noexcept
+  {
+    return impl->get_executor();
+  }
+
+  using cancellation_slot_type = boost::asio::cancellation_slot;
+  cancellation_slot_type get_cancellation_slot() const noexcept
+  {
+    return slot;
+  }
+
+  void operator()(std::exception_ptr eptr)
+  {
+    impl->child_complete(index, eptr);
+  }
+};
+
+// Reference-counted spawn group implementation.
+template <boost::asio::execution::executor Executor>
+class spawn_group_impl :
+    public boost::intrusive_ref_counter<spawn_group_impl<Executor>,
+        boost::thread_unsafe_counter>,
+    public service_list_base_hook
+{
+ public:
+  using size_type = uint16_t;
+
+  spawn_group_impl(Executor ex, size_type limit,
+                   cancel_on_error on_error)
+    : svc(boost::asio::use_service<service<spawn_group_impl>>(
+            boost::asio::query(ex, boost::asio::execution::context))),
+      ex(ex),
+      signals(std::make_unique<boost::asio::cancellation_signal[]>(limit)),
+      limit(limit), on_error(on_error)
+  {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+  }
+  ~spawn_group_impl()
+  {
+    svc.remove(*this);
+  }
+
+  using executor_type = Executor;
+  executor_type get_executor() const noexcept
+  {
+    return ex;
+  }
+
+  spawn_group_handler<executor_type> completion()
+  {
+    if (spawned >= limit) {
+      throw std::length_error("spawn group maximum size exceeded");
+    }
+    const size_type index = spawned++;
+    return {boost::intrusive_ptr{this}, signals[index].slot(), index};
+  }
+
+  void child_complete(size_type index, std::exception_ptr e)
+  {
+    if (e) {
+      if (!eptr) {
+        eptr = e;
+      }
+      if (on_error == cancel_on_error::all) {
+        cancel_from(0);
+      } else if (on_error == cancel_on_error::after) {
+        cancel_from(index + 1);
+      }
+    }
+    if (++completed == spawned) {
+      complete();
+    }
+  }
+
+  boost::asio::awaitable<void, executor_type> wait()
+  {
+    if (completed < spawned) {
+      co_await waiter.get();
+    }
+
+    // clear for reuse
+    completed = 0;
+    spawned = 0;
+
+    if (eptr) {
+      std::rethrow_exception(std::exchange(eptr, nullptr));
+    }
+  }
+
+  void cancel()
+  {
+    cancel_from(0);
+  }
+
+  void service_shutdown()
+  {
+    waiter.shutdown();
+  }
+
+ private:
+  service<spawn_group_impl>& svc;
+  co_waiter<void, executor_type> waiter;
+  executor_type ex;
+  std::unique_ptr<boost::asio::cancellation_signal[]> signals;
+  std::exception_ptr eptr;
+  const size_type limit;
+  size_type spawned = 0;
+  size_type completed = 0;
+  const cancel_on_error on_error;
+
+  void cancel_from(size_type begin)
+  {
+    for (size_type i = begin; i < spawned; i++) {
+      signals[i].emit(boost::asio::cancellation_type::terminal);
+    }
+  }
+
+  void complete()
+  {
+    if (waiter.waiting()) {
+      waiter.complete(nullptr);
+    }
+  }
+};
+
+} // namespace ceph::async::detail
diff --git a/src/common/async/spawn_group.h b/src/common/async/spawn_group.h
new file mode 100644 (file)
index 0000000..dbe2953
--- /dev/null
@@ -0,0 +1,131 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include "cancel_on_error.h"
+#include "detail/spawn_group.h"
+
+namespace ceph::async {
+
+/// \brief Tracks a group of coroutines to await all of their completions.
+///
+/// This class functions as a CompletionToken for calls to co_spawn(), attaching
+/// a handler that notifies the group upon the child coroutine's completion.
+///
+/// The wait() function can be used to await the completion of all children.
+/// If any child coroutines exit with an exception, the first such exception
+/// is rethrown by wait(). The cancel_on_error option controls whether these
+/// exceptions trigger the cancellation of other children.
+///
+/// All child coroutines are canceled by cancel() or spawn_group destruction.
+/// This allows the parent coroutine to share memory with its child coroutines
+/// without fear of dangling references.
+///
+/// This class is not thread-safe, so a strand executor should be used in
+/// multi-threaded contexts.
+///
+/// Example:
+/// \code
+/// awaitable<void> child(task& t);
+///
+/// awaitable<void> parent(std::span<task> tasks)
+/// {
+///   // process all tasks in parallel
+///   auto ex = co_await boost::asio::this_coro::executor;
+///   auto group = spawn_group{ex, tasks.size()};
+///
+///   for (auto& t : tasks) {
+///     boost::asio::co_spawn(ex, child(t), group);
+///   }
+///   co_await group.wait();
+/// }
+/// \endcode
+template <boost::asio::execution::executor Executor>
+class spawn_group {
+  using impl_type = detail::spawn_group_impl<Executor>;
+  boost::intrusive_ptr<impl_type> impl;
+
+ public:
+  spawn_group(Executor ex, size_t limit,
+              cancel_on_error on_error = cancel_on_error::none)
+    : impl(new impl_type(ex, limit, on_error))
+  {
+  }
+
+  ~spawn_group()
+  {
+    impl->cancel();
+  }
+
+  using executor_type = Executor;
+  executor_type get_executor() const
+  {
+    return impl->get_executor();
+  }
+
+  /// Return a cancellable co_spawn() completion handler with signature
+  /// void(std::exception_ptr). Throws a std::length_error exception if the
+  /// number of outstanding completion handlers would exceed the group's limit.
+  ///
+  /// As a convenience, you can avoid calling this function by using the
+  /// spawn_group itself as a CompletionToken for co_spawn().
+  auto completion()
+  {
+    return impl->completion();
+  }
+
+  /// Wait for all outstanding completion handlers before returning. If any
+  /// of the spawned coroutines exit with an exception, the first exception
+  /// is rethrown.
+  ///
+  /// After wait() completes, whether by exception or co_return, the spawn
+  /// group can be reused to spawn and await additional coroutines.
+  boost::asio::awaitable<void, executor_type> wait()
+  {
+    return impl->wait();
+  }
+
+  /// Cancel all outstanding coroutines.
+  void cancel()
+  {
+    impl->cancel();
+  }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// Allow spawn_group to be used as a CompletionToken.
+template <typename Executor, typename Signature>
+struct async_result<ceph::async::spawn_group<Executor>, Signature>
+{
+  using completion_handler_type =
+      ceph::async::detail::spawn_group_handler<Executor>;
+  async_result(completion_handler_type&) {}
+
+  using return_type = void;
+  return_type get() {}
+
+  template <typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                              ceph::async::spawn_group<Executor>& group,
+                              Args&& ...args)
+  {
+    return std::move(init)(group.completion(), std::forward<Args>(args)...);
+  }
+};
+
+} // namespace boost::asio
index b993ee661d0d7175a788cedfb769916965ca5404..9b982a03ed138cb5925ef56affd88c4aa41a789c 100644 (file)
@@ -369,6 +369,10 @@ add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
 add_ceph_unittest(unittest_async_shared_mutex)
 target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)
 
+add_executable(unittest_async_spawn_group test_async_spawn_group.cc)
+add_ceph_unittest(unittest_async_spawn_group)
+target_link_libraries(unittest_async_spawn_group ceph-common Boost::system)
+
 add_executable(unittest_cdc test_cdc.cc
   $<TARGET_OBJECTS:unit-main>)
 target_link_libraries(unittest_cdc global ceph-common)
diff --git a/src/test/common/test_async_spawn_group.cc b/src/test/common/test_async_spawn_group.cc
new file mode 100644 (file)
index 0000000..2810c68
--- /dev/null
@@ -0,0 +1,471 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/async/spawn_group.h"
+
+#include <optional>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/io_context.hpp>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = asio::io_context::executor_type;
+
+template <typename T>
+using awaitable = asio::awaitable<T, executor_type>;
+
+template <typename T>
+auto capture(std::optional<T>& opt)
+{
+  return [&opt] (T value) { opt = std::move(value); };
+}
+
+template <typename T>
+auto capture(asio::cancellation_signal& signal, std::optional<T>& opt)
+{
+  return asio::bind_cancellation_slot(signal.slot(), capture(opt));
+}
+
+TEST(spawn_group, spawn_limit)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 1};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  asio::co_spawn(ex, cr(), group);
+  EXPECT_THROW(asio::co_spawn(ex, cr(), group), std::length_error);
+}
+
+TEST(spawn_group, wait_empty)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 1};
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_shutdown)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 10};
+
+  co_waiter<void, executor_type> waiter;
+  asio::co_spawn(ex, waiter.get(), group);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  // shut down before wait()
+}
+
+TEST(spawn_group, spawn_wait)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 10};
+
+  co_waiter<void, executor_type> waiter;
+  asio::co_spawn(ex, waiter.get(), group);
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_wait_shutdown)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+
+  co_waiter<void, executor_type> waiter;
+  auto cr = [ex, &waiter] () -> awaitable<void> {
+    auto group = spawn_group{ex, 1};
+    asio::co_spawn(ex, waiter.get(), group);
+    co_await group.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  // shut down before wait() completes
+}
+
+TEST(spawn_group, spawn_wait_cancel)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+
+  co_waiter<void, executor_type> waiter;
+  auto cr = [ex, &waiter] () -> awaitable<void> {
+    auto group = spawn_group{ex, 1};
+    asio::co_spawn(ex, waiter.get(), group);
+    co_await group.wait();
+  };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before wait() completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(spawn_group, spawn_wait_exception_order)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 2};
+
+  co_waiter<void, executor_type> waiter1;
+  asio::co_spawn(ex, waiter1.get(), group);
+
+  co_waiter<void, executor_type> waiter2;
+  asio::co_spawn(ex, waiter2.get(), group);
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter1.complete(std::make_exception_ptr(std::logic_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(spawn_group, spawn_complete_wait)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 2};
+
+  co_waiter<void, executor_type> waiter;
+  asio::co_spawn(ex, waiter.get(), group);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop
+  ctx.restart();
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(spawn_group, spawn_wait_wait)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  asio::co_spawn(ex, waiter.get(), group);
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_wait_spawn_wait)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  asio::co_spawn(ex, waiter.get(), group);
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_FALSE(*result);
+
+  asio::co_spawn(ex, waiter.get(), group);
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_cancel_wait_spawn_wait)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  asio::co_spawn(ex, waiter.get(), group);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  group.cancel();
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop
+  ctx.restart();
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+
+  asio::co_spawn(ex, waiter.get(), group);
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_wait_cancel_spawn_wait)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  asio::co_spawn(ex, waiter.get(), group);
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before waiter completes
+  group.cancel();
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+
+  asio::co_spawn(ex, waiter.get(), group);
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, cancel_on_error_after)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 3, cancel_on_error::after};
+
+  co_waiter<void, executor_type> waiter1;
+  asio::co_spawn(ex, waiter1.get(), group);
+
+  co_waiter<void, executor_type> waiter2;
+  asio::co_spawn(ex, waiter2.get(), group);
+
+  co_waiter<void, executor_type> waiter3;
+  asio::co_spawn(ex, waiter3.get(), group);
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter1.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(spawn_group, cancel_on_error_all)
+{
+  asio::io_context ctx;
+  auto ex = ctx.get_executor();
+  auto group = spawn_group{ex, 3, cancel_on_error::all};
+
+  co_waiter<void, executor_type> waiter1;
+  asio::co_spawn(ex, waiter1.get(), group);
+
+  co_waiter<void, executor_type> waiter2;
+  asio::co_spawn(ex, waiter2.get(), group);
+
+  co_waiter<void, executor_type> waiter3;
+  asio::co_spawn(ex, waiter3.get(), group);
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+} // namespace ceph::async