]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
common/async: add co_spawn_group template for fork-join parallelism
authorCasey Bodley <cbodley@redhat.com>
Sun, 5 Feb 2023 15:10:34 +0000 (10:10 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 24 Jul 2024 16:51:41 +0000 (12:51 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/co_spawn_group.h [new file with mode: 0644]
src/common/async/detail/co_spawn_group.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_co_spawn_group.cc [new file with mode: 0644]

diff --git a/src/common/async/co_spawn_group.h b/src/common/async/co_spawn_group.h
new file mode 100644 (file)
index 0000000..e30d20c
--- /dev/null
@@ -0,0 +1,101 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include "cancel_on_error.h"
+#include "detail/co_spawn_group.h"
+
+namespace ceph::async {
+
+/// \brief Tracks a group of coroutines to await all of their completions.
+///
+/// The wait() function can be used to await the completion of all children.
+/// If any child coroutines exit with an exception, the first such exception
+/// is rethrown by wait(). The cancel_on_error option controls whether these
+/// exceptions trigger the cancellation of other children.
+///
+/// All child coroutines are canceled by cancel() or co_spawn_group destruction.
+/// This allows the parent coroutine to share memory with its child coroutines
+/// without fear of dangling references.
+///
+/// This class is not thread-safe, so a strand executor should be used in
+/// multi-threaded contexts.
+///
+/// Example:
+/// \code
+/// awaitable<void> child(task& t);
+///
+/// awaitable<void> parent(std::span<task> tasks)
+/// {
+///   // process all tasks in parallel
+///   auto ex = co_await boost::asio::this_coro::executor;
+///   auto group = co_spawn_group{ex, tasks.size()};
+///
+///   for (auto& t : tasks) {
+///     group.spawn(child(t));
+///   }
+///   co_await group.wait();
+/// }
+/// \endcode
+template <boost::asio::execution::executor Executor>
+class co_spawn_group {
+  using impl_type = detail::co_spawn_group_impl<Executor>;
+  boost::intrusive_ptr<impl_type> impl;
+
+ public:
+  co_spawn_group(Executor ex, size_t limit,
+                 cancel_on_error on_error = cancel_on_error::none)
+    : impl(new impl_type(ex, limit, on_error))
+  {
+  }
+
+  ~co_spawn_group()
+  {
+    impl->cancel();
+  }
+
+  using executor_type = Executor;
+  executor_type get_executor() const
+  {
+    return impl->get_executor();
+  }
+
+  /// Spawn the given coroutine \ref cr on the group's executor. Throws a
+  /// std::length_error exception if the number of outstanding coroutines
+  /// would exceed the group's limit.
+  void spawn(boost::asio::awaitable<void, executor_type> cr)
+  {
+    impl->spawn(std::move(cr));
+  }
+
+  /// Wait for all outstanding coroutines before returning. If any of the
+  /// spawned coroutines exit with an exception, the first exception is
+  /// rethrown.
+  ///
+  /// After wait() completes, whether by exception or co_return, the spawn
+  /// group can be reused to spawn and await additional coroutines.
+  boost::asio::awaitable<void, executor_type> wait()
+  {
+    return impl->wait();
+  }
+
+  /// Cancel all outstanding coroutines.
+  void cancel()
+  {
+    impl->cancel();
+  }
+};
+
+} // namespace ceph::async
diff --git a/src/common/async/detail/co_spawn_group.h b/src/common/async/detail/co_spawn_group.h
new file mode 100644 (file)
index 0000000..bfdb2de
--- /dev/null
@@ -0,0 +1,182 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <exception>
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/co_waiter.h"
+#include "common/async/service.h"
+#include "include/scope_guard.h"
+
+namespace ceph::async::detail {
+
+template <boost::asio::execution::executor Executor>
+class co_spawn_group_impl;
+
+// A cancellable co_spawn() completion handler that notifies the co_spawn_group
+// upon completion. This holds a reference to the implementation in order to
+// extend its lifetime. This is required for per-op cancellation because the
+// cancellation_signals must outlive these coroutine frames.
+template <typename Executor>
+class co_spawn_group_handler {
+  using impl_type = co_spawn_group_impl<Executor>;
+  using size_type = typename impl_type::size_type;
+  boost::intrusive_ptr<impl_type> impl;
+  boost::asio::cancellation_slot slot;
+  size_type index;
+ public:
+  co_spawn_group_handler(boost::intrusive_ptr<impl_type> impl,
+                         boost::asio::cancellation_slot slot, size_type index)
+      : impl(std::move(impl)), slot(std::move(slot)), index(index)
+  {}
+
+  using executor_type = typename impl_type::executor_type;
+  executor_type get_executor() const noexcept
+  {
+    return impl->get_executor();
+  }
+
+  using cancellation_slot_type = boost::asio::cancellation_slot;
+  cancellation_slot_type get_cancellation_slot() const noexcept
+  {
+    return slot;
+  }
+
+  void operator()(std::exception_ptr eptr)
+  {
+    impl->child_complete(index, eptr);
+  }
+};
+
+// Reference-counted spawn group implementation.
+template <boost::asio::execution::executor Executor>
+class co_spawn_group_impl :
+    public boost::intrusive_ref_counter<co_spawn_group_impl<Executor>,
+        boost::thread_unsafe_counter>,
+    public service_list_base_hook
+{
+ public:
+  using size_type = uint16_t;
+
+  co_spawn_group_impl(Executor ex, size_type limit,
+                      cancel_on_error on_error)
+    : svc(boost::asio::use_service<service<co_spawn_group_impl>>(
+            boost::asio::query(ex, boost::asio::execution::context))),
+      ex(ex),
+      signals(std::make_unique<boost::asio::cancellation_signal[]>(limit)),
+      limit(limit), on_error(on_error)
+  {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+  }
+  ~co_spawn_group_impl()
+  {
+    svc.remove(*this);
+  }
+
+  using executor_type = Executor;
+  executor_type get_executor() const noexcept
+  {
+    return ex;
+  }
+
+  void child_complete(size_type index, std::exception_ptr e)
+  {
+    if (e) {
+      if (!eptr) {
+        eptr = e;
+      }
+      if (on_error == cancel_on_error::all) {
+        cancel_from(0);
+      } else if (on_error == cancel_on_error::after) {
+        cancel_from(index + 1);
+      }
+    }
+    if (++completed == spawned) {
+      complete();
+    }
+  }
+
+  void spawn(boost::asio::awaitable<void, executor_type> cr)
+  {
+    boost::asio::co_spawn(get_executor(), std::move(cr), completion());
+  }
+
+  boost::asio::awaitable<void, executor_type> wait()
+  {
+    if (completed < spawned) {
+      co_await waiter.get();
+    }
+
+    // clear for reuse
+    completed = 0;
+    spawned = 0;
+
+    if (eptr) {
+      std::rethrow_exception(std::exchange(eptr, nullptr));
+    }
+  }
+
+  void cancel()
+  {
+    cancel_from(0);
+  }
+
+  void service_shutdown()
+  {
+    waiter.shutdown();
+  }
+
+ private:
+  service<co_spawn_group_impl>& svc;
+  co_waiter<void, executor_type> waiter;
+  executor_type ex;
+  std::unique_ptr<boost::asio::cancellation_signal[]> signals;
+  std::exception_ptr eptr;
+  const size_type limit;
+  size_type spawned = 0;
+  size_type completed = 0;
+  const cancel_on_error on_error;
+
+  void cancel_from(size_type begin)
+  {
+    for (size_type i = begin; i < spawned; i++) {
+      signals[i].emit(boost::asio::cancellation_type::terminal);
+    }
+  }
+
+  void complete()
+  {
+    if (waiter.waiting()) {
+      waiter.complete(nullptr);
+    }
+  }
+
+  co_spawn_group_handler<executor_type> completion()
+  {
+    if (spawned >= limit) {
+      throw std::length_error("spawn group maximum size exceeded");
+    }
+    const size_type index = spawned++;
+    return {boost::intrusive_ptr{this}, signals[index].slot(), index};
+  }
+};
+
+} // namespace ceph::async::detail
index 88fb5142995d6876fa13ed408d1f7ac69a71477d..96bbb35f73bc9b01fa8655e17a172fa235092694 100644 (file)
@@ -367,6 +367,10 @@ add_ceph_unittest(unittest_async_max_concurrent_for_each)
 target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context)
 
 if(NOT WIN32)
+add_executable(unittest_async_co_spawn_group test_async_co_spawn_group.cc)
+add_ceph_unittest(unittest_async_co_spawn_group)
+target_link_libraries(unittest_async_co_spawn_group ceph-common Boost::system)
+
 add_executable(unittest_async_co_throttle test_async_co_throttle.cc)
 add_ceph_unittest(unittest_async_co_throttle)
 target_link_libraries(unittest_async_co_throttle ceph-common Boost::system)
diff --git a/src/test/common/test_async_co_spawn_group.cc b/src/test/common/test_async_co_spawn_group.cc
new file mode 100644 (file)
index 0000000..237b887
--- /dev/null
@@ -0,0 +1,511 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/async/co_spawn_group.h"
+
+#include <latch>
+#include <optional>
+#include <boost/asio/any_io_executor.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/defer.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/thread_pool.hpp>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = asio::any_io_executor;
+
+template <typename T>
+auto capture(std::optional<T>& opt)
+{
+  return [&opt] (T value) { opt = std::move(value); };
+}
+
+template <typename T>
+auto capture(asio::cancellation_signal& signal, std::optional<T>& opt)
+{
+  return asio::bind_cancellation_slot(signal.slot(), capture(opt));
+}
+
+TEST(co_spawn_group, spawn_limit)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 1};
+
+  auto cr = [] () -> asio::awaitable<void> { co_return; };
+
+  group.spawn(cr());
+  EXPECT_THROW(group.spawn(cr()), std::length_error);
+}
+
+TEST(co_spawn_group, wait_empty)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 1};
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_spawn_group, spawn_shutdown)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 10};
+
+  co_waiter<void, executor_type> waiter;
+  group.spawn(waiter.get());
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  // shut down before wait()
+}
+
+TEST(co_spawn_group, spawn_wait)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 10};
+
+  co_waiter<void, executor_type> waiter;
+  group.spawn(waiter.get());
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_spawn_group, spawn_wait_shutdown)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  co_waiter<void, executor_type> waiter;
+  auto cr = [ex, &waiter] () -> asio::awaitable<void> {
+    auto group = co_spawn_group{ex, 1};
+    group.spawn(waiter.get());
+    co_await group.wait();
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  // shut down before wait() completes
+}
+
+TEST(co_spawn_group, spawn_wait_cancel)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+
+  co_waiter<void, executor_type> waiter;
+  auto cr = [ex, &waiter] () -> asio::awaitable<void> {
+    auto group = co_spawn_group{ex, 1};
+    group.spawn(waiter.get());
+    co_await group.wait();
+  };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, cr(), capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before wait() completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(co_spawn_group, spawn_wait_exception_order)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 2};
+
+  co_waiter<void, executor_type> waiter1;
+  group.spawn(waiter1.get());
+
+  co_waiter<void, executor_type> waiter2;
+  group.spawn(waiter2.get());
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter1.complete(std::make_exception_ptr(std::logic_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(co_spawn_group, spawn_complete_wait)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 2};
+
+  co_waiter<void, executor_type> waiter;
+  group.spawn(waiter.get());
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop
+  ctx.restart();
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(co_spawn_group, spawn_wait_wait)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  group.spawn(waiter.get());
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_spawn_group, spawn_wait_spawn_wait)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  group.spawn(waiter.get());
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_FALSE(*result);
+
+  group.spawn(waiter.get());
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_spawn_group, spawn_cancel_wait_spawn_wait)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  group.spawn(waiter.get());
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  group.cancel();
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop
+  ctx.restart();
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+
+  group.spawn(waiter.get());
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_spawn_group, spawn_wait_cancel_spawn_wait)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 1};
+
+  co_waiter<void, executor_type> waiter;
+  group.spawn(waiter.get());
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before waiter completes
+  group.cancel();
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+
+  group.spawn(waiter.get());
+
+  result.reset();
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.restart();
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(co_spawn_group, cancel_on_error_after)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 3, cancel_on_error::after};
+
+  co_waiter<void, executor_type> waiter1;
+  group.spawn(waiter1.get());
+
+  co_waiter<void, executor_type> waiter2;
+  group.spawn(waiter2.get());
+
+  co_waiter<void, executor_type> waiter3;
+  group.spawn(waiter3.get());
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter1.complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(co_spawn_group, cancel_on_error_all)
+{
+  asio::io_context ctx;
+  executor_type ex = ctx.get_executor();
+  auto group = co_spawn_group{ex, 3, cancel_on_error::all};
+
+  co_waiter<void, executor_type> waiter1;
+  group.spawn(waiter1.get());
+
+  co_waiter<void, executor_type> waiter2;
+  group.spawn(waiter2.get());
+
+  co_waiter<void, executor_type> waiter3;
+  group.spawn(waiter3.get());
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ex, group.wait(), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(co_spawn_group, cross_thread_cancel)
+{
+  // run the coroutine in a background thread
+  asio::thread_pool ctx{1};
+  executor_type ex = ctx.get_executor();
+
+  std::latch waiting{1};
+
+  auto cr = [ex, &waiting] () -> asio::awaitable<void> {
+    auto group = co_spawn_group{ex, 1};
+    co_waiter<void, executor_type> waiter;
+    group.spawn(waiter.get());
+    // decrement the latch after group.wait() suspends
+    asio::defer(ex, [&waiting] { waiting.count_down(); });
+    co_await group.wait();
+  };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  // without bind_executor(), tsan identifies a data race on signal.emit()
+  asio::co_spawn(ex, cr(), bind_executor(ex, capture(signal, result)));
+
+  waiting.wait(); // wait until we've suspended in group.wait()
+
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.join();
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+} // namespace ceph::async