From 1c39ad5995b1195e0b89b4361fe54f2d64d41c2e Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 11 Jan 2023 10:45:49 -0500 Subject: [PATCH] common/async: add co_throttle for bounded concurrency with c++20 coroutines Signed-off-by: Casey Bodley --- src/common/async/cancel_on_error.h | 28 + src/common/async/co_throttle.h | 127 ++++ src/common/async/detail/co_throttle_impl.h | 251 ++++++++ src/test/common/CMakeLists.txt | 4 + src/test/common/test_async_co_throttle.cc | 655 +++++++++++++++++++++ 5 files changed, 1065 insertions(+) create mode 100644 src/common/async/cancel_on_error.h create mode 100644 src/common/async/co_throttle.h create mode 100644 src/common/async/detail/co_throttle_impl.h create mode 100644 src/test/common/test_async_co_throttle.cc diff --git a/src/common/async/cancel_on_error.h b/src/common/async/cancel_on_error.h new file mode 100644 index 0000000000000..a4c05c2a7bc31 --- /dev/null +++ b/src/common/async/cancel_on_error.h @@ -0,0 +1,28 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include + +namespace ceph::async { + +/// Error handling strategy for co_throttle. +enum class cancel_on_error : uint8_t { + none, //< No spawned coroutines are canceled on failure. + after, //< Cancel coroutines spawned after the failed coroutine. + all, //< Cancel all spawned coroutines on failure. +}; + +} // namespace ceph::async diff --git a/src/common/async/co_throttle.h b/src/common/async/co_throttle.h new file mode 100644 index 0000000000000..bf1e9685c6725 --- /dev/null +++ b/src/common/async/co_throttle.h @@ -0,0 +1,127 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include "common/async/cancel_on_error.h" +#include "common/async/detail/co_throttle_impl.h" + +namespace ceph::async { + +/// A coroutine throttle that allows a parent coroutine to spawn and manage +/// multiple child coroutines, while enforcing an upper bound on concurrency. +/// +/// Child coroutines can be of type awaitable or awaitable. +/// Error codes returned by children are reported to the parent on its next call +/// to spawn() or wait(). The cancel_on_error option controls whether these +/// errors trigger the cancellation of other children. +/// +/// All child coroutines are canceled by cancel() or co_throttle destruction. +/// This allows the parent coroutine to share memory with its child coroutines +/// without fear of dangling references. +/// +/// This class is not thread-safe, so a strand executor should be used in +/// multi-threaded contexts. +/// +/// Example: +/// @code +/// awaitable child(task& t); +/// +/// awaitable parent(std::span tasks) +/// { +/// // process all tasks, up to 10 at a time +/// auto ex = co_await boost::asio::this_coro::executor; +/// auto throttle = co_throttle{ex, 10}; +/// +/// for (auto& t : tasks) { +/// co_await throttle.spawn(child(t)); +/// } +/// co_await throttle.wait(); +/// } +/// @endcode +template +class co_throttle { + public: + using executor_type = Executor; + executor_type get_executor() const { return impl->get_executor(); } + + using size_type = uint16_t; + static constexpr size_type max_limit = std::numeric_limits::max(); + + co_throttle(const executor_type& ex, size_type limit, + cancel_on_error on_error = cancel_on_error::none) + : impl(new impl_type(ex, limit, on_error)) + { + } + + ~co_throttle() + { + cancel(); + } + + co_throttle(const co_throttle&) = delete; + co_throttle& operator=(const co_throttle&) = delete; + + template + using awaitable = boost::asio::awaitable; + + /// Try to spawn the given coroutine. If this would exceed the concurrency + /// limit, wait for another coroutine to complete first. This default + /// limit can be overridden with the optional `smaller_limit` argument. + /// + /// If any spawned coroutines of type awaitable return a non-zero + /// error, the first such error is reported by the next call to spawn() or + /// wait(). When spawn() reports these errors, the given coroutine given will + /// only be spawned in the case of cancel_on_error::none. New coroutines can + /// be spawned by later calls to spawn() regardless of cancel_on_error. + /// + /// If a spawned coroutine exits by an uncaught exception, that exception is + /// rethrown by the next call to spawn() or wait(). + auto spawn(awaitable cr, + size_type smaller_limit = max_limit) + -> awaitable + { + return impl->spawn(std::move(cr), smaller_limit); + } + + /// \overload + auto spawn(awaitable cr, size_type smaller_limit = max_limit) + -> awaitable + { + return impl->spawn(std::move(cr), smaller_limit); + } + + /// Wait for all associated coroutines to complete. If any of these coroutines + /// return a non-zero error_code, the first of those errors is returned. + awaitable wait() + { + return impl->wait(); + } + + /// Cancel all associated coroutines. Callers waiting on spawn() or wait() + /// will fail with boost::asio::error::operation_aborted. + void cancel() + { + impl->cancel(); + } + + private: + using impl_type = detail::co_throttle_impl; + boost::intrusive_ptr impl; +}; + +} // namespace ceph::async diff --git a/src/common/async/detail/co_throttle_impl.h b/src/common/async/detail/co_throttle_impl.h new file mode 100644 index 0000000000000..c031745e96ce2 --- /dev/null +++ b/src/common/async/detail/co_throttle_impl.h @@ -0,0 +1,251 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/async/cancel_on_error.h" +#include "common/async/detail/service.h" +#include "include/ceph_assert.h" + +namespace ceph::async::detail { + +// Coroutine throttle implementation. This is reference-counted so the +// co_spawn() completion handlers can extend the implementation's lifetime. +// This is required for per-op cancellation because the cancellation_signals +// must outlive their coroutine frames. +template +class co_throttle_impl : + public boost::intrusive_ref_counter, + boost::thread_unsafe_counter>, + public service_list_base_hook +{ + public: + using size_type = SizeType; + + using executor_type = Executor; + executor_type get_executor() const { return ex; } + + co_throttle_impl(const executor_type& ex, size_type limit, + cancel_on_error on_error) + : svc(boost::asio::use_service>( + boost::asio::query(ex, boost::asio::execution::context))), + ex(ex), limit(limit), on_error(on_error), + children(new child[limit]) + { + // register for service_shutdown() notifications + svc.add(*this); + + // initialize the free list + for (size_type i = 0; i < limit; i++) { + free.push_back(children[i]); + } + } + ~co_throttle_impl() + { + svc.remove(*this); + } + + template + using awaitable = boost::asio::awaitable; + + template // where T=void or error_code + auto spawn(awaitable cr, size_type smaller_limit) + -> awaitable + { + if (unreported_exception) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); + } + if (unreported_error && on_error != cancel_on_error::none) { + co_return std::exchange(unreported_error, {}); + } + + const size_type current_limit = std::min(smaller_limit, limit); + if (count >= current_limit) { + auto ec = co_await wait_for(current_limit - 1); + if (ec) { + unreported_error.clear(); + co_return ec; + } + if (unreported_error && on_error != cancel_on_error::none) { + co_return std::exchange(unreported_error, {}); + } + } + + ++count; + + // move a free child to the outstanding list + ceph_assert(!free.empty()); + child& c = free.front(); + free.pop_front(); + outstanding.push_back(c); + + // spawn the coroutine with its associated cancellation signal + c.signal.emplace(); + c.canceled = false; + + boost::asio::co_spawn(get_executor(), std::move(cr), + boost::asio::bind_cancellation_slot(c.signal->slot(), + child_completion{this, c})); + + co_return std::exchange(unreported_error, {}); + } + + awaitable wait() + { + if (count > 0) { + auto ec = co_await wait_for(0); + if (ec) { + unreported_error.clear(); + co_return ec; + } + } + co_return std::exchange(unreported_error, {}); + } + + void cancel() + { + for (child& c : outstanding) { + c.canceled = true; + c.signal->emit(boost::asio::cancellation_type::terminal); + } + if (wait_handler) { + wait_complete(make_error_code(boost::asio::error::operation_aborted)); + } + } + + void service_shutdown() + { + wait_handler.reset(); + } + + private: + service& svc; + executor_type ex; + const size_type limit; + const cancel_on_error on_error; + + size_type count = 0; + size_type wait_for_count = 0; + + boost::system::error_code unreported_error; + std::exception_ptr unreported_exception; + + // track each spawned coroutine for cancellation. these are stored in an + // array, and recycled after each use via the free list + struct child : boost::intrusive::list_base_hook<> { + std::optional signal; + bool canceled = false; + }; + std::unique_ptr children; + + using child_list = boost::intrusive::list>; + child_list outstanding; + child_list free; + + using use_awaitable_t = boost::asio::use_awaitable_t; + + using wait_signature = void(std::exception_ptr, boost::system::error_code); + using wait_handler_type = typename boost::asio::async_result< + use_awaitable_t, wait_signature>::handler_type; + std::optional wait_handler; + + // return an awaitable that completes once count <= target_count + auto wait_for(size_type target_count) + -> awaitable + { + ceph_assert(!wait_handler); // one waiter at a time + wait_for_count = target_count; + + use_awaitable_t token; + return boost::asio::async_initiate( + [this] (wait_handler_type h) { + wait_handler.emplace(std::move(h)); + }, token); + } + + void on_complete(child& c, std::exception_ptr eptr, + boost::system::error_code ec) + { + --count; + + if (c.canceled) { + // don't report cancellation errors. cancellation was either requested + // by the user, or triggered by another failure that is reported + eptr = nullptr; + ec = {}; + } + + if (eptr && !unreported_exception) { + unreported_exception = eptr; + } + if (ec && !unreported_error) { + unreported_error = ec; + } + + // move back to the free list + auto next = outstanding.erase(outstanding.iterator_to(c)); + c.signal.reset(); + free.push_back(c); + + // handle cancel_on_error + if (eptr || ec) { + auto cancel_begin = outstanding.end(); + if (on_error == cancel_on_error::after) { + cancel_begin = next; + } else if (on_error == cancel_on_error::all) { + cancel_begin = outstanding.begin(); + } + for (auto i = cancel_begin; i != outstanding.end(); ++i) { + i->canceled = true; + i->signal->emit(boost::asio::cancellation_type::terminal); + } + } + + // maybe wake the waiter + if (wait_handler && count <= wait_for_count) { + wait_complete({}); + } + } + + void wait_complete(boost::system::error_code ec) + { + // bind arguments to the handler for dispatch + auto eptr = std::exchange(unreported_exception, nullptr); + auto c = boost::asio::append(std::move(*wait_handler), eptr, ec); + wait_handler.reset(); + + boost::asio::dispatch(std::move(c)); + } + + struct child_completion { + boost::intrusive_ptr impl; + child& c; + + void operator()(std::exception_ptr eptr, + boost::system::error_code ec = {}) { + impl->on_complete(c, eptr, ec); + } + }; +}; + +} // namespace ceph::async::detail diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index c044daf662ab7..b993ee661d0d7 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -361,6 +361,10 @@ add_executable(unittest_async_completion test_async_completion.cc) add_ceph_unittest(unittest_async_completion) target_link_libraries(unittest_async_completion ceph-common Boost::system) +add_executable(unittest_async_co_throttle test_async_co_throttle.cc) +add_ceph_unittest(unittest_async_co_throttle) +target_link_libraries(unittest_async_co_throttle ceph-common Boost::system) + add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc) add_ceph_unittest(unittest_async_shared_mutex) target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system) diff --git a/src/test/common/test_async_co_throttle.cc b/src/test/common/test_async_co_throttle.cc new file mode 100644 index 0000000000000..7dec81cfc7ed1 --- /dev/null +++ b/src/test/common/test_async_co_throttle.cc @@ -0,0 +1,655 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/co_throttle.h" +#include +#include +#include +#include + +namespace ceph::async { + +namespace errc = boost::system::errc; +using boost::system::error_code; + +using executor_type = boost::asio::io_context::executor_type; + +template +using awaitable = boost::asio::awaitable; +using use_awaitable_t = boost::asio::use_awaitable_t; +static constexpr use_awaitable_t use_awaitable{}; + +using clock_type = std::chrono::steady_clock; +using timer_type = boost::asio::basic_waitable_timer, executor_type>; + +void rethrow(std::exception_ptr eptr) +{ + if (eptr) std::rethrow_exception(eptr); +} + +using namespace std::chrono_literals; + +auto worker(std::chrono::milliseconds delay = 20ms) + -> awaitable +{ + auto timer = timer_type{co_await boost::asio::this_coro::executor, delay}; + co_await timer.async_wait(use_awaitable); +} + +auto worker(error_code ec, std::chrono::milliseconds delay = 10ms) + -> awaitable +{ + co_await worker(delay); + co_return ec; +} + +auto worker(std::exception_ptr eptr, std::chrono::milliseconds delay = 10ms) + -> awaitable +{ + co_await worker(delay); + std::rethrow_exception(eptr); +} + +auto worker(bool& finished, std::chrono::milliseconds delay = 20ms) + -> awaitable +{ + co_await worker(delay); + finished = true; +} + +auto counting_worker(size_t& count, size_t& max_count) + -> awaitable +{ + ++count; + if (max_count < count) { + max_count = count; + } + co_await worker(); + --count; +} + +// use a worker that never completes to test cancellation +awaitable lazy_worker() +{ + for (;;) { + co_await worker(); + } +} + +TEST(co_throttle, wait_empty) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + + ASSERT_TRUE(ec_wait); // wait returns immediately if nothing was spawned + EXPECT_FALSE(*ec_wait); +} + +TEST(co_throttle, spawn_over_limit) +{ + constexpr size_t limit = 1; + + size_t count = 0; + size_t max_count = 0; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn1; + std::optional ec_spawn2; + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count)); + ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count)); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until spawn2 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + EXPECT_FALSE(ec_spawn2); + + ctx.run_one(); // wait for spawn1's completion + ctx.poll(); // run until wait blocks + + ASSERT_TRUE(ec_spawn2); + EXPECT_FALSE(*ec_spawn2); + EXPECT_FALSE(ec_wait); + + ctx.run(); // run to completion + + ASSERT_TRUE(ec_wait); + EXPECT_FALSE(*ec_wait); + + EXPECT_EQ(max_count, limit); // count never exceeds limit + EXPECT_EQ(count, 0); +} + +TEST(co_throttle, spawn_over_smaller_limit) +{ + constexpr size_t limit = 2; + constexpr size_t smaller_limit = 1; + + size_t count = 0; + size_t max_count = 0; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn1; + std::optional ec_spawn2; + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count)); + ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count), + smaller_limit); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until spawn2 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + EXPECT_FALSE(ec_spawn2); + + ctx.run_one(); // wait for spawn1's completion + ctx.poll(); // run until wait blocks + + ASSERT_TRUE(ec_spawn2); + EXPECT_FALSE(*ec_spawn2); + EXPECT_FALSE(ec_wait); + + ctx.run(); // run to completion + + ASSERT_TRUE(ec_wait); + EXPECT_FALSE(*ec_wait); + + EXPECT_EQ(max_count, smaller_limit); // count never exceeds smaller_limit + EXPECT_EQ(count, 0); +} + +TEST(co_throttle, spawn_cancel) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn1; + std::optional ec_spawn2; + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(lazy_worker()); + ec_spawn2 = co_await throttle.spawn(lazy_worker()); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until spawn2 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + EXPECT_FALSE(ec_spawn2); + EXPECT_FALSE(ec_wait); + + throttle.cancel(); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + + ASSERT_TRUE(ec_spawn2); + EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted); + ASSERT_TRUE(ec_wait); + EXPECT_FALSE(*ec_wait); // wait after cancel succeeds immediately +} + +TEST(co_throttle, wait_cancel) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn; + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn = co_await throttle.spawn(lazy_worker()); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until wait blocks + + ASSERT_TRUE(ec_spawn); + EXPECT_FALSE(*ec_spawn); + EXPECT_FALSE(ec_wait); + + throttle.cancel(); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + + ASSERT_TRUE(ec_wait); + EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted); +} + +TEST(co_throttle, spawn_shutdown) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + + std::optional ec_spawn1; + std::optional ec_spawn2; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + auto throttle = co_throttle{ex, limit}; + ec_spawn1 = co_await throttle.spawn(lazy_worker()); + ec_spawn2 = co_await throttle.spawn(lazy_worker()); + }, rethrow); + + ctx.run_one(); // call spawn1 and spawn2 + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + EXPECT_FALSE(ec_spawn2); + + // shut down io_context before spawn2 unblocks +} + +TEST(co_throttle, wait_shutdown) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + + std::optional ec_spawn; + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + auto throttle = co_throttle{ex, limit}; + ec_spawn = co_await throttle.spawn(lazy_worker()); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.run_one(); // call spawn and wait + + ASSERT_TRUE(ec_spawn); + EXPECT_FALSE(*ec_spawn); + EXPECT_FALSE(ec_wait); + + // shut down io_context before wait unblocks +} + +TEST(co_throttle, spawn_destroy) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + + std::optional ec_spawn1; + std::optional ec_spawn2; + + { + auto throttle = co_throttle{ex, limit}; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(lazy_worker()); + ec_spawn2 = co_await throttle.spawn(lazy_worker()); + }, rethrow); + + ctx.poll(); // run until spawn2 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + EXPECT_FALSE(ec_spawn2); + // throttle canceled/destroyed + } + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + + ASSERT_TRUE(ec_spawn2); + EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted); +} + +TEST(co_throttle, wait_destroy) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + + std::optional ec_spawn; + std::optional ec_wait; + + { + auto throttle = co_throttle{ex, limit}; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn = co_await throttle.spawn(lazy_worker()); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until wait blocks + + ASSERT_TRUE(ec_spawn); + EXPECT_FALSE(*ec_spawn); + EXPECT_FALSE(ec_wait); + // throttle canceled/destroyed + } + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + + ASSERT_TRUE(ec_wait); + EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted); +} + +TEST(co_throttle, spawn_error) +{ + constexpr size_t limit = 2; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn1; + std::optional ec_spawn2; + std::optional ec_spawn3; + std::optional ec_wait; + bool spawn1_finished = false; + bool spawn3_finished = false; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); + auto ec = make_error_code(errc::invalid_argument); + ec_spawn2 = co_await throttle.spawn(worker(ec)); + ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until spawn3 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + ASSERT_TRUE(ec_spawn2); + EXPECT_FALSE(*ec_spawn2); + EXPECT_FALSE(ec_spawn3); + EXPECT_FALSE(spawn1_finished); + EXPECT_FALSE(spawn3_finished); + + ctx.run_one(); // wait for spawn2's completion + ctx.poll(); // run until wait() blocks + + ASSERT_TRUE(ec_spawn3); + EXPECT_EQ(*ec_spawn3, errc::invalid_argument); + EXPECT_FALSE(ec_wait); + + ctx.run(); // run to completion + + EXPECT_TRUE(spawn3_finished); // spawn3 isn't canceled by spawn2's error + ASSERT_TRUE(ec_wait); + EXPECT_FALSE(*ec_wait); +} + +TEST(co_throttle, wait_error) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn; + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + auto ec = make_error_code(errc::invalid_argument); + ec_spawn = co_await throttle.spawn(worker(ec)); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until wait blocks + + ASSERT_TRUE(ec_spawn); + EXPECT_FALSE(*ec_spawn); + EXPECT_FALSE(ec_wait); + + ctx.run(); // run to completion + + ASSERT_TRUE(ec_wait); + EXPECT_EQ(*ec_wait, errc::invalid_argument); +} + +TEST(co_throttle, spawn_cancel_on_error_after) +{ + constexpr size_t limit = 2; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit, cancel_on_error::after}; + + std::optional ec_spawn1; + std::optional ec_spawn2; + std::optional ec_spawn3; + std::optional ec_spawn4; + std::optional ec_wait; + bool spawn1_finished = false; + bool spawn3_finished = false; + bool spawn4_finished = false; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); + auto ec = make_error_code(errc::invalid_argument); + ec_spawn2 = co_await throttle.spawn(worker(ec)); + // spawn3 expects invalid_argument error and cancellation + ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); + // spawn4 expects success + ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished)); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until spawn3 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + ASSERT_TRUE(ec_spawn2); + EXPECT_FALSE(*ec_spawn2); + EXPECT_FALSE(spawn1_finished); + + ctx.run_one(); // wait for spawn2's completion + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); + + ASSERT_TRUE(ec_spawn3); + EXPECT_EQ(*ec_spawn3, errc::invalid_argument); + ASSERT_TRUE(ec_spawn4); + EXPECT_FALSE(*ec_spawn4); + EXPECT_FALSE(spawn1_finished); + + ctx.run_one(); // wait for spawn1's completion + ctx.poll(); // run until wait blocks + + EXPECT_FALSE(ec_wait); + EXPECT_TRUE(spawn1_finished); // spawn1 not canceled + + ctx.run_one(); // wait for spawn4's completion + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + + ASSERT_TRUE(ec_wait); + EXPECT_FALSE(*ec_wait); + EXPECT_FALSE(spawn3_finished); // spawn3 canceled + EXPECT_TRUE(spawn4_finished); // spawn4 not canceled +} + +TEST(co_throttle, spawn_cancel_on_error_all) +{ + constexpr size_t limit = 2; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit, cancel_on_error::all}; + + std::optional ec_spawn1; + std::optional ec_spawn2; + std::optional ec_spawn3; + std::optional ec_spawn4; + std::optional ec_wait; + bool spawn1_finished = false; + bool spawn3_finished = false; + bool spawn4_finished = false; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); + auto ec = make_error_code(errc::invalid_argument); + ec_spawn2 = co_await throttle.spawn(worker(ec)); + // spawn3 expects invalid_argument error and cancellation + ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); + // spawn3 expects success + ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished)); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until spawn3 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + ASSERT_TRUE(ec_spawn2); + EXPECT_FALSE(*ec_spawn2); + EXPECT_FALSE(ec_spawn3); + EXPECT_FALSE(ec_spawn4); + + ctx.run_one(); // wait for spawn2's completion + ctx.poll(); // run until wait blocks + + ASSERT_TRUE(ec_spawn3); + EXPECT_EQ(*ec_spawn3, errc::invalid_argument); + ASSERT_TRUE(ec_spawn4); + EXPECT_FALSE(*ec_spawn4); + EXPECT_FALSE(ec_wait); + EXPECT_FALSE(spawn1_finished); // spawn1 canceled + + ctx.run_one(); // wait for spawn4's completion + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + + ASSERT_TRUE(ec_wait); + EXPECT_FALSE(*ec_wait); + EXPECT_FALSE(spawn3_finished); // spawn3 canceled + EXPECT_TRUE(spawn4_finished); // spawn4 not canceled +} + +TEST(co_throttle, spawn_exception) +{ + constexpr size_t limit = 2; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn1; + std::optional ec_spawn2; + std::optional ec_spawn3; + bool spawn1_finished = false; + bool spawn3_finished = false; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); + auto eptr = std::make_exception_ptr(std::runtime_error{"oops"}); + ec_spawn2 = co_await throttle.spawn(worker(eptr)); + ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); + }, rethrow); + + ctx.poll(); // run until spawn3 blocks + + ASSERT_TRUE(ec_spawn1); + EXPECT_FALSE(*ec_spawn1); + ASSERT_TRUE(ec_spawn2); + EXPECT_FALSE(*ec_spawn2); + + EXPECT_THROW(ctx.run_one(), std::runtime_error); + + ASSERT_FALSE(ec_spawn3); + EXPECT_FALSE(spawn1_finished); + EXPECT_FALSE(spawn3_finished); +} + +TEST(co_throttle, wait_exception) +{ + constexpr size_t limit = 1; + + boost::asio::io_context ctx; + auto ex = ctx.get_executor(); + auto throttle = co_throttle{ex, limit}; + + std::optional ec_spawn; + std::optional ec_wait; + + boost::asio::co_spawn(ex, + [&] () -> awaitable { + auto eptr = std::make_exception_ptr(std::runtime_error{"oops"}); + ec_spawn = co_await throttle.spawn(worker(eptr)); + ec_wait = co_await throttle.wait(); + }, rethrow); + + ctx.poll(); // run until wait blocks + + ASSERT_TRUE(ec_spawn); + EXPECT_FALSE(*ec_spawn); + + EXPECT_THROW(ctx.run(), std::runtime_error); + + ASSERT_FALSE(ec_wait); +} + +} // namespace ceph::async -- 2.39.5