From 6b84b7bcc3531a8aebfe164a4e0847b68c76f791 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 6 Feb 2023 17:05:12 -0500 Subject: [PATCH] common/async: remove error_codes from co_throttle interface in the initial design of co_throttle described in https://github.com/ceph/ceph/pull/49720, the cancel_on_error option only applied to errors from awaitable but not to exceptions from awaitable coroutines with the decision to use exceptions as the default method of error handling in rgw multisite, this design choice no longer makes sense. i've removed the error_code overloads entirely, and changed the exception handling logic to match the previous behavior for error codes the unit tests were rewritten with co_waiter instead of timers to make them deterministic and faster. co_waiter's cancellation behavior exposed some issues where the cancellation signal could cause the completions to recurse, so on_complete() was restructured to tolerate that Signed-off-by: Casey Bodley --- src/common/async/co_throttle.h | 65 +- src/common/async/detail/co_throttle_impl.h | 136 ++-- src/test/common/test_async_co_throttle.cc | 814 +++++++++------------ 3 files changed, 417 insertions(+), 598 deletions(-) diff --git a/src/common/async/co_throttle.h b/src/common/async/co_throttle.h index bf1e9685c6725..f8d399ed40ea5 100644 --- a/src/common/async/co_throttle.h +++ b/src/common/async/co_throttle.h @@ -25,10 +25,10 @@ namespace ceph::async { /// A coroutine throttle that allows a parent coroutine to spawn and manage /// multiple child coroutines, while enforcing an upper bound on concurrency. /// -/// Child coroutines can be of type awaitable or awaitable. -/// Error codes returned by children are reported to the parent on its next call -/// to spawn() or wait(). The cancel_on_error option controls whether these -/// errors trigger the cancellation of other children. +/// Child coroutines must be of type awaitable. Exceptions thrown by +/// children are rethrown to the parent on its next call to spawn() or wait(). +/// The cancel_on_error option controls whether these exceptions errors trigger +/// the cancellation of other children. /// /// All child coroutines are canceled by cancel() or co_throttle destruction. /// This allows the parent coroutine to share memory with its child coroutines @@ -38,7 +38,7 @@ namespace ceph::async { /// multi-threaded contexts. /// /// Example: -/// @code +/// \code /// awaitable child(task& t); /// /// awaitable parent(std::span tasks) @@ -52,14 +52,17 @@ namespace ceph::async { /// } /// co_await throttle.wait(); /// } -/// @endcode +/// \endcode template class co_throttle { + using impl_type = detail::co_throttle_impl; + boost::intrusive_ptr impl; + public: using executor_type = Executor; - executor_type get_executor() const { return impl->get_executor(); } + executor_type get_executor() const noexcept { return impl->get_executor(); } - using size_type = uint16_t; + using size_type = typename impl_type::size_type; static constexpr size_type max_limit = std::numeric_limits::max(); co_throttle(const executor_type& ex, size_type limit, @@ -76,52 +79,36 @@ class co_throttle { co_throttle(const co_throttle&) = delete; co_throttle& operator=(const co_throttle&) = delete; - template - using awaitable = boost::asio::awaitable; - - /// Try to spawn the given coroutine. If this would exceed the concurrency - /// limit, wait for another coroutine to complete first. This default - /// limit can be overridden with the optional `smaller_limit` argument. + /// Try to spawn the given coroutine \ref cr. If this would exceed the + /// concurrency limit, wait for another coroutine to complete first. This + /// default limit can be overridden with the optional \ref smaller_limit + /// argument. /// - /// If any spawned coroutines of type awaitable return a non-zero - /// error, the first such error is reported by the next call to spawn() or - /// wait(). When spawn() reports these errors, the given coroutine given will - /// only be spawned in the case of cancel_on_error::none. New coroutines can - /// be spawned by later calls to spawn() regardless of cancel_on_error. - /// - /// If a spawned coroutine exits by an uncaught exception, that exception is - /// rethrown by the next call to spawn() or wait(). - auto spawn(awaitable cr, + /// If any spawned coroutines exit with an exception, the first exception is + /// rethrown by the next call to spawn() or wait(). If spawn() has an + /// exception to rethrow, it will spawn \cr first only in the case of + /// cancel_on_error::none. New coroutines can be spawned by later calls to + /// spawn() regardless of cancel_on_error. + auto spawn(boost::asio::awaitable cr, size_type smaller_limit = max_limit) - -> awaitable - { - return impl->spawn(std::move(cr), smaller_limit); - } - - /// \overload - auto spawn(awaitable cr, size_type smaller_limit = max_limit) - -> awaitable + -> boost::asio::awaitable { return impl->spawn(std::move(cr), smaller_limit); } /// Wait for all associated coroutines to complete. If any of these coroutines - /// return a non-zero error_code, the first of those errors is returned. - awaitable wait() + /// exit with an exception, the first of those exceptions is rethrown. + auto wait() + -> boost::asio::awaitable { return impl->wait(); } - /// Cancel all associated coroutines. Callers waiting on spawn() or wait() - /// will fail with boost::asio::error::operation_aborted. + /// Cancel all associated coroutines. void cancel() { impl->cancel(); } - - private: - using impl_type = detail::co_throttle_impl; - boost::intrusive_ptr impl; }; } // namespace ceph::async diff --git a/src/common/async/detail/co_throttle_impl.h b/src/common/async/detail/co_throttle_impl.h index d1a89db94c064..43f3691439d7e 100644 --- a/src/common/async/detail/co_throttle_impl.h +++ b/src/common/async/detail/co_throttle_impl.h @@ -33,14 +33,14 @@ namespace ceph::async::detail { // co_spawn() completion handlers can extend the implementation's lifetime. // This is required for per-op cancellation because the cancellation_signals // must outlive their coroutine frames. -template +template class co_throttle_impl : - public boost::intrusive_ref_counter, + public boost::intrusive_ref_counter, boost::thread_unsafe_counter>, public service_list_base_hook { public: - using size_type = SizeType; + using size_type = uint16_t; using executor_type = Executor; executor_type get_executor() const { return ex; } @@ -65,29 +65,19 @@ class co_throttle_impl : svc.remove(*this); } - template - using awaitable = boost::asio::awaitable; - - template // where T=void or error_code - auto spawn(awaitable cr, size_type smaller_limit) - -> awaitable + auto spawn(boost::asio::awaitable cr, + size_type smaller_limit) + -> boost::asio::awaitable { - if (unreported_exception) { + if (unreported_exception && on_error != cancel_on_error::none) { std::rethrow_exception(std::exchange(unreported_exception, nullptr)); } - if (unreported_error && on_error != cancel_on_error::none) { - co_return std::exchange(unreported_error, {}); - } const size_type current_limit = std::min(smaller_limit, limit); if (count >= current_limit) { - auto ec = co_await wait_for(current_limit - 1); - if (ec) { - unreported_error.clear(); - co_return ec; - } - if (unreported_error && on_error != cancel_on_error::none) { - co_return std::exchange(unreported_error, {}); + co_await wait_for(current_limit - 1); + if (unreported_exception && on_error != cancel_on_error::none) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); } } @@ -107,32 +97,31 @@ class co_throttle_impl : boost::asio::bind_cancellation_slot(c.signal->slot(), child_completion{this, c})); - co_return std::exchange(unreported_error, {}); + if (unreported_exception) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); + } } - awaitable wait() + auto wait() + -> boost::asio::awaitable { if (count > 0) { - auto ec = co_await wait_for(0); - if (ec) { - unreported_error.clear(); - co_return ec; - } + co_await wait_for(0); + } + if (unreported_exception) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); } - co_return std::exchange(unreported_error, {}); } void cancel() { - for (child& c : outstanding) { + while (!outstanding.empty()) { + child& c = outstanding.front(); + outstanding.pop_front(); + c.canceled = true; c.signal->emit(boost::asio::cancellation_type::terminal); } - if (waiter.waiting()) { - auto eptr = std::exchange(unreported_exception, nullptr); - auto ec = make_error_code(boost::asio::error::operation_aborted); - waiter.complete(eptr, ec); - } } void service_shutdown() @@ -149,7 +138,6 @@ class co_throttle_impl : size_type count = 0; size_type wait_for_count = 0; - boost::system::error_code unreported_error; std::exception_ptr unreported_exception; // track each spawned coroutine for cancellation. these are stored in an @@ -165,58 +153,61 @@ class co_throttle_impl : child_list outstanding; child_list free; - co_waiter waiter; + co_waiter waiter; // return an awaitable that completes once count <= target_count auto wait_for(size_type target_count) - -> awaitable + -> boost::asio::awaitable { wait_for_count = target_count; return waiter.get(); } - void on_complete(child& c, std::exception_ptr eptr, - boost::system::error_code ec) + void on_complete(child& c, std::exception_ptr eptr) { --count; if (c.canceled) { - // don't report cancellation errors. cancellation was either requested - // by the user, or triggered by another failure that is reported - eptr = nullptr; - ec = {}; - } - - if (eptr && !unreported_exception) { - unreported_exception = eptr; - } - if (ec && !unreported_error) { - unreported_error = ec; - } - - // move back to the free list - auto next = outstanding.erase(outstanding.iterator_to(c)); - c.signal.reset(); - free.push_back(c); - - // handle cancel_on_error - if (eptr || ec) { - auto cancel_begin = outstanding.end(); - if (on_error == cancel_on_error::after) { - cancel_begin = next; - } else if (on_error == cancel_on_error::all) { - cancel_begin = outstanding.begin(); - } - for (auto i = cancel_begin; i != outstanding.end(); ++i) { - i->canceled = true; - i->signal->emit(boost::asio::cancellation_type::terminal); + // if the child was canceled, it was already removed from outstanding + ceph_assert(!c.is_linked()); + c.canceled = false; + c.signal.reset(); + free.push_back(c); + } else { + // move back to the free list + ceph_assert(c.is_linked()); + auto next = outstanding.erase(outstanding.iterator_to(c)); + c.signal.reset(); + free.push_back(c); + + if (eptr) { + if (eptr && !unreported_exception) { + unreported_exception = eptr; + } + + // handle cancel_on_error. cancellation signals may recurse into + // on_complete(), so move the entries into a separate list first + child_list to_cancel; + if (on_error == cancel_on_error::after) { + to_cancel.splice(to_cancel.end(), outstanding, + next, outstanding.end()); + } else if (on_error == cancel_on_error::all) { + to_cancel = std::move(outstanding); + } + + for (auto i = to_cancel.begin(); i != to_cancel.end(); ++i) { + child& c = *i; + i = to_cancel.erase(i); + + c.canceled = true; + c.signal->emit(boost::asio::cancellation_type::terminal); + } } } // maybe wake the waiter if (waiter.waiting() && count <= wait_for_count) { - auto eptr = std::exchange(unreported_exception, nullptr); - waiter.complete(eptr, {}); + waiter.complete(nullptr); } } @@ -224,9 +215,8 @@ class co_throttle_impl : boost::intrusive_ptr impl; child& c; - void operator()(std::exception_ptr eptr, - boost::system::error_code ec = {}) { - impl->on_complete(c, eptr, ec); + void operator()(std::exception_ptr eptr) { + impl->on_complete(c, eptr); } }; }; diff --git a/src/test/common/test_async_co_throttle.cc b/src/test/common/test_async_co_throttle.cc index 7dec81cfc7ed1..6ecdd9b248f98 100644 --- a/src/test/common/test_async_co_throttle.cc +++ b/src/test/common/test_async_co_throttle.cc @@ -13,643 +13,485 @@ */ #include "common/async/co_throttle.h" -#include -#include +#include +#include +#include +#include #include #include +#include "common/async/co_waiter.h" namespace ceph::async { +namespace asio = boost::asio; namespace errc = boost::system::errc; using boost::system::error_code; -using executor_type = boost::asio::io_context::executor_type; +using executor_type = asio::io_context::executor_type; template -using awaitable = boost::asio::awaitable; -using use_awaitable_t = boost::asio::use_awaitable_t; +using awaitable = asio::awaitable; +using use_awaitable_t = asio::use_awaitable_t; static constexpr use_awaitable_t use_awaitable{}; -using clock_type = std::chrono::steady_clock; -using timer_type = boost::asio::basic_waitable_timer, executor_type>; +using void_waiter = co_waiter; -void rethrow(std::exception_ptr eptr) +auto capture(std::optional& eptr) { - if (eptr) std::rethrow_exception(eptr); + return [&eptr] (std::exception_ptr e) { eptr = e; }; } -using namespace std::chrono_literals; - -auto worker(std::chrono::milliseconds delay = 20ms) - -> awaitable -{ - auto timer = timer_type{co_await boost::asio::this_coro::executor, delay}; - co_await timer.async_wait(use_awaitable); -} - -auto worker(error_code ec, std::chrono::milliseconds delay = 10ms) - -> awaitable -{ - co_await worker(delay); - co_return ec; -} - -auto worker(std::exception_ptr eptr, std::chrono::milliseconds delay = 10ms) - -> awaitable -{ - co_await worker(delay); - std::rethrow_exception(eptr); -} - -auto worker(bool& finished, std::chrono::milliseconds delay = 20ms) - -> awaitable -{ - co_await worker(delay); - finished = true; -} - -auto counting_worker(size_t& count, size_t& max_count) - -> awaitable +auto capture(asio::cancellation_signal& signal, + std::optional& eptr) { - ++count; - if (max_count < count) { - max_count = count; - } - co_await worker(); - --count; + return asio::bind_cancellation_slot(signal.slot(), capture(eptr)); } -// use a worker that never completes to test cancellation -awaitable lazy_worker() +awaitable wait(void_waiter& waiter, bool& completed) { - for (;;) { - co_await worker(); - } + co_await waiter.get(); + completed = true; } TEST(co_throttle, wait_empty) { constexpr size_t limit = 1; + asio::io_context ctx; - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; - - std::optional ec_wait; + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.wait(); + }; - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_wait = co_await throttle.wait(); - }, rethrow); + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); ctx.poll(); - ASSERT_TRUE(ctx.stopped()); // poll runs to completion - - ASSERT_TRUE(ec_wait); // wait returns immediately if nothing was spawned - EXPECT_FALSE(*ec_wait); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); } TEST(co_throttle, spawn_over_limit) { constexpr size_t limit = 1; + asio::io_context ctx; - size_t count = 0; - size_t max_count = 0; + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; + bool spawn2_completed = false; - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get()); + spawn2_completed = true; + co_await throttle.wait(); + }; - std::optional ec_spawn1; - std::optional ec_spawn2; - std::optional ec_wait; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count)); - ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count)); - ec_wait = co_await throttle.wait(); - }, rethrow); + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(spawn2_completed); - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - EXPECT_FALSE(ec_spawn2); + waiter1.complete(nullptr); - ctx.run_one(); // wait for spawn1's completion ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + EXPECT_TRUE(spawn2_completed); - ASSERT_TRUE(ec_spawn2); - EXPECT_FALSE(*ec_spawn2); - EXPECT_FALSE(ec_wait); - - ctx.run(); // run to completion - - ASSERT_TRUE(ec_wait); - EXPECT_FALSE(*ec_wait); + waiter2.complete(nullptr); - EXPECT_EQ(max_count, limit); // count never exceeds limit - EXPECT_EQ(count, 0); + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); } TEST(co_throttle, spawn_over_smaller_limit) { constexpr size_t limit = 2; constexpr size_t smaller_limit = 1; + asio::io_context ctx; - size_t count = 0; - size_t max_count = 0; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; + bool spawn2_completed = false; - std::optional ec_spawn1; - std::optional ec_spawn2; - std::optional ec_wait; + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get(), smaller_limit); + spawn2_completed = true; + co_await throttle.wait(); + }; - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count)); - ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count), - smaller_limit); - ec_wait = co_await throttle.wait(); - }, rethrow); + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(spawn2_completed); - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - EXPECT_FALSE(ec_spawn2); + waiter1.complete(nullptr); - ctx.run_one(); // wait for spawn1's completion ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn2_completed); - ASSERT_TRUE(ec_spawn2); - EXPECT_FALSE(*ec_spawn2); - EXPECT_FALSE(ec_wait); - - ctx.run(); // run to completion - - ASSERT_TRUE(ec_wait); - EXPECT_FALSE(*ec_wait); + waiter2.complete(nullptr); - EXPECT_EQ(max_count, smaller_limit); // count never exceeds smaller_limit - EXPECT_EQ(count, 0); + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); } TEST(co_throttle, spawn_cancel) { constexpr size_t limit = 1; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; - - std::optional ec_spawn1; - std::optional ec_spawn2; - std::optional ec_wait; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(lazy_worker()); - ec_spawn2 = co_await throttle.spawn(lazy_worker()); - ec_wait = co_await throttle.wait(); - }, rethrow); + asio::io_context ctx; + + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; + bool spawn2_completed = false; + + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get()); + spawn2_completed = true; + co_await throttle.wait(); + }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ctx, cr(), capture(signal, result)); ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(spawn2_completed); - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - EXPECT_FALSE(ec_spawn2); - EXPECT_FALSE(ec_wait); - - throttle.cancel(); + // cancel before spawn2 completes + signal.emit(asio::cancellation_type::terminal); ctx.poll(); ASSERT_TRUE(ctx.stopped()); // poll runs to completion - - ASSERT_TRUE(ec_spawn2); - EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted); - ASSERT_TRUE(ec_wait); - EXPECT_FALSE(*ec_wait); // wait after cancel succeeds immediately + EXPECT_FALSE(spawn2_completed); + ASSERT_TRUE(result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } } TEST(co_throttle, wait_cancel) { constexpr size_t limit = 1; + asio::io_context ctx; - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; + void_waiter waiter; + bool spawn_completed = false; - std::optional ec_spawn; - std::optional ec_wait; + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter.get()); + spawn_completed = true; + co_await throttle.wait(); + }; - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn = co_await throttle.spawn(lazy_worker()); - ec_wait = co_await throttle.wait(); - }, rethrow); + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ctx, cr(), capture(signal, result)); ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn_completed); + EXPECT_FALSE(result); - ASSERT_TRUE(ec_spawn); - EXPECT_FALSE(*ec_spawn); - EXPECT_FALSE(ec_wait); - - throttle.cancel(); + // cancel before wait completes + signal.emit(asio::cancellation_type::terminal); ctx.poll(); ASSERT_TRUE(ctx.stopped()); // poll runs to completion - - ASSERT_TRUE(ec_wait); - EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted); + ASSERT_TRUE(result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } } TEST(co_throttle, spawn_shutdown) { constexpr size_t limit = 1; + asio::io_context ctx; - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; - std::optional ec_spawn1; - std::optional ec_spawn2; + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get()); + }; - boost::asio::co_spawn(ex, - [&] () -> awaitable { - auto throttle = co_throttle{ex, limit}; - ec_spawn1 = co_await throttle.spawn(lazy_worker()); - ec_spawn2 = co_await throttle.spawn(lazy_worker()); - }, rethrow); + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); - ctx.run_one(); // call spawn1 and spawn2 - - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - EXPECT_FALSE(ec_spawn2); - - // shut down io_context before spawn2 unblocks + ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(result); + // shut down before spawn2 completes } TEST(co_throttle, wait_shutdown) { constexpr size_t limit = 1; + asio::io_context ctx; - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - - std::optional ec_spawn; - std::optional ec_wait; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - auto throttle = co_throttle{ex, limit}; - ec_spawn = co_await throttle.spawn(lazy_worker()); - ec_wait = co_await throttle.wait(); - }, rethrow); - - ctx.run_one(); // call spawn and wait - - ASSERT_TRUE(ec_spawn); - EXPECT_FALSE(*ec_spawn); - EXPECT_FALSE(ec_wait); - - // shut down io_context before wait unblocks -} - -TEST(co_throttle, spawn_destroy) -{ - constexpr size_t limit = 1; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - - std::optional ec_spawn1; - std::optional ec_spawn2; - - { - auto throttle = co_throttle{ex, limit}; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(lazy_worker()); - ec_spawn2 = co_await throttle.spawn(lazy_worker()); - }, rethrow); - - ctx.poll(); // run until spawn2 blocks - - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - EXPECT_FALSE(ec_spawn2); - // throttle canceled/destroyed - } - - ctx.poll(); - ASSERT_TRUE(ctx.stopped()); // poll runs to completion - - ASSERT_TRUE(ec_spawn2); - EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted); -} - -TEST(co_throttle, wait_destroy) -{ - constexpr size_t limit = 1; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - - std::optional ec_spawn; - std::optional ec_wait; - - { - auto throttle = co_throttle{ex, limit}; + void_waiter waiter; + bool spawn_completed = false; - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn = co_await throttle.spawn(lazy_worker()); - ec_wait = co_await throttle.wait(); - }, rethrow); + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter.get()); + spawn_completed = true; + co_await throttle.wait(); + }; - ctx.poll(); // run until wait blocks + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); - ASSERT_TRUE(ec_spawn); - EXPECT_FALSE(*ec_spawn); - EXPECT_FALSE(ec_wait); - // throttle canceled/destroyed - } - - ctx.poll(); - ASSERT_TRUE(ctx.stopped()); // poll runs to completion - - ASSERT_TRUE(ec_wait); - EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted); + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn_completed); + EXPECT_FALSE(result); + // shut down before wait completes } TEST(co_throttle, spawn_error) { constexpr size_t limit = 2; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; - - std::optional ec_spawn1; - std::optional ec_spawn2; - std::optional ec_spawn3; - std::optional ec_wait; - bool spawn1_finished = false; - bool spawn3_finished = false; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); - auto ec = make_error_code(errc::invalid_argument); - ec_spawn2 = co_await throttle.spawn(worker(ec)); - ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); - ec_wait = co_await throttle.wait(); - }, rethrow); + asio::io_context ctx; + + void_waiter waiter1; + void_waiter waiter2; + void_waiter waiter3; + bool cr1_completed = false; + bool cr2_completed = false; + bool cr3_completed = false; + std::exception_ptr spawn3_eptr; + + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(wait(waiter1, cr1_completed)); + co_await throttle.spawn(wait(waiter2, cr2_completed)); + try { + co_await throttle.spawn(wait(waiter3, cr3_completed)); + } catch (const std::exception&) { + spawn3_eptr = std::current_exception(); + } + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); ctx.poll(); // run until spawn3 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(cr1_completed); + EXPECT_FALSE(cr2_completed); + EXPECT_FALSE(cr3_completed); + + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - ASSERT_TRUE(ec_spawn2); - EXPECT_FALSE(*ec_spawn2); - EXPECT_FALSE(ec_spawn3); - EXPECT_FALSE(spawn1_finished); - EXPECT_FALSE(spawn3_finished); + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(spawn3_eptr); + EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error); + EXPECT_FALSE(result); - ctx.run_one(); // wait for spawn2's completion - ctx.poll(); // run until wait() blocks + waiter1.complete(nullptr); - ASSERT_TRUE(ec_spawn3); - EXPECT_EQ(*ec_spawn3, errc::invalid_argument); - EXPECT_FALSE(ec_wait); + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); // wait still blocked - ctx.run(); // run to completion + waiter3.complete(nullptr); - EXPECT_TRUE(spawn3_finished); // spawn3 isn't canceled by spawn2's error - ASSERT_TRUE(ec_wait); - EXPECT_FALSE(*ec_wait); + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + EXPECT_TRUE(cr1_completed); + EXPECT_FALSE(cr2_completed); + EXPECT_TRUE(cr3_completed); // cr3 isn't canceled by cr2's error } TEST(co_throttle, wait_error) { constexpr size_t limit = 1; + asio::io_context ctx; - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; + void_waiter waiter; - std::optional ec_spawn; - std::optional ec_wait; + auto cr = [&] () -> awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter.get()); + co_await throttle.wait(); + }; - boost::asio::co_spawn(ex, - [&] () -> awaitable { - auto ec = make_error_code(errc::invalid_argument); - ec_spawn = co_await throttle.spawn(worker(ec)); - ec_wait = co_await throttle.wait(); - }, rethrow); + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); - ASSERT_TRUE(ec_spawn); - EXPECT_FALSE(*ec_spawn); - EXPECT_FALSE(ec_wait); - - ctx.run(); // run to completion + waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"})); - ASSERT_TRUE(ec_wait); - EXPECT_EQ(*ec_wait, errc::invalid_argument); + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); } TEST(co_throttle, spawn_cancel_on_error_after) { constexpr size_t limit = 2; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit, cancel_on_error::after}; - - std::optional ec_spawn1; - std::optional ec_spawn2; - std::optional ec_spawn3; - std::optional ec_spawn4; - std::optional ec_wait; - bool spawn1_finished = false; - bool spawn3_finished = false; - bool spawn4_finished = false; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); - auto ec = make_error_code(errc::invalid_argument); - ec_spawn2 = co_await throttle.spawn(worker(ec)); - // spawn3 expects invalid_argument error and cancellation - ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); - // spawn4 expects success - ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished)); - ec_wait = co_await throttle.wait(); - }, rethrow); + asio::io_context ctx; + + void_waiter waiter1; + void_waiter waiter2; + void_waiter waiter3; + void_waiter waiter4; + bool cr1_completed = false; + bool cr2_completed = false; + bool cr3_completed = false; + bool cr4_completed = false; + std::exception_ptr spawn3_eptr; + + auto cr = [&] () -> awaitable { + auto ex = co_await asio::this_coro::executor; + auto throttle = co_throttle{ex, limit, cancel_on_error::after}; + co_await throttle.spawn(wait(waiter1, cr1_completed)); + co_await throttle.spawn(wait(waiter2, cr2_completed)); + try { + co_await throttle.spawn(wait(waiter3, cr3_completed)); + } catch (const std::exception&) { + spawn3_eptr = std::current_exception(); + } + co_await throttle.spawn(wait(waiter4, cr4_completed)); + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); ctx.poll(); // run until spawn3 blocks - - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - ASSERT_TRUE(ec_spawn2); - EXPECT_FALSE(*ec_spawn2); - EXPECT_FALSE(spawn1_finished); - - ctx.run_one(); // wait for spawn2's completion - ctx.poll(); ASSERT_FALSE(ctx.stopped()); - ASSERT_TRUE(ec_spawn3); - EXPECT_EQ(*ec_spawn3, errc::invalid_argument); - ASSERT_TRUE(ec_spawn4); - EXPECT_FALSE(*ec_spawn4); - EXPECT_FALSE(spawn1_finished); + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); - ctx.run_one(); // wait for spawn1's completion ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(cr1_completed); + ASSERT_TRUE(spawn3_eptr); + EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error); - EXPECT_FALSE(ec_wait); - EXPECT_TRUE(spawn1_finished); // spawn1 not canceled + waiter1.complete(nullptr); - ctx.run_one(); // wait for spawn4's completion ctx.poll(); - ASSERT_TRUE(ctx.stopped()); // poll runs to completion - - ASSERT_TRUE(ec_wait); - EXPECT_FALSE(*ec_wait); - EXPECT_FALSE(spawn3_finished); // spawn3 canceled - EXPECT_TRUE(spawn4_finished); // spawn4 not canceled + ASSERT_FALSE(ctx.stopped()); // wait still blocked + EXPECT_FALSE(result); + EXPECT_TRUE(cr1_completed); + EXPECT_FALSE(cr4_completed); + + waiter4.complete(nullptr); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + EXPECT_FALSE(cr2_completed); // exited by exception + EXPECT_FALSE(cr3_completed); // cr3 canceled + EXPECT_TRUE(cr4_completed); // cr4 not canceled } TEST(co_throttle, spawn_cancel_on_error_all) { constexpr size_t limit = 2; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit, cancel_on_error::all}; - - std::optional ec_spawn1; - std::optional ec_spawn2; - std::optional ec_spawn3; - std::optional ec_spawn4; - std::optional ec_wait; - bool spawn1_finished = false; - bool spawn3_finished = false; - bool spawn4_finished = false; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); - auto ec = make_error_code(errc::invalid_argument); - ec_spawn2 = co_await throttle.spawn(worker(ec)); - // spawn3 expects invalid_argument error and cancellation - ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); - // spawn3 expects success - ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished)); - ec_wait = co_await throttle.wait(); - }, rethrow); - - ctx.poll(); // run until spawn3 blocks - - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - ASSERT_TRUE(ec_spawn2); - EXPECT_FALSE(*ec_spawn2); - EXPECT_FALSE(ec_spawn3); - EXPECT_FALSE(ec_spawn4); - - ctx.run_one(); // wait for spawn2's completion - ctx.poll(); // run until wait blocks - - ASSERT_TRUE(ec_spawn3); - EXPECT_EQ(*ec_spawn3, errc::invalid_argument); - ASSERT_TRUE(ec_spawn4); - EXPECT_FALSE(*ec_spawn4); - EXPECT_FALSE(ec_wait); - EXPECT_FALSE(spawn1_finished); // spawn1 canceled - - ctx.run_one(); // wait for spawn4's completion - ctx.poll(); - ASSERT_TRUE(ctx.stopped()); // poll runs to completion - - ASSERT_TRUE(ec_wait); - EXPECT_FALSE(*ec_wait); - EXPECT_FALSE(spawn3_finished); // spawn3 canceled - EXPECT_TRUE(spawn4_finished); // spawn4 not canceled -} - -TEST(co_throttle, spawn_exception) -{ - constexpr size_t limit = 2; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; - - std::optional ec_spawn1; - std::optional ec_spawn2; - std::optional ec_spawn3; - bool spawn1_finished = false; - bool spawn3_finished = false; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished)); - auto eptr = std::make_exception_ptr(std::runtime_error{"oops"}); - ec_spawn2 = co_await throttle.spawn(worker(eptr)); - ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished)); - }, rethrow); + asio::io_context ctx; + + void_waiter waiter1; + void_waiter waiter2; + void_waiter waiter3; + void_waiter waiter4; + bool cr1_completed = false; + bool cr2_completed = false; + bool cr3_completed = false; + bool cr4_completed = false; + std::exception_ptr spawn3_eptr; + + auto cr = [&] () -> awaitable { + auto ex = co_await asio::this_coro::executor; + auto throttle = co_throttle{ex, limit, cancel_on_error::all}; + co_await throttle.spawn(wait(waiter1, cr1_completed)); + co_await throttle.spawn(wait(waiter2, cr2_completed)); + try { + co_await throttle.spawn(wait(waiter3, cr3_completed)); + } catch (const std::exception&) { + spawn3_eptr = std::current_exception(); + } + co_await throttle.spawn(wait(waiter4, cr4_completed)); + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ctx, cr(), capture(result)); ctx.poll(); // run until spawn3 blocks + ASSERT_FALSE(ctx.stopped()); - ASSERT_TRUE(ec_spawn1); - EXPECT_FALSE(*ec_spawn1); - ASSERT_TRUE(ec_spawn2); - EXPECT_FALSE(*ec_spawn2); - - EXPECT_THROW(ctx.run_one(), std::runtime_error); - - ASSERT_FALSE(ec_spawn3); - EXPECT_FALSE(spawn1_finished); - EXPECT_FALSE(spawn3_finished); -} - -TEST(co_throttle, wait_exception) -{ - constexpr size_t limit = 1; - - boost::asio::io_context ctx; - auto ex = ctx.get_executor(); - auto throttle = co_throttle{ex, limit}; - - std::optional ec_spawn; - std::optional ec_wait; - - boost::asio::co_spawn(ex, - [&] () -> awaitable { - auto eptr = std::make_exception_ptr(std::runtime_error{"oops"}); - ec_spawn = co_await throttle.spawn(worker(eptr)); - ec_wait = co_await throttle.wait(); - }, rethrow); + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); ctx.poll(); // run until wait blocks - - ASSERT_TRUE(ec_spawn); - EXPECT_FALSE(*ec_spawn); - - EXPECT_THROW(ctx.run(), std::runtime_error); - - ASSERT_FALSE(ec_wait); + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(spawn3_eptr); + EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error); + EXPECT_FALSE(cr4_completed); + + waiter4.complete(nullptr); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + EXPECT_FALSE(cr1_completed); // cr1 canceled + EXPECT_FALSE(cr2_completed); // exited by exception + EXPECT_FALSE(cr3_completed); // cr3 canceled + EXPECT_TRUE(cr4_completed); // cr4 not canceled } } // namespace ceph::async -- 2.39.5