From: Casey Bodley Date: Sun, 5 Feb 2023 15:10:34 +0000 (-0500) Subject: common/async: add spawn_group template for fork-join parallelism X-Git-Tag: v20.3.0~169^2~20 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a47ddbd6a8601b1d6ad0bd5896574e2b99541658;p=ceph.git common/async: add spawn_group template for fork-join parallelism Signed-off-by: Casey Bodley --- diff --git a/src/common/async/detail/spawn_group.h b/src/common/async/detail/spawn_group.h new file mode 100644 index 0000000000000..c121b7d209b03 --- /dev/null +++ b/src/common/async/detail/spawn_group.h @@ -0,0 +1,176 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "common/async/cancel_on_error.h" +#include "common/async/co_waiter.h" +#include "common/async/service.h" +#include "include/scope_guard.h" + +namespace ceph::async::detail { + +template +class spawn_group_impl; + +// A cancellable co_spawn() completion handler that notifies the spawn_group +// upon completion. This holds a reference to the implementation in order to +// extend its lifetime. This is required for per-op cancellation because the +// cancellation_signals must outlive these coroutine frames. +template +class spawn_group_handler { + using impl_type = spawn_group_impl; + using size_type = typename impl_type::size_type; + boost::intrusive_ptr impl; + boost::asio::cancellation_slot slot; + size_type index; + public: + spawn_group_handler(boost::intrusive_ptr impl, + boost::asio::cancellation_slot slot, size_type index) + : impl(std::move(impl)), slot(std::move(slot)), index(index) + {} + + using executor_type = typename impl_type::executor_type; + executor_type get_executor() const noexcept + { + return impl->get_executor(); + } + + using cancellation_slot_type = boost::asio::cancellation_slot; + cancellation_slot_type get_cancellation_slot() const noexcept + { + return slot; + } + + void operator()(std::exception_ptr eptr) + { + impl->child_complete(index, eptr); + } +}; + +// Reference-counted spawn group implementation. +template +class spawn_group_impl : + public boost::intrusive_ref_counter, + boost::thread_unsafe_counter>, + public service_list_base_hook +{ + public: + using size_type = uint16_t; + + spawn_group_impl(Executor ex, size_type limit, + cancel_on_error on_error) + : svc(boost::asio::use_service>( + boost::asio::query(ex, boost::asio::execution::context))), + ex(ex), + signals(std::make_unique(limit)), + limit(limit), on_error(on_error) + { + // register for service_shutdown() notifications + svc.add(*this); + } + ~spawn_group_impl() + { + svc.remove(*this); + } + + using executor_type = Executor; + executor_type get_executor() const noexcept + { + return ex; + } + + spawn_group_handler completion() + { + if (spawned >= limit) { + throw std::length_error("spawn group maximum size exceeded"); + } + const size_type index = spawned++; + return {boost::intrusive_ptr{this}, signals[index].slot(), index}; + } + + void child_complete(size_type index, std::exception_ptr e) + { + if (e) { + if (!eptr) { + eptr = e; + } + if (on_error == cancel_on_error::all) { + cancel_from(0); + } else if (on_error == cancel_on_error::after) { + cancel_from(index + 1); + } + } + if (++completed == spawned) { + complete(); + } + } + + boost::asio::awaitable wait() + { + if (completed < spawned) { + co_await waiter.get(); + } + + // clear for reuse + completed = 0; + spawned = 0; + + if (eptr) { + std::rethrow_exception(std::exchange(eptr, nullptr)); + } + } + + void cancel() + { + cancel_from(0); + } + + void service_shutdown() + { + waiter.shutdown(); + } + + private: + service& svc; + co_waiter waiter; + executor_type ex; + std::unique_ptr signals; + std::exception_ptr eptr; + const size_type limit; + size_type spawned = 0; + size_type completed = 0; + const cancel_on_error on_error; + + void cancel_from(size_type begin) + { + for (size_type i = begin; i < spawned; i++) { + signals[i].emit(boost::asio::cancellation_type::terminal); + } + } + + void complete() + { + if (waiter.waiting()) { + waiter.complete(nullptr); + } + } +}; + +} // namespace ceph::async::detail diff --git a/src/common/async/spawn_group.h b/src/common/async/spawn_group.h new file mode 100644 index 0000000000000..dbe29535b11ac --- /dev/null +++ b/src/common/async/spawn_group.h @@ -0,0 +1,131 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include "cancel_on_error.h" +#include "detail/spawn_group.h" + +namespace ceph::async { + +/// \brief Tracks a group of coroutines to await all of their completions. +/// +/// This class functions as a CompletionToken for calls to co_spawn(), attaching +/// a handler that notifies the group upon the child coroutine's completion. +/// +/// The wait() function can be used to await the completion of all children. +/// If any child coroutines exit with an exception, the first such exception +/// is rethrown by wait(). The cancel_on_error option controls whether these +/// exceptions trigger the cancellation of other children. +/// +/// All child coroutines are canceled by cancel() or spawn_group destruction. +/// This allows the parent coroutine to share memory with its child coroutines +/// without fear of dangling references. +/// +/// This class is not thread-safe, so a strand executor should be used in +/// multi-threaded contexts. +/// +/// Example: +/// \code +/// awaitable child(task& t); +/// +/// awaitable parent(std::span tasks) +/// { +/// // process all tasks in parallel +/// auto ex = co_await boost::asio::this_coro::executor; +/// auto group = spawn_group{ex, tasks.size()}; +/// +/// for (auto& t : tasks) { +/// boost::asio::co_spawn(ex, child(t), group); +/// } +/// co_await group.wait(); +/// } +/// \endcode +template +class spawn_group { + using impl_type = detail::spawn_group_impl; + boost::intrusive_ptr impl; + + public: + spawn_group(Executor ex, size_t limit, + cancel_on_error on_error = cancel_on_error::none) + : impl(new impl_type(ex, limit, on_error)) + { + } + + ~spawn_group() + { + impl->cancel(); + } + + using executor_type = Executor; + executor_type get_executor() const + { + return impl->get_executor(); + } + + /// Return a cancellable co_spawn() completion handler with signature + /// void(std::exception_ptr). Throws a std::length_error exception if the + /// number of outstanding completion handlers would exceed the group's limit. + /// + /// As a convenience, you can avoid calling this function by using the + /// spawn_group itself as a CompletionToken for co_spawn(). + auto completion() + { + return impl->completion(); + } + + /// Wait for all outstanding completion handlers before returning. If any + /// of the spawned coroutines exit with an exception, the first exception + /// is rethrown. + /// + /// After wait() completes, whether by exception or co_return, the spawn + /// group can be reused to spawn and await additional coroutines. + boost::asio::awaitable wait() + { + return impl->wait(); + } + + /// Cancel all outstanding coroutines. + void cancel() + { + impl->cancel(); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// Allow spawn_group to be used as a CompletionToken. +template +struct async_result, Signature> +{ + using completion_handler_type = + ceph::async::detail::spawn_group_handler; + async_result(completion_handler_type&) {} + + using return_type = void; + return_type get() {} + + template + static return_type initiate(Initiation&& init, + ceph::async::spawn_group& group, + Args&& ...args) + { + return std::move(init)(group.completion(), std::forward(args)...); + } +}; + +} // namespace boost::asio diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 7182904ea9a29..953b8a29c4293 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -400,6 +400,7 @@ add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc) add_ceph_unittest(unittest_async_shared_mutex) target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system) + add_executable(unittest_async_spawn_throttle test_async_spawn_throttle.cc) add_ceph_unittest(unittest_async_spawn_throttle) target_link_libraries(unittest_async_spawn_throttle ceph-common Boost::system Boost::context) @@ -408,10 +409,15 @@ add_executable(unittest_async_yield_waiter test_async_yield_waiter.cc) add_ceph_unittest(unittest_async_yield_waiter) target_link_libraries(unittest_async_yield_waiter ceph-common Boost::system Boost::context) + add_executable(unittest_async_parallel_for_each test_async_parallel_for_each.cc) add_ceph_unittest(unittest_async_parallel_for_each) target_link_libraries(unittest_async_parallel_for_each ceph-common Boost::system) +add_executable(unittest_async_spawn_group test_async_spawn_group.cc) +add_ceph_unittest(unittest_async_spawn_group) +target_link_libraries(unittest_async_spawn_group ceph-common Boost::system) + add_executable(unittest_cdc test_cdc.cc $) target_link_libraries(unittest_cdc global ceph-common) diff --git a/src/test/common/test_async_spawn_group.cc b/src/test/common/test_async_spawn_group.cc new file mode 100644 index 0000000000000..2810c68d8b35e --- /dev/null +++ b/src/test/common/test_async_spawn_group.cc @@ -0,0 +1,471 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/spawn_group.h" + +#include +#include +#include +#include +#include +#include "common/async/co_waiter.h" + +namespace ceph::async { + +namespace asio = boost::asio; +namespace errc = boost::system::errc; +using boost::system::error_code; + +using executor_type = asio::io_context::executor_type; + +template +using awaitable = asio::awaitable; + +template +auto capture(std::optional& opt) +{ + return [&opt] (T value) { opt = std::move(value); }; +} + +template +auto capture(asio::cancellation_signal& signal, std::optional& opt) +{ + return asio::bind_cancellation_slot(signal.slot(), capture(opt)); +} + +TEST(spawn_group, spawn_limit) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 1}; + + auto cr = [] () -> awaitable { co_return; }; + + asio::co_spawn(ex, cr(), group); + EXPECT_THROW(asio::co_spawn(ex, cr(), group), std::length_error); +} + +TEST(spawn_group, wait_empty) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 1}; + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(spawn_group, spawn_shutdown) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 10}; + + co_waiter waiter; + asio::co_spawn(ex, waiter.get(), group); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + // shut down before wait() +} + +TEST(spawn_group, spawn_wait) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 10}; + + co_waiter waiter; + asio::co_spawn(ex, waiter.get(), group); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter.complete(nullptr); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(spawn_group, spawn_wait_shutdown) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + + co_waiter waiter; + auto cr = [ex, &waiter] () -> awaitable { + auto group = spawn_group{ex, 1}; + asio::co_spawn(ex, waiter.get(), group); + co_await group.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + // shut down before wait() completes +} + +TEST(spawn_group, spawn_wait_cancel) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + + co_waiter waiter; + auto cr = [ex, &waiter] () -> awaitable { + auto group = spawn_group{ex, 1}; + asio::co_spawn(ex, waiter.get(), group); + co_await group.wait(); + }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ex, cr(), capture(signal, result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + // cancel before wait() completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(spawn_group, spawn_wait_exception_order) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 2}; + + co_waiter waiter1; + asio::co_spawn(ex, waiter1.get(), group); + + co_waiter waiter2; + asio::co_spawn(ex, waiter2.get(), group); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter1.complete(std::make_exception_ptr(std::logic_error{"oops"})); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +TEST(spawn_group, spawn_complete_wait) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 2}; + + co_waiter waiter; + asio::co_spawn(ex, waiter.get(), group); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop + ctx.restart(); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +TEST(spawn_group, spawn_wait_wait) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 1}; + + co_waiter waiter; + asio::co_spawn(ex, waiter.get(), group); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); + + result.reset(); + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.restart(); + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(spawn_group, spawn_wait_spawn_wait) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 1}; + + co_waiter waiter; + asio::co_spawn(ex, waiter.get(), group); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter.complete(nullptr); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_FALSE(*result); + + asio::co_spawn(ex, waiter.get(), group); + + result.reset(); + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.restart(); + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter.complete(nullptr); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(spawn_group, spawn_cancel_wait_spawn_wait) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 1}; + + co_waiter waiter; + asio::co_spawn(ex, waiter.get(), group); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + group.cancel(); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // no waiter means ctx can stop + ctx.restart(); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } + + asio::co_spawn(ex, waiter.get(), group); + + result.reset(); + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.restart(); + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter.complete(nullptr); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(spawn_group, spawn_wait_cancel_spawn_wait) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 1}; + + co_waiter waiter; + asio::co_spawn(ex, waiter.get(), group); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + // cancel before waiter completes + group.cancel(); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } + + asio::co_spawn(ex, waiter.get(), group); + + result.reset(); + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.restart(); + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter.complete(nullptr); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(spawn_group, cancel_on_error_after) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 3, cancel_on_error::after}; + + co_waiter waiter1; + asio::co_spawn(ex, waiter1.get(), group); + + co_waiter waiter2; + asio::co_spawn(ex, waiter2.get(), group); + + co_waiter waiter3; + asio::co_spawn(ex, waiter3.get(), group); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter1.complete(nullptr); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +TEST(spawn_group, cancel_on_error_all) +{ + asio::io_context ctx; + auto ex = ctx.get_executor(); + auto group = spawn_group{ex, 3, cancel_on_error::all}; + + co_waiter waiter1; + asio::co_spawn(ex, waiter1.get(), group); + + co_waiter waiter2; + asio::co_spawn(ex, waiter2.get(), group); + + co_waiter waiter3; + asio::co_spawn(ex, waiter3.get(), group); + + std::optional result; + asio::co_spawn(ex, group.wait(), capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +} // namespace ceph::async