From 0da024f92813323b58048947172f7d499701c55d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 8 May 2024 12:42:42 -0400 Subject: [PATCH] common/async: add spawn_throttle for bounded concurrency with optional_yield a primitive for structured concurrency with stackful coroutines from boost::asio::spawn(). this relies on spawn()'s support for per-op cancellation to guarantee that the lifetime of child coroutines won't exceed the lifetime of their spawn_throttle, making it safe for children to access memory from their parent's stack by taking optional_yield in the constructor, spawn_throttle transparently supports synchronous execution (where optional_yield is empty) and asynchronous execution within a stackful coroutine (where optional_yield contains the parent's yield_context) Signed-off-by: Casey Bodley --- src/common/async/detail/spawn_throttle_impl.h | 360 +++++++++ src/common/async/spawn_throttle.h | 146 ++++ src/test/common/CMakeLists.txt | 4 + src/test/common/test_async_spawn_throttle.cc | 751 ++++++++++++++++++ 4 files changed, 1261 insertions(+) create mode 100644 src/common/async/detail/spawn_throttle_impl.h create mode 100644 src/common/async/spawn_throttle.h create mode 100644 src/test/common/test_async_spawn_throttle.cc diff --git a/src/common/async/detail/spawn_throttle_impl.h b/src/common/async/detail/spawn_throttle_impl.h new file mode 100644 index 00000000000..9030f266233 --- /dev/null +++ b/src/common/async/detail/spawn_throttle_impl.h @@ -0,0 +1,360 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/async/cancel_on_error.h" +#include "common/async/service.h" +#include "common/async/yield_context.h" + +namespace ceph::async::detail { + +struct spawn_throttle_handler; + +// Reference-counted spawn throttle interface. +class spawn_throttle_impl : + public boost::intrusive_ref_counter +{ + public: + spawn_throttle_impl(size_t limit, cancel_on_error on_error) + : limit(limit), on_error(on_error), + children(std::make_unique(limit)) + { + // initialize the free list + for (size_t i = 0; i < limit; i++) { + free.push_back(children[i]); + } + } + virtual ~spawn_throttle_impl() {} + + // factory function + static auto create(optional_yield y, size_t limit, cancel_on_error on_error) + -> boost::intrusive_ptr; + + // return the completion handler for a new child. may block due to throttling + // or rethrow an exception from a previously-spawned child + spawn_throttle_handler get(); + + // track each spawned coroutine for cancellation. these are stored in an + // array, and recycled after each use via the free list + struct child : boost::intrusive::list_base_hook<> { + std::optional signal; + }; + + using executor_type = boost::asio::any_io_executor; + virtual executor_type get_executor() = 0; + + // wait until count <= target_count + virtual void wait_for(size_t target_count) = 0; + + // cancel outstanding coroutines + virtual void cancel(bool shutdown) + { + cancel_outstanding_from(outstanding.begin()); + } + + // complete the given child coroutine + virtual void on_complete(child& c, std::exception_ptr eptr) + { + --count; + + // move back to the free list + auto next = outstanding.erase(outstanding.iterator_to(c)); + c.signal.reset(); + free.push_back(c); + + if (eptr && !unreported_exception) { + // hold on to the first child exception until we can report it in wait() + // or completion() + unreported_exception = eptr; + + // handle cancel_on_error + auto cancel_from = outstanding.end(); + if (on_error == cancel_on_error::after) { + cancel_from = next; + } else if (on_error == cancel_on_error::all) { + cancel_from = outstanding.begin(); + } + cancel_outstanding_from(cancel_from); + } + } + + protected: + const size_t limit; + const cancel_on_error on_error; + size_t count = 0; + + void report_exception() + { + if (unreported_exception) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); + } + } + + private: + std::exception_ptr unreported_exception; + std::unique_ptr children; + + using child_list = boost::intrusive::list>; + child_list outstanding; + child_list free; + + void cancel_outstanding_from(child_list::iterator i) + { + while (i != outstanding.end()) { + // increment before cancellation, which may invoke on_complete() + // directly and remove the child from this list + child& c = *i++; + c.signal->emit(boost::asio::cancellation_type::terminal); + } + } +}; + +// A cancellable spawn() completion handler that notifies the spawn_throttle +// upon completion. This holds a reference to the implementation in order to +// extend its lifetime. This is required for per-op cancellation because the +// cancellation_signals must outlive these coroutine stacks. +struct spawn_throttle_handler { + boost::intrusive_ptr impl; + spawn_throttle_impl::child& c; + boost::asio::cancellation_slot slot; + + spawn_throttle_handler(boost::intrusive_ptr impl, + spawn_throttle_impl::child& c) + : impl(std::move(impl)), c(c), slot(c.signal->slot()) + {} + + using executor_type = spawn_throttle_impl::executor_type; + executor_type get_executor() const noexcept + { + return impl->get_executor(); + } + + using cancellation_slot_type = boost::asio::cancellation_slot; + cancellation_slot_type get_cancellation_slot() const noexcept + { + return slot; + } + + void operator()(std::exception_ptr eptr) + { + impl->on_complete(c, eptr); + } +}; + +spawn_throttle_handler spawn_throttle_impl::get() +{ + report_exception(); // throw unreported exception + + if (count >= limit) { + wait_for(limit - 1); + } + + ++count; + + // move a free child to the outstanding list + child& c = free.front(); + free.pop_front(); + outstanding.push_back(c); + + // spawn the coroutine with its associated cancellation signal + c.signal.emplace(); + return {this, c}; +} + + +// Spawn throttle implementation for use in synchronous contexts where wait() +// blocks the calling thread until completion. +class sync_spawn_throttle_impl final : public spawn_throttle_impl { + static constexpr int concurrency = 1; // only run from a single thread + public: + sync_spawn_throttle_impl(size_t limit, cancel_on_error on_error) + : spawn_throttle_impl(limit, on_error), + ctx(std::in_place, concurrency) + {} + + executor_type get_executor() override + { + return ctx->get_executor(); + } + + void wait_for(size_t target_count) override + { + while (count > target_count) { + if (ctx->stopped()) { + ctx->restart(); + } + ctx->run_one(); + } + + report_exception(); // throw unreported exception + } + + void cancel(bool shutdown) override + { + spawn_throttle_impl::cancel(shutdown); + + if (shutdown) { + // destroy the io_context to trigger two-phase shutdown which + // destroys any completion handlers with a reference to 'this' + ctx.reset(); + count = 0; + } + } + + private: + std::optional ctx; +}; + +// Spawn throttle implementation for use in asynchronous contexts where wait() +// suspends the calling stackful coroutine. +class async_spawn_throttle_impl final : + public spawn_throttle_impl, + public service_list_base_hook +{ + public: + async_spawn_throttle_impl(boost::asio::yield_context yield, + size_t limit, cancel_on_error on_error) + : spawn_throttle_impl(limit, on_error), + svc(boost::asio::use_service>( + boost::asio::query(yield.get_executor(), + boost::asio::execution::context))), + yield(yield) + { + // register for service_shutdown() notifications + svc.add(*this); + } + + ~async_spawn_throttle_impl() + { + svc.remove(*this); + } + + executor_type get_executor() override + { + return yield.get_executor(); + } + + void service_shutdown() + { + waiter.reset(); + } + + private: + service& svc; + boost::asio::yield_context yield; + + using WaitSignature = void(boost::system::error_code); + struct wait_state { + using Work = boost::asio::executor_work_guard< + boost::asio::any_io_executor>; + using Handler = typename boost::asio::async_result< + boost::asio::yield_context, WaitSignature>::handler_type; + + Work work; + Handler handler; + + explicit wait_state(Handler&& h) + : work(make_work_guard(h)), + handler(std::move(h)) + {} + }; + std::optional waiter; + size_t wait_for_count = 0; + + struct op_cancellation { + async_spawn_throttle_impl* self; + explicit op_cancellation(async_spawn_throttle_impl* self) noexcept + : self(self) {} + void operator()(boost::asio::cancellation_type type) { + if (type != boost::asio::cancellation_type::none) { + self->cancel(false); + } + } + }; + + void wait_for(size_t target_count) override + { + if (count > target_count) { + wait_for_count = target_count; + + boost::asio::async_initiate( + [this] (auto handler) { + auto slot = get_associated_cancellation_slot(handler); + if (slot.is_connected()) { + slot.template emplace(this); + } + waiter.emplace(std::move(handler)); + }, yield); + // this is a coroutine, so the wait has completed by this point + } + + report_exception(); // throw unreported exception + } + + void wait_complete(boost::system::error_code ec) + { + auto w = std::move(*waiter); + waiter.reset(); + boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec)); + } + + void on_complete(child& c, std::exception_ptr eptr) override + { + spawn_throttle_impl::on_complete(c, eptr); + + if (waiter && count <= wait_for_count) { + wait_complete({}); + } + } + + void cancel(bool shutdown) override + { + spawn_throttle_impl::cancel(shutdown); + + if (waiter) { + wait_complete(make_error_code(boost::asio::error::operation_aborted)); + } + } +}; + +auto spawn_throttle_impl::create(optional_yield y, size_t limit, + cancel_on_error on_error) + -> boost::intrusive_ptr +{ + if (y) { + auto yield = y.get_yield_context(); + return new async_spawn_throttle_impl(yield, limit, on_error); + } else { + return new sync_spawn_throttle_impl(limit, on_error); + } +} + +} // namespace ceph::async::detail diff --git a/src/common/async/spawn_throttle.h b/src/common/async/spawn_throttle.h new file mode 100644 index 00000000000..02e07ca3deb --- /dev/null +++ b/src/common/async/spawn_throttle.h @@ -0,0 +1,146 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "detail/spawn_throttle_impl.h" + +#include +#include "cancel_on_error.h" +#include "yield_context.h" + +namespace ceph::async { + +/// A coroutine throttle that allows a thread of execution to spawn and manage +/// multiple child coroutines, while enforcing an upper bound on concurrency. +/// The parent may either be a synchronous function or a stackful coroutine, +/// depending on the optional_yield constructor argument. +/// +/// Child coroutines are spawned by calling boost::asio::spawn() and using the +/// spawn_throttle object as the CompletionToken argument. Exceptions thrown +/// by children are reported to the caller on its next call to get() or wait(). +/// The cancel_on_error option controls whether these exceptions trigger the +/// cancellation of other children. +/// +/// All child coroutines are canceled by cancel() or spawn_throttle destruction. +/// This allows a parent function to share memory with its child coroutines +/// without fear of dangling references. +/// +/// This class is not thread-safe. Member functions should be called from the +/// parent thread of execution only. +/// +/// Example: +/// @code +/// void child(boost::asio::yield_context yield); +/// +/// void parent(size_t count, optional_yield y) +/// { +/// // spawn all children, up to 10 at a time +/// auto throttle = ceph::async::spawn_throttle{y, 10}; +/// +/// for (size_t i = 0; i < count; i++) { +/// boost::asio::spawn(throttle.get_executor(), child, throttle); +/// } +/// throttle.wait(); +/// } +/// @endcode +class spawn_throttle { + using impl_type = detail::spawn_throttle_impl; + boost::intrusive_ptr impl; + + public: + spawn_throttle(optional_yield y, size_t limit, + cancel_on_error on_error = cancel_on_error::none) + : impl(detail::spawn_throttle_impl::create(y, limit, on_error)) + {} + + spawn_throttle(spawn_throttle&&) = default; + spawn_throttle& operator=(spawn_throttle&&) = default; + // disable copy for unique ownership + spawn_throttle(const spawn_throttle&) = delete; + spawn_throttle& operator=(const spawn_throttle&) = delete; + + /// Cancel outstanding coroutines on destruction. + ~spawn_throttle() + { + if (impl) { + impl->cancel(true); + } + } + + using executor_type = impl_type::executor_type; + executor_type get_executor() + { + return impl->get_executor(); + } + + /// Return a cancellable spawn() completion handler with signature + /// void(std::exception_ptr). + /// + /// This function may block until a throttle unit becomes available. If one or + /// more previously-spawned coroutines exit with an exception, the first such + /// exception is rethrown here. + /// + /// As a convenience, you can avoid calling this function by using the + /// spawn_throttle itself as a CompletionToken for spawn(). + auto get() + -> detail::spawn_throttle_handler + { + return impl->get(); + } + + /// Wait for all outstanding completions before returning. If any + /// of the spawned coroutines exits with an exception, the first exception + /// is rethrown. + /// + /// After wait() completes, whether successfully or by exception, the yield + /// throttle can be reused to spawn and await additional coroutines. + void wait() + { + impl->wait_for(0); + } + + /// Cancel all outstanding coroutines. + void cancel() + { + impl->cancel(false); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// Allow spawn_throttle to be used as a CompletionToken. +template +struct async_result +{ + using completion_handler_type = + ceph::async::detail::spawn_throttle_handler; + async_result(completion_handler_type&) {} + + using return_type = void; + return_type get() {} + + template + static return_type initiate(Initiation&& init, + ceph::async::spawn_throttle& throttle, + Args&& ...args) + { + return std::move(init)(throttle.get(), std::forward(args)...); + } +}; + +} // namespace boost::asio diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 08f77a03894..d54ea127cc9 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -361,6 +361,10 @@ add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc) add_ceph_unittest(unittest_async_shared_mutex) target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system) +add_executable(unittest_async_spawn_throttle test_async_spawn_throttle.cc) +add_ceph_unittest(unittest_async_spawn_throttle) +target_link_libraries(unittest_async_spawn_throttle ceph-common Boost::system Boost::context) + add_executable(unittest_async_yield_waiter test_async_yield_waiter.cc) add_ceph_unittest(unittest_async_yield_waiter) target_link_libraries(unittest_async_yield_waiter ceph-common Boost::system Boost::context) diff --git a/src/test/common/test_async_spawn_throttle.cc b/src/test/common/test_async_spawn_throttle.cc new file mode 100644 index 00000000000..6f306a13918 --- /dev/null +++ b/src/test/common/test_async_spawn_throttle.cc @@ -0,0 +1,751 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/spawn_throttle.h" + +#include +#include +#include +#include +#include +#include +#include +#include "common/async/yield_waiter.h" + +namespace ceph::async { + +namespace asio = boost::asio; +using error_code = boost::system::error_code; + +void rethrow(std::exception_ptr eptr) +{ + if (eptr) std::rethrow_exception(eptr); +} + +auto capture(std::optional& eptr) +{ + return [&eptr] (std::exception_ptr e) { eptr = e; }; +} + +auto capture(asio::cancellation_signal& signal, + std::optional& eptr) +{ + return asio::bind_cancellation_slot(signal.slot(), capture(eptr)); +} + +using namespace std::chrono_literals; + +void wait_for(std::chrono::milliseconds dur, asio::yield_context yield) +{ + auto timer = asio::steady_timer{yield.get_executor(), dur}; + timer.async_wait(yield); +} + +auto wait_for(std::chrono::milliseconds dur) +{ + return [dur] (asio::yield_context yield) { wait_for(dur, yield); }; +} + +auto wait_on(yield_waiter& handler) +{ + return [&handler] (asio::yield_context yield) { + handler.async_wait(yield); + }; +} + + +TEST(YieldGroupSync, wait_empty) +{ + auto throttle = spawn_throttle{null_yield, 2}; + throttle.wait(); +} + +TEST(YieldGroupSync, spawn_wait) +{ + int completed = 0; + auto cr = [&] (asio::yield_context yield) { + wait_for(1ms, yield); + ++completed; + }; + + auto throttle = spawn_throttle{null_yield, 2}; + asio::spawn(throttle.get_executor(), cr, throttle); + throttle.wait(); + + EXPECT_EQ(1, completed); +} + +TEST(YieldGroupSync, spawn_shutdown) +{ + auto throttle = spawn_throttle{null_yield, 2}; + asio::spawn(throttle.get_executor(), wait_for(1s), throttle); +} + +TEST(YieldGroupSync, spawn_cancel_wait) +{ + int completed = 0; + + auto cr = [&] (asio::yield_context yield) { + wait_for(1s, yield); + ++completed; + }; + + auto throttle = spawn_throttle{null_yield, 2}; + asio::spawn(throttle.get_executor(), cr, throttle); + throttle.cancel(); + EXPECT_THROW(throttle.wait(), boost::system::system_error); + + EXPECT_EQ(0, completed); +} + +TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait) +{ + int completed = 0; + + auto cr = [&] (asio::yield_context yield) { + wait_for(1ms, yield); + ++completed; + }; + + auto throttle = spawn_throttle{null_yield, 2}; + asio::spawn(throttle.get_executor(), cr, throttle); + throttle.cancel(); + EXPECT_THROW(throttle.wait(), boost::system::system_error); + asio::spawn(throttle.get_executor(), cr, throttle); + throttle.wait(); + + EXPECT_EQ(1, completed); +} + +TEST(YieldGroupSync, spawn_over_limit) +{ + int concurrent = 0; + int max_concurrent = 0; + int completed = 0; + + auto cr = [&] (asio::yield_context yield) { + ++concurrent; + if (max_concurrent < concurrent) { + max_concurrent = concurrent; + } + + wait_for(1ms, yield); + + --concurrent; + ++completed; + }; + + auto throttle = spawn_throttle{null_yield, 2}; + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); // blocks + asio::spawn(throttle.get_executor(), cr, throttle); // blocks + throttle.wait(); // blocks + + EXPECT_EQ(0, concurrent); + EXPECT_EQ(2, max_concurrent); + EXPECT_EQ(4, completed); +} + +TEST(YieldGroupSync, spawn_cancel_on_error_none) +{ + int completed = 0; + + auto cr = [&] (asio::yield_context yield) { + wait_for(10ms, yield); + ++completed; + }; + auto err = [] (asio::yield_context yield) { + wait_for(0ms, yield); + throw std::logic_error{"err"}; + }; + + auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none}; + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), err, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); + EXPECT_THROW(throttle.wait(), std::logic_error); + + EXPECT_EQ(3, completed); +} + +TEST(YieldGroupSync, spawn_cancel_on_error_after) +{ + int completed = 0; + + auto cr = [&] (asio::yield_context yield) { + wait_for(10ms, yield); + ++completed; + }; + auto err = [] (asio::yield_context yield) { + wait_for(0ms, yield); + throw std::logic_error{"err"}; + }; + + auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after}; + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), err, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); + EXPECT_THROW(throttle.wait(), std::logic_error); + + EXPECT_EQ(2, completed); +} + +TEST(YieldGroupSync, spawn_cancel_on_error_all) +{ + int completed = 0; + + auto cr = [&] (asio::yield_context yield) { + wait_for(1s, yield); + ++completed; + }; + auto err = [] (asio::yield_context yield) { + wait_for(0ms, yield); + throw std::logic_error{"err"}; + }; + + auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all}; + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); + asio::spawn(throttle.get_executor(), err, throttle); + asio::spawn(throttle.get_executor(), cr, throttle); + EXPECT_THROW(throttle.wait(), std::logic_error); + + EXPECT_EQ(0, completed); +} + + +TEST(YieldGroupAsync, wait_empty) +{ + asio::io_context ctx; + asio::spawn(ctx, [] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 2}; + throttle.wait(); + }, rethrow); + + ctx.run(); +} + +TEST(YieldGroupAsync, spawn_wait) +{ + asio::io_context ctx; + yield_waiter waiter; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 2}; + asio::spawn(yield, wait_on(waiter), throttle); + throttle.wait(); // blocks + }, rethrow); + + ASSERT_FALSE(waiter); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter); + + waiter.complete(error_code{}); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); +} + +TEST(YieldGroupAsync, spawn_over_limit) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + yield_waiter waiter3; + yield_waiter waiter4; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 2}; + asio::spawn(yield, wait_on(waiter1), throttle); + asio::spawn(yield, wait_on(waiter2), throttle); + asio::spawn(yield, wait_on(waiter3), throttle); // blocks + asio::spawn(yield, wait_on(waiter4), throttle); // blocks + throttle.wait(); // blocks + }, rethrow); + + ASSERT_FALSE(waiter1); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_TRUE(waiter2); + ASSERT_FALSE(waiter3); + + waiter1.complete(error_code{}); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter3); + ASSERT_FALSE(waiter4); + + waiter3.complete(error_code{}); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter4); + + waiter2.complete(error_code{}); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + waiter4.complete(error_code{}); + + ctx.poll(); + EXPECT_TRUE(ctx.stopped()); +} + +TEST(YieldGroupAsync, spawn_shutdown) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 2}; + asio::spawn(yield, wait_on(waiter1), throttle); + waiter2.async_wait(yield); // blocks + // shut down while there's an outstanding child but throttle is not + // waiting on spawn() or wait() + }, rethrow); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(waiter1); + EXPECT_TRUE(waiter2); +} + +TEST(YieldGroupAsync, spawn_throttled_shutdown) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter1), throttle); + asio::spawn(yield, wait_on(waiter2), throttle); // blocks + // shut down while we're throttled on the second spawn + }, rethrow); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(waiter1); + EXPECT_FALSE(waiter2); +} + +TEST(YieldGroupAsync, spawn_wait_shutdown) +{ + asio::io_context ctx; + yield_waiter waiter; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter), throttle); + throttle.wait(); // blocks + // shut down while we're wait()ing + }, rethrow); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(waiter); +} + +TEST(YieldGroupAsync, spawn_throttled_error) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter1), throttle); + asio::spawn(yield, wait_on(waiter2), throttle); // blocks + }, capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_FALSE(waiter2); + + waiter1.complete(make_error_code(std::errc::no_such_file_or_directory)); + + ctx.poll(); + EXPECT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_throttled_signal) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + + asio::cancellation_signal signal; + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter1), throttle); + asio::spawn(yield, wait_on(waiter2), throttle); // blocks + }, capture(signal, result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_FALSE(waiter2); + + signal.emit(boost::asio::cancellation_type::terminal); + + ctx.poll(); + EXPECT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_wait_error) +{ + asio::io_context ctx; + yield_waiter waiter; + + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter), throttle); + throttle.wait(); // blocks + }, capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter); + + waiter.complete(make_error_code(std::errc::no_such_file_or_directory)); + + ctx.poll(); + EXPECT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_wait_signal) +{ + asio::io_context ctx; + yield_waiter waiter; + + asio::cancellation_signal signal; + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter), throttle); + throttle.wait(); // blocks + }, capture(signal, result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter); + ASSERT_FALSE(result); + + signal.emit(boost::asio::cancellation_type::terminal); + + ctx.poll(); + EXPECT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_cancel_wait) +{ + asio::io_context ctx; + yield_waiter waiter; + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 2}; + asio::spawn(yield, wait_on(waiter), throttle); + throttle.cancel(); + throttle.wait(); + }, capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_cancel_on_error_none) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + yield_waiter waiter3; + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 4, cancel_on_error::none}; + asio::spawn(yield, wait_on(waiter1), throttle); + asio::spawn(yield, wait_on(waiter2), throttle); + asio::spawn(yield, wait_on(waiter3), throttle); + throttle.wait(); // blocks + }, capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_TRUE(waiter2); + ASSERT_TRUE(waiter3); + + waiter2.complete(make_error_code(std::errc::no_such_file_or_directory)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + waiter1.complete(error_code{}); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + waiter3.complete(error_code{}); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_cancel_on_error_after) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + yield_waiter waiter3; + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 4, cancel_on_error::after}; + asio::spawn(yield, wait_on(waiter1), throttle); + asio::spawn(yield, wait_on(waiter2), throttle); + asio::spawn(yield, wait_on(waiter3), throttle); + throttle.wait(); // blocks + }, capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_TRUE(waiter2); + ASSERT_TRUE(waiter3); + + waiter2.complete(make_error_code(std::errc::no_such_file_or_directory)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + // if the waiter3 cr was canceled, completing waiter1 should unblock wait() + waiter1.complete(error_code{}); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_cancel_on_error_all) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + yield_waiter waiter3; + std::optional result; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 4, cancel_on_error::all}; + asio::spawn(yield, wait_on(waiter1), throttle); + asio::spawn(yield, wait_on(waiter2), throttle); + asio::spawn(yield, wait_on(waiter3), throttle); + throttle.wait(); // blocks + }, capture(result)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_TRUE(waiter2); + ASSERT_TRUE(waiter3); + + // should cancel the other crs and unblock throttle.wait() + waiter2.complete(make_error_code(std::errc::no_such_file_or_directory)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(YieldGroupAsync, spawn_wait_spawn_wait) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter1), throttle); + throttle.wait(); // blocks + asio::spawn(yield, wait_on(waiter2), throttle); + throttle.wait(); // blocks + }, rethrow); + + ASSERT_FALSE(waiter1); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_FALSE(waiter2); + + waiter1.complete(error_code{}); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter2); + + waiter2.complete(error_code{}); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); +} + +TEST(YieldGroupAsync, spawn_cancel_wait_spawn_wait) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter1), throttle); + throttle.cancel(); + EXPECT_THROW(throttle.wait(), boost::system::system_error); + asio::spawn(yield, wait_on(waiter2), throttle); + throttle.wait(); // blocks + }, rethrow); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter2); + + waiter2.complete(error_code{}); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); +} + +TEST(YieldGroupAsync, spawn_error_wait_spawn_wait) +{ + asio::io_context ctx; + yield_waiter waiter1; + yield_waiter waiter2; + + asio::spawn(ctx, [&] (asio::yield_context yield) { + auto throttle = spawn_throttle{yield, 1}; + asio::spawn(yield, wait_on(waiter1), throttle); + EXPECT_THROW(throttle.wait(), boost::system::system_error); + asio::spawn(yield, wait_on(waiter2), throttle); + throttle.wait(); // blocks + }, rethrow); + + ASSERT_FALSE(waiter1); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter1); + ASSERT_FALSE(waiter2); + + waiter1.complete(make_error_code(std::errc::no_such_file_or_directory)); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(waiter2); + + waiter2.complete(error_code{}); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); +} + +} // namespace ceph::async -- 2.39.5