From 7e5d28f442375ecd3e13cccf5cb8c097f7498b80 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 5 Feb 2023 17:08:49 -0500 Subject: [PATCH] common/async: add parallel_for_each() algorithm Signed-off-by: Casey Bodley --- src/common/async/parallel_for_each.h | 86 ++++++ src/test/common/CMakeLists.txt | 4 + .../common/test_async_parallel_for_each.cc | 258 ++++++++++++++++++ 3 files changed, 348 insertions(+) create mode 100644 src/common/async/parallel_for_each.h create mode 100644 src/test/common/test_async_parallel_for_each.cc diff --git a/src/common/async/parallel_for_each.h b/src/common/async/parallel_for_each.h new file mode 100644 index 0000000000000..cb4970378e3a0 --- /dev/null +++ b/src/common/async/parallel_for_each.h @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "co_spawn_group.h" + +namespace ceph::async { + +/// Call a coroutine with each element in the given range then wait for all +/// of them to complete. The first exception is rethrown to the caller. The +/// cancel_on_error option controls whether these exceptions trigger the +/// cancellation of other children. +/// +/// Example: +/// \code +/// awaitable child(task& t); +/// +/// awaitable parent(std::span tasks) +/// { +/// co_await parallel_for_each(tasks.begin(), tasks.end(), child); +/// } +/// \endcode +template , + typename VoidAwaitable = std::invoke_result_t< + VoidAwaitableFactory, Value>, + typename AwaitableT = typename VoidAwaitable::value_type, + typename AwaitableExecutor = typename VoidAwaitable::executor_type> + requires (std::input_iterator && + std::sentinel_for && + std::same_as && + boost::asio::execution::executor) +auto parallel_for_each(Iterator begin, Sentinel end, + VoidAwaitableFactory&& factory, + cancel_on_error on_error = cancel_on_error::none) + -> boost::asio::awaitable +{ + const size_t count = std::ranges::distance(begin, end); + if (!count) { + co_return; + } + auto ex = co_await boost::asio::this_coro::executor; + auto group = co_spawn_group{ex, count, on_error}; + for (Iterator i = begin; i != end; ++i) { + group.spawn(factory(*i)); + } + co_await group.wait(); +} + +/// \overload +template , + typename VoidAwaitable = std::invoke_result_t< + VoidAwaitableFactory, Value>, + typename AwaitableT = typename VoidAwaitable::value_type, + typename AwaitableExecutor = typename VoidAwaitable::executor_type> + requires (std::ranges::range && + std::same_as && + boost::asio::execution::executor) +auto parallel_for_each(Range&& range, VoidAwaitableFactory&& factory, + cancel_on_error on_error = cancel_on_error::none) + -> boost::asio::awaitable +{ + return parallel_for_each(std::begin(range), std::end(range), + std::move(factory), on_error); +} + +} // namespace ceph::async diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 96bbb35f73bc9..33ff38b932df4 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -388,6 +388,10 @@ add_executable(unittest_async_yield_waiter test_async_yield_waiter.cc) add_ceph_unittest(unittest_async_yield_waiter) target_link_libraries(unittest_async_yield_waiter ceph-common Boost::system Boost::context) +add_executable(unittest_async_parallel_for_each test_async_parallel_for_each.cc) +add_ceph_unittest(unittest_async_parallel_for_each) +target_link_libraries(unittest_async_parallel_for_each ceph-common Boost::system) + add_executable(unittest_cdc test_cdc.cc $) target_link_libraries(unittest_cdc global ceph-common) diff --git a/src/test/common/test_async_parallel_for_each.cc b/src/test/common/test_async_parallel_for_each.cc new file mode 100644 index 0000000000000..be04221ceb55b --- /dev/null +++ b/src/test/common/test_async_parallel_for_each.cc @@ -0,0 +1,258 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/parallel_for_each.h" + +#include +#include +#include +#include +#include +#include "common/async/co_waiter.h" + +namespace ceph::async { + +namespace asio = boost::asio; +namespace errc = boost::system::errc; +using boost::system::error_code; + +using executor_type = asio::io_context::executor_type; + +template +using awaitable = asio::awaitable; + +using void_waiter = co_waiter; + +template +auto capture(std::optional& opt) +{ + return [&opt] (T value) { opt = std::move(value); }; +} + +template +auto capture(asio::cancellation_signal& signal, std::optional& opt) +{ + return asio::bind_cancellation_slot(signal.slot(), capture(opt)); +} + +TEST(parallel_for_each, empty) +{ + asio::io_context ctx; + + int* end = nullptr; + auto cr = [] (int i) -> awaitable { co_return; }; + + std::optional result; + asio::co_spawn(ctx, parallel_for_each(end, end, cr), capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(parallel_for_each, shutdown) +{ + asio::io_context ctx; + + void_waiter waiters[2]; + auto cr = [] (void_waiter& w) -> awaitable { return w.get(); }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + // shut down before any waiters complete +} + +TEST(parallel_for_each, cancel) +{ + asio::io_context ctx; + + void_waiter waiters[2]; + auto cr = [] (void_waiter& w) -> awaitable { return w.get(); }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + // cancel before any waiters complete + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(parallel_for_each, complete_shutdown) +{ + asio::io_context ctx; + + void_waiter waiters[2]; + auto cr = [] (void_waiter& w) -> awaitable { return w.get(); }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiters[0].complete(nullptr); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + // shut down before final waiter completes +} + +TEST(parallel_for_each, complete_cancel) +{ + asio::io_context ctx; + + void_waiter waiters[2]; + auto cr = [] (void_waiter& w) -> awaitable { return w.get(); }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiters[0].complete(nullptr); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + // cancel before final waiter completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(parallel_for_each, complete_complete) +{ + asio::io_context ctx; + + void_waiter waiters[2]; + auto cr = [] (void_waiter& w) -> awaitable { return w.get(); }; + + std::optional result; + asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiters[0].complete(nullptr); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiters[1].complete(nullptr); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +struct null_sentinel {}; +bool operator==(const char* c, null_sentinel) { return !*c; } +static_assert(std::sentinel_for); + +TEST(parallel_for_each, sentinel) +{ + asio::io_context ctx; + + const char* begin = "hello"; + null_sentinel end; + + size_t count = 0; + auto cr = [&count] (char c) -> awaitable { + ++count; + co_return; + }; + + std::optional result; + asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + EXPECT_EQ(count, 5); +} + +TEST(parallel_for_each, move_iterator) +{ + asio::io_context ctx; + + using value_type = std::unique_ptr; + value_type values[] = { + std::make_unique(42), + std::make_unique(43), + }; + + auto begin = std::make_move_iterator(std::begin(values)); + auto end = std::make_move_iterator(std::end(values)); + + auto cr = [] (value_type v) -> awaitable { + if (!v) { + throw std::invalid_argument("empty"); + } + co_return; + }; + + std::optional result; + asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + + EXPECT_FALSE(values[0]); + EXPECT_FALSE(values[1]); +} + +} // namespace ceph::async -- 2.39.5