--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <exception>
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/co_waiter.h"
+#include "common/async/service.h"
+#include "include/scope_guard.h"
+
+namespace ceph::async::detail {
+
+template <boost::asio::execution::executor Executor>
+class spawn_group_impl;
+
+// A cancellable co_spawn() completion handler that notifies the spawn_group
+// upon completion. This holds a reference to the implementation in order to
+// extend its lifetime. This is required for per-op cancellation because the
+// cancellation_signals must outlive these coroutine frames.
+template <typename Executor>
+class spawn_group_handler {
+ using impl_type = spawn_group_impl<Executor>;
+ using size_type = typename impl_type::size_type;
+ boost::intrusive_ptr<impl_type> impl;
+ boost::asio::cancellation_slot slot;
+ size_type index;
+ public:
+ spawn_group_handler(boost::intrusive_ptr<impl_type> impl,
+ boost::asio::cancellation_slot slot, size_type index)
+ : impl(std::move(impl)), slot(std::move(slot)), index(index)
+ {}
+
+ using executor_type = typename impl_type::executor_type;
+ executor_type get_executor() const noexcept
+ {
+ return impl->get_executor();
+ }
+
+ using cancellation_slot_type = boost::asio::cancellation_slot;
+ cancellation_slot_type get_cancellation_slot() const noexcept
+ {
+ return slot;
+ }
+
+ void operator()(std::exception_ptr eptr)
+ {
+ impl->child_complete(index, eptr);
+ }
+};
+
+// Reference-counted spawn group implementation.
+template <boost::asio::execution::executor Executor>
+class spawn_group_impl :
+ public boost::intrusive_ref_counter<spawn_group_impl<Executor>,
+ boost::thread_unsafe_counter>,
+ public service_list_base_hook
+{
+ public:
+ using size_type = uint16_t;
+
+ spawn_group_impl(Executor ex, size_type limit,
+ cancel_on_error on_error)
+ : svc(boost::asio::use_service<service<spawn_group_impl>>(
+ boost::asio::query(ex, boost::asio::execution::context))),
+ ex(ex),
+ signals(std::make_unique<boost::asio::cancellation_signal[]>(limit)),
+ limit(limit), on_error(on_error)
+ {
+ // register for service_shutdown() notifications
+ svc.add(*this);
+ }
+ ~spawn_group_impl()
+ {
+ svc.remove(*this);
+ }
+
+ using executor_type = Executor;
+ executor_type get_executor() const noexcept
+ {
+ return ex;
+ }
+
+ spawn_group_handler<executor_type> completion()
+ {
+ if (spawned >= limit) {
+ throw std::length_error("spawn group maximum size exceeded");
+ }
+ const size_type index = spawned++;
+ return {boost::intrusive_ptr{this}, signals[index].slot(), index};
+ }
+
+ void child_complete(size_type index, std::exception_ptr e)
+ {
+ if (e) {
+ if (!eptr) {
+ eptr = e;
+ }
+ if (on_error == cancel_on_error::all) {
+ cancel_from(0);
+ } else if (on_error == cancel_on_error::after) {
+ cancel_from(index + 1);
+ }
+ }
+ if (++completed == spawned) {
+ complete();
+ }
+ }
+
+ boost::asio::awaitable<void, executor_type> wait()
+ {
+ if (completed < spawned) {
+ co_await waiter.get();
+ }
+
+ // clear for reuse
+ completed = 0;
+ spawned = 0;
+
+ if (eptr) {
+ std::rethrow_exception(std::exchange(eptr, nullptr));
+ }
+ }
+
+ void cancel()
+ {
+ cancel_from(0);
+ }
+
+ void service_shutdown()
+ {
+ waiter.shutdown();
+ }
+
+ private:
+ service<spawn_group_impl>& svc;
+ co_waiter<void, executor_type> waiter;
+ executor_type ex;
+ std::unique_ptr<boost::asio::cancellation_signal[]> signals;
+ std::exception_ptr eptr;
+ const size_type limit;
+ size_type spawned = 0;
+ size_type completed = 0;
+ const cancel_on_error on_error;
+
+ void cancel_from(size_type begin)
+ {
+ for (size_type i = begin; i < spawned; i++) {
+ signals[i].emit(boost::asio::cancellation_type::terminal);
+ }
+ }
+
+ void complete()
+ {
+ if (waiter.waiting()) {
+ waiter.complete(nullptr);
+ }
+ }
+};
+
+} // namespace ceph::async::detail
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include "cancel_on_error.h"
+#include "detail/spawn_group.h"
+
+namespace ceph::async {
+
+/// \brief Tracks a group of coroutines to await all of their completions.
+///
+/// This class functions as a CompletionToken for calls to co_spawn(), attaching
+/// a handler that notifies the group upon the child coroutine's completion.
+///
+/// The wait() function can be used to await the completion of all children.
+/// If any child coroutines exit with an exception, the first such exception
+/// is rethrown by wait(). The cancel_on_error option controls whether these
+/// exceptions trigger the cancellation of other children.
+///
+/// All child coroutines are canceled by cancel() or spawn_group destruction.
+/// This allows the parent coroutine to share memory with its child coroutines
+/// without fear of dangling references.
+///
+/// This class is not thread-safe, so a strand executor should be used in
+/// multi-threaded contexts.
+///
+/// Example:
+/// \code
+/// awaitable<void> child(task& t);
+///
+/// awaitable<void> parent(std::span<task> tasks)
+/// {
+/// // process all tasks in parallel
+/// auto ex = co_await boost::asio::this_coro::executor;
+/// auto group = spawn_group{ex, tasks.size()};
+///
+/// for (auto& t : tasks) {
+/// boost::asio::co_spawn(ex, child(t), group);
+/// }
+/// co_await group.wait();
+/// }
+/// \endcode
+template <boost::asio::execution::executor Executor>
+class spawn_group {
+ using impl_type = detail::spawn_group_impl<Executor>;
+ boost::intrusive_ptr<impl_type> impl;
+
+ public:
+ spawn_group(Executor ex, size_t limit,
+ cancel_on_error on_error = cancel_on_error::none)
+ : impl(new impl_type(ex, limit, on_error))
+ {
+ }
+
+ ~spawn_group()
+ {
+ impl->cancel();
+ }
+
+ using executor_type = Executor;
+ executor_type get_executor() const
+ {
+ return impl->get_executor();
+ }
+
+ /// Return a cancellable co_spawn() completion handler with signature
+ /// void(std::exception_ptr). Throws a std::length_error exception if the
+ /// number of outstanding completion handlers would exceed the group's limit.
+ ///
+ /// As a convenience, you can avoid calling this function by using the
+ /// spawn_group itself as a CompletionToken for co_spawn().
+ auto completion()
+ {
+ return impl->completion();
+ }
+
+ /// Wait for all outstanding completion handlers before returning. If any
+ /// of the spawned coroutines exit with an exception, the first exception
+ /// is rethrown.
+ ///
+ /// After wait() completes, whether by exception or co_return, the spawn
+ /// group can be reused to spawn and await additional coroutines.
+ boost::asio::awaitable<void, executor_type> wait()
+ {
+ return impl->wait();
+ }
+
+ /// Cancel all outstanding coroutines.
+ void cancel()
+ {
+ impl->cancel();
+ }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// Allow spawn_group to be used as a CompletionToken.
+template <typename Executor, typename Signature>
+struct async_result<ceph::async::spawn_group<Executor>, Signature>
+{
+ using completion_handler_type =
+ ceph::async::detail::spawn_group_handler<Executor>;
+ async_result(completion_handler_type&) {}
+
+ using return_type = void;
+ return_type get() {}
+
+ template <typename Initiation, typename... Args>
+ static return_type initiate(Initiation&& init,
+ ceph::async::spawn_group<Executor>& group,
+ Args&& ...args)
+ {
+ return std::move(init)(group.completion(), std::forward<Args>(args)...);
+ }
+};
+
+} // namespace boost::asio
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/spawn_group.h"
+
+#include <optional>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/io_context.hpp>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = asio::io_context::executor_type;
+
+template <typename T>
+using awaitable = asio::awaitable<T, executor_type>;
+
+template <typename T>
+auto capture(std::optional<T>& opt)
+{
+ return [&opt] (T value) { opt = std::move(value); };
+}
+
+template <typename T>
+auto capture(asio::cancellation_signal& signal, std::optional<T>& opt)
+{
+ return asio::bind_cancellation_slot(signal.slot(), capture(opt));
+}
+
+TEST(spawn_group, spawn_limit)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 1};
+
+ auto cr = [] () -> awaitable<void> { co_return; };
+
+ asio::co_spawn(ex, cr(), group);
+ EXPECT_THROW(asio::co_spawn(ex, cr(), group), std::length_error);
+}
+
+TEST(spawn_group, wait_empty)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 1};
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_shutdown)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 10};
+
+ co_waiter<void, executor_type> waiter;
+ asio::co_spawn(ex, waiter.get(), group);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ // shut down before wait()
+}
+
+TEST(spawn_group, spawn_wait)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 10};
+
+ co_waiter<void, executor_type> waiter;
+ asio::co_spawn(ex, waiter.get(), group);
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_wait_shutdown)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+
+ co_waiter<void, executor_type> waiter;
+ auto cr = [ex, &waiter] () -> awaitable<void> {
+ auto group = spawn_group{ex, 1};
+ asio::co_spawn(ex, waiter.get(), group);
+ co_await group.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+ // shut down before wait() completes
+}
+
+TEST(spawn_group, spawn_wait_cancel)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+
+ co_waiter<void, executor_type> waiter;
+ auto cr = [ex, &waiter] () -> awaitable<void> {
+ auto group = spawn_group{ex, 1};
+ asio::co_spawn(ex, waiter.get(), group);
+ co_await group.wait();
+ };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(signal, result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ // cancel before wait() completes
+ signal.emit(asio::cancellation_type::terminal);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
+TEST(spawn_group, spawn_wait_exception_order)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 2};
+
+ co_waiter<void, executor_type> waiter1;
+ asio::co_spawn(ex, waiter1.get(), group);
+
+ co_waiter<void, executor_type> waiter2;
+ asio::co_spawn(ex, waiter2.get(), group);
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter1.complete(std::make_exception_ptr(std::logic_error{"oops"}));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(spawn_group, spawn_complete_wait)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 2};
+
+ co_waiter<void, executor_type> waiter;
+ asio::co_spawn(ex, waiter.get(), group);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+
+ waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop
+ ctx.restart();
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(spawn_group, spawn_wait_wait)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 1};
+
+ co_waiter<void, executor_type> waiter;
+ asio::co_spawn(ex, waiter.get(), group);
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+
+ waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+
+ result.reset();
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.restart();
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_wait_spawn_wait)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 1};
+
+ co_waiter<void, executor_type> waiter;
+ asio::co_spawn(ex, waiter.get(), group);
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_FALSE(*result);
+
+ asio::co_spawn(ex, waiter.get(), group);
+
+ result.reset();
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.restart();
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_cancel_wait_spawn_wait)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 1};
+
+ co_waiter<void, executor_type> waiter;
+ asio::co_spawn(ex, waiter.get(), group);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+
+ group.cancel();
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop
+ ctx.restart();
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+
+ asio::co_spawn(ex, waiter.get(), group);
+
+ result.reset();
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.restart();
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, spawn_wait_cancel_spawn_wait)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 1};
+
+ co_waiter<void, executor_type> waiter;
+ asio::co_spawn(ex, waiter.get(), group);
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ // cancel before waiter completes
+ group.cancel();
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+
+ asio::co_spawn(ex, waiter.get(), group);
+
+ result.reset();
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.restart();
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(spawn_group, cancel_on_error_after)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 3, cancel_on_error::after};
+
+ co_waiter<void, executor_type> waiter1;
+ asio::co_spawn(ex, waiter1.get(), group);
+
+ co_waiter<void, executor_type> waiter2;
+ asio::co_spawn(ex, waiter2.get(), group);
+
+ co_waiter<void, executor_type> waiter3;
+ asio::co_spawn(ex, waiter3.get(), group);
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter1.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(spawn_group, cancel_on_error_all)
+{
+ asio::io_context ctx;
+ auto ex = ctx.get_executor();
+ auto group = spawn_group{ex, 3, cancel_on_error::all};
+
+ co_waiter<void, executor_type> waiter1;
+ asio::co_spawn(ex, waiter1.get(), group);
+
+ co_waiter<void, executor_type> waiter2;
+ asio::co_spawn(ex, waiter2.get(), group);
+
+ co_waiter<void, executor_type> waiter3;
+ asio::co_spawn(ex, waiter3.get(), group);
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, group.wait(), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+} // namespace ceph::async