From 25017d6196b662e13ea7fb00e975834dfd5c6f06 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 9 May 2024 13:00:57 -0400 Subject: [PATCH] common/async: add max_concurrent_for_each() algorithm inspired by seastar's max_concurrent_for_each(), implemented for optional_yield in terms of yield_throttle can also be overloaded for co_await() and co_throttle (not part of this branch) Signed-off-by: Casey Bodley --- src/common/async/max_concurrent_for_each.h | 89 +++++++ src/test/common/CMakeLists.txt | 4 + .../test_async_max_concurrent_for_each.cc | 225 ++++++++++++++++++ 3 files changed, 318 insertions(+) create mode 100644 src/common/async/max_concurrent_for_each.h create mode 100644 src/test/common/test_async_max_concurrent_for_each.cc diff --git a/src/common/async/max_concurrent_for_each.h b/src/common/async/max_concurrent_for_each.h new file mode 100644 index 0000000000000..c0789a1606431 --- /dev/null +++ b/src/common/async/max_concurrent_for_each.h @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "cancel_on_error.h" +#include "yield_context.h" +#include "spawn_throttle.h" + +namespace ceph::async { + +/// Call a coroutine with each element in the given range then wait for all of +/// them to complete. The first exception is rethrown to the caller. The +/// cancel_on_error option controls whether these exceptions trigger the +/// cancellation of other children. The number of outstanding coroutines +/// is limited by the max_concurrent argument. +/// +/// Example: +/// \code +/// void child(task& t, boost::asio::yield_context yield); +/// +/// void parent(std::span tasks, optional_yield y) +/// { +/// // process all tasks, up to 10 at a time +/// max_concurrent_for_each(tasks, 10, y, child); +/// } +/// \endcode +template > + requires (std::input_iterator && + std::sentinel_for && + std::invocable) +void max_concurrent_for_each(Iterator begin, + Sentinel end, + size_t max_concurrent, + optional_yield y, + Func&& func, + cancel_on_error on_error = cancel_on_error::none) +{ + const size_t count = std::ranges::distance(begin, end); + if (!count) { + return; + } + auto throttle = spawn_throttle{y, max_concurrent, on_error}; + for (Iterator i = begin; i != end; ++i) { + boost::asio::spawn(throttle.get_executor(), + [&func, &val = *i] (boost::asio::yield_context yield) { + func(val, yield); + }, throttle); + } + throttle.wait(); +} + +/// \overload +template > + requires (std::ranges::range && + std::invocable) +auto max_concurrent_for_each(Range&& range, + size_t max_concurrent, + optional_yield y, + Func&& func, + cancel_on_error on_error = cancel_on_error::none) +{ + return max_concurrent_for_each(std::begin(range), std::end(range), + max_concurrent, y, std::forward(func), + on_error); +} + +// TODO: overloads for co_spawn() + +} // namespace ceph::async diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index d54ea127cc968..428ef7b01470f 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -357,6 +357,10 @@ add_executable(unittest_async_completion test_async_completion.cc) add_ceph_unittest(unittest_async_completion) target_link_libraries(unittest_async_completion ceph-common Boost::system) +add_executable(unittest_async_max_concurrent_for_each test_async_max_concurrent_for_each.cc) +add_ceph_unittest(unittest_async_max_concurrent_for_each) +target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context) + add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc) add_ceph_unittest(unittest_async_shared_mutex) target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system) diff --git a/src/test/common/test_async_max_concurrent_for_each.cc b/src/test/common/test_async_max_concurrent_for_each.cc new file mode 100644 index 0000000000000..2e6f919a39485 --- /dev/null +++ b/src/test/common/test_async_max_concurrent_for_each.cc @@ -0,0 +1,225 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/max_concurrent_for_each.h" + +#include +#include +#include +#include +#include +#include + +namespace ceph::async { + +namespace asio = boost::asio; + +void rethrow(std::exception_ptr eptr) +{ + if (eptr) std::rethrow_exception(eptr); +} + +using namespace std::chrono_literals; + +void wait_for(std::chrono::milliseconds dur, asio::yield_context yield) +{ + auto timer = asio::steady_timer{yield.get_executor(), dur}; + timer.async_wait(yield); +} + +struct null_sentinel {}; +bool operator==(const char* c, null_sentinel) { return !*c; } +static_assert(std::sentinel_for); + +TEST(iterator_null_yield, empty) +{ + int* end = nullptr; + auto cr = [] (int, asio::yield_context) {}; + max_concurrent_for_each(end, end, 10, null_yield, cr); +} + +TEST(iterator_null_yield, over_limit) +{ + int concurrent = 0; + int max_concurrent = 0; + int completed = 0; + + auto cr = [&] (int, asio::yield_context yield) { + ++concurrent; + if (max_concurrent < concurrent) { + max_concurrent = concurrent; + } + + wait_for(1ms, yield); + + --concurrent; + ++completed; + }; + + constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10}; + max_concurrent_for_each(begin(arr), end(arr), 2, null_yield, cr); + + EXPECT_EQ(0, concurrent); + EXPECT_EQ(2, max_concurrent); + EXPECT_EQ(10, completed); +} + +TEST(iterator_null_yield, sentinel) +{ + const char* begin = "hello"; + null_sentinel end; + + size_t completed = 0; + auto cr = [&completed] (char c, asio::yield_context) { ++completed; }; + max_concurrent_for_each(begin, end, 10, null_yield, cr); + EXPECT_EQ(completed, 5); +} + +TEST(range_null_yield, empty) +{ + constexpr std::array arr{}; + auto cr = [] (int, asio::yield_context) {}; + max_concurrent_for_each(arr, 10, null_yield, cr); +} + +TEST(range_null_yield, over_limit) +{ + int concurrent = 0; + int max_concurrent = 0; + int completed = 0; + + auto cr = [&] (int, asio::yield_context yield) { + ++concurrent; + if (max_concurrent < concurrent) { + max_concurrent = concurrent; + } + + wait_for(1ms, yield); + + --concurrent; + ++completed; + }; + + constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10}; + max_concurrent_for_each(arr, 2, null_yield, cr); + + EXPECT_EQ(0, concurrent); + EXPECT_EQ(2, max_concurrent); + EXPECT_EQ(10, completed); +} + + +TEST(iterator_yield, empty) +{ + int* end = nullptr; + auto cr = [] (int, asio::yield_context) {}; + + asio::io_context ctx; + asio::spawn(ctx, [&] (asio::yield_context yield) { + max_concurrent_for_each(end, end, 10, yield, cr); + }, rethrow); + ctx.run(); +} + +TEST(iterator_yield, over_limit) +{ + int concurrent = 0; + int max_concurrent = 0; + int completed = 0; + + auto cr = [&] (int, asio::yield_context yield) { + ++concurrent; + if (max_concurrent < concurrent) { + max_concurrent = concurrent; + } + + wait_for(1ms, yield); + + --concurrent; + ++completed; + }; + + asio::io_context ctx; + asio::spawn(ctx, [&] (asio::yield_context yield) { + constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10}; + max_concurrent_for_each(begin(arr), end(arr), 2, yield, cr); + }, rethrow); + ctx.run(); + + EXPECT_EQ(0, concurrent); + EXPECT_EQ(2, max_concurrent); + EXPECT_EQ(10, completed); +} + +TEST(iterator_yield, sentinel) +{ + const char* begin = "hello"; + null_sentinel end; + + size_t completed = 0; + auto cr = [&completed] (char c, asio::yield_context) { ++completed; }; + + asio::io_context ctx; + asio::spawn(ctx, [&] (asio::yield_context yield) { + max_concurrent_for_each(begin, end, 10, yield, cr); + }, rethrow); + ctx.run(); + + EXPECT_EQ(completed, 5); +} + +TEST(range_yield, empty) +{ + constexpr std::array arr{}; + auto cr = [] (int, asio::yield_context) {}; + + asio::io_context ctx; + asio::spawn(ctx, [&] (asio::yield_context yield) { + max_concurrent_for_each(arr, 10, yield, cr); + }, rethrow); + ctx.run(); +} + +TEST(range_yield, over_limit) +{ + int concurrent = 0; + int max_concurrent = 0; + int completed = 0; + + auto cr = [&] (int, asio::yield_context yield) { + ++concurrent; + if (max_concurrent < concurrent) { + max_concurrent = concurrent; + } + + wait_for(1ms, yield); + + --concurrent; + ++completed; + }; + + asio::io_context ctx; + asio::spawn(ctx, [&] (asio::yield_context yield) { + constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10}; + max_concurrent_for_each(arr, 2, yield, cr); + }, rethrow); + ctx.run(); + + EXPECT_EQ(0, concurrent); + EXPECT_EQ(2, max_concurrent); + EXPECT_EQ(10, completed); +} + +} // namespace ceph::async -- 2.39.5