// co_spawn() completion handlers can extend the implementation's lifetime.
// This is required for per-op cancellation because the cancellation_signals
// must outlive their coroutine frames.
-template <boost::asio::execution::executor Executor, typename SizeType>
+template <boost::asio::execution::executor Executor>
class co_throttle_impl :
- public boost::intrusive_ref_counter<co_throttle_impl<Executor, SizeType>,
+ public boost::intrusive_ref_counter<co_throttle_impl<Executor>,
boost::thread_unsafe_counter>,
public service_list_base_hook
{
public:
- using size_type = SizeType;
+ using size_type = uint16_t;
using executor_type = Executor;
executor_type get_executor() const { return ex; }
svc.remove(*this);
}
- template <typename T>
- using awaitable = boost::asio::awaitable<T, executor_type>;
-
- template <typename T> // where T=void or error_code
- auto spawn(awaitable<T> cr, size_type smaller_limit)
- -> awaitable<boost::system::error_code>
+ auto spawn(boost::asio::awaitable<void, executor_type> cr,
+ size_type smaller_limit)
+ -> boost::asio::awaitable<void, executor_type>
{
- if (unreported_exception) {
+ if (unreported_exception && on_error != cancel_on_error::none) {
std::rethrow_exception(std::exchange(unreported_exception, nullptr));
}
- if (unreported_error && on_error != cancel_on_error::none) {
- co_return std::exchange(unreported_error, {});
- }
const size_type current_limit = std::min(smaller_limit, limit);
if (count >= current_limit) {
- auto ec = co_await wait_for(current_limit - 1);
- if (ec) {
- unreported_error.clear();
- co_return ec;
- }
- if (unreported_error && on_error != cancel_on_error::none) {
- co_return std::exchange(unreported_error, {});
+ co_await wait_for(current_limit - 1);
+ if (unreported_exception && on_error != cancel_on_error::none) {
+ std::rethrow_exception(std::exchange(unreported_exception, nullptr));
}
}
boost::asio::bind_cancellation_slot(c.signal->slot(),
child_completion{this, c}));
- co_return std::exchange(unreported_error, {});
+ if (unreported_exception) {
+ std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+ }
}
- awaitable<boost::system::error_code> wait()
+ auto wait()
+ -> boost::asio::awaitable<void, executor_type>
{
if (count > 0) {
- auto ec = co_await wait_for(0);
- if (ec) {
- unreported_error.clear();
- co_return ec;
- }
+ co_await wait_for(0);
+ }
+ if (unreported_exception) {
+ std::rethrow_exception(std::exchange(unreported_exception, nullptr));
}
- co_return std::exchange(unreported_error, {});
}
void cancel()
{
- for (child& c : outstanding) {
+ while (!outstanding.empty()) {
+ child& c = outstanding.front();
+ outstanding.pop_front();
+
c.canceled = true;
c.signal->emit(boost::asio::cancellation_type::terminal);
}
- if (waiter.waiting()) {
- auto eptr = std::exchange(unreported_exception, nullptr);
- auto ec = make_error_code(boost::asio::error::operation_aborted);
- waiter.complete(eptr, ec);
- }
}
void service_shutdown()
size_type count = 0;
size_type wait_for_count = 0;
- boost::system::error_code unreported_error;
std::exception_ptr unreported_exception;
// track each spawned coroutine for cancellation. these are stored in an
child_list outstanding;
child_list free;
- co_waiter<boost::system::error_code, executor_type> waiter;
+ co_waiter<void, executor_type> waiter;
// return an awaitable that completes once count <= target_count
auto wait_for(size_type target_count)
- -> awaitable<boost::system::error_code>
+ -> boost::asio::awaitable<void, executor_type>
{
wait_for_count = target_count;
return waiter.get();
}
- void on_complete(child& c, std::exception_ptr eptr,
- boost::system::error_code ec)
+ void on_complete(child& c, std::exception_ptr eptr)
{
--count;
if (c.canceled) {
- // don't report cancellation errors. cancellation was either requested
- // by the user, or triggered by another failure that is reported
- eptr = nullptr;
- ec = {};
- }
-
- if (eptr && !unreported_exception) {
- unreported_exception = eptr;
- }
- if (ec && !unreported_error) {
- unreported_error = ec;
- }
-
- // move back to the free list
- auto next = outstanding.erase(outstanding.iterator_to(c));
- c.signal.reset();
- free.push_back(c);
-
- // handle cancel_on_error
- if (eptr || ec) {
- auto cancel_begin = outstanding.end();
- if (on_error == cancel_on_error::after) {
- cancel_begin = next;
- } else if (on_error == cancel_on_error::all) {
- cancel_begin = outstanding.begin();
- }
- for (auto i = cancel_begin; i != outstanding.end(); ++i) {
- i->canceled = true;
- i->signal->emit(boost::asio::cancellation_type::terminal);
+ // if the child was canceled, it was already removed from outstanding
+ ceph_assert(!c.is_linked());
+ c.canceled = false;
+ c.signal.reset();
+ free.push_back(c);
+ } else {
+ // move back to the free list
+ ceph_assert(c.is_linked());
+ auto next = outstanding.erase(outstanding.iterator_to(c));
+ c.signal.reset();
+ free.push_back(c);
+
+ if (eptr) {
+ if (eptr && !unreported_exception) {
+ unreported_exception = eptr;
+ }
+
+ // handle cancel_on_error. cancellation signals may recurse into
+ // on_complete(), so move the entries into a separate list first
+ child_list to_cancel;
+ if (on_error == cancel_on_error::after) {
+ to_cancel.splice(to_cancel.end(), outstanding,
+ next, outstanding.end());
+ } else if (on_error == cancel_on_error::all) {
+ to_cancel = std::move(outstanding);
+ }
+
+ for (auto i = to_cancel.begin(); i != to_cancel.end(); ++i) {
+ child& c = *i;
+ i = to_cancel.erase(i);
+
+ c.canceled = true;
+ c.signal->emit(boost::asio::cancellation_type::terminal);
+ }
}
}
// maybe wake the waiter
if (waiter.waiting() && count <= wait_for_count) {
- auto eptr = std::exchange(unreported_exception, nullptr);
- waiter.complete(eptr, {});
+ waiter.complete(nullptr);
}
}
boost::intrusive_ptr<co_throttle_impl> impl;
child& c;
- void operator()(std::exception_ptr eptr,
- boost::system::error_code ec = {}) {
- impl->on_complete(c, eptr, ec);
+ void operator()(std::exception_ptr eptr) {
+ impl->on_complete(c, eptr);
}
};
};
*/
#include "common/async/co_throttle.h"
-#include <chrono>
-#include <boost/asio/basic_waitable_timer.hpp>
+#include <optional>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
namespace ceph::async {
+namespace asio = boost::asio;
namespace errc = boost::system::errc;
using boost::system::error_code;
-using executor_type = boost::asio::io_context::executor_type;
+using executor_type = asio::io_context::executor_type;
template <typename T>
-using awaitable = boost::asio::awaitable<T, executor_type>;
-using use_awaitable_t = boost::asio::use_awaitable_t<executor_type>;
+using awaitable = asio::awaitable<T, executor_type>;
+using use_awaitable_t = asio::use_awaitable_t<executor_type>;
static constexpr use_awaitable_t use_awaitable{};
-using clock_type = std::chrono::steady_clock;
-using timer_type = boost::asio::basic_waitable_timer<clock_type,
- boost::asio::wait_traits<clock_type>, executor_type>;
+using void_waiter = co_waiter<void, executor_type>;
-void rethrow(std::exception_ptr eptr)
+auto capture(std::optional<std::exception_ptr>& eptr)
{
- if (eptr) std::rethrow_exception(eptr);
+ return [&eptr] (std::exception_ptr e) { eptr = e; };
}
-using namespace std::chrono_literals;
-
-auto worker(std::chrono::milliseconds delay = 20ms)
- -> awaitable<void>
-{
- auto timer = timer_type{co_await boost::asio::this_coro::executor, delay};
- co_await timer.async_wait(use_awaitable);
-}
-
-auto worker(error_code ec, std::chrono::milliseconds delay = 10ms)
- -> awaitable<error_code>
-{
- co_await worker(delay);
- co_return ec;
-}
-
-auto worker(std::exception_ptr eptr, std::chrono::milliseconds delay = 10ms)
- -> awaitable<void>
-{
- co_await worker(delay);
- std::rethrow_exception(eptr);
-}
-
-auto worker(bool& finished, std::chrono::milliseconds delay = 20ms)
- -> awaitable<void>
-{
- co_await worker(delay);
- finished = true;
-}
-
-auto counting_worker(size_t& count, size_t& max_count)
- -> awaitable<void>
+auto capture(asio::cancellation_signal& signal,
+ std::optional<std::exception_ptr>& eptr)
{
- ++count;
- if (max_count < count) {
- max_count = count;
- }
- co_await worker();
- --count;
+ return asio::bind_cancellation_slot(signal.slot(), capture(eptr));
}
-// use a worker that never completes to test cancellation
-awaitable<void> lazy_worker()
+awaitable<void> wait(void_waiter& waiter, bool& completed)
{
- for (;;) {
- co_await worker();
- }
+ co_await waiter.get();
+ completed = true;
}
TEST(co_throttle, wait_empty)
{
constexpr size_t limit = 1;
+ asio::io_context ctx;
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
-
- std::optional<error_code> ec_wait;
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.wait();
+ };
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
ctx.poll();
- ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
- ASSERT_TRUE(ec_wait); // wait returns immediately if nothing was spawned
- EXPECT_FALSE(*ec_wait);
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
}
TEST(co_throttle, spawn_over_limit)
{
constexpr size_t limit = 1;
+ asio::io_context ctx;
- size_t count = 0;
- size_t max_count = 0;
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
+ bool spawn2_completed = false;
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get());
+ spawn2_completed = true;
+ co_await throttle.wait();
+ };
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
- std::optional<error_code> ec_wait;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count));
- ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count));
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(spawn2_completed);
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- EXPECT_FALSE(ec_spawn2);
+ waiter1.complete(nullptr);
- ctx.run_one(); // wait for spawn1's completion
ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+ EXPECT_TRUE(spawn2_completed);
- ASSERT_TRUE(ec_spawn2);
- EXPECT_FALSE(*ec_spawn2);
- EXPECT_FALSE(ec_wait);
-
- ctx.run(); // run to completion
-
- ASSERT_TRUE(ec_wait);
- EXPECT_FALSE(*ec_wait);
+ waiter2.complete(nullptr);
- EXPECT_EQ(max_count, limit); // count never exceeds limit
- EXPECT_EQ(count, 0);
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
}
TEST(co_throttle, spawn_over_smaller_limit)
{
constexpr size_t limit = 2;
constexpr size_t smaller_limit = 1;
+ asio::io_context ctx;
- size_t count = 0;
- size_t max_count = 0;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
+ bool spawn2_completed = false;
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
- std::optional<error_code> ec_wait;
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get(), smaller_limit);
+ spawn2_completed = true;
+ co_await throttle.wait();
+ };
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(counting_worker(count, max_count));
- ec_spawn2 = co_await throttle.spawn(counting_worker(count, max_count),
- smaller_limit);
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(spawn2_completed);
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- EXPECT_FALSE(ec_spawn2);
+ waiter1.complete(nullptr);
- ctx.run_one(); // wait for spawn1's completion
ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn2_completed);
- ASSERT_TRUE(ec_spawn2);
- EXPECT_FALSE(*ec_spawn2);
- EXPECT_FALSE(ec_wait);
-
- ctx.run(); // run to completion
-
- ASSERT_TRUE(ec_wait);
- EXPECT_FALSE(*ec_wait);
+ waiter2.complete(nullptr);
- EXPECT_EQ(max_count, smaller_limit); // count never exceeds smaller_limit
- EXPECT_EQ(count, 0);
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
}
TEST(co_throttle, spawn_cancel)
{
constexpr size_t limit = 1;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
-
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
- std::optional<error_code> ec_wait;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(lazy_worker());
- ec_spawn2 = co_await throttle.spawn(lazy_worker());
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ asio::io_context ctx;
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
+ bool spawn2_completed = false;
+
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get());
+ spawn2_completed = true;
+ co_await throttle.wait();
+ };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(signal, result));
ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(spawn2_completed);
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- EXPECT_FALSE(ec_spawn2);
- EXPECT_FALSE(ec_wait);
-
- throttle.cancel();
+ // cancel before spawn2 completes
+ signal.emit(asio::cancellation_type::terminal);
ctx.poll();
ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
- ASSERT_TRUE(ec_spawn2);
- EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted);
- ASSERT_TRUE(ec_wait);
- EXPECT_FALSE(*ec_wait); // wait after cancel succeeds immediately
+ EXPECT_FALSE(spawn2_completed);
+ ASSERT_TRUE(result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
}
TEST(co_throttle, wait_cancel)
{
constexpr size_t limit = 1;
+ asio::io_context ctx;
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
+ void_waiter waiter;
+ bool spawn_completed = false;
- std::optional<error_code> ec_spawn;
- std::optional<error_code> ec_wait;
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter.get());
+ spawn_completed = true;
+ co_await throttle.wait();
+ };
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn = co_await throttle.spawn(lazy_worker());
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(signal, result));
ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn_completed);
+ EXPECT_FALSE(result);
- ASSERT_TRUE(ec_spawn);
- EXPECT_FALSE(*ec_spawn);
- EXPECT_FALSE(ec_wait);
-
- throttle.cancel();
+ // cancel before wait completes
+ signal.emit(asio::cancellation_type::terminal);
ctx.poll();
ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
- ASSERT_TRUE(ec_wait);
- EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted);
+ ASSERT_TRUE(result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
}
TEST(co_throttle, spawn_shutdown)
{
constexpr size_t limit = 1;
+ asio::io_context ctx;
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get());
+ };
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- auto throttle = co_throttle{ex, limit};
- ec_spawn1 = co_await throttle.spawn(lazy_worker());
- ec_spawn2 = co_await throttle.spawn(lazy_worker());
- }, rethrow);
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
- ctx.run_one(); // call spawn1 and spawn2
-
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- EXPECT_FALSE(ec_spawn2);
-
- // shut down io_context before spawn2 unblocks
+ ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(result);
+ // shut down before spawn2 completes
}
TEST(co_throttle, wait_shutdown)
{
constexpr size_t limit = 1;
+ asio::io_context ctx;
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
-
- std::optional<error_code> ec_spawn;
- std::optional<error_code> ec_wait;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- auto throttle = co_throttle{ex, limit};
- ec_spawn = co_await throttle.spawn(lazy_worker());
- ec_wait = co_await throttle.wait();
- }, rethrow);
-
- ctx.run_one(); // call spawn and wait
-
- ASSERT_TRUE(ec_spawn);
- EXPECT_FALSE(*ec_spawn);
- EXPECT_FALSE(ec_wait);
-
- // shut down io_context before wait unblocks
-}
-
-TEST(co_throttle, spawn_destroy)
-{
- constexpr size_t limit = 1;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
-
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
-
- {
- auto throttle = co_throttle{ex, limit};
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(lazy_worker());
- ec_spawn2 = co_await throttle.spawn(lazy_worker());
- }, rethrow);
-
- ctx.poll(); // run until spawn2 blocks
-
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- EXPECT_FALSE(ec_spawn2);
- // throttle canceled/destroyed
- }
-
- ctx.poll();
- ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
- ASSERT_TRUE(ec_spawn2);
- EXPECT_EQ(*ec_spawn2, boost::asio::error::operation_aborted);
-}
-
-TEST(co_throttle, wait_destroy)
-{
- constexpr size_t limit = 1;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
-
- std::optional<error_code> ec_spawn;
- std::optional<error_code> ec_wait;
-
- {
- auto throttle = co_throttle{ex, limit};
+ void_waiter waiter;
+ bool spawn_completed = false;
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn = co_await throttle.spawn(lazy_worker());
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter.get());
+ spawn_completed = true;
+ co_await throttle.wait();
+ };
- ctx.poll(); // run until wait blocks
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
- ASSERT_TRUE(ec_spawn);
- EXPECT_FALSE(*ec_spawn);
- EXPECT_FALSE(ec_wait);
- // throttle canceled/destroyed
- }
-
- ctx.poll();
- ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
- ASSERT_TRUE(ec_wait);
- EXPECT_EQ(*ec_wait, boost::asio::error::operation_aborted);
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn_completed);
+ EXPECT_FALSE(result);
+ // shut down before wait completes
}
TEST(co_throttle, spawn_error)
{
constexpr size_t limit = 2;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
-
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
- std::optional<error_code> ec_spawn3;
- std::optional<error_code> ec_wait;
- bool spawn1_finished = false;
- bool spawn3_finished = false;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
- auto ec = make_error_code(errc::invalid_argument);
- ec_spawn2 = co_await throttle.spawn(worker(ec));
- ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ asio::io_context ctx;
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ void_waiter waiter3;
+ bool cr1_completed = false;
+ bool cr2_completed = false;
+ bool cr3_completed = false;
+ std::exception_ptr spawn3_eptr;
+
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(wait(waiter1, cr1_completed));
+ co_await throttle.spawn(wait(waiter2, cr2_completed));
+ try {
+ co_await throttle.spawn(wait(waiter3, cr3_completed));
+ } catch (const std::exception&) {
+ spawn3_eptr = std::current_exception();
+ }
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
ctx.poll(); // run until spawn3 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(cr1_completed);
+ EXPECT_FALSE(cr2_completed);
+ EXPECT_FALSE(cr3_completed);
+
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- ASSERT_TRUE(ec_spawn2);
- EXPECT_FALSE(*ec_spawn2);
- EXPECT_FALSE(ec_spawn3);
- EXPECT_FALSE(spawn1_finished);
- EXPECT_FALSE(spawn3_finished);
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ ASSERT_TRUE(spawn3_eptr);
+ EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+ EXPECT_FALSE(result);
- ctx.run_one(); // wait for spawn2's completion
- ctx.poll(); // run until wait() blocks
+ waiter1.complete(nullptr);
- ASSERT_TRUE(ec_spawn3);
- EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
- EXPECT_FALSE(ec_wait);
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped()); // wait still blocked
- ctx.run(); // run to completion
+ waiter3.complete(nullptr);
- EXPECT_TRUE(spawn3_finished); // spawn3 isn't canceled by spawn2's error
- ASSERT_TRUE(ec_wait);
- EXPECT_FALSE(*ec_wait);
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+ EXPECT_TRUE(cr1_completed);
+ EXPECT_FALSE(cr2_completed);
+ EXPECT_TRUE(cr3_completed); // cr3 isn't canceled by cr2's error
}
TEST(co_throttle, wait_error)
{
constexpr size_t limit = 1;
+ asio::io_context ctx;
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
+ void_waiter waiter;
- std::optional<error_code> ec_spawn;
- std::optional<error_code> ec_wait;
+ auto cr = [&] () -> awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter.get());
+ co_await throttle.wait();
+ };
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- auto ec = make_error_code(errc::invalid_argument);
- ec_spawn = co_await throttle.spawn(worker(ec));
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
- ASSERT_TRUE(ec_spawn);
- EXPECT_FALSE(*ec_spawn);
- EXPECT_FALSE(ec_wait);
-
- ctx.run(); // run to completion
+ waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
- ASSERT_TRUE(ec_wait);
- EXPECT_EQ(*ec_wait, errc::invalid_argument);
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
}
TEST(co_throttle, spawn_cancel_on_error_after)
{
constexpr size_t limit = 2;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit, cancel_on_error::after};
-
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
- std::optional<error_code> ec_spawn3;
- std::optional<error_code> ec_spawn4;
- std::optional<error_code> ec_wait;
- bool spawn1_finished = false;
- bool spawn3_finished = false;
- bool spawn4_finished = false;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
- auto ec = make_error_code(errc::invalid_argument);
- ec_spawn2 = co_await throttle.spawn(worker(ec));
- // spawn3 expects invalid_argument error and cancellation
- ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
- // spawn4 expects success
- ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished));
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ asio::io_context ctx;
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ void_waiter waiter3;
+ void_waiter waiter4;
+ bool cr1_completed = false;
+ bool cr2_completed = false;
+ bool cr3_completed = false;
+ bool cr4_completed = false;
+ std::exception_ptr spawn3_eptr;
+
+ auto cr = [&] () -> awaitable<void> {
+ auto ex = co_await asio::this_coro::executor;
+ auto throttle = co_throttle{ex, limit, cancel_on_error::after};
+ co_await throttle.spawn(wait(waiter1, cr1_completed));
+ co_await throttle.spawn(wait(waiter2, cr2_completed));
+ try {
+ co_await throttle.spawn(wait(waiter3, cr3_completed));
+ } catch (const std::exception&) {
+ spawn3_eptr = std::current_exception();
+ }
+ co_await throttle.spawn(wait(waiter4, cr4_completed));
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
ctx.poll(); // run until spawn3 blocks
-
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- ASSERT_TRUE(ec_spawn2);
- EXPECT_FALSE(*ec_spawn2);
- EXPECT_FALSE(spawn1_finished);
-
- ctx.run_one(); // wait for spawn2's completion
- ctx.poll();
ASSERT_FALSE(ctx.stopped());
- ASSERT_TRUE(ec_spawn3);
- EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
- ASSERT_TRUE(ec_spawn4);
- EXPECT_FALSE(*ec_spawn4);
- EXPECT_FALSE(spawn1_finished);
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
- ctx.run_one(); // wait for spawn1's completion
ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(cr1_completed);
+ ASSERT_TRUE(spawn3_eptr);
+ EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
- EXPECT_FALSE(ec_wait);
- EXPECT_TRUE(spawn1_finished); // spawn1 not canceled
+ waiter1.complete(nullptr);
- ctx.run_one(); // wait for spawn4's completion
ctx.poll();
- ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
- ASSERT_TRUE(ec_wait);
- EXPECT_FALSE(*ec_wait);
- EXPECT_FALSE(spawn3_finished); // spawn3 canceled
- EXPECT_TRUE(spawn4_finished); // spawn4 not canceled
+ ASSERT_FALSE(ctx.stopped()); // wait still blocked
+ EXPECT_FALSE(result);
+ EXPECT_TRUE(cr1_completed);
+ EXPECT_FALSE(cr4_completed);
+
+ waiter4.complete(nullptr);
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+ EXPECT_FALSE(cr2_completed); // exited by exception
+ EXPECT_FALSE(cr3_completed); // cr3 canceled
+ EXPECT_TRUE(cr4_completed); // cr4 not canceled
}
TEST(co_throttle, spawn_cancel_on_error_all)
{
constexpr size_t limit = 2;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit, cancel_on_error::all};
-
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
- std::optional<error_code> ec_spawn3;
- std::optional<error_code> ec_spawn4;
- std::optional<error_code> ec_wait;
- bool spawn1_finished = false;
- bool spawn3_finished = false;
- bool spawn4_finished = false;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
- auto ec = make_error_code(errc::invalid_argument);
- ec_spawn2 = co_await throttle.spawn(worker(ec));
- // spawn3 expects invalid_argument error and cancellation
- ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
- // spawn3 expects success
- ec_spawn4 = co_await throttle.spawn(worker(spawn4_finished));
- ec_wait = co_await throttle.wait();
- }, rethrow);
-
- ctx.poll(); // run until spawn3 blocks
-
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- ASSERT_TRUE(ec_spawn2);
- EXPECT_FALSE(*ec_spawn2);
- EXPECT_FALSE(ec_spawn3);
- EXPECT_FALSE(ec_spawn4);
-
- ctx.run_one(); // wait for spawn2's completion
- ctx.poll(); // run until wait blocks
-
- ASSERT_TRUE(ec_spawn3);
- EXPECT_EQ(*ec_spawn3, errc::invalid_argument);
- ASSERT_TRUE(ec_spawn4);
- EXPECT_FALSE(*ec_spawn4);
- EXPECT_FALSE(ec_wait);
- EXPECT_FALSE(spawn1_finished); // spawn1 canceled
-
- ctx.run_one(); // wait for spawn4's completion
- ctx.poll();
- ASSERT_TRUE(ctx.stopped()); // poll runs to completion
-
- ASSERT_TRUE(ec_wait);
- EXPECT_FALSE(*ec_wait);
- EXPECT_FALSE(spawn3_finished); // spawn3 canceled
- EXPECT_TRUE(spawn4_finished); // spawn4 not canceled
-}
-
-TEST(co_throttle, spawn_exception)
-{
- constexpr size_t limit = 2;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
-
- std::optional<error_code> ec_spawn1;
- std::optional<error_code> ec_spawn2;
- std::optional<error_code> ec_spawn3;
- bool spawn1_finished = false;
- bool spawn3_finished = false;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- ec_spawn1 = co_await throttle.spawn(worker(spawn1_finished));
- auto eptr = std::make_exception_ptr(std::runtime_error{"oops"});
- ec_spawn2 = co_await throttle.spawn(worker(eptr));
- ec_spawn3 = co_await throttle.spawn(worker(spawn3_finished));
- }, rethrow);
+ asio::io_context ctx;
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ void_waiter waiter3;
+ void_waiter waiter4;
+ bool cr1_completed = false;
+ bool cr2_completed = false;
+ bool cr3_completed = false;
+ bool cr4_completed = false;
+ std::exception_ptr spawn3_eptr;
+
+ auto cr = [&] () -> awaitable<void> {
+ auto ex = co_await asio::this_coro::executor;
+ auto throttle = co_throttle{ex, limit, cancel_on_error::all};
+ co_await throttle.spawn(wait(waiter1, cr1_completed));
+ co_await throttle.spawn(wait(waiter2, cr2_completed));
+ try {
+ co_await throttle.spawn(wait(waiter3, cr3_completed));
+ } catch (const std::exception&) {
+ spawn3_eptr = std::current_exception();
+ }
+ co_await throttle.spawn(wait(waiter4, cr4_completed));
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ctx, cr(), capture(result));
ctx.poll(); // run until spawn3 blocks
+ ASSERT_FALSE(ctx.stopped());
- ASSERT_TRUE(ec_spawn1);
- EXPECT_FALSE(*ec_spawn1);
- ASSERT_TRUE(ec_spawn2);
- EXPECT_FALSE(*ec_spawn2);
-
- EXPECT_THROW(ctx.run_one(), std::runtime_error);
-
- ASSERT_FALSE(ec_spawn3);
- EXPECT_FALSE(spawn1_finished);
- EXPECT_FALSE(spawn3_finished);
-}
-
-TEST(co_throttle, wait_exception)
-{
- constexpr size_t limit = 1;
-
- boost::asio::io_context ctx;
- auto ex = ctx.get_executor();
- auto throttle = co_throttle{ex, limit};
-
- std::optional<error_code> ec_spawn;
- std::optional<error_code> ec_wait;
-
- boost::asio::co_spawn(ex,
- [&] () -> awaitable<void> {
- auto eptr = std::make_exception_ptr(std::runtime_error{"oops"});
- ec_spawn = co_await throttle.spawn(worker(eptr));
- ec_wait = co_await throttle.wait();
- }, rethrow);
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
ctx.poll(); // run until wait blocks
-
- ASSERT_TRUE(ec_spawn);
- EXPECT_FALSE(*ec_spawn);
-
- EXPECT_THROW(ctx.run(), std::runtime_error);
-
- ASSERT_FALSE(ec_wait);
+ ASSERT_FALSE(ctx.stopped());
+ ASSERT_TRUE(spawn3_eptr);
+ EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+ EXPECT_FALSE(cr4_completed);
+
+ waiter4.complete(nullptr);
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+ EXPECT_FALSE(cr1_completed); // cr1 canceled
+ EXPECT_FALSE(cr2_completed); // exited by exception
+ EXPECT_FALSE(cr3_completed); // cr3 canceled
+ EXPECT_TRUE(cr4_completed); // cr4 not canceled
}
} // namespace ceph::async