From d192ca7986228360a7ff5ef4951468f9e65929f3 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 27 Jun 2024 16:53:01 -0400 Subject: [PATCH] common/async: implement max_concurrent_for_each() for awaitable Signed-off-by: Casey Bodley --- src/common/async/max_concurrent_for_each.h | 54 ++++++++- .../test_async_max_concurrent_for_each.cc | 110 ++++++++++++++++++ 2 files changed, 161 insertions(+), 3 deletions(-) diff --git a/src/common/async/max_concurrent_for_each.h b/src/common/async/max_concurrent_for_each.h index c0789a16064..c99a0fbb048 100644 --- a/src/common/async/max_concurrent_for_each.h +++ b/src/common/async/max_concurrent_for_each.h @@ -21,6 +21,7 @@ #include #include #include "cancel_on_error.h" +#include "co_throttle.h" #include "yield_context.h" #include "spawn_throttle.h" @@ -54,8 +55,7 @@ void max_concurrent_for_each(Iterator begin, Func&& func, cancel_on_error on_error = cancel_on_error::none) { - const size_t count = std::ranges::distance(begin, end); - if (!count) { + if (begin == end) { return; } auto throttle = spawn_throttle{y, max_concurrent, on_error}; @@ -84,6 +84,54 @@ auto max_concurrent_for_each(Range&& range, on_error); } -// TODO: overloads for co_spawn() +// \overload +template , + typename VoidAwaitable = std::invoke_result_t< + VoidAwaitableFactory, Value>, + typename AwaitableT = typename VoidAwaitable::value_type, + typename AwaitableExecutor = typename VoidAwaitable::executor_type> + requires (std::input_iterator && + std::sentinel_for && + std::same_as && + boost::asio::execution::executor) +auto max_concurrent_for_each(Iterator begin, + Sentinel end, + size_t max_concurrent, + VoidAwaitableFactory&& factory, + cancel_on_error on_error = cancel_on_error::none) + -> boost::asio::awaitable +{ + if (begin == end) { + co_return; + } + auto ex = co_await boost::asio::this_coro::executor; + auto throttle = co_throttle{ex, max_concurrent, on_error}; + for (Iterator i = begin; i != end; ++i) { + co_await throttle.spawn(factory(*i)); + } + co_await throttle.wait(); +} + +/// \overload +template , + typename VoidAwaitable = std::invoke_result_t< + VoidAwaitableFactory, Value>, + typename AwaitableT = typename VoidAwaitable::value_type, + typename AwaitableExecutor = typename VoidAwaitable::executor_type> + requires (std::ranges::range && + std::same_as && + boost::asio::execution::executor) +auto max_concurrent_for_each(Range&& range, + size_t max_concurrent, + VoidAwaitableFactory&& factory, + cancel_on_error on_error = cancel_on_error::none) + -> boost::asio::awaitable +{ + return max_concurrent_for_each( + std::begin(range), std::end(range), max_concurrent, + std::forward(factory), on_error); +} } // namespace ceph::async diff --git a/src/test/common/test_async_max_concurrent_for_each.cc b/src/test/common/test_async_max_concurrent_for_each.cc index 2e6f919a394..b0880dfdb85 100644 --- a/src/test/common/test_async_max_concurrent_for_each.cc +++ b/src/test/common/test_async_max_concurrent_for_each.cc @@ -39,6 +39,12 @@ void wait_for(std::chrono::milliseconds dur, asio::yield_context yield) timer.async_wait(yield); } +asio::awaitable wait_for(std::chrono::milliseconds dur) +{ + auto timer = asio::steady_timer{co_await asio::this_coro::executor, dur}; + co_await timer.async_wait(asio::use_awaitable); +} + struct null_sentinel {}; bool operator==(const char* c, null_sentinel) { return !*c; } static_assert(std::sentinel_for); @@ -222,4 +228,108 @@ TEST(range_yield, over_limit) EXPECT_EQ(10, completed); } +TEST(iterator_co, empty) +{ + int* end = nullptr; + auto cr = [] (int) -> asio::awaitable { co_return; }; + + asio::io_context ctx; + asio::co_spawn(ctx, [&] () -> asio::awaitable { + co_await max_concurrent_for_each(end, end, 10, cr); + }, rethrow); + ctx.run(); +} + +TEST(iterator_co, over_limit) +{ + int concurrent = 0; + int max_concurrent = 0; + int completed = 0; + + auto cr = [&] (int) -> asio::awaitable { + ++concurrent; + if (max_concurrent < concurrent) { + max_concurrent = concurrent; + } + + co_await wait_for(1ms); + + --concurrent; + ++completed; + }; + + asio::io_context ctx; + asio::co_spawn(ctx, [&] () -> asio::awaitable { + constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10}; + co_await max_concurrent_for_each(begin(arr), end(arr), 2, cr); + }, rethrow); + ctx.run(); + + EXPECT_EQ(0, concurrent); + EXPECT_EQ(2, max_concurrent); + EXPECT_EQ(10, completed); +} + +TEST(iterator_co, sentinel) +{ + const char* begin = "hello"; + null_sentinel end; + + size_t completed = 0; + auto cr = [&completed] (char c) -> asio::awaitable { + ++completed; + co_return; + }; + + asio::io_context ctx; + asio::co_spawn(ctx, [&] () -> asio::awaitable { + co_await max_concurrent_for_each(begin, end, 10, cr); + }, rethrow); + ctx.run(); + + EXPECT_EQ(completed, 5); +} + +TEST(range_co, empty) +{ + constexpr std::array arr{}; + auto cr = [] (int) -> asio::awaitable { co_return; }; + + asio::io_context ctx; + asio::co_spawn(ctx, [&] () -> asio::awaitable { + co_await max_concurrent_for_each(arr, 10, cr); + }, rethrow); + ctx.run(); +} + +TEST(range_co, over_limit) +{ + int concurrent = 0; + int max_concurrent = 0; + int completed = 0; + + auto cr = [&] (int) -> asio::awaitable { + ++concurrent; + if (max_concurrent < concurrent) { + max_concurrent = concurrent; + } + + co_await wait_for(1ms); + + --concurrent; + ++completed; + }; + + asio::io_context ctx; + asio::co_spawn(ctx, [&] () -> asio::awaitable { + constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10}; + co_await max_concurrent_for_each(arr, 2, cr); + }, rethrow); + ctx.run(); + + EXPECT_EQ(0, concurrent); + EXPECT_EQ(2, max_concurrent); + EXPECT_EQ(10, completed); +} + } // namespace ceph::async -- 2.39.5