--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <concepts>
+#include <iterator>
+#include <ranges>
+#include <utility>
+#include <boost/asio/spawn.hpp>
+#include "cancel_on_error.h"
+#include "yield_context.h"
+#include "spawn_throttle.h"
+
+namespace ceph::async {
+
+/// Call a coroutine with each element in the given range then wait for all of
+/// them to complete. The first exception is rethrown to the caller. The
+/// cancel_on_error option controls whether these exceptions trigger the
+/// cancellation of other children. The number of outstanding coroutines
+/// is limited by the max_concurrent argument.
+///
+/// Example:
+/// \code
+/// void child(task& t, boost::asio::yield_context yield);
+///
+/// void parent(std::span<task> tasks, optional_yield y)
+/// {
+/// // process all tasks, up to 10 at a time
+/// max_concurrent_for_each(tasks, 10, y, child);
+/// }
+/// \endcode
+template <typename Iterator, typename Sentinel, typename Func,
+ typename Reference = std::iter_reference_t<Iterator>>
+ requires (std::input_iterator<Iterator> &&
+ std::sentinel_for<Sentinel, Iterator> &&
+ std::invocable<Func, Reference, boost::asio::yield_context>)
+void max_concurrent_for_each(Iterator begin,
+ Sentinel end,
+ size_t max_concurrent,
+ optional_yield y,
+ Func&& func,
+ cancel_on_error on_error = cancel_on_error::none)
+{
+ const size_t count = std::ranges::distance(begin, end);
+ if (!count) {
+ return;
+ }
+ auto throttle = spawn_throttle{y, max_concurrent, on_error};
+ for (Iterator i = begin; i != end; ++i) {
+ boost::asio::spawn(throttle.get_executor(),
+ [&func, &val = *i] (boost::asio::yield_context yield) {
+ func(val, yield);
+ }, throttle);
+ }
+ throttle.wait();
+}
+
+/// \overload
+template <typename Range, typename Func,
+ typename Reference = std::ranges::range_reference_t<Range>>
+ requires (std::ranges::range<Range> &&
+ std::invocable<Func, Reference, boost::asio::yield_context>)
+auto max_concurrent_for_each(Range&& range,
+ size_t max_concurrent,
+ optional_yield y,
+ Func&& func,
+ cancel_on_error on_error = cancel_on_error::none)
+{
+ return max_concurrent_for_each(std::begin(range), std::end(range),
+ max_concurrent, y, std::forward<Func>(func),
+ on_error);
+}
+
+// TODO: overloads for co_spawn()
+
+} // namespace ceph::async
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/max_concurrent_for_each.h"
+
+#include <chrono>
+#include <exception>
+#include <optional>
+#include <boost/asio/spawn.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <gtest/gtest.h>
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+
+void rethrow(std::exception_ptr eptr)
+{
+ if (eptr) std::rethrow_exception(eptr);
+}
+
+using namespace std::chrono_literals;
+
+void wait_for(std::chrono::milliseconds dur, asio::yield_context yield)
+{
+ auto timer = asio::steady_timer{yield.get_executor(), dur};
+ timer.async_wait(yield);
+}
+
+struct null_sentinel {};
+bool operator==(const char* c, null_sentinel) { return !*c; }
+static_assert(std::sentinel_for<null_sentinel, const char*>);
+
+TEST(iterator_null_yield, empty)
+{
+ int* end = nullptr;
+ auto cr = [] (int, asio::yield_context) {};
+ max_concurrent_for_each(end, end, 10, null_yield, cr);
+}
+
+TEST(iterator_null_yield, over_limit)
+{
+ int concurrent = 0;
+ int max_concurrent = 0;
+ int completed = 0;
+
+ auto cr = [&] (int, asio::yield_context yield) {
+ ++concurrent;
+ if (max_concurrent < concurrent) {
+ max_concurrent = concurrent;
+ }
+
+ wait_for(1ms, yield);
+
+ --concurrent;
+ ++completed;
+ };
+
+ constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+ max_concurrent_for_each(begin(arr), end(arr), 2, null_yield, cr);
+
+ EXPECT_EQ(0, concurrent);
+ EXPECT_EQ(2, max_concurrent);
+ EXPECT_EQ(10, completed);
+}
+
+TEST(iterator_null_yield, sentinel)
+{
+ const char* begin = "hello";
+ null_sentinel end;
+
+ size_t completed = 0;
+ auto cr = [&completed] (char c, asio::yield_context) { ++completed; };
+ max_concurrent_for_each(begin, end, 10, null_yield, cr);
+ EXPECT_EQ(completed, 5);
+}
+
+TEST(range_null_yield, empty)
+{
+ constexpr std::array<int, 0> arr{};
+ auto cr = [] (int, asio::yield_context) {};
+ max_concurrent_for_each(arr, 10, null_yield, cr);
+}
+
+TEST(range_null_yield, over_limit)
+{
+ int concurrent = 0;
+ int max_concurrent = 0;
+ int completed = 0;
+
+ auto cr = [&] (int, asio::yield_context yield) {
+ ++concurrent;
+ if (max_concurrent < concurrent) {
+ max_concurrent = concurrent;
+ }
+
+ wait_for(1ms, yield);
+
+ --concurrent;
+ ++completed;
+ };
+
+ constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+ max_concurrent_for_each(arr, 2, null_yield, cr);
+
+ EXPECT_EQ(0, concurrent);
+ EXPECT_EQ(2, max_concurrent);
+ EXPECT_EQ(10, completed);
+}
+
+
+TEST(iterator_yield, empty)
+{
+ int* end = nullptr;
+ auto cr = [] (int, asio::yield_context) {};
+
+ asio::io_context ctx;
+ asio::spawn(ctx, [&] (asio::yield_context yield) {
+ max_concurrent_for_each(end, end, 10, yield, cr);
+ }, rethrow);
+ ctx.run();
+}
+
+TEST(iterator_yield, over_limit)
+{
+ int concurrent = 0;
+ int max_concurrent = 0;
+ int completed = 0;
+
+ auto cr = [&] (int, asio::yield_context yield) {
+ ++concurrent;
+ if (max_concurrent < concurrent) {
+ max_concurrent = concurrent;
+ }
+
+ wait_for(1ms, yield);
+
+ --concurrent;
+ ++completed;
+ };
+
+ asio::io_context ctx;
+ asio::spawn(ctx, [&] (asio::yield_context yield) {
+ constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+ max_concurrent_for_each(begin(arr), end(arr), 2, yield, cr);
+ }, rethrow);
+ ctx.run();
+
+ EXPECT_EQ(0, concurrent);
+ EXPECT_EQ(2, max_concurrent);
+ EXPECT_EQ(10, completed);
+}
+
+TEST(iterator_yield, sentinel)
+{
+ const char* begin = "hello";
+ null_sentinel end;
+
+ size_t completed = 0;
+ auto cr = [&completed] (char c, asio::yield_context) { ++completed; };
+
+ asio::io_context ctx;
+ asio::spawn(ctx, [&] (asio::yield_context yield) {
+ max_concurrent_for_each(begin, end, 10, yield, cr);
+ }, rethrow);
+ ctx.run();
+
+ EXPECT_EQ(completed, 5);
+}
+
+TEST(range_yield, empty)
+{
+ constexpr std::array<int, 0> arr{};
+ auto cr = [] (int, asio::yield_context) {};
+
+ asio::io_context ctx;
+ asio::spawn(ctx, [&] (asio::yield_context yield) {
+ max_concurrent_for_each(arr, 10, yield, cr);
+ }, rethrow);
+ ctx.run();
+}
+
+TEST(range_yield, over_limit)
+{
+ int concurrent = 0;
+ int max_concurrent = 0;
+ int completed = 0;
+
+ auto cr = [&] (int, asio::yield_context yield) {
+ ++concurrent;
+ if (max_concurrent < concurrent) {
+ max_concurrent = concurrent;
+ }
+
+ wait_for(1ms, yield);
+
+ --concurrent;
+ ++completed;
+ };
+
+ asio::io_context ctx;
+ asio::spawn(ctx, [&] (asio::yield_context yield) {
+ constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+ max_concurrent_for_each(arr, 2, yield, cr);
+ }, rethrow);
+ ctx.run();
+
+ EXPECT_EQ(0, concurrent);
+ EXPECT_EQ(2, max_concurrent);
+ EXPECT_EQ(10, completed);
+}
+
+} // namespace ceph::async