--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <concepts>
+#include <iterator>
+#include <ranges>
+#include <type_traits>
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/asio/this_coro.hpp>
+#include "co_spawn_group.h"
+
+namespace ceph::async {
+
+/// Call a coroutine with each element in the given range then wait for all
+/// of them to complete. The first exception is rethrown to the caller. The
+/// cancel_on_error option controls whether these exceptions trigger the
+/// cancellation of other children.
+///
+/// Example:
+/// \code
+/// awaitable<void> child(task& t);
+///
+/// awaitable<void> parent(std::span<task> tasks)
+/// {
+/// co_await parallel_for_each(tasks.begin(), tasks.end(), child);
+/// }
+/// \endcode
+template <typename Iterator, typename Sentinel, typename VoidAwaitableFactory,
+ typename Value = std::iter_reference_t<Iterator>,
+ typename VoidAwaitable = std::invoke_result_t<
+ VoidAwaitableFactory, Value>,
+ typename AwaitableT = typename VoidAwaitable::value_type,
+ typename AwaitableExecutor = typename VoidAwaitable::executor_type>
+ requires (std::input_iterator<Iterator> &&
+ std::sentinel_for<Sentinel, Iterator> &&
+ std::same_as<AwaitableT, void> &&
+ boost::asio::execution::executor<AwaitableExecutor>)
+auto parallel_for_each(Iterator begin, Sentinel end,
+ VoidAwaitableFactory&& factory,
+ cancel_on_error on_error = cancel_on_error::none)
+ -> boost::asio::awaitable<void, AwaitableExecutor>
+{
+ const size_t count = std::ranges::distance(begin, end);
+ if (!count) {
+ co_return;
+ }
+ auto ex = co_await boost::asio::this_coro::executor;
+ auto group = co_spawn_group{ex, count, on_error};
+ for (Iterator i = begin; i != end; ++i) {
+ group.spawn(factory(*i));
+ }
+ co_await group.wait();
+}
+
+/// \overload
+template <typename Range, typename VoidAwaitableFactory,
+ typename Value = std::ranges::range_reference_t<Range>,
+ typename VoidAwaitable = std::invoke_result_t<
+ VoidAwaitableFactory, Value>,
+ typename AwaitableT = typename VoidAwaitable::value_type,
+ typename AwaitableExecutor = typename VoidAwaitable::executor_type>
+ requires (std::ranges::range<Range> &&
+ std::same_as<AwaitableT, void> &&
+ boost::asio::execution::executor<AwaitableExecutor>)
+auto parallel_for_each(Range&& range, VoidAwaitableFactory&& factory,
+ cancel_on_error on_error = cancel_on_error::none)
+ -> boost::asio::awaitable<void, AwaitableExecutor>
+{
+ return parallel_for_each(std::begin(range), std::end(range),
+ std::move(factory), on_error);
+}
+
+} // namespace ceph::async
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/parallel_for_each.h"
+
+#include <optional>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/io_context.hpp>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = asio::io_context::executor_type;
+
+template <typename T>
+using awaitable = asio::awaitable<T, executor_type>;
+
+using void_waiter = co_waiter<void, executor_type>;
+
+template <typename T>
+auto capture(std::optional<T>& opt)
+{
+ return [&opt] (T value) { opt = std::move(value); };
+}
+
+template <typename T>
+auto capture(asio::cancellation_signal& signal, std::optional<T>& opt)
+{
+ return asio::bind_cancellation_slot(signal.slot(), capture(opt));
+}
+
+TEST(parallel_for_each, empty)
+{
+ asio::io_context ctx;
+
+ int* end = nullptr;
+ auto cr = [] (int i) -> awaitable<void> { co_return; };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(end, end, cr), capture(result));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(parallel_for_each, shutdown)
+{
+ asio::io_context ctx;
+
+ void_waiter waiters[2];
+ auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+ // shut down before any waiters complete
+}
+
+TEST(parallel_for_each, cancel)
+{
+ asio::io_context ctx;
+
+ void_waiter waiters[2];
+ auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ // cancel before any waiters complete
+ signal.emit(asio::cancellation_type::terminal);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
+TEST(parallel_for_each, complete_shutdown)
+{
+ asio::io_context ctx;
+
+ void_waiter waiters[2];
+ auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiters[0].complete(nullptr);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+ // shut down before final waiter completes
+}
+
+TEST(parallel_for_each, complete_cancel)
+{
+ asio::io_context ctx;
+
+ void_waiter waiters[2];
+ auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiters[0].complete(nullptr);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ // cancel before final waiter completes
+ signal.emit(asio::cancellation_type::terminal);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
+TEST(parallel_for_each, complete_complete)
+{
+ asio::io_context ctx;
+
+ void_waiter waiters[2];
+ auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(result));
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiters[0].complete(nullptr);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiters[1].complete(nullptr);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+struct null_sentinel {};
+bool operator==(const char* c, null_sentinel) { return !*c; }
+static_assert(std::sentinel_for<null_sentinel, const char*>);
+
+TEST(parallel_for_each, sentinel)
+{
+ asio::io_context ctx;
+
+ const char* begin = "hello";
+ null_sentinel end;
+
+ size_t count = 0;
+ auto cr = [&count] (char c) -> awaitable<void> {
+ ++count;
+ co_return;
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+ EXPECT_EQ(count, 5);
+}
+
+TEST(parallel_for_each, move_iterator)
+{
+ asio::io_context ctx;
+
+ using value_type = std::unique_ptr<int>;
+ value_type values[] = {
+ std::make_unique<int>(42),
+ std::make_unique<int>(43),
+ };
+
+ auto begin = std::make_move_iterator(std::begin(values));
+ auto end = std::make_move_iterator(std::end(values));
+
+ auto cr = [] (value_type v) -> awaitable<void> {
+ if (!v) {
+ throw std::invalid_argument("empty");
+ }
+ co_return;
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+
+ EXPECT_FALSE(values[0]);
+ EXPECT_FALSE(values[1]);
+}
+
+} // namespace ceph::async