From 9217fcc55588e3ed62df4899c324568815d92c4a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 11 Jan 2023 10:45:49 -0500 Subject: [PATCH] common/async: add co_throttle for bounded concurrency with c++20 coroutines Signed-off-by: Casey Bodley --- src/common/async/co_throttle.h | 113 +++++ src/common/async/detail/co_throttle_impl.h | 222 +++++++++ src/test/common/CMakeLists.txt | 7 + src/test/common/test_async_co_throttle.cc | 548 +++++++++++++++++++++ 4 files changed, 890 insertions(+) create mode 100644 src/common/async/co_throttle.h create mode 100644 src/common/async/detail/co_throttle_impl.h create mode 100644 src/test/common/test_async_co_throttle.cc diff --git a/src/common/async/co_throttle.h b/src/common/async/co_throttle.h new file mode 100644 index 00000000000..880ffc96ce9 --- /dev/null +++ b/src/common/async/co_throttle.h @@ -0,0 +1,113 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include "common/async/cancel_on_error.h" +#include "common/async/detail/co_throttle_impl.h" + +namespace ceph::async { + +/// A coroutine throttle that allows a parent coroutine to spawn and manage +/// multiple child coroutines, while enforcing an upper bound on concurrency. +/// +/// Child coroutines must be of type awaitable. Exceptions thrown by +/// children are rethrown to the parent on its next call to spawn() or wait(). +/// The cancel_on_error option controls whether these exceptions errors trigger +/// the cancellation of other children. +/// +/// All child coroutines are canceled by cancel() or co_throttle destruction. +/// This allows the parent coroutine to share memory with its child coroutines +/// without fear of dangling references. +/// +/// This class is not thread-safe, so a strand executor should be used in +/// multi-threaded contexts. +/// +/// Example: +/// \code +/// awaitable child(task& t); +/// +/// awaitable parent(std::span tasks) +/// { +/// // process all tasks, up to 10 at a time +/// auto ex = co_await boost::asio::this_coro::executor; +/// auto throttle = co_throttle{ex, 10}; +/// +/// for (auto& t : tasks) { +/// co_await throttle.spawn(child(t)); +/// } +/// co_await throttle.wait(); +/// } +/// \endcode +template +class co_throttle { + using impl_type = detail::co_throttle_impl; + boost::intrusive_ptr impl; + + public: + using executor_type = Executor; + executor_type get_executor() const noexcept { return impl->get_executor(); } + + static constexpr size_t max_limit = std::numeric_limits::max(); + + co_throttle(const executor_type& ex, size_t limit, + cancel_on_error on_error = cancel_on_error::none) + : impl(new impl_type(ex, limit, on_error)) + { + } + + ~co_throttle() + { + cancel(); + } + + co_throttle(const co_throttle&) = delete; + co_throttle& operator=(const co_throttle&) = delete; + + /// Try to spawn the given coroutine \ref cr. If this would exceed the + /// concurrency limit, wait for another coroutine to complete first. This + /// default limit can be overridden with the optional \ref smaller_limit + /// argument. + /// + /// If any spawned coroutines exit with an exception, the first exception is + /// rethrown by the next call to spawn() or wait(). If spawn() has an + /// exception to rethrow, it will spawn \cr first only in the case of + /// cancel_on_error::none. New coroutines can be spawned by later calls to + /// spawn() regardless of cancel_on_error. + auto spawn(boost::asio::awaitable cr, + size_t smaller_limit = max_limit) + -> boost::asio::awaitable + { + return impl->spawn(std::move(cr), smaller_limit); + } + + /// Wait for all associated coroutines to complete. If any of these coroutines + /// exit with an exception, the first of those exceptions is rethrown. + auto wait() + -> boost::asio::awaitable + { + return impl->wait(); + } + + /// Cancel all associated coroutines. + void cancel() + { + impl->cancel(); + } +}; + +} // namespace ceph::async diff --git a/src/common/async/detail/co_throttle_impl.h b/src/common/async/detail/co_throttle_impl.h new file mode 100644 index 00000000000..f2f17a043ab --- /dev/null +++ b/src/common/async/detail/co_throttle_impl.h @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/async/cancel_on_error.h" +#include "common/async/co_waiter.h" +#include "common/async/service.h" +#include "include/ceph_assert.h" + +namespace ceph::async::detail { + +// Coroutine throttle implementation. This is reference-counted so the +// co_spawn() completion handlers can extend the implementation's lifetime. +// This is required for per-op cancellation because the cancellation_signals +// must outlive their coroutine frames. +template +class co_throttle_impl : + public boost::intrusive_ref_counter, + boost::thread_unsafe_counter>, + public service_list_base_hook +{ + public: + using executor_type = Executor; + executor_type get_executor() const { return ex; } + + co_throttle_impl(const executor_type& ex, size_t limit, + cancel_on_error on_error) + : svc(boost::asio::use_service>( + boost::asio::query(ex, boost::asio::execution::context))), + ex(ex), limit(limit), on_error(on_error), + children(new child[limit]) + { + // register for service_shutdown() notifications + svc.add(*this); + + // initialize the free list + for (size_t i = 0; i < limit; i++) { + free.push_back(children[i]); + } + } + ~co_throttle_impl() + { + svc.remove(*this); + } + + auto spawn(boost::asio::awaitable cr, + size_t smaller_limit) + -> boost::asio::awaitable + { + if (unreported_exception && on_error != cancel_on_error::none) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); + } + + const size_t current_limit = std::min(smaller_limit, limit); + if (count >= current_limit) { + co_await wait_for(current_limit - 1); + if (unreported_exception && on_error != cancel_on_error::none) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); + } + } + + ++count; + + // move a free child to the outstanding list + ceph_assert(!free.empty()); + child& c = free.front(); + free.pop_front(); + outstanding.push_back(c); + + // spawn the coroutine with its associated cancellation signal + c.signal.emplace(); + c.canceled = false; + + boost::asio::co_spawn(get_executor(), std::move(cr), + boost::asio::bind_cancellation_slot(c.signal->slot(), + child_completion{this, c})); + + if (unreported_exception) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); + } + } + + auto wait() + -> boost::asio::awaitable + { + if (count > 0) { + co_await wait_for(0); + } + if (unreported_exception) { + std::rethrow_exception(std::exchange(unreported_exception, nullptr)); + } + } + + void cancel() + { + while (!outstanding.empty()) { + child& c = outstanding.front(); + outstanding.pop_front(); + + c.canceled = true; + c.signal->emit(boost::asio::cancellation_type::terminal); + } + } + + void service_shutdown() + { + waiter.shutdown(); + } + + private: + service& svc; + executor_type ex; + const size_t limit; + const cancel_on_error on_error; + + size_t count = 0; + size_t wait_for_count = 0; + + std::exception_ptr unreported_exception; + + // track each spawned coroutine for cancellation. these are stored in an + // array, and recycled after each use via the free list + struct child : boost::intrusive::list_base_hook<> { + std::optional signal; + bool canceled = false; + }; + std::unique_ptr children; + + using child_list = boost::intrusive::list>; + child_list outstanding; + child_list free; + + co_waiter waiter; + + // return an awaitable that completes once count <= target_count + auto wait_for(size_t target_count) + -> boost::asio::awaitable + { + wait_for_count = target_count; + return waiter.get(); + } + + void on_complete(child& c, std::exception_ptr eptr) + { + --count; + + if (c.canceled) { + // if the child was canceled, it was already removed from outstanding + ceph_assert(!c.is_linked()); + c.canceled = false; + c.signal.reset(); + free.push_back(c); + } else { + // move back to the free list + ceph_assert(c.is_linked()); + auto next = outstanding.erase(outstanding.iterator_to(c)); + c.signal.reset(); + free.push_back(c); + + if (eptr) { + if (eptr && !unreported_exception) { + unreported_exception = eptr; + } + + // handle cancel_on_error. cancellation signals may recurse into + // on_complete(), so move the entries into a separate list first + child_list to_cancel; + if (on_error == cancel_on_error::after) { + to_cancel.splice(to_cancel.end(), outstanding, + next, outstanding.end()); + } else if (on_error == cancel_on_error::all) { + to_cancel = std::move(outstanding); + } + + for (auto i = to_cancel.begin(); i != to_cancel.end(); ++i) { + child& c = *i; + i = to_cancel.erase(i); + + c.canceled = true; + c.signal->emit(boost::asio::cancellation_type::terminal); + } + } + } + + // maybe wake the waiter + if (waiter.waiting() && count <= wait_for_count) { + waiter.complete(nullptr); + } + } + + struct child_completion { + boost::intrusive_ptr impl; + child& c; + + void operator()(std::exception_ptr eptr) { + impl->on_complete(c, eptr); + } + }; +}; + +} // namespace ceph::async::detail diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 11a7ea0e20c..88fb5142995 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -361,10 +361,17 @@ add_executable(unittest_async_completion test_async_completion.cc) add_ceph_unittest(unittest_async_completion) target_link_libraries(unittest_async_completion ceph-common Boost::system) + add_executable(unittest_async_max_concurrent_for_each test_async_max_concurrent_for_each.cc) add_ceph_unittest(unittest_async_max_concurrent_for_each) target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context) +if(NOT WIN32) +add_executable(unittest_async_co_throttle test_async_co_throttle.cc) +add_ceph_unittest(unittest_async_co_throttle) +target_link_libraries(unittest_async_co_throttle ceph-common Boost::system) +endif(NOT WIN32) + add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc) add_ceph_unittest(unittest_async_shared_mutex) target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system) diff --git a/src/test/common/test_async_co_throttle.cc b/src/test/common/test_async_co_throttle.cc new file mode 100644 index 00000000000..31988c77bdd --- /dev/null +++ b/src/test/common/test_async_co_throttle.cc @@ -0,0 +1,548 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/co_throttle.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/async/co_waiter.h" + +namespace ceph::async { + +namespace asio = boost::asio; +namespace errc = boost::system::errc; +using boost::system::error_code; + +using executor_type = asio::any_io_executor; + +using void_waiter = co_waiter; + +auto capture(std::optional& eptr) +{ + return [&eptr] (std::exception_ptr e) { eptr = e; }; +} + +auto capture(asio::cancellation_signal& signal, + std::optional& eptr) +{ + return asio::bind_cancellation_slot(signal.slot(), capture(eptr)); +} + +asio::awaitable wait(void_waiter& waiter, bool& completed) +{ + co_await waiter.get(); + completed = true; +} + +TEST(co_throttle, wait_empty) +{ + constexpr size_t limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(co_throttle, spawn_over_limit) +{ + constexpr size_t limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; + bool spawn2_completed = false; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get()); + spawn2_completed = true; + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(spawn2_completed); + + waiter1.complete(nullptr); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + EXPECT_TRUE(spawn2_completed); + + waiter2.complete(nullptr); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(co_throttle, spawn_over_smaller_limit) +{ + constexpr size_t limit = 2; + constexpr size_t smaller_limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; + bool spawn2_completed = false; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get(), smaller_limit); + spawn2_completed = true; + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(spawn2_completed); + + waiter1.complete(nullptr); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn2_completed); + + waiter2.complete(nullptr); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(co_throttle, spawn_cancel) +{ + constexpr size_t limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; + bool spawn2_completed = false; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get()); + spawn2_completed = true; + co_await throttle.wait(); + }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ex, cr(), capture(signal, result)); + + ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(spawn2_completed); + + // cancel before spawn2 completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + EXPECT_FALSE(spawn2_completed); + ASSERT_TRUE(result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(co_throttle, wait_cancel) +{ + constexpr size_t limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter; + bool spawn_completed = false; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter.get()); + spawn_completed = true; + co_await throttle.wait(); + }; + + asio::cancellation_signal signal; + std::optional result; + asio::co_spawn(ex, cr(), capture(signal, result)); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn_completed); + EXPECT_FALSE(result); + + // cancel before wait completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); // poll runs to completion + ASSERT_TRUE(result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(co_throttle, spawn_shutdown) +{ + constexpr size_t limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter1; + void_waiter waiter2; + bool spawn1_completed = false; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter1.get()); + spawn1_completed = true; + co_await throttle.spawn(waiter2.get()); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until spawn2 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn1_completed); + EXPECT_FALSE(result); + // shut down before spawn2 completes +} + +TEST(co_throttle, wait_shutdown) +{ + constexpr size_t limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter; + bool spawn_completed = false; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter.get()); + spawn_completed = true; + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_TRUE(spawn_completed); + EXPECT_FALSE(result); + // shut down before wait completes +} + +TEST(co_throttle, spawn_error) +{ + constexpr size_t limit = 2; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter1; + void_waiter waiter2; + void_waiter waiter3; + bool cr1_completed = false; + bool cr2_completed = false; + bool cr3_completed = false; + std::exception_ptr spawn3_eptr; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(wait(waiter1, cr1_completed)); + co_await throttle.spawn(wait(waiter2, cr2_completed)); + try { + co_await throttle.spawn(wait(waiter3, cr3_completed)); + } catch (const std::exception&) { + spawn3_eptr = std::current_exception(); + } + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until spawn3 blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(cr1_completed); + EXPECT_FALSE(cr2_completed); + EXPECT_FALSE(cr3_completed); + + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(spawn3_eptr); + EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error); + EXPECT_FALSE(result); + + waiter1.complete(nullptr); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); // wait still blocked + + waiter3.complete(nullptr); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + EXPECT_TRUE(cr1_completed); + EXPECT_FALSE(cr2_completed); + EXPECT_TRUE(cr3_completed); // cr3 isn't canceled by cr2's error +} + +TEST(co_throttle, wait_error) +{ + constexpr size_t limit = 1; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter; + + auto cr = [&] () -> asio::awaitable { + auto throttle = co_throttle{co_await asio::this_coro::executor, limit}; + co_await throttle.spawn(waiter.get()); + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +TEST(co_throttle, spawn_cancel_on_error_after) +{ + constexpr size_t limit = 2; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter1; + void_waiter waiter2; + void_waiter waiter3; + void_waiter waiter4; + bool cr1_completed = false; + bool cr2_completed = false; + bool cr3_completed = false; + bool cr4_completed = false; + std::exception_ptr spawn3_eptr; + + auto cr = [&] () -> asio::awaitable { + auto ex = co_await asio::this_coro::executor; + auto throttle = co_throttle{ex, limit, cancel_on_error::after}; + co_await throttle.spawn(wait(waiter1, cr1_completed)); + co_await throttle.spawn(wait(waiter2, cr2_completed)); + try { + co_await throttle.spawn(wait(waiter3, cr3_completed)); + } catch (const std::exception&) { + spawn3_eptr = std::current_exception(); + } + co_await throttle.spawn(wait(waiter4, cr4_completed)); + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until spawn3 blocks + ASSERT_FALSE(ctx.stopped()); + + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(cr1_completed); + ASSERT_TRUE(spawn3_eptr); + EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error); + + waiter1.complete(nullptr); + + ctx.poll(); + ASSERT_FALSE(ctx.stopped()); // wait still blocked + EXPECT_FALSE(result); + EXPECT_TRUE(cr1_completed); + EXPECT_FALSE(cr4_completed); + + waiter4.complete(nullptr); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + EXPECT_FALSE(cr2_completed); // exited by exception + EXPECT_FALSE(cr3_completed); // cr3 canceled + EXPECT_TRUE(cr4_completed); // cr4 not canceled +} + +TEST(co_throttle, spawn_cancel_on_error_all) +{ + constexpr size_t limit = 2; + asio::io_context ctx; + executor_type ex = ctx.get_executor(); + + void_waiter waiter1; + void_waiter waiter2; + void_waiter waiter3; + void_waiter waiter4; + bool cr1_completed = false; + bool cr2_completed = false; + bool cr3_completed = false; + bool cr4_completed = false; + std::exception_ptr spawn3_eptr; + + auto cr = [&] () -> asio::awaitable { + auto ex = co_await asio::this_coro::executor; + auto throttle = co_throttle{ex, limit, cancel_on_error::all}; + co_await throttle.spawn(wait(waiter1, cr1_completed)); + co_await throttle.spawn(wait(waiter2, cr2_completed)); + try { + co_await throttle.spawn(wait(waiter3, cr3_completed)); + } catch (const std::exception&) { + spawn3_eptr = std::current_exception(); + } + co_await throttle.spawn(wait(waiter4, cr4_completed)); + co_await throttle.wait(); + }; + + std::optional result; + asio::co_spawn(ex, cr(), capture(result)); + + ctx.poll(); // run until spawn3 blocks + ASSERT_FALSE(ctx.stopped()); + + waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run until wait blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_TRUE(spawn3_eptr); + EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error); + EXPECT_FALSE(cr4_completed); + + waiter4.complete(nullptr); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); + EXPECT_FALSE(cr1_completed); // cr1 canceled + EXPECT_FALSE(cr2_completed); // exited by exception + EXPECT_FALSE(cr3_completed); // cr3 canceled + EXPECT_TRUE(cr4_completed); // cr4 not canceled +} + +TEST(co_throttle, cross_thread_cancel) +{ + constexpr size_t limit = 1; + // run the coroutine in a background thread + asio::thread_pool ctx{1}; + executor_type ex = ctx.get_executor(); + + std::latch waiting{1}; + + auto cr = [ex, &waiting] () -> asio::awaitable { + auto throttle = co_throttle{ex, limit}; + co_waiter waiter; + co_await throttle.spawn(waiter.get()); + // decrement the latch after throttle.wait() suspends + asio::defer(ex, [&waiting] { waiting.count_down(); }); + co_await throttle.wait(); + }; + + asio::cancellation_signal signal; + std::optional result; + // without bind_executor(), tsan identifies a data race on signal.emit() + asio::co_spawn(ex, cr(), bind_executor(ex, capture(signal, result))); + + waiting.wait(); // wait until we've suspended in throttle.wait() + + signal.emit(asio::cancellation_type::terminal); + + ctx.join(); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception&) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +} // namespace ceph::async -- 2.39.5